Browse Source

fix: fixed some bugs

许家凯 2 years ago
parent
commit
351ad5b06c

+ 1 - 1
handle/pull_sample_data.py

@@ -118,7 +118,7 @@ def pull_by_max(size=100000):
         instance.wait_for_success()
 
     with odps_sdk.execute_sql(
-            'select * from out_winhc_data_analysis_pull_data where ds = ' + latest_ds + ' limit 100').open_reader(
+            'select * from out_winhc_data_analysis_pull_data where ds = ' + latest_ds + ' limit 1000').open_reader(
         tunnel=True) as reader:
         for record in reader:
             c = get_value(

+ 18 - 3
handle/search_winhc_latest_date.py

@@ -8,7 +8,11 @@ from utils import map_2_json_str
 from utils.dim_name_mapping import get_latest_date_map
 from sdk.WinhcAllClient import get_all_client
 from utils.base_utils import tuple_max
+from utils.datetime_utils import datetime_format_transform
+import re
+from log import get_log
 
+log = get_log("winhc_latest_date")
 all_client = get_all_client()
 hbase_client = all_client.get_hbase_client()
 
@@ -16,6 +20,7 @@ es_sdk = get_es_sdk("new")
 old_es_sdk = get_es_sdk("old")
 
 old_es_index = ['wenshu_detail2']
+date_part = re.compile('\\d{4}年\\d{2}月\\d{2}日')
 
 
 def get_latest_date(index: str, company_id_f: str, company_id: str, latest_date_f: str):
@@ -59,7 +64,14 @@ def get_latest_date(index: str, company_id_f: str, company_id: str, latest_date_
     if len(res) == 0 or latest_date_f not in res[0]:
         return None
 
-    return res[0][latest_date_f]
+    latest_date = res[0][latest_date_f]
+
+    # if date_part.match(latest_date)
+    if date_part.match(latest_date):
+        latest_date = datetime_format_transform(latest_date, '%Y年%m月%d日', "%Y-%m-%d %H:%M:%S")
+        pass
+
+    return latest_date
     pass
 
 
@@ -85,9 +97,11 @@ def search_latest_date(company_id: str):
         result_data[i] = max_date
         pass
 
-    tmp_res = hbase_client.get_record('ng_company', company_id)
+    tmp_res = hbase_client.get_record('ng_rt_company', company_id)
+
     if tmp_res is not None and 'APPROVED_TIME' in tmp_res:
         result_data['基本信息'] = tmp_res['APPROVED_TIME']
+        log.info('fetch hbase data: {}'.format(tmp_res))
     else:
         result_data['基本信息'] = None
     return result_data
@@ -99,7 +113,8 @@ if __name__ == '__main__':
     #                     company_id='059f83641cc4df8b9577cb1e2d89939e', latest_date_f='decision_date')
     # print(d)
 
-    d = search_latest_date(company_id='46a12c9c77fe5fdc615f8f4f1a5f5d8c')
+    d = search_latest_date(company_id='6e13b126ee0c5fcd8fe454693ab4bbda')
     print(map_2_json_str(d))
 
+
     pass

+ 5 - 2
handle/search_winhc_summary.py

@@ -7,7 +7,8 @@
 
 from sdk import get_es_sdk
 from utils import map_2_json_str
-
+from log import get_log
+log = get_log('winhc_summary')
 es_sdk = get_es_sdk("new")
 
 skip_item = ['司法拍卖', '经营异常', '企业年报', '产品信息', '历史变更', '抽查检查', '软件著作权', '作品著作权', '失信信息', '双随机抽查', '融资信息', '股东信息', '行政许可',
@@ -63,11 +64,13 @@ def search_summary(company_id: str):
         for j in m:
             res_map[j] = m[j]
 
+    log.info('fetch winhc summary: {}'.format(res_map))
+
     # print(map_2_json_str(res_map))
     return _summary_format(res_map)
 
 
 if __name__ == '__main__':
-    res_map = search_summary(company_id='a33b741a8258aa9d6f181162b5ee2787')
+    res_map = search_summary(company_id='6e13b126ee0c5fcd8fe454693ab4bbda')
     print(map_2_json_str(res_map))
     pass

+ 48 - 44
log.py

@@ -1,44 +1,48 @@
-# -*- coding: utf-8 -*-
-# @Time : 2021/6/7 11:02
-# @Author : XuJiakai
-# @File : log
-# @Software: PyCharm
-
-import logging
-import os
-import time
-
-
-def get_log(log_name="my_log", write_file=False):
-    logging.basicConfig()
-    logger = logging.getLogger(log_name)
-    logger.setLevel(logging.INFO)
-    logger.propagate = False
-    # 日志输出格式
-    formatter = logging.Formatter('[%(asctime)s - ' + log_name + ':%(lineno)d] - %(levelname)s: %(message)s')
-
-    if write_file:
-        # 创建logs文件夹
-        cur_path = os.path.dirname(os.path.realpath(__file__))
-        log_path = os.path.join(cur_path, 'logs')
-        # 如果不存在这个logs文件夹,就自动创建一个
-        if not os.path.exists(log_path):
-            os.mkdir(log_path)
-        log_name = os.path.join(log_path, '%s.log' % time.strftime('%Y_%m_%d'))
-        fh = logging.FileHandler(log_name, 'a', encoding='utf-8')  # 这个是python3的
-        # 创建一个FileHandler,用于写到本地
-        fh.setLevel(logging.INFO)
-        fh.setFormatter(formatter)
-        logger.addHandler(fh)
-
-    # 创建一个StreamHandler,用于输出到控制台
-    ch = logging.StreamHandler()
-    ch.setLevel(logging.INFO)
-    ch.setFormatter(formatter)
-    logger.addHandler(ch)
-
-    return logger
-
-
-if __name__ == '__main__':
-    print(''.encode('utf-8'))
+# -*- coding: utf-8 -*-
+# @Time : 2021/6/7 11:02
+# @Author : XuJiakai
+# @File : log
+# @Software: PyCharm
+
+import logging
+import os
+import time
+import platform
+
+AUTO_WRITE_LOG_FILE = False if platform.system() == "Linux" else True
+
+
+def get_log(log_name="my_log", write_file=AUTO_WRITE_LOG_FILE):
+    logging.basicConfig()
+    logger = logging.getLogger(log_name)
+    logger.setLevel(logging.INFO)
+    logger.propagate = False
+    # 日志输出格式
+    formatter = logging.Formatter('[%(asctime)s - ' + log_name + ':%(lineno)d] - %(levelname)s: %(message)s')
+
+    if write_file:
+        # 创建logs文件夹
+        cur_path = os.path.dirname(os.path.realpath(__file__))
+        log_path = os.path.join(cur_path, 'logs')
+        # 如果不存在这个logs文件夹,就自动创建一个
+        if not os.path.exists(log_path):
+            os.mkdir(log_path)
+        log_name = os.path.join(log_path, '%s.log' % time.strftime('%Y_%m_%d'))
+        fh = logging.FileHandler(log_name, 'a', encoding='utf-8')  # 这个是python3的
+        # 创建一个FileHandler,用于写到本地
+        fh.setLevel(logging.INFO)
+        fh.setFormatter(formatter)
+        logger.addHandler(fh)
+
+    # 创建一个StreamHandler,用于输出到控制台
+    ch = logging.StreamHandler()
+    ch.setLevel(logging.INFO)
+    ch.setFormatter(formatter)
+    logger.addHandler(ch)
+
+    return logger
+
+
+if __name__ == '__main__':
+    print(AUTO_WRITE_LOG_FILE)
+    pass

+ 4 - 2
spider/winhc_job.py

@@ -43,6 +43,7 @@ def run(data: map):
         "latest_date": latest_date,
         "spider_date": get_now()
     }
+    log.info('insert mongo: {}'.format(output_data))
     # print(map_2_json_str(output_data))
     try:
         col.insert_one(output_data)
@@ -72,7 +73,8 @@ def main():
 
     def callback(ch, method, properties, body):
         data = json.loads(body.decode())
-        print(data)
+        log.info('receive data: {}'.format(data))
+
         q.put(data)
         ch.basic_ack(delivery_tag=method.delivery_tag)  # 手动应答ack,确保消息真正消费后才应答
         pass
@@ -102,6 +104,6 @@ def test(company_id):
 
 if __name__ == '__main__':
     main()
-    # test('88f04cbfab150fe2bccdeec3aea32750')
+    # test('3667556dcfeda7c18c9745f6b8133165')
 
     pass

+ 6 - 0
utils/datetime_utils.py

@@ -15,5 +15,11 @@ def get_now():
     return datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
 
 
+def datetime_format_transform(datetime_str: str, source_format: str, target_format: str):
+    middle = datetime.datetime.strptime(datetime_str, source_format)
+    return datetime.datetime.strftime(middle, target_format)
+    pass
+
+
 if __name__ == '__main__':
     pass

+ 11 - 9
utils/dim_name_mapping.py

@@ -84,7 +84,7 @@ _winhc_dim_map = {
     },
     "裁判文书": {  # fixme 需要跨集群查询
         "dim_name": "wenshu_detail_v2",
-        "summary_key": "", "sort_field": ""
+        "summary_key": "wenshu_detail_v2_del_0_defendant,wenshu_detail_v2_del_0_plaintiff", "sort_field": "wenshu_detail2:litigant_info.litigant_id:judge_date"
     },
     "开庭公告": {
         "dim_name": "company_court_open_announcement",
@@ -121,7 +121,7 @@ _winhc_dim_map = {
     },
     "诉前调解": {
         "dim_name": "litigation_mediation",
-        "summary_key": "", "sort_field": ""
+        "summary_key": "litigation_mediation_del_0_plaintiff,litigation_mediation_del_0_defendant", "sort_field": "defendant_info.litigant_id:filing_date,plaintiff_info.litigant_id:filing_date"
     },
     "破产债权": {
         "dim_name": "",
@@ -200,9 +200,10 @@ _winhc_dim_map = {
         "dim_name": "",
         "summary_key": "", "sort_field": ""
     },
-    "招投标": {
-        "dim_name": "company_bid",
-        "summary_key": "", "sort_field": ""
+    "招投标": {  # todo v9没有该维度
+        "dim_name": "company_bid_new",
+        "summary_key": "company_bid_new_del_0_supplier,company_bid_new_del_0_purchaser",
+        "sort_field": "winhc_index_rt_company_bid_new:proxy.keyno:publish_time,winhc_index_rt_company_bid_new:purchaser_info.keyno:publish_time,winhc_index_rt_company_bid_new:supplier_info.keyno:publish_time"
     },
     "招聘信息": {
         "dim_name": "company_employment",
@@ -218,15 +219,16 @@ _winhc_dim_map = {
     },
     "行政许可": {
         "dim_name": "company_license,company_license_entpub,company_license_creditchina",
-        "summary_key": "", "sort_field": "winhc_index_rt_company_license:company_id:start_date,winhc_index_rt_company_license_entpub:company_id:start_date,winhc_index_rt_company_license_creditchina:company_id:decision_date"
+        "summary_key": "",
+        "sort_field": "winhc_index_rt_company_license:company_id:start_date,winhc_index_rt_company_license_entpub:company_id:start_date,winhc_index_rt_company_license_creditchina:company_id:decision_date"
     },
     "土地公示": {
-        "dim_name": "",
-        "summary_key": "", "sort_field": ""
+        "dim_name": "company_land_publicity",
+        "summary_key": "", "sort_field": "publication_date"
     },
     "购地信息": {
         "dim_name": "company_land_announcement",
-        "summary_key": "", "sort_field": ""
+        "summary_key": "", "sort_field": "contract_date"
     },
     "土地转让-受让人": {
         "dim_name": "",

+ 4 - 3
utils/mysql_utils.py

@@ -6,6 +6,9 @@
 import pymysql
 import psycopg2
 from psycopg2 import extras
+from log import get_log
+
+log = get_log('exec_sql')
 
 
 def exec_sql(db_client, sql):
@@ -59,7 +62,7 @@ def insert_many(data: list, key_list: list, table_name, db_client):
             db_client.commit()
             return num
         except Exception as e:
-            print(e)
+            log.error("insert exec error: {}".format(e))
             db_client.rollback()
             return -1
     pass
@@ -73,6 +76,4 @@ if __name__ == '__main__':
     holo_client = all_client.get_holo_client(db='winhc_biz')
     HOLO_TABLE_NAME = 'public.ads_waa_dim_info'
 
-
-
     pass

+ 0 - 1
utils/xxl_queue.py

@@ -24,7 +24,6 @@ class xxl_queue:
         pass
 
     def append(self, key, obj):
-        print(key, obj)
         result_list = None
         flag = False
         for i in range(len(self.data)):