# -*- coding: utf-8 -*- # @Time : 2023/7/20 16:50 # @Author : XuJiakai # @File : TestMain # @Software: PyCharm import asyncio import json from functools import partial from aio_pika import Message from JobMain import source_topic from data_clean.api.http_api import get from data_clean.utils.async_client import get_aio_kafka_producer, get_rabbitmq_connection from data_clean.utils import to_string json.dumps = partial(json.dumps, ensure_ascii=False) from data_clean.utils.data_schema_utils import record_to_json async def get_test_data(): tn = "company_court_open_announcement" url = f"http://47.101.221.131:8288/hbase/get/ng_rt_{tn}/4c957482789a8218461079215b4d239b" res = await get(url) res = await record_to_json(tn, res['data']) # res = json.loads(json.dumps(res['data']).lower()) # res["plaintiff_info"] = '[{"name":"季韩旭","litigant_id":""},{"name":"戴莉","litigant_id":""}]' # res["plaintiff_info"] = '[{"name":"Z某某","litigant_id":""},{"name":"戴莉","litigant_id":""}]' # res["deleted"] = 9 # res["start_date"] = '1959-08-16 00:00:00' # del res['case_no'] data = { "data": { tn: [ res ] } } return data async def test_send_rabbitmq(): connection = await get_rabbitmq_connection() channel = await connection.channel() data = await get_test_data() await channel.default_exchange.publish( Message( bytes(json.dumps(data), 'utf-8'), ), routing_key=source_topic, ) await channel.close() await connection.close() pass async def test_send_kafka(): producer = get_aio_kafka_producer() await producer.start() data = await get_test_data() res = await producer.send_and_wait(source_topic, json.dumps(data).encode()) print(res) await producer.stop() pass async def test_for_url(): data = await get_test_data() asyncio.get_running_loop() print("receive : ", to_string(data)) from data_clean.task_distributor import task_distribute result_data = await task_distribute(data) print("send : ", to_string(result_data)) pass async def test(): data = await get_test_data() print(data) from data_clean.statistic.statistic_filter import filter_data filter_data(data) print(data) pass if __name__ == '__main__': # asyncio.run(test_send_kafka()) # asyncio.run(test_send_rabbitmq()) # asyncio.run(test()) asyncio.run(test_for_url()) pass