|
@@ -120,8 +120,10 @@ def pull_by_max(size=100000):
|
|
|
|
|
|
all_ds = get_partition_ds(tab='out_winhc_data_analysis_pull_data', project='winhc_ng')
|
|
all_ds = get_partition_ds(tab='out_winhc_data_analysis_pull_data', project='winhc_ng')
|
|
if latest_ds not in all_ds:
|
|
if latest_ds not in all_ds:
|
|
|
|
+ log.info("exec sql: {}".format(sql))
|
|
instance = odps_sdk.run_sql(sql)
|
|
instance = odps_sdk.run_sql(sql)
|
|
instance.wait_for_success()
|
|
instance.wait_for_success()
|
|
|
|
+ log.info("开始推送数据...")
|
|
|
|
|
|
with odps_sdk.execute_sql(
|
|
with odps_sdk.execute_sql(
|
|
'select * from out_winhc_data_analysis_pull_data where ds = ' + latest_ds + '').open_reader(
|
|
'select * from out_winhc_data_analysis_pull_data where ds = ' + latest_ds + '').open_reader(
|
|
@@ -151,6 +153,8 @@ def pull_by_max(size=100000):
|
|
# print(map_2_json_str(ele))
|
|
# print(map_2_json_str(ele))
|
|
r_sdk.send_by_fanout(RABBITMQ_TOPIC, json.dumps(ele, ensure_ascii=False).encode())
|
|
r_sdk.send_by_fanout(RABBITMQ_TOPIC, json.dumps(ele, ensure_ascii=False).encode())
|
|
|
|
|
|
|
|
+ log.info('数据推送完成.')
|
|
|
|
+
|
|
pass
|
|
pass
|
|
|
|
|
|
|
|
|