# -*- coding: utf-8 -*- # @Time : 2023/7/20 16:13 # @Author : XuJiakai # @File : JobMain # @Software: PyCharm import asyncio import json from functools import partial import aio_pika from aio_pika import IncomingMessage from environs import Env from data_clean.task_distributor import task_distribute from loguru import logger as log from data_clean.utils.async_client import get_aio_kafka_consumer, get_aio_kafka_producer, get_rabbitmq_connection from data_clean.utils.asyncio_pool import AsyncPool json.dumps = partial(json.dumps, ensure_ascii=False) env = Env() base_topic = env.str("base_topic", "rt_other_dim") source_topic = env.str("source_topic", base_topic) # "rt_company_dim" target_topic = env.str("target_topic", base_topic) # "rt_company_dim" max_concurrency = env.int("concurrency", 1) async def handle(producer, data: dict): result = await task_distribute(data) # print("send : ", result) if result is not None: await producer.send_and_wait(target_topic, json.dumps(result).encode()) pass pass async def on_message_received(producer, msg: IncomingMessage): data: dict = json.loads(msg.body) await handle(producer, data) await msg.ack() pass async def main_for_rabbitmq(): log.info("start job. Listening queue : {} , send topic: {} , max concurrency: {}", source_topic, target_topic, max_concurrency) pool = AsyncPool(max_concurrency) producer = get_aio_kafka_producer() await producer.start() queue_name = source_topic # 只需要配置这个 connection = await get_rabbitmq_connection() async with connection: channel: aio_pika.abc.AbstractChannel = await connection.channel() await channel.set_qos(prefetch_count=max_concurrency) # Declaring queue queue: aio_pika.abc.AbstractQueue = await channel.declare_queue( name=queue_name, durable=True, auto_delete=False ) async with queue.iterator(no_ack=False) as queue_iter: # Cancel consuming after __aexit__ async for message in queue_iter: message: IncomingMessage = message await pool.create_task(on_message_received(producer, message)) # async with message.process(ignore_processed=True): # await pool.create_task(on_message_received(producer, message)) # pass pass pass async def main_for_kafka(): pool = AsyncPool(max_concurrency) consumer = get_aio_kafka_consumer(source_topic) producer = get_aio_kafka_producer() await producer.start() await consumer.start() try: # Consume messages async for msg in consumer: print("consumed: ", msg.topic, msg.partition, msg.offset, msg.key, msg.value, msg.timestamp) data: dict = json.loads(msg.value) await pool.create_task(handle(producer, data)) finally: # Will leave consumer group; perform autocommit if enabled. await consumer.stop() pass if __name__ == '__main__': asyncio.run(main_for_rabbitmq()) pass