xxl_queue.py 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143
  1. # -*- coding: utf-8 -*-
  2. # @Time : 2022/12/2 9:19
  3. # @Author : XuJiakai
  4. # @File : xxl_queue
  5. # @Software: PyCharm
  6. from collections import OrderedDict
  7. from log import get_log
  8. log = get_log("xxl_queue")
  9. o = OrderedDict()
  10. class xxl_queue_plus:
  11. def __init__(self, pop_threshold, buff_size=100):
  12. """
  13. 类似于开心消消乐的定长队列,攒够阈值,则弹出数据
  14. 注意:该队列目前不能按照最新插入时间,调整覆盖
  15. :param pop_threshold: 弹出数据阈值
  16. :param buff_size: 缓冲区大小
  17. """
  18. assert isinstance(buff_size, int) and buff_size > 0, "请给我一个大于零的整数来指定长度"
  19. self.buff_size = buff_size
  20. self.pop_threshold = pop_threshold
  21. self.data = {}
  22. pass
  23. def append(self, key, obj):
  24. result_list = None
  25. flag = False
  26. if key in self.data:
  27. _obj_list = self.data[key]
  28. if len(_obj_list) + 1 >= self.pop_threshold:
  29. result_list = _obj_list.copy()
  30. result_list.append(obj)
  31. del self.data[key]
  32. return result_list
  33. pass
  34. pass
  35. def default_overwrite_handle(key, obj_list):
  36. log.info(f"overwrite record , key:{key} ojb_list:{obj_list}")
  37. pass
  38. class xxl_queue:
  39. def __init__(self, pop_threshold, buff_size=100, overwrite_handle=None):
  40. """
  41. 类似于开心消消乐的定长队列,攒够阈值,则弹出数据
  42. 注意:该队列目前不能按照最新插入时间,调整覆盖
  43. :param pop_threshold: 弹出数据阈值
  44. :param buff_size: 缓冲区大小
  45. """
  46. assert isinstance(buff_size, int) and buff_size > 0, "请给我一个大于零的整数来指定长度"
  47. self.len = buff_size
  48. self.data = [None] * self.len
  49. self.hand = 0
  50. self.pop_threshold = pop_threshold
  51. self.overwrite_handle = overwrite_handle
  52. pass
  53. def append(self, key, obj):
  54. result_list = None
  55. flag = False
  56. for i in range(len(self.data)):
  57. if self.data[i] is None:
  58. continue
  59. _key = self.data[i][0]
  60. _obj_list = self.data[i][1]
  61. if _key == key:
  62. if len(_obj_list) + 1 >= self.pop_threshold:
  63. r = _obj_list.copy()
  64. r.append(obj)
  65. result_list = r
  66. self.data[i] = None
  67. pass
  68. else:
  69. self.data[i][1].append(obj)
  70. pass
  71. flag = True
  72. pass
  73. pass
  74. if not flag:
  75. self._put(index=self.hand, key=key, obj=obj)
  76. pass
  77. return result_list
  78. pass
  79. def _put(self, index, key, obj):
  80. if self.data[index] is None:
  81. self.data[index] = (key, [obj])
  82. self.hand = (self.hand + 1) % self.len # 指针后移
  83. pass
  84. elif self.data[index][0] == key:
  85. self.data[index][1].append(obj)
  86. else:
  87. if self.overwrite_handle is not None:
  88. _key, _obj_list = self.data[index]
  89. self.overwrite_handle(_key, _obj_list)
  90. pass
  91. self.data[index] = (key, [obj]) # 覆盖当前index
  92. self.hand = (self.hand + 1) % self.len # 指针后移
  93. pass
  94. pass
  95. def get(self):
  96. ret = self.data[self.hand:]
  97. ret.extend(self.data[:self.hand])
  98. return ret
  99. if __name__ == '__main__':
  100. q = xxl_queue(pop_threshold=2, buff_size=3, overwrite_handle=default_overwrite_handle)
  101. print(q.append('a', '1'))
  102. print(q.append('a', '2'))
  103. # print(q.append('a', '3'))
  104. print(q.append('b', '1'))
  105. print(q.append('b', '2'))
  106. # print(q.append('b', '3'))
  107. print(q.append('a', '3'))
  108. print(q.append('b', '3'))
  109. print(q.append('c', '3'))
  110. print(q.append('d', '3'))
  111. print(q.append('e', '3'))
  112. print(q.append('f', '3'))
  113. # print(q.append('a', '1'))
  114. # print(q.append('a', '2'))
  115. pass