# -*- coding: utf-8 -*- # @Time : 2022/12/2 9:19 # @Author : XuJiakai # @File : xxl_queue # @Software: PyCharm from collections import OrderedDict from log import get_log log = get_log("xxl_queue") o = OrderedDict() class xxl_queue_plus: def __init__(self, pop_threshold, buff_size=100): """ 类似于开心消消乐的定长队列,攒够阈值,则弹出数据 注意:该队列目前不能按照最新插入时间,调整覆盖 :param pop_threshold: 弹出数据阈值 :param buff_size: 缓冲区大小 """ assert isinstance(buff_size, int) and buff_size > 0, "请给我一个大于零的整数来指定长度" self.buff_size = buff_size self.pop_threshold = pop_threshold self.data = {} pass def append(self, key, obj): result_list = None flag = False if key in self.data: _obj_list = self.data[key] if len(_obj_list) + 1 >= self.pop_threshold: result_list = _obj_list.copy() result_list.append(obj) del self.data[key] return result_list pass pass def default_overwrite_handle(key, obj_list): log.info(f"overwrite record , key:{key} ojb_list:{obj_list}") pass class xxl_queue: def __init__(self, pop_threshold, buff_size=100, overwrite_handle=None): """ 类似于开心消消乐的定长队列,攒够阈值,则弹出数据 注意:该队列目前不能按照最新插入时间,调整覆盖 :param pop_threshold: 弹出数据阈值 :param buff_size: 缓冲区大小 """ assert isinstance(buff_size, int) and buff_size > 0, "请给我一个大于零的整数来指定长度" self.len = buff_size self.data = [None] * self.len self.hand = 0 self.pop_threshold = pop_threshold self.overwrite_handle = overwrite_handle pass def append(self, key, obj): result_list = None flag = False for i in range(len(self.data)): if self.data[i] is None: continue _key = self.data[i][0] _obj_list = self.data[i][1] if _key == key: if len(_obj_list) + 1 >= self.pop_threshold: r = _obj_list.copy() r.append(obj) result_list = r self.data[i] = None pass else: self.data[i][1].append(obj) pass flag = True pass pass if not flag: self._put(index=self.hand, key=key, obj=obj) pass return result_list pass def _put(self, index, key, obj): if self.data[index] is None: self.data[index] = (key, [obj]) self.hand = (self.hand + 1) % self.len # 指针后移 pass elif self.data[index][0] == key: self.data[index][1].append(obj) else: if self.overwrite_handle is not None: _key, _obj_list = self.data[index] self.overwrite_handle(_key, _obj_list) pass self.data[index] = (key, [obj]) # 覆盖当前index self.hand = (self.hand + 1) % self.len # 指针后移 pass pass def get(self): ret = self.data[self.hand:] ret.extend(self.data[:self.hand]) return ret if __name__ == '__main__': q = xxl_queue(pop_threshold=2, buff_size=3, overwrite_handle=default_overwrite_handle) print(q.append('a', '1')) print(q.append('a', '2')) # print(q.append('a', '3')) print(q.append('b', '1')) print(q.append('b', '2')) # print(q.append('b', '3')) print(q.append('a', '3')) print(q.append('b', '3')) print(q.append('c', '3')) print(q.append('d', '3')) print(q.append('e', '3')) print(q.append('f', '3')) # print(q.append('a', '1')) # print(q.append('a', '2')) pass