1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495 |
- # -*- coding: utf-8 -*-
- # @Time : 2023/7/20 16:50
- # @Author : XuJiakai
- # @File : TestMain
- # @Software: PyCharm
- import asyncio
- import json
- from functools import partial
- from aio_pika import Message
- from JobMain import source_topic
- from data_clean.api.http_api import get
- from data_clean.utils.async_client import get_aio_kafka_producer, get_rabbitmq_connection
- from data_clean.utils import to_string
- json.dumps = partial(json.dumps, ensure_ascii=False)
- from data_clean.utils.data_schema_utils import record_to_json
- async def get_test_data():
- tn = "company_court_open_announcement"
- url = f"http://47.101.221.131:8288/hbase/get/ng_rt_{tn}/4c957482789a8218461079215b4d239b"
- res = await get(url)
- res = await record_to_json(tn, res['data'])
- # res = json.loads(json.dumps(res['data']).lower())
- # res["plaintiff_info"] = '[{"name":"季韩旭","litigant_id":""},{"name":"戴莉","litigant_id":""}]'
- # res["plaintiff_info"] = '[{"name":"Z某某","litigant_id":""},{"name":"戴莉","litigant_id":""}]'
- # res["deleted"] = 9
- # res["start_date"] = '1959-08-16 00:00:00'
- # del res['case_no']
- data = {
- "data": {
- tn: [
- res
- ]
- }
- }
- return data
- async def test_send_rabbitmq():
- connection = await get_rabbitmq_connection()
- channel = await connection.channel()
- data = await get_test_data()
- await channel.default_exchange.publish(
- Message(
- bytes(json.dumps(data), 'utf-8'),
- ), routing_key=source_topic,
- )
- await channel.close()
- await connection.close()
- pass
- async def test_send_kafka():
- producer = get_aio_kafka_producer()
- await producer.start()
- data = await get_test_data()
- res = await producer.send_and_wait(source_topic, json.dumps(data).encode())
- print(res)
- await producer.stop()
- pass
- async def test_for_url():
- data = await get_test_data()
- asyncio.get_running_loop()
- print("receive : ", to_string(data))
- from data_clean.task_distributor import task_distribute
- result_data = await task_distribute(data)
- print("send : ", to_string(result_data))
- pass
- async def test():
- data = await get_test_data()
- print(data)
- from data_clean.statistic.statistic_filter import filter_data
- filter_data(data)
- print(data)
- pass
- if __name__ == '__main__':
- # asyncio.run(test_send_kafka())
- # asyncio.run(test_send_rabbitmq())
- # asyncio.run(test())
- asyncio.run(test_for_url())
- pass
|