# -*- coding: utf-8 -*- # @Time : 2023/7/20 16:50 # @Author : XuJiakai # @File : TestMain # @Software: PyCharm import asyncio import json from data_clean.api.http_api import get_json from data_clean.utils.async_client import get_aio_kafka_producer async def get_test_data(): tn = "company_court_open_announcement" url = f"http://47.101.221.131:8288/hbase/get/ng_rt_{tn}/8fd218fc8461789c4c401eb1eaa3d723" res = await get_json(url) res = json.loads(json.dumps(res['data']).lower()) # res["plaintiff_info"] = '[{"name":"季韩旭","litigant_id":""},{"name":"戴莉","litigant_id":""}]' res["plaintiff_info"] = '[{"name":"Z某某","litigant_id":""},{"name":"戴莉","litigant_id":""}]' # res["start_date"] = '1948-10-01 00:00:00' data = { "data": { tn: [ res ] } } return data async def test_send_kafka(): producer = get_aio_kafka_producer() await producer.start() data = await get_test_data() res = await producer.send_and_wait("source_topic", json.dumps(data).encode()) print(res) await producer.stop() pass async def test_for_url(): data = await get_test_data() asyncio.get_running_loop() print("receive : ", data) from data_clean.task_distributor import task_distribute result_data = await task_distribute(data) print("send : ", result_data) pass if __name__ == '__main__': # asyncio.run(test_send_kafka()) asyncio.run(test_for_url()) pass