TestMain.py 1.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657
  1. # -*- coding: utf-8 -*-
  2. # @Time : 2023/7/20 16:50
  3. # @Author : XuJiakai
  4. # @File : TestMain
  5. # @Software: PyCharm
  6. import asyncio
  7. import json
  8. from data_clean.api.http_api import get_json
  9. from data_clean.utils.async_client import get_aio_kafka_producer
  10. async def get_test_data():
  11. tn = "company_court_open_announcement"
  12. url = f"http://47.101.221.131:8288/hbase/get/ng_rt_{tn}/8fd218fc8461789c4c401eb1eaa3d723"
  13. res = await get_json(url)
  14. res = json.loads(json.dumps(res['data']).lower())
  15. # res["plaintiff_info"] = '[{"name":"季韩旭","litigant_id":""},{"name":"戴莉","litigant_id":""}]'
  16. res["plaintiff_info"] = '[{"name":"Z某某","litigant_id":""},{"name":"戴莉","litigant_id":""}]'
  17. # res["start_date"] = '1948-10-01 00:00:00'
  18. data = {
  19. "data": {
  20. tn: [
  21. res
  22. ]
  23. }
  24. }
  25. return data
  26. async def test_send_kafka():
  27. producer = get_aio_kafka_producer()
  28. await producer.start()
  29. data = await get_test_data()
  30. res = await producer.send_and_wait("source_topic", json.dumps(data).encode())
  31. print(res)
  32. await producer.stop()
  33. pass
  34. async def test_for_url():
  35. data = await get_test_data()
  36. asyncio.get_running_loop()
  37. print("receive : ", data)
  38. from data_clean.task_distributor import task_distribute
  39. result_data = await task_distribute(data)
  40. print("send : ", result_data)
  41. pass
  42. if __name__ == '__main__':
  43. # asyncio.run(test_send_kafka())
  44. asyncio.run(test_for_url())
  45. pass