JobMain.py 1.4 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253
  1. # -*- coding: utf-8 -*-
  2. # @Time : 2023/7/20 16:13
  3. # @Author : XuJiakai
  4. # @File : JobMain
  5. # @Software: PyCharm
  6. import asyncio
  7. import json
  8. from environs import Env
  9. from data_clean.task_distributor import task_distribute
  10. from data_clean.utils.async_client import get_aio_kafka_consumer, get_aio_kafka_producer
  11. from data_clean.utils.asyncio_pool import AsyncPool
  12. source_topic = "source_topic"
  13. target_topic = "target_topic"
  14. env = Env()
  15. max_concurrency = env.int("concurrency", 20)
  16. async def handle(producer, data):
  17. result = await task_distribute(data)
  18. print("send : ", result)
  19. if result is not None:
  20. await producer.send_and_wait(target_topic, json.dumps(result).encode())
  21. pass
  22. pass
  23. async def main():
  24. pool = AsyncPool(max_concurrency)
  25. consumer = get_aio_kafka_consumer(source_topic)
  26. producer = get_aio_kafka_producer()
  27. await producer.start()
  28. await consumer.start()
  29. try:
  30. # Consume messages
  31. async for msg in consumer:
  32. # print("consumed: ", msg.topic, msg.partition, msg.offset,
  33. # msg.key, msg.value, msg.timestamp)
  34. data: dict = json.loads(msg.value)
  35. await pool.create_task(handle(producer, data))
  36. finally:
  37. # Will leave consumer group; perform autocommit if enabled.
  38. await consumer.stop()
  39. pass
  40. if __name__ == '__main__':
  41. asyncio.run(main())
  42. pass