# from src.plugins.CreditImportExport.CustomsImportExport.CustomsImportExportDetail import * from urllib import parse import logging import ddddocr import requests import re,json,random,time logging.basicConfig(level=logging.INFO) class Ctu_cookie: def __init__(self): self.headers = { "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9", "Accept-Language":"zh-CN,zh;q=0.9", "Cache-Control": "no-cache", "Connection": "keep-alive", "Pragma": "no-cache", "Upgrade-Insecure-Requests": "1", "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/106.0.0.0 Safari/537.36" } self.headers_api = { "Content-Type": "application/json", "Accept": "application/json" } self.index_url = "http://zxgk.court.gov.cn/zhixing/" self.url_search = "http://zxgk.court.gov.cn/zhixing/searchBzxr.do" self.session = requests.session() self.cookie =None self.second_content=None self.second_innerjs=None self.second_win_ts =None def initial(self, options, proxy_queue, index): logging.debug(f"{self.__class__.__name__}_{proxy_queue}_{index}") self._options = options def get_html_202(self, url): self.session = requests.session() html = self.session.get(url, headers=self.headers) content = re.findall(r'', html.text)[0].replace('"', '') win_ts_url = re.findall('" src="(.*?)" r=\'m\'>', html.text)[0] win_ts_url = parse.urljoin(url, win_ts_url) innerjs = requests.get(win_ts_url, headers=self.headers, verify=False).text win_ts = re.findall(r"r='m'>(.*?)", html.text, re.S)[1] return content, innerjs, win_ts def get_html_200(self, response): second_content = re.findall(r'', response.text)[0].replace('"', '') second_win_ts_url = 'http://zxgk.court.gov.cn/U52nf4AkCaDm/fYlbxzjRpgxD.11afee1.js' second_innerjs = self.session.get(second_win_ts_url, headers=self.headers, verify=False).text second_win_ts = re.findall(r"r='m'>(.*?)", response.text)[1] return second_content, second_innerjs, second_win_ts def update(self): """ #刷新cookie """ data_update = { 'content': self.second_content, 'innerjs': self.second_innerjs, 'win_ts': self.second_win_ts, 'cookie': self.cookie, } result = requests.post('http://127.0.0.1:8006/rs_update_cookie', json=data_update,headers=self.headers_api) if result.status_code == 200: self.cookie = json.loads(result.text)['cookie'] self.session.cookies.update({'lqWVdQzgOVyaT':self.cookie}) logging.info(f"更新cookie_length:{len(self.cookie)},cookie:{self.cookie}") else: logging.error(f"更新cookie失败,状态码:{result.status_code}") def get_init_cookie(self, index_url): """ # 初始化cookie """ #202 for ret in range(5): try: content,innerjs,win_ts = self.get_html_202(index_url) data = { 'content': content, 'innerjs':innerjs, 'win_ts':win_ts, } result = requests.post('http://127.0.0.1:8006/rs_202', json=data, headers=self.headers_api) self.cookie = json.loads(result.text)['cookie'] self.session.cookies.update({'lqWVdQzgOVyaT':self.cookie}) response = self.session.get(index_url, headers=self.headers) logging.info(f"412-200状态码:{response.status_code},第一次cookie:{len(self.cookie)},{self.cookie}") #200 self.second_content,self.second_innerjs,self.second_win_ts = self.get_html_200(response) break except Exception as e: logging.error(f"请求首页报错,{e}") def get_captchaId(self): chars = ['0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M', 'N', 'O', 'P', 'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X', 'Y', 'Z', 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z'] nums = "" for i in range(32): ids = int(random.random() * 61) nums += chars[ids] return nums def get_img(self): """ 验证码部分 """ while True: self.update() captchaId = self.get_captchaId() params = { "captchaId": captchaId, "random": str(random.random()) } try: response = self.session.get("http://zxgk.court.gov.cn/zhixing/captcha.do", headers=self.headers, params=params) print("验证码状态码:", response.status_code) if response.status_code == 200: ocr = ddddocr.DdddOcr() yzm_code = ocr.classification(response.content) return yzm_code,captchaId except Exception as e: logging.error(f"请求验证码失败,{e}") def request_(self): """ 业务请求 """ for ret in range(6): yzm_code, captchaId = self.get_img() self.update() try: data = { "pName": "张三", "pCardNum": "", "selectCourtId": "0", "pCode": yzm_code, "captchaId": captchaId, "searchCourtName": "全国法院(包含地方各级法院)", "selectCourtArrange": "1", "currentPage": "1" } response = self.session.post(self.url_search, headers=self.headers, data=data, timeout=10) if response.status_code == 200: return response elif response.status_code == 502: print(f"请求搜索页状态码不正确,{response.status_code}") elif response.status_code == 500: time.sleep(3) print(f"对方服务器出错,延时3s,{response.status_code}") else: self.get_init_cookie("http://zxgk.court.gov.cn/zhixing/") print(f"该cookie已失效,重新走一遍412-200流程,{response.status_code}") except Exception as e: logging.error(f"请求搜索页报错,{e}") def get_html(self): """ 开始拿数据 """ self.get_init_cookie(self.index_url) for j in range(1, 20): res = self.request_() print(f"请求数据{j}页状态码:{res.status_code}") print(res.text, "\n") def process(self,condition): self.get_html() if __name__ == '__main__': ctu_cookie = Ctu_cookie() ctu_cookie.process({})