123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107 |
- # -*- coding: utf-8 -*-
- # @Time : 2023/7/20 16:13
- # @Author : XuJiakai
- # @File : JobMain
- # @Software: PyCharm
- import asyncio
- import json
- from functools import partial
- import aio_pika
- from aio_pika import IncomingMessage
- from environs import Env
- from data_clean.task_distributor import task_distribute
- from loguru import logger as log
- from data_clean.utils.async_client import get_aio_kafka_consumer, get_aio_kafka_producer, get_rabbitmq_connection
- from data_clean.utils.asyncio_pool import AsyncPool
- json.dumps = partial(json.dumps, ensure_ascii=False)
- env = Env()
- base_topic = env.str("base_topic", "rt_other_dim")
- source_topic = env.str("source_topic", base_topic) # "rt_company_dim"
- target_topic = env.str("target_topic", base_topic) # "rt_company_dim"
- max_concurrency = env.int("concurrency", 1)
- async def handle(producer, data: dict):
- result = await task_distribute(data)
- # print("send : ", result)
- if result is not None:
- await producer.send_and_wait(target_topic, json.dumps(result).encode())
- pass
- pass
- async def on_message_received(producer, msg: IncomingMessage):
- data: dict = json.loads(msg.body)
- await handle(producer, data)
- await msg.ack()
- pass
- async def main_for_rabbitmq():
- log.info("start job. Listening queue : {} , send topic: {} , max concurrency: {}", source_topic, target_topic,
- max_concurrency)
- pool = AsyncPool(max_concurrency)
- producer = get_aio_kafka_producer()
- await producer.start()
- queue_name = source_topic # 只需要配置这个
- connection = await get_rabbitmq_connection()
- async with connection:
- channel: aio_pika.abc.AbstractChannel = await connection.channel()
- await channel.set_qos(prefetch_count=max_concurrency)
- # Declaring queue
- queue: aio_pika.abc.AbstractQueue = await channel.declare_queue(
- name=queue_name,
- durable=True,
- auto_delete=False
- )
- async with queue.iterator(no_ack=False) as queue_iter:
- # Cancel consuming after __aexit__
- async for message in queue_iter:
- message: IncomingMessage = message
- await pool.create_task(on_message_received(producer, message))
- # async with message.process(ignore_processed=True):
- # await pool.create_task(on_message_received(producer, message))
- # pass
- pass
- pass
- async def main_for_kafka():
- pool = AsyncPool(max_concurrency)
- consumer = get_aio_kafka_consumer(source_topic)
- producer = get_aio_kafka_producer()
- await producer.start()
- await consumer.start()
- try:
- # Consume messages
- async for msg in consumer:
- print("consumed: ", msg.topic, msg.partition, msg.offset,
- msg.key, msg.value, msg.timestamp)
- data: dict = json.loads(msg.value)
- await pool.create_task(handle(producer, data))
- finally:
- # Will leave consumer group; perform autocommit if enabled.
- await consumer.stop()
- pass
- if __name__ == '__main__':
- asyncio.run(main_for_rabbitmq())
- pass
|