xxl_queue.py 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157
  1. # -*- coding: utf-8 -*-
  2. # @Time : 2022/12/2 9:19
  3. # @Author : XuJiakai
  4. # @File : xxl_queue
  5. # @Software: PyCharm
  6. from collections import OrderedDict
  7. from log import get_log
  8. log = get_log("xxl_queue")
  9. o = OrderedDict()
  10. class xxl_queue_plus:
  11. def __init__(self, pop_threshold, buff_size=100):
  12. """
  13. 类似于开心消消乐的定长队列,攒够阈值,则弹出数据
  14. 注意:该队列目前不能按照最新插入时间,调整覆盖
  15. :param pop_threshold: 弹出数据阈值
  16. :param buff_size: 缓冲区大小
  17. """
  18. assert isinstance(buff_size, int) and buff_size > 0, "请给我一个大于零的整数来指定长度"
  19. self.buff_size = buff_size
  20. self.pop_threshold = pop_threshold
  21. self.data = {}
  22. pass
  23. def append(self, key, obj):
  24. result_list = None
  25. flag = False
  26. if key in self.data:
  27. _obj_list = self.data[key]
  28. if len(_obj_list) + 1 >= self.pop_threshold:
  29. result_list = _obj_list.copy()
  30. result_list.append(obj)
  31. del self.data[key]
  32. return result_list
  33. pass
  34. pass
  35. def default_overwrite_handle(key, obj_list):
  36. log.info(f"overwrite record , key:{key} ojb_list:{obj_list}")
  37. pass
  38. class xxl_queue:
  39. def __init__(self, pop_threshold, buff_size=100, overwrite_handle=None):
  40. """
  41. 类似于开心消消乐的定长队列,攒够阈值,则弹出数据
  42. 注意:该队列目前不能按照最新插入时间,调整覆盖
  43. :param pop_threshold: 弹出数据阈值
  44. :param buff_size: 缓冲区大小
  45. """
  46. assert isinstance(buff_size, int) and buff_size > 0, "请给我一个大于零的整数来指定长度"
  47. self.len = buff_size
  48. self.data = [None] * self.len
  49. self.hand = 0
  50. self.pop_threshold = pop_threshold
  51. self.overwrite_handle = overwrite_handle
  52. self.index_data = {}
  53. pass
  54. def append(self, key, obj):
  55. result_list = None
  56. flag = False
  57. if key in self.index_data:
  58. tmp_index = self.index_data[key]
  59. if self.data[tmp_index] is None:
  60. ##todo
  61. pass
  62. else:
  63. _key = self.data[tmp_index][0]
  64. _obj_list = self.data[tmp_index][1]
  65. if _key == key:
  66. if len(_obj_list) + 1 >= self.pop_threshold:
  67. r = _obj_list.copy()
  68. r.append(obj)
  69. result_list = r
  70. del self.index_data[key]
  71. self.data[tmp_index] = None
  72. pass
  73. else:
  74. self.data[tmp_index][1].append(obj)
  75. pass
  76. flag = True
  77. pass
  78. pass
  79. pass
  80. if not flag:
  81. self._put(index=self.hand, key=key, obj=obj)
  82. pass
  83. return result_list
  84. pass
  85. def _put(self, index, key, obj):
  86. if self.data[index] is None:
  87. self.data[index] = (key, [obj])
  88. self.index_data[key] = index
  89. self.hand = (self.hand + 1) % self.len # 指针后移
  90. pass
  91. elif self.data[index][0] == key:
  92. self.data[index][1].append(obj)
  93. else:
  94. _key, _obj_list = self.data[index]
  95. del self.index_data[_key]
  96. if self.overwrite_handle is not None:
  97. self.overwrite_handle(_key, _obj_list)
  98. pass
  99. self.index_data[key] = index
  100. self.data[index] = (key, [obj]) # 覆盖当前index
  101. self.hand = (self.hand + 1) % self.len # 指针后移
  102. pass
  103. pass
  104. def get(self):
  105. ret = self.data[self.hand:]
  106. ret.extend(self.data[:self.hand])
  107. return ret
  108. def print_info(self):
  109. print(f"--info-- \n index_data: {self.index_data} \n data: {self.data} \n")
  110. pass
  111. if __name__ == '__main__':
  112. q = xxl_queue(pop_threshold=2, buff_size=3, overwrite_handle=default_overwrite_handle)
  113. log.info('%s' % q.append('a', '1'))
  114. log.info('%s' % q.append('a', '2'))
  115. # print(q.append('a', '3'))
  116. log.info('%s' % q.append('b', '1'))
  117. log.info('%s' % q.append('b', '2'))
  118. # print(q.append('b', '3'))
  119. log.info('%s' % q.append('a', '3'))
  120. log.info('%s' % q.append('b', '3'))
  121. log.info('%s' % q.append('c', '3'))
  122. log.info('%s' % q.append('d', '3'))
  123. log.info('%s' % q.append('e', '3'))
  124. log.info('%s' % q.append('f', '3'))
  125. # print(q.append('a', '1'))
  126. # print(q.append('a', '2'))
  127. pass