# from src.plugins.CreditImportExport.CustomsImportExport.CustomsImportExportDetail import *
from urllib import parse
import logging
import ddddocr
import requests
import re,json,random,time
logging.basicConfig(level=logging.INFO)
class Ctu_cookie:
def __init__(self):
self.headers = {
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9",
"Accept-Language":"zh-CN,zh;q=0.9",
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"Pragma": "no-cache",
"Upgrade-Insecure-Requests": "1",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/106.0.0.0 Safari/537.36"
}
self.headers_api = {
"Content-Type": "application/json",
"Accept": "application/json"
}
self.index_url = "http://zxgk.court.gov.cn/zhixing/"
self.url_search = "http://zxgk.court.gov.cn/zhixing/searchBzxr.do"
self.session = requests.session()
self.cookie =None
self.second_content=None
self.second_innerjs=None
self.second_win_ts =None
def initial(self, options, proxy_queue, index):
logging.debug(f"{self.__class__.__name__}_{proxy_queue}_{index}")
self._options = options
def get_html_202(self, url):
self.session = requests.session()
html = self.session.get(url, headers=self.headers)
content = re.findall(r'', html.text)[0].replace('"', '')
win_ts_url = re.findall('" src="(.*?)" r=\'m\'>', html.text)[0]
win_ts_url = parse.urljoin(url, win_ts_url)
innerjs = requests.get(win_ts_url, headers=self.headers, verify=False).text
win_ts = re.findall(r"r='m'>(.*?)", html.text, re.S)[1]
return content, innerjs, win_ts
def get_html_200(self, response):
second_content = re.findall(r'', response.text)[0].replace('"', '')
second_win_ts_url = 'http://zxgk.court.gov.cn/U52nf4AkCaDm/fYlbxzjRpgxD.11afee1.js'
second_innerjs = self.session.get(second_win_ts_url, headers=self.headers, verify=False).text
second_win_ts = re.findall(r"r='m'>(.*?)", response.text)[1]
return second_content, second_innerjs, second_win_ts
def update(self):
"""
#刷新cookie
"""
data_update = {
'content': self.second_content,
'innerjs': self.second_innerjs,
'win_ts': self.second_win_ts,
'cookie': self.cookie,
}
result = requests.post('http://127.0.0.1:8006/rs_update_cookie', json=data_update,headers=self.headers_api)
if result.status_code == 200:
self.cookie = json.loads(result.text)['cookie']
self.session.cookies.update({'lqWVdQzgOVyaT':self.cookie})
logging.info(f"更新cookie_length:{len(self.cookie)},cookie:{self.cookie}")
else:
logging.error(f"更新cookie失败,状态码:{result.status_code}")
def get_init_cookie(self, index_url):
"""
# 初始化cookie
"""
#202
for ret in range(5):
try:
content,innerjs,win_ts = self.get_html_202(index_url)
data = {
'content': content,
'innerjs':innerjs,
'win_ts':win_ts,
}
result = requests.post('http://127.0.0.1:8006/rs_202', json=data, headers=self.headers_api)
self.cookie = json.loads(result.text)['cookie']
self.session.cookies.update({'lqWVdQzgOVyaT':self.cookie})
response = self.session.get(index_url, headers=self.headers)
logging.info(f"412-200状态码:{response.status_code},第一次cookie:{len(self.cookie)},{self.cookie}")
#200
self.second_content,self.second_innerjs,self.second_win_ts = self.get_html_200(response)
break
except Exception as e:
logging.error(f"请求首页报错,{e}")
def get_captchaId(self):
chars = ['0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'A',
'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M',
'N', 'O', 'P', 'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X', 'Y',
'Z', 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k',
'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w',
'x', 'y', 'z']
nums = ""
for i in range(32):
ids = int(random.random() * 61)
nums += chars[ids]
return nums
def get_img(self):
"""
验证码部分
"""
while True:
self.update()
captchaId = self.get_captchaId()
params = {
"captchaId": captchaId,
"random": str(random.random())
}
try:
response = self.session.get("http://zxgk.court.gov.cn/zhixing/captcha.do", headers=self.headers, params=params)
print("验证码状态码:", response.status_code)
if response.status_code == 200:
ocr = ddddocr.DdddOcr()
yzm_code = ocr.classification(response.content)
return yzm_code,captchaId
except Exception as e:
logging.error(f"请求验证码失败,{e}")
def request_(self):
"""
业务请求
"""
for ret in range(6):
yzm_code, captchaId = self.get_img()
self.update()
try:
data = {
"pName": "张三",
"pCardNum": "",
"selectCourtId": "0",
"pCode": yzm_code,
"captchaId": captchaId,
"searchCourtName": "全国法院(包含地方各级法院)",
"selectCourtArrange": "1",
"currentPage": "1"
}
response = self.session.post(self.url_search, headers=self.headers, data=data, timeout=10)
if response.status_code == 200:
return response
elif response.status_code == 502:
print(f"请求搜索页状态码不正确,{response.status_code}")
elif response.status_code == 500:
time.sleep(3)
print(f"对方服务器出错,延时3s,{response.status_code}")
else:
self.get_init_cookie("http://zxgk.court.gov.cn/zhixing/")
print(f"该cookie已失效,重新走一遍412-200流程,{response.status_code}")
except Exception as e:
logging.error(f"请求搜索页报错,{e}")
def get_html(self):
"""
开始拿数据
"""
self.get_init_cookie(self.index_url)
for j in range(1, 20):
res = self.request_()
print(f"请求数据{j}页状态码:{res.status_code}")
print(res.text, "\n")
def process(self,condition):
self.get_html()
if __name__ == '__main__':
ctu_cookie = Ctu_cookie()
ctu_cookie.process({})