Bläddra i källkod

feat: 添加从maxcompute抽样数据

- 添加从maxcompute抽样数据
- fixed bug: latest_date
许家凯 2 år sedan
förälder
incheckning
1eb389ef48

Filskillnaden har hållts tillbaka eftersom den är för stor
+ 39 - 0
handle/pull_data.sql


+ 57 - 2
handle/pull_sample_data.py

@@ -3,7 +3,7 @@
 # @Author : XuJiakai
 # @Author : XuJiakai
 # @File : pull_sample_data
 # @File : pull_sample_data
 # @Software: PyCharm
 # @Software: PyCharm
-import json
+import json, os
 from project_const import TOPIC_NAME
 from project_const import TOPIC_NAME
 from sdk.WinhcElasticSearchSDK import get_new_es
 from sdk.WinhcElasticSearchSDK import get_new_es
 from utils.base_utils import json_path
 from utils.base_utils import json_path
@@ -11,6 +11,10 @@ from utils import map_2_json_str
 from utils.category_utils import get_value
 from utils.category_utils import get_value
 from utils.pca_code_utils import get_name
 from utils.pca_code_utils import get_name
 from sdk.WinhcAllClient import get_all_client
 from sdk.WinhcAllClient import get_all_client
+from utils.odps_schema_utils import get_last_partition_ds, get_partition_ds
+from sdk.WinhcAllClient import get_odps_sdk
+
+_module_path = os.path.dirname(__file__)
 
 
 RABBITMQ_TOPIC = TOPIC_NAME
 RABBITMQ_TOPIC = TOPIC_NAME
 
 
@@ -94,6 +98,57 @@ def pull_by_es(size: int = 20):
     pass
     pass
 
 
 
 
+odps_sdk = get_odps_sdk()
+
+
+def pull_by_max(size=100000):
+    f = open(os.path.join(_module_path, 'pull_data.sql'), encoding='utf-8')
+    sql = f.read().strip()
+
+    company_rank_latest_ds = get_last_partition_ds(tab='calc_company_rank_out', project='winhc_ng')
+    from utils.datetime_utils import get_ds
+    latest_ds = get_ds()
+
+    sql = sql.format(limit_num=size, company_rank_latest_ds=company_rank_latest_ds, latest_ds=latest_ds)
+    # print(sql)
+
+    all_ds = get_partition_ds(tab='out_winhc_data_analysis_pull_data', project='winhc_ng')
+    if latest_ds not in all_ds:
+        instance = odps_sdk.run_sql(sql)
+        instance.wait_for_success()
+
+    with odps_sdk.execute_sql(
+            'select * from out_winhc_data_analysis_pull_data where ds = ' + latest_ds + ' limit 100').open_reader(
+        tunnel=True) as reader:
+        for record in reader:
+            c = get_value(
+                c1=record['cate_first_code'], c2=record['cate_second_code'],
+                c3=record['cate_third_code'])
+
+            a = get_name(province_code=record['province_code'], city_code=record['city_code'],
+                         county_code=record['county_code'])
+
+            ele = {
+                "company_id": record['company_id'],
+                "company_name": record['company_name'],
+                "company_org_type": record['company_org_type'],
+                "province": a[0],
+                "city": a[1],
+                "county": a[2],
+                "org_number": record['org_number'],
+                "credit_code": record['credit_code'],
+                "reg_number": record['reg_number'],
+                "cate_first": c[0],
+                "cate_second": c[1],
+                "cate_third": c[2],
+            }
+            # print(map_2_json_str(ele))
+            r_sdk.send_by_fanout(RABBITMQ_TOPIC, json.dumps(ele, ensure_ascii=False).encode())
+
+    pass
+
+
 if __name__ == '__main__':
 if __name__ == '__main__':
-    pull_by_es(size=100)
+    # pull_by_es(size=100)
+    pull_by_max(size=10000)
     pass
     pass

+ 10 - 9
handle/search_winhc_latest_date.py

@@ -32,13 +32,13 @@ def get_latest_date(index: str, company_id_f: str, company_id: str, latest_date_
                             }
                             }
                         }
                         }
                     },
                     },
-                    # {
-                    #     "term": {
-                    #         "deleted": {
-                    #             "value": 0
-                    #         }
-                    #     }
-                    # }
+                    {
+                        "term": {
+                            "deleted": {
+                                "value": 0
+                            }
+                        }
+                    }
                 ]
                 ]
             }
             }
         }
         }
@@ -56,8 +56,9 @@ def get_latest_date(index: str, company_id_f: str, company_id: str, latest_date_
     else:
     else:
         res = es_sdk.query(index=index, doc_type='_doc', dsl=dsl)
         res = es_sdk.query(index=index, doc_type='_doc', dsl=dsl)
 
 
-    if len(res) == 0:
+    if len(res) == 0 or latest_date_f not in res[0]:
         return None
         return None
+
     return res[0][latest_date_f]
     return res[0][latest_date_f]
     pass
     pass
 
 
@@ -98,7 +99,7 @@ if __name__ == '__main__':
     #                     company_id='059f83641cc4df8b9577cb1e2d89939e', latest_date_f='decision_date')
     #                     company_id='059f83641cc4df8b9577cb1e2d89939e', latest_date_f='decision_date')
     # print(d)
     # print(d)
 
 
-    d = search_latest_date(company_id='eda3ee85ad34656bd45bc587b21b6c26')
+    d = search_latest_date(company_id='46a12c9c77fe5fdc615f8f4f1a5f5d8c')
     print(map_2_json_str(d))
     print(map_2_json_str(d))
 
 
     pass
     pass

+ 2 - 1
handle/search_winhc_summary.py

@@ -68,5 +68,6 @@ def search_summary(company_id: str):
 
 
 
 
 if __name__ == '__main__':
 if __name__ == '__main__':
-    search_summary(company_id='64406e27d43838e78aae09d8096ef7ff')
+    res_map = search_summary(company_id='a33b741a8258aa9d6f181162b5ee2787')
+    print(map_2_json_str(res_map))
     pass
     pass

+ 134 - 0
utils/odps_schema_utils.py

@@ -0,0 +1,134 @@
+# -*- coding: utf-8 -*-
+# @Time : 2020/12/16 15:25
+# @Author : XuJiakai
+# @File : odps_schema_util
+# @Software: PyCharm
+
+from odps import ODPS
+
+odps = ODPS('LTAI4G4yiyJV4ggnLyGMduqV', 'nokDg5HlVIBh80nL2dOXsKa2La4XL5', 'winhc_biz',
+            endpoint='http://service.odps.aliyun.com/api')
+
+
+def execute_sql(sql):
+    with odps.execute_sql(sql).open_reader() as reader:
+        # print(type(reader.raw))
+        # print(reader.raw)
+        return reader.to_pandas()
+    return None
+
+
+def exists_tab(tab, project):
+    return odps.exist_table(tab, project=project)
+
+
+def list_tab(project, prefix=None):
+    return odps.list_tables(project=project, prefix=prefix)
+    pass
+
+
+def get_tabs(project, prefix=None, owner=None):
+    ts = odps.list_tables(project, prefix=prefix, owner=owner)
+    return [i.name for i in ts]
+
+
+def get_cols(tn, project='winhc_eci_dev'):
+    t = odps.get_table(tn, project)
+    cols = t.schema.columns
+    return [i.name for i in cols]
+
+
+def get_cols_remove_partition_cols(tn, project='winhc_eci_dev'):
+    t = odps.get_table(tn, project)
+    cols = t.schema.columns
+    partition_cols = [i.name for i in t.schema.partitions]
+    return [i.name for i in cols if i.name not in partition_cols]
+
+
+def query(sql):
+    with odps.execute_sql(sql).open_reader() as reader:
+        li = []
+        for record in reader:
+            # print(record['value'][0:10])
+            li.append(record['value'] + "\n")
+        return li
+
+    pass
+
+
+def get_cols_type(tn, project='winhc_ng'):
+    t = odps.get_table(tn, project)
+    cols = t.schema.columns
+    m = []
+    for i in cols:
+        m.append((i.name, str(i.type)))
+
+    return m
+
+
+def get_cols_type_desc(tn, project='winhc_ng'):
+    t = odps.get_table(tn, project)
+    cols = t.schema.columns
+    m = []
+    for i in cols:
+        m.append((i.name, str(i.type), i.comment))
+
+    return m
+
+
+def get_partition_cols(tn, project='winhc_ng'):
+    table = odps.get_table(tn, project)
+    return [i.name for i in table.schema.partitions]
+
+
+def get_last_partition_ds(tab, project, default=None):
+    li = get_partition_ds(tab, project)
+    if len(li) == 0:
+        return default
+    else:
+        return li[-1]
+    pass
+
+
+def get_partition_ds(tab, project):
+    li = get_partition(tab, project, 'ds=')
+    l = []
+    for i in li:
+        if '/' in i:
+            l.append([j for j in i.split('/') if 'ds=' in j][0].split('=')[1])
+        else:
+            l.append(i.split('=')[1])
+    return l
+    pass
+
+
+def get_partition(tab, project, expression=None):
+    with odps.execute_sql(f"show partitions {project}.{tab}").open_reader() as reader:
+        li = reader.raw.split('\n')
+        li = [i for i in li if li is not None and i.strip() != '']
+        if expression:
+            li = [i for i in li if expression in i]
+        return li
+
+
+def show_partitions(tn, project='winhc_ng'):
+    table = odps.get_table(tn, project)
+    return [i.name for i in table.partitions]
+    pass
+
+
+if __name__ == '__main__':
+    res = list_tab("winhc_ng", prefix='ads_')
+    str = ""
+    for i in res:
+        str += '"{a}":"{b}",'.format(a=i.name[4:], b=i.comment)
+
+    from utils import set_text
+    set_text(str)
+    print(str)
+    # li = query(
+    #     "SELECT * from winhc_ng.tmp_dynamic_test1 where ds = '20220719' and INSTR(value, 'company_staff') > 0 and INSTR(value, 'company_holder') > 0;")
+    # f = open("D://out_dynamic_test1.txt", mode='w', encoding='utf-8')
+    # f.writelines(li)
+    # print(li)
+    pass