TestMain.py 2.5 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495
  1. # -*- coding: utf-8 -*-
  2. # @Time : 2023/7/20 16:50
  3. # @Author : XuJiakai
  4. # @File : TestMain
  5. # @Software: PyCharm
  6. import asyncio
  7. import json
  8. from functools import partial
  9. from aio_pika import Message
  10. from JobMain import source_topic
  11. from data_clean.api.http_api import get
  12. from data_clean.utils.async_client import get_aio_kafka_producer, get_rabbitmq_connection
  13. from data_clean.utils import to_string
  14. json.dumps = partial(json.dumps, ensure_ascii=False)
  15. from data_clean.utils.data_schema_utils import record_to_json
  16. async def get_test_data():
  17. tn = "company_court_open_announcement"
  18. url = f"http://47.101.221.131:8288/hbase/get/ng_rt_{tn}/4c957482789a8218461079215b4d239b"
  19. res = await get(url)
  20. res = await record_to_json(tn, res['data'])
  21. # res = json.loads(json.dumps(res['data']).lower())
  22. # res["plaintiff_info"] = '[{"name":"季韩旭","litigant_id":""},{"name":"戴莉","litigant_id":""}]'
  23. # res["plaintiff_info"] = '[{"name":"Z某某","litigant_id":""},{"name":"戴莉","litigant_id":""}]'
  24. # res["deleted"] = 9
  25. # res["start_date"] = '1959-08-16 00:00:00'
  26. # del res['case_no']
  27. data = {
  28. "data": {
  29. tn: [
  30. res
  31. ]
  32. }
  33. }
  34. return data
  35. async def test_send_rabbitmq():
  36. connection = await get_rabbitmq_connection()
  37. channel = await connection.channel()
  38. data = await get_test_data()
  39. await channel.default_exchange.publish(
  40. Message(
  41. bytes(json.dumps(data), 'utf-8'),
  42. ), routing_key=source_topic,
  43. )
  44. await channel.close()
  45. await connection.close()
  46. pass
  47. async def test_send_kafka():
  48. producer = get_aio_kafka_producer()
  49. await producer.start()
  50. data = await get_test_data()
  51. res = await producer.send_and_wait(source_topic, json.dumps(data).encode())
  52. print(res)
  53. await producer.stop()
  54. pass
  55. async def test_for_url():
  56. data = await get_test_data()
  57. asyncio.get_running_loop()
  58. print("receive : ", to_string(data))
  59. from data_clean.task_distributor import task_distribute
  60. result_data = await task_distribute(data)
  61. print("send : ", to_string(result_data))
  62. pass
  63. async def test():
  64. data = await get_test_data()
  65. print(data)
  66. from data_clean.statistic.statistic_filter import filter_data
  67. filter_data(data)
  68. print(data)
  69. pass
  70. if __name__ == '__main__':
  71. # asyncio.run(test_send_kafka())
  72. # asyncio.run(test_send_rabbitmq())
  73. # asyncio.run(test())
  74. asyncio.run(test_for_url())
  75. pass