Bladeren bron

feat: 更新配置

- 更新配置
- 调整抽样数据
- 修复一些bug
许家凯 2 jaren geleden
bovenliggende
commit
63af21ff03
6 gewijzigde bestanden met toevoegingen van 254 en 51 verwijderingen
  1. 3 3
      handle/pull_sample_data.py
  2. 31 7
      handle/search_winhc_latest_date.py
  3. 20 14
      spider/winhc_job.py
  4. 29 27
      utils/dim_name_mapping.py
  5. 78 0
      utils/mysql_utils.py
  6. 93 0
      utils/xxl_queue.py

+ 3 - 3
handle/pull_sample_data.py

@@ -34,7 +34,7 @@ def pull_by_es(size: int = 20):
     dsl = {
         "_source": ["cname.show", "company_org_type_new_std", "province_code", "city_code", "county_code", "org_number",
                     "credit_code", "reg_number", "category_first_code", "category_second_code", "category_third_code"],
-        "size": 20,
+        "size": size,
         "query": {
             "bool": {
                 "must": [
@@ -57,7 +57,7 @@ def pull_by_es(size: int = 20):
         }
         , "sort": [
             {
-                "company_score_weight": {
+                "company_rank_sec": {
                     "order": "desc"
                 }
             }
@@ -95,5 +95,5 @@ def pull_by_es(size: int = 20):
 
 
 if __name__ == '__main__':
-    pull_by_es()
+    pull_by_es(size=100)
     pass

+ 31 - 7
handle/search_winhc_latest_date.py

@@ -8,10 +8,14 @@ from utils import map_2_json_str
 from utils.dim_name_mapping import get_latest_date_map
 from sdk.WinhcAllClient import get_all_client
 from utils.base_utils import tuple_max
+
 all_client = get_all_client()
 hbase_client = all_client.get_hbase_client()
 
 es_sdk = get_es_sdk("new")
+old_es_sdk = get_es_sdk("old")
+
+old_es_index = ['wenshu_detail2']
 
 
 def get_latest_date(index: str, company_id_f: str, company_id: str, latest_date_f: str):
@@ -19,10 +23,23 @@ def get_latest_date(index: str, company_id_f: str, company_id: str, latest_date_
         "size": 1,
         "_source": [latest_date_f],
         "query": {
-            "term": {
-                company_id_f: {
-                    "value": company_id
-                }
+            "bool": {
+                "must": [
+                    {
+                        "term": {
+                            company_id_f: {
+                                "value": company_id
+                            }
+                        }
+                    },
+                    # {
+                    #     "term": {
+                    #         "deleted": {
+                    #             "value": 0
+                    #         }
+                    #     }
+                    # }
+                ]
             }
         }
         , "sort": [
@@ -33,7 +50,11 @@ def get_latest_date(index: str, company_id_f: str, company_id: str, latest_date_
             }
         ]
     }
-    res = es_sdk.query(index=index, doc_type='_doc', dsl=dsl)
+    if index in old_es_index:
+        res = old_es_sdk.query(index=index, doc_type=None, dsl=dsl)
+        pass
+    else:
+        res = es_sdk.query(index=index, doc_type='_doc', dsl=dsl)
 
     if len(res) == 0:
         return None
@@ -48,10 +69,11 @@ def search_latest_date(company_id: str):
     result_data = {}
     for i in latest_date_map:
         str = latest_date_map[i]
-        max_date = ''
+        max_date = None
         for j in str.split(','):
             tmp_str = j.split(':')
             index = tmp_str[0]
+
             company_id_f = tmp_str[1]
             latest_date_f = tmp_str[2]
             tmp_date = get_latest_date(index=index, company_id_f=company_id_f, company_id=company_id,
@@ -65,6 +87,8 @@ def search_latest_date(company_id: str):
     tmp_res = hbase_client.get_record('ng_company', company_id)
     if tmp_res is not None and 'APPROVED_TIME' in tmp_res:
         result_data['基本信息'] = tmp_res['APPROVED_TIME']
+    else:
+        result_data['基本信息'] = None
     return result_data
     pass
 
@@ -74,7 +98,7 @@ if __name__ == '__main__':
     #                     company_id='059f83641cc4df8b9577cb1e2d89939e', latest_date_f='decision_date')
     # print(d)
 
-    d = search_latest_date(company_id='059f83641cc4df8b9577cb1e2d89939e')
+    d = search_latest_date(company_id='eda3ee85ad34656bd45bc587b21b6c26')
     print(map_2_json_str(d))
 
     pass

+ 20 - 14
spider/winhc_job.py

@@ -74,20 +74,26 @@ def main():
     pass
 
 
+def test(company_id):
+    data = {
+        "company_id": company_id,
+        "company_name": "",
+        "company_registered_date": "",
+        "company_org_type": "",
+        "province_code": "",
+        "city_code": "",
+        "county_code": "",
+        "org_number": "",
+        "reg_number": "",
+        "credit_code": "",
+    }
+
+    run(data)
+    pass
+
+
 if __name__ == '__main__':
     main()
-    # data = {
-    #     "company_id": "88f04cbfab150fe2bccdeec3aea32750",
-    #     "company_name": "",
-    #     "company_registered_date": "",
-    #     "company_org_type": "",
-    #     "province_code": "",
-    #     "city_code": "",
-    #     "county_code": "",
-    #     "org_number": "",
-    #     "reg_number": "",
-    #     "credit_code": "",
-    # }
-    #
-    # run(data)
+    # test('88f04cbfab150fe2bccdeec3aea32750')
+
     pass

+ 29 - 27
utils/dim_name_mapping.py

@@ -58,47 +58,48 @@ _winhc_dim_map = {
         "dim_name": "company_annual_report",
         "summary_key": "", "sort_field": ""
     },
-    "司法案件": {
+    "司法案件": {  # todo
         "dim_name": "",
         "summary_key": "", "sort_field": ""
     },
     "失信信息": {
         "dim_name": "company_dishonest_info",
-        "summary_key": "", "sort_field": ""
+        "summary_key": "", "sort_field": "keyno:pub_date"
     },
     "被执行人": {
         "dim_name": "company_zxr",
-        "summary_key": "", "sort_field": ""
+        "summary_key": "", "sort_field": "keyno:case_create_time"
     },
     "限制消费": {
         "dim_name": "company_zxr_restrict",
-        "summary_key": "", "sort_field": ""
+        "summary_key": "", "sort_field": "publish_date"
     },
     "终本案件": {
         "dim_name": "company_zxr_final_case",
-        "summary_key": "", "sort_field": ""
+        "summary_key": "", "sort_field": "keyno:case_create_time"
     },
-    "送达公告": {
+    "送达公告": {  # todo
         "dim_name": "company_send_announcement",
-        "summary_key": "", "sort_field": ""
+        "summary_key": "", "sort_field": "plaintiff_info.litigant_id:start_date,defendant_info.litigant_id:start_date"
     },
-    "裁判文书": {
+    "裁判文书": {  # fixme 需要跨集群查询
         "dim_name": "wenshu_detail_v2",
         "summary_key": "", "sort_field": ""
     },
     "开庭公告": {
         "dim_name": "company_court_open_announcement",
-        "summary_key": "", "sort_field": ""
+        "summary_key": "", "sort_field": "defendant_info.litigant_id:start_date,plaintiff_info.litigant_id:start_date"
     },
     "立案信息": {
         "dim_name": "company_court_register",
-        "summary_key": "", "sort_field": ""
+        "summary_key": "", "sort_field": "defendant_info.litigant_id:filing_date,plaintiff_info.litigant_id:filing_date"
     },
     "法院公告": {
         "dim_name": "company_court_announcement",
-        "summary_key": "", "sort_field": ""
+        "summary_key": "",
+        "sort_field": "defendant_info.litigant_id:publish_date,plaintiff_info.litigant_id:publish_date"
     },
-    "股权冻结": {
+    "股权冻结": {  # todo 没有时间
         "dim_name": "company_judicial_assistance",
         "summary_key": "", "sort_field": ""
     },
@@ -108,11 +109,11 @@ _winhc_dim_map = {
     },
     "司法拍卖": {
         "dim_name": "auction_tracking",
-        "summary_key": "", "sort_field": ""
+        "summary_key": "", "sort_field": "company_info.company_id:start_time"
     },
     "询价评估": {
         "dim_name": "zxr_evaluate,zxr_evaluate_results",
-        "summary_key": "", "sort_field": ""
+        "summary_key": "", "sort_field": "winhc_index_rt_zxr_evaluate_results:keyno:publish_time"
     },
     "限制出境": {
         "dim_name": "restrictions_on_exit",
@@ -128,11 +129,12 @@ _winhc_dim_map = {
     },
     "行政处罚": {
         "dim_name": "company_punishment_info,company_punishment_info_creditchina",
-        "summary_key": "", "sort_field": ""
+        "summary_key": "",
+        "sort_field": "winhc_index_rt_company_punishment_info:company_id:decision_date,winhc_index_rt_company_punishment_info_creditchina:company_id:decision_date"
     },
     "经营异常": {
         "dim_name": "company_abnormal_info",
-        "summary_key": "", "sort_field": ""
+        "summary_key": "", "sort_field": "put_date"
     },
     "股权出质": {  # todo
         "dim_name": "company_equity_info",
@@ -156,23 +158,23 @@ _winhc_dim_map = {
     },
     "环保处罚": {
         "dim_name": "company_env_punishment",
-        "summary_key": "", "sort_field": ""
+        "summary_key": "", "sort_field": "publish_time"
     },
     "税收违法": {
         "dim_name": "company_tax_contravention",
-        "summary_key": "", "sort_field": ""
+        "summary_key": "", "sort_field": "publish_time"
     },
     "欠税公告": {
         "dim_name": "company_own_tax",
-        "summary_key": "", "sort_field": ""
+        "summary_key": "", "sort_field": "publish_date"
     },
     "公示催告": {
         "dim_name": "company_public_announcement",
-        "summary_key": "", "sort_field": ""
+        "summary_key": "", "sort_field": "publish_date"
     },
     "严重违法": {
         "dim_name": "company_illegal_info",
-        "summary_key": "", "sort_field": ""
+        "summary_key": "", "sort_field": "put_date"
     },
     "简易注销": {
         "dim_name": "company_brief_cancel_announcement",
@@ -216,7 +218,7 @@ _winhc_dim_map = {
     },
     "行政许可": {
         "dim_name": "company_license,company_license_entpub,company_license_creditchina",
-        "summary_key": "", "sort_field": ""
+        "summary_key": "", "sort_field": "winhc_index_rt_company_license:company_id:start_date,winhc_index_rt_company_license_entpub:company_id:start_date,winhc_index_rt_company_license_creditchina:company_id:decision_date"
     },
     "土地公示": {
         "dim_name": "",
@@ -268,7 +270,7 @@ _winhc_dim_map = {
     },
     "抽查检查": {
         "dim_name": "company_check_info",
-        "summary_key": "", "sort_field": ""
+        "summary_key": "", "sort_field": "check_date"
     },
     "产权交易": {
         "dim_name": "",
@@ -276,9 +278,9 @@ _winhc_dim_map = {
     },
     "双随机抽查": {
         "dim_name": "company_double_random_check_info",
-        "summary_key": "", "sort_field": ""
+        "summary_key": "", "sort_field": "check_date"
     },
-    "商标": {
+    "商标": {  # todo
         "dim_name": "company_tm",
         "summary_key": "", "sort_field": ""
     },
@@ -396,8 +398,8 @@ for i in _winhc_dim_map:
             pass
     pass
 
-_latest_date_map = { \
-    }
+_latest_date_map = {
+}
 
 for i in _winhc_dim_map:
 

+ 78 - 0
utils/mysql_utils.py

@@ -0,0 +1,78 @@
+# -*- coding: utf-8 -*-
+# @Time : 2022/2/22 11:44
+# @Author : XuJiakai
+# @File : mysql_utils
+# @Software: PyCharm
+import pymysql
+import psycopg2
+from psycopg2 import extras
+
+
+def exec_sql(db_client, sql):
+    with db_client.cursor(cursor=pymysql.cursors.DictCursor) as cursor:
+        cursor.execute(sql)
+        result = cursor.fetchall()
+        return result
+    pass
+
+
+def exec_sql_by_holo(db_client, sql):
+    with db_client.cursor(cursor_factory=psycopg2.extras.DictCursor) as cursor:
+        cursor.execute(sql)
+        result = cursor.fetchall()
+        return result
+    pass
+
+
+def get_mysql_fields(db_client, table_schema, table_name):
+    with db_client.cursor(cursor=pymysql.cursors.DictCursor) as cursor:
+        cursor.execute(
+            f"select column_name, column_comment from information_schema.columns where table_schema ='{table_schema}' and table_name = '{table_name}'")
+        result = cursor.fetchall()
+        return result
+
+
+def get_mysql_tables(db_client, table_schema):
+    with db_client.cursor(cursor=pymysql.cursors.DictCursor) as cursor:
+        cursor.execute(
+            f"select table_name , table_comment from information_schema.tables  where table_schema ='{table_schema}' ")
+        result = cursor.fetchall()
+        return result
+
+
+def insert(json: map, table_name, db_client):
+    keys = list(json.keys())
+    return insert_many([json], keys, table_name, db_client)
+    pass
+
+
+def insert_many(data: list, key_list: list, table_name, db_client):
+    keys = ','.join(key_list)
+    vs = ','.join(['%s'] * len(key_list))
+    data = tuple([tuple(i.values()) for i in data])
+    sql = f"INSERT INTO {table_name}({keys}) \nvalues({vs}) "
+    # print(sql)
+
+    with db_client.cursor() as cursor:
+        try:
+            num = cursor.executemany(sql, data)
+            db_client.commit()
+            return num
+        except Exception as e:
+            print(e)
+            db_client.rollback()
+            return -1
+    pass
+
+
+if __name__ == '__main__':
+    from sdk.WinhcAllClient import get_all_client
+
+    all_client = get_all_client()
+
+    holo_client = all_client.get_holo_client(db='winhc_biz')
+    HOLO_TABLE_NAME = 'public.ads_waa_dim_info'
+
+
+
+    pass

+ 93 - 0
utils/xxl_queue.py

@@ -0,0 +1,93 @@
+# -*- coding: utf-8 -*-
+# @Time : 2022/12/2 9:19
+# @Author : XuJiakai
+# @File : xxl_queue
+# @Software: PyCharm
+
+
+class xxl_queue:
+
+    def __init__(self, pop_threshold, buff_size=100):
+        """
+        类似于开心消消乐的定长队列,攒够阈值,则弹出数据
+
+        注意:该队列目前不能按照最新插入时间,调整覆盖
+        :param pop_threshold: 弹出数据阈值
+        :param buff_size: 缓冲区大小
+        """
+        assert isinstance(buff_size, int) and buff_size > 0, "请给我一个大于零的整数来指定长度"
+        self.len = buff_size
+        self.data = [None] * self.len
+        self.hand = 0
+        self.pop_threshold = pop_threshold
+
+        pass
+
+    def append(self, key, obj):
+        print(key, obj)
+        result_list = None
+        flag = False
+        for i in range(len(self.data)):
+            if self.data[i] is None:
+                continue
+            _key = self.data[i][0]
+            _obj_list = self.data[i][1]
+            if _key == key:
+                if len(_obj_list) + 1 >= self.pop_threshold:
+                    r = _obj_list.copy()
+                    r.append(obj)
+                    result_list = r
+                    self.data[i] = None
+                    pass
+                else:
+                    self.data[i][1].append(obj)
+                    pass
+                flag = True
+                pass
+            pass
+
+        if not flag:
+            self._put(index=self.hand, key=key, obj=obj)
+            pass
+
+        return result_list
+        pass
+
+    def _put(self, index, key, obj):
+        if self.data[index] is None:
+            self.data[index] = (key, [obj])
+            self.hand = (self.hand + 1) % self.len  # 指针后移
+            pass
+        elif self.data[index][0] == key:
+            self.data[index][1].append(obj)
+        else:
+            self.data[index] = (key, [obj])  # 覆盖当前index
+            self.hand = (self.hand + 1) % self.len  # 指针后移
+            pass
+        pass
+
+    def get(self):
+        ret = self.data[self.hand:]
+        ret.extend(self.data[:self.hand])
+        return ret
+
+
+if __name__ == '__main__':
+    q = xxl_queue(pop_threshold=3, buff_size=5)
+
+    print(q.append('a', '1'))
+    print(q.append('a', '2'))
+    print(q.append('a', '3'))
+    print(q.append('b', '1'))
+    print(q.append('b', '2'))
+    print(q.append('b', '3'))
+    print(q.append('a', '3'))
+    print(q.append('b', '3'))
+    print(q.append('c', '3'))
+    print(q.append('d', '3'))
+    print(q.append('e', '3'))
+    print(q.append('f', '3'))
+    print(q.append('a', '1'))
+    print(q.append('a', '2'))
+
+    pass