|
@@ -12,6 +12,7 @@ from aio_pika import IncomingMessage
|
|
|
from environs import Env
|
|
|
|
|
|
from data_clean.task_distributor import task_distribute
|
|
|
+from data_clean.utils import get_log
|
|
|
from data_clean.utils.async_client import get_aio_kafka_consumer, get_aio_kafka_producer, get_rabbitmq_connection
|
|
|
from data_clean.utils.asyncio_pool import AsyncPool
|
|
|
|
|
@@ -25,6 +26,7 @@ source_topic = env.str("source_topic", base_topic) # "rt_company_dim"
|
|
|
target_topic = env.str("target_topic", base_topic) # "rt_company_dim"
|
|
|
|
|
|
max_concurrency = env.int("concurrency", 1)
|
|
|
+log = get_log("JobMain")
|
|
|
|
|
|
|
|
|
async def handle(producer, data: dict):
|
|
@@ -46,8 +48,8 @@ async def on_message_received(producer, msg: IncomingMessage):
|
|
|
|
|
|
|
|
|
async def main_for_rabbitmq():
|
|
|
- print("start job. Listening queue :", source_topic, "send topic:", target_topic, "max concurrency:",
|
|
|
- max_concurrency)
|
|
|
+ log.info("start job. Listening queue :", source_topic, "send topic:", target_topic, "max concurrency:",
|
|
|
+ max_concurrency)
|
|
|
|
|
|
pool = AsyncPool(max_concurrency)
|
|
|
|