|
@@ -6,29 +6,77 @@
|
|
import asyncio
|
|
import asyncio
|
|
import json
|
|
import json
|
|
|
|
|
|
|
|
+import aio_pika
|
|
|
|
+from aio_pika import IncomingMessage
|
|
from environs import Env
|
|
from environs import Env
|
|
|
|
|
|
from data_clean.task_distributor import task_distribute
|
|
from data_clean.task_distributor import task_distribute
|
|
-from data_clean.utils.async_client import get_aio_kafka_consumer, get_aio_kafka_producer
|
|
|
|
|
|
+from data_clean.utils.async_client import get_aio_kafka_consumer, get_aio_kafka_producer, get_rabbitmq_connection
|
|
from data_clean.utils.asyncio_pool import AsyncPool
|
|
from data_clean.utils.asyncio_pool import AsyncPool
|
|
|
|
|
|
-source_topic = "source_topic"
|
|
|
|
-target_topic = "target_topic"
|
|
|
|
-
|
|
|
|
env = Env()
|
|
env = Env()
|
|
|
|
+
|
|
|
|
+base_topic = env.str("base_topic", "rt_other_dim")
|
|
|
|
+
|
|
|
|
+source_topic = env.str("source_topic", base_topic) # "rt_company_dim"
|
|
|
|
+target_topic = env.str("target_topic", base_topic) # "rt_company_dim"
|
|
|
|
+
|
|
max_concurrency = env.int("concurrency", 20)
|
|
max_concurrency = env.int("concurrency", 20)
|
|
|
|
|
|
|
|
|
|
-async def handle(producer, data):
|
|
|
|
|
|
+async def handle(producer, data: dict):
|
|
result = await task_distribute(data)
|
|
result = await task_distribute(data)
|
|
print("send : ", result)
|
|
print("send : ", result)
|
|
|
|
+
|
|
if result is not None:
|
|
if result is not None:
|
|
await producer.send_and_wait(target_topic, json.dumps(result).encode())
|
|
await producer.send_and_wait(target_topic, json.dumps(result).encode())
|
|
pass
|
|
pass
|
|
pass
|
|
pass
|
|
|
|
|
|
|
|
|
|
-async def main():
|
|
|
|
|
|
+async def on_message_received(producer, msg: IncomingMessage):
|
|
|
|
+ data: dict = json.loads(msg.body)
|
|
|
|
+ await handle(producer, data)
|
|
|
|
+ await msg.ack()
|
|
|
|
+ pass
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+async def main_for_rabbitmq():
|
|
|
|
+ print("start job. Listening queue :", source_topic, "send topic:", target_topic, "max concurrency:",
|
|
|
|
+ max_concurrency)
|
|
|
|
+
|
|
|
|
+ pool = AsyncPool(max_concurrency)
|
|
|
|
+
|
|
|
|
+ producer = get_aio_kafka_producer()
|
|
|
|
+ await producer.start()
|
|
|
|
+
|
|
|
|
+ queue_name = source_topic # 只需要配置这个
|
|
|
|
+
|
|
|
|
+ connection = await get_rabbitmq_connection()
|
|
|
|
+ async with connection:
|
|
|
|
+ channel: aio_pika.abc.AbstractChannel = await connection.channel()
|
|
|
|
+ await channel.set_qos(prefetch_count=max_concurrency)
|
|
|
|
+
|
|
|
|
+ # Declaring queue
|
|
|
|
+ queue: aio_pika.abc.AbstractQueue = await channel.declare_queue(
|
|
|
|
+ name=queue_name,
|
|
|
|
+ durable=True,
|
|
|
|
+ auto_delete=False
|
|
|
|
+ )
|
|
|
|
+
|
|
|
|
+ async with queue.iterator(no_ack=False) as queue_iter:
|
|
|
|
+ # Cancel consuming after __aexit__
|
|
|
|
+ async for message in queue_iter:
|
|
|
|
+ message: IncomingMessage = message
|
|
|
|
+ await pool.create_task(on_message_received(producer, message))
|
|
|
|
+ # async with message.process(ignore_processed=True):
|
|
|
|
+ # await pool.create_task(on_message_received(producer, message))
|
|
|
|
+ # pass
|
|
|
|
+ pass
|
|
|
|
+ pass
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+async def main_for_kafka():
|
|
pool = AsyncPool(max_concurrency)
|
|
pool = AsyncPool(max_concurrency)
|
|
consumer = get_aio_kafka_consumer(source_topic)
|
|
consumer = get_aio_kafka_consumer(source_topic)
|
|
producer = get_aio_kafka_producer()
|
|
producer = get_aio_kafka_producer()
|
|
@@ -37,10 +85,11 @@ async def main():
|
|
try:
|
|
try:
|
|
# Consume messages
|
|
# Consume messages
|
|
async for msg in consumer:
|
|
async for msg in consumer:
|
|
- # print("consumed: ", msg.topic, msg.partition, msg.offset,
|
|
|
|
- # msg.key, msg.value, msg.timestamp)
|
|
|
|
|
|
+ print("consumed: ", msg.topic, msg.partition, msg.offset,
|
|
|
|
+ msg.key, msg.value, msg.timestamp)
|
|
data: dict = json.loads(msg.value)
|
|
data: dict = json.loads(msg.value)
|
|
await pool.create_task(handle(producer, data))
|
|
await pool.create_task(handle(producer, data))
|
|
|
|
+
|
|
finally:
|
|
finally:
|
|
# Will leave consumer group; perform autocommit if enabled.
|
|
# Will leave consumer group; perform autocommit if enabled.
|
|
await consumer.stop()
|
|
await consumer.stop()
|
|
@@ -49,5 +98,5 @@ async def main():
|
|
|
|
|
|
|
|
|
|
if __name__ == '__main__':
|
|
if __name__ == '__main__':
|
|
- asyncio.run(main())
|
|
|
|
|
|
+ asyncio.run(main_for_rabbitmq())
|
|
pass
|
|
pass
|