# -*- coding: utf-8 -*- # @Time : 2023/7/20 16:13 # @Author : XuJiakai # @File : JobMain # @Software: PyCharm import asyncio import json from environs import Env from data_clean.task_distributor import task_distribute from data_clean.utils.async_client import get_aio_kafka_consumer, get_aio_kafka_producer from data_clean.utils.asyncio_pool import AsyncPool source_topic = "source_topic" target_topic = "target_topic" env = Env() max_concurrency = env.int("concurrency", 20) async def handle(producer, data): result = await task_distribute(data) print("send : ", result) if result is not None: await producer.send_and_wait(target_topic, json.dumps(result).encode()) pass pass async def main(): pool = AsyncPool(max_concurrency) consumer = get_aio_kafka_consumer(source_topic) producer = get_aio_kafka_producer() await producer.start() await consumer.start() try: # Consume messages async for msg in consumer: # print("consumed: ", msg.topic, msg.partition, msg.offset, # msg.key, msg.value, msg.timestamp) data: dict = json.loads(msg.value) await pool.create_task(handle(producer, data)) finally: # Will leave consumer group; perform autocommit if enabled. await consumer.stop() pass if __name__ == '__main__': asyncio.run(main()) pass