JobMain.py 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108
  1. # -*- coding: utf-8 -*-
  2. # @Time : 2023/7/20 16:13
  3. # @Author : XuJiakai
  4. # @File : JobMain
  5. # @Software: PyCharm
  6. import asyncio
  7. import json
  8. from functools import partial
  9. import aio_pika
  10. from aio_pika import IncomingMessage
  11. from environs import Env
  12. from data_clean.task_distributor import task_distribute
  13. from data_clean.utils import get_log
  14. from data_clean.utils.async_client import get_aio_kafka_consumer, get_aio_kafka_producer, get_rabbitmq_connection
  15. from data_clean.utils.asyncio_pool import AsyncPool
  16. json.dumps = partial(json.dumps, ensure_ascii=False)
  17. env = Env()
  18. base_topic = env.str("base_topic", "rt_other_dim")
  19. source_topic = env.str("source_topic", base_topic) # "rt_company_dim"
  20. target_topic = env.str("target_topic", base_topic) # "rt_company_dim"
  21. max_concurrency = env.int("concurrency", 1)
  22. log = get_log("JobMain")
  23. async def handle(producer, data: dict):
  24. result = await task_distribute(data)
  25. # print("send : ", result)
  26. if result is not None:
  27. await producer.send_and_wait(target_topic, json.dumps(result).encode())
  28. pass
  29. pass
  30. async def on_message_received(producer, msg: IncomingMessage):
  31. data: dict = json.loads(msg.body)
  32. await handle(producer, data)
  33. await msg.ack()
  34. pass
  35. async def main_for_rabbitmq():
  36. log.info("start job. Listening queue : %s , send topic: %s , max concurrency: %s", source_topic, target_topic,
  37. max_concurrency)
  38. pool = AsyncPool(max_concurrency)
  39. producer = get_aio_kafka_producer()
  40. await producer.start()
  41. queue_name = source_topic # 只需要配置这个
  42. connection = await get_rabbitmq_connection()
  43. async with connection:
  44. channel: aio_pika.abc.AbstractChannel = await connection.channel()
  45. await channel.set_qos(prefetch_count=max_concurrency)
  46. # Declaring queue
  47. queue: aio_pika.abc.AbstractQueue = await channel.declare_queue(
  48. name=queue_name,
  49. durable=True,
  50. auto_delete=False
  51. )
  52. async with queue.iterator(no_ack=False) as queue_iter:
  53. # Cancel consuming after __aexit__
  54. async for message in queue_iter:
  55. message: IncomingMessage = message
  56. await pool.create_task(on_message_received(producer, message))
  57. # async with message.process(ignore_processed=True):
  58. # await pool.create_task(on_message_received(producer, message))
  59. # pass
  60. pass
  61. pass
  62. async def main_for_kafka():
  63. pool = AsyncPool(max_concurrency)
  64. consumer = get_aio_kafka_consumer(source_topic)
  65. producer = get_aio_kafka_producer()
  66. await producer.start()
  67. await consumer.start()
  68. try:
  69. # Consume messages
  70. async for msg in consumer:
  71. print("consumed: ", msg.topic, msg.partition, msg.offset,
  72. msg.key, msg.value, msg.timestamp)
  73. data: dict = json.loads(msg.value)
  74. await pool.create_task(handle(producer, data))
  75. finally:
  76. # Will leave consumer group; perform autocommit if enabled.
  77. await consumer.stop()
  78. pass
  79. if __name__ == '__main__':
  80. asyncio.run(main_for_rabbitmq())
  81. pass