1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253 |
- # -*- coding: utf-8 -*-
- # @Time : 2023/7/20 16:13
- # @Author : XuJiakai
- # @File : JobMain
- # @Software: PyCharm
- import asyncio
- import json
- from environs import Env
- from data_clean.task_distributor import task_distribute
- from data_clean.utils.async_client import get_aio_kafka_consumer, get_aio_kafka_producer
- from data_clean.utils.asyncio_pool import AsyncPool
- source_topic = "source_topic"
- target_topic = "target_topic"
- env = Env()
- max_concurrency = env.int("concurrency", 20)
- async def handle(producer, data):
- result = await task_distribute(data)
- print("send : ", result)
- if result is not None:
- await producer.send_and_wait(target_topic, json.dumps(result).encode())
- pass
- pass
- async def main():
- pool = AsyncPool(max_concurrency)
- consumer = get_aio_kafka_consumer(source_topic)
- producer = get_aio_kafka_producer()
- await producer.start()
- await consumer.start()
- try:
- # Consume messages
- async for msg in consumer:
- # print("consumed: ", msg.topic, msg.partition, msg.offset,
- # msg.key, msg.value, msg.timestamp)
- data: dict = json.loads(msg.value)
- await pool.create_task(handle(producer, data))
- finally:
- # Will leave consumer group; perform autocommit if enabled.
- await consumer.stop()
- pass
- if __name__ == '__main__':
- asyncio.run(main())
- pass
|