Browse Source

feat: winhc爬虫

- winhc爬虫
- 整合rabbit mq
许家凯 2 years ago
parent
commit
73e81e2139

+ 3 - 1
env/env-dev.yaml

@@ -51,7 +51,9 @@ mysql:
     user: wenshu
     pwd: wenshu_168
 
-
+rabbit_mq:
+  base:
+    host: 106.15.78.184
 
 holo:
   base:

+ 3 - 0
env/env-prod.yaml

@@ -45,3 +45,6 @@ holo:
     username: LTAI4G4yiyJV4ggnLyGMduqV
     pwd: nokDg5HlVIBh80nL2dOXsKa2La4XL5
 
+rabbit_mq:
+  base:
+    host: 192.168.2.63

+ 3 - 3
handle/pull_sample_data.py

@@ -4,9 +4,7 @@
 # @File : pull_sample_data
 # @Software: PyCharm
 import json
-
-RABBITMQ_TOPIC = "xjk_test"
-
+from project_const import TOPIC_NAME
 from sdk.WinhcElasticSearchSDK import get_new_es
 from utils.base_utils import json_path
 from utils import map_2_json_str
@@ -14,6 +12,8 @@ from utils.category_utils import get_value
 from utils.pca_code_utils import get_name
 from sdk.WinhcAllClient import get_all_client
 
+RABBITMQ_TOPIC = TOPIC_NAME
+
 all_sdk = get_all_client()
 r_sdk = all_sdk.get_rabbit_mq_sdk()
 

+ 3 - 3
handle/search_winhc_latest_date.py

@@ -7,7 +7,7 @@ from sdk import get_es_sdk
 from utils import map_2_json_str
 from utils.dim_name_mapping import get_latest_date_map
 from sdk.WinhcAllClient import get_all_client
-
+from utils.base_utils import tuple_max
 all_client = get_all_client()
 hbase_client = all_client.get_hbase_client()
 
@@ -56,14 +56,14 @@ def search_latest_date(company_id: str):
             latest_date_f = tmp_str[2]
             tmp_date = get_latest_date(index=index, company_id_f=company_id_f, company_id=company_id,
                                        latest_date_f=latest_date_f)
-            max_date = max(max_date, tmp_date)
+            max_date = tuple_max(max_date, tmp_date)
             pass
 
         result_data[i] = max_date
         pass
 
     tmp_res = hbase_client.get_record('ng_company', company_id)
-    if 'APPROVED_TIME' in tmp_res:
+    if tmp_res is not None and 'APPROVED_TIME' in tmp_res:
         result_data['基本信息'] = tmp_res['APPROVED_TIME']
     return result_data
     pass

+ 4 - 4
handle/search_winhc_summary.py

@@ -12,6 +12,7 @@ es_sdk = get_es_sdk("new")
 
 skip_item = ['司法拍卖', '经营异常', '企业年报', '产品信息', '历史变更', '抽查检查', '软件著作权', '作品著作权', '失信信息', '双随机抽查', '融资信息', '股东信息', '行政许可',
              '专利', '行政处罚', '主要成员', '商标', '被执行人', '终本案件', '限制消费', '询价评估']
+skip_item.clear()
 
 
 def _summary_format(summary: map):
@@ -35,9 +36,8 @@ def _summary_format(summary: map):
         if i in result_data:
             del result_data[i]
 
-    print(result_data.keys())
-
-    print(map_2_json_str(result_data))
+    # print(result_data.keys())
+    # print(map_2_json_str(result_data))
 
     return result_data
     pass
@@ -63,7 +63,7 @@ def search_summary(company_id: str):
         for j in m:
             res_map[j] = m[j]
 
-    print(map_2_json_str(res_map))
+    # print(map_2_json_str(res_map))
     return _summary_format(res_map)
 
 

+ 10 - 0
project_const.py

@@ -0,0 +1,10 @@
+# -*- coding: utf-8 -*-
+# @Time : 2022/12/5 14:12
+# @Author : XuJiakai
+# @File : project_const
+# @Software: PyCharm
+
+TOPIC_NAME = 'xjk_test'
+
+MONGODB_NAME = 'a_xjk_tmp_cpa'
+

+ 93 - 0
spider/winhc_job.py

@@ -0,0 +1,93 @@
+# -*- coding: utf-8 -*-
+# @Time : 2022/12/1 14:11
+# @Author : XuJiakai
+# @File : winhc_job
+# @Software: PyCharm
+
+import json
+import queue
+from threading import Thread
+
+from handle.search_winhc_latest_date import search_latest_date
+from handle.search_winhc_summary import search_summary
+from sdk.WinhcAllClient import get_all_client
+from utils import map_2_json_str
+from utils.datetime_utils import get_ds, get_now
+
+from project_const import TOPIC_NAME, MONGODB_NAME
+
+all_client = get_all_client()
+col = all_client.get_mongo_collection(MONGODB_NAME)
+
+
+def run(data: map):
+    company_id = data['company_id']
+    summary = search_summary(company_id)
+    latest_date = search_latest_date(company_id)
+
+    _id = get_ds() + '_' + company_id + '_winhc'
+
+    output_data = {
+        "_id": _id,
+        "base_info": data,
+        "competitor_product_name": "winhc",
+        "summary": summary,
+        "latest_date": latest_date,
+        "spider_date": get_now()
+    }
+    # print(map_2_json_str(output_data))
+    try:
+        col.insert_one(output_data)
+    except Exception as e:
+        pass
+    pass
+
+
+r_sdk = all_client.get_rabbit_mq_sdk()
+
+RABBITMQ_TOPIC = TOPIC_NAME
+q = queue.Queue(5000)
+
+
+class Work(Thread):
+    def run(self):
+        while True:
+            run(q.get())
+
+
+def main():
+    thread_num = 10
+
+    for i in range(thread_num):
+        Work().start()
+        pass
+
+    def callback(ch, method, properties, body):
+        data = json.loads(body.decode())
+        print(data)
+        q.put(data)
+        ch.basic_ack(delivery_tag=method.delivery_tag)  # 手动应答ack,确保消息真正消费后才应答
+        pass
+
+    r_sdk.consumer_by_fanout(RABBITMQ_TOPIC, callback=callback)
+
+    pass
+
+
+if __name__ == '__main__':
+    main()
+    # data = {
+    #     "company_id": "88f04cbfab150fe2bccdeec3aea32750",
+    #     "company_name": "",
+    #     "company_registered_date": "",
+    #     "company_org_type": "",
+    #     "province_code": "",
+    #     "city_code": "",
+    #     "county_code": "",
+    #     "org_number": "",
+    #     "reg_number": "",
+    #     "credit_code": "",
+    # }
+    #
+    # run(data)
+    pass

+ 10 - 2
utils/base_utils.py

@@ -45,12 +45,20 @@ def tuple_max(*tu: tuple):
             result = i
             pass
         else:
-            if i[0] is not None and i[0] >= result[0]:
-                result = i
+            if i is tuple:
+                if i[0] is not None and i[0] >= result[0]:
+                    result = i
+                pass
+            else:
+                if i is not None and i >= result:
+                    result = i
+                pass
+
             pass
         pass
     return result
 
 
 if __name__ == '__main__':
+    print(tuple_max('4', None))
     pass