123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157 |
- # -*- coding: utf-8 -*-
- # @Time : 2022/12/2 9:19
- # @Author : XuJiakai
- # @File : xxl_queue
- # @Software: PyCharm
- from collections import OrderedDict
- from log import get_log
- log = get_log("xxl_queue")
- o = OrderedDict()
- class xxl_queue_plus:
- def __init__(self, pop_threshold, buff_size=100):
- """
- 类似于开心消消乐的定长队列,攒够阈值,则弹出数据
- 注意:该队列目前不能按照最新插入时间,调整覆盖
- :param pop_threshold: 弹出数据阈值
- :param buff_size: 缓冲区大小
- """
- assert isinstance(buff_size, int) and buff_size > 0, "请给我一个大于零的整数来指定长度"
- self.buff_size = buff_size
- self.pop_threshold = pop_threshold
- self.data = {}
- pass
- def append(self, key, obj):
- result_list = None
- flag = False
- if key in self.data:
- _obj_list = self.data[key]
- if len(_obj_list) + 1 >= self.pop_threshold:
- result_list = _obj_list.copy()
- result_list.append(obj)
- del self.data[key]
- return result_list
- pass
- pass
- def default_overwrite_handle(key, obj_list):
- log.info(f"overwrite record , key:{key} ojb_list:{obj_list}")
- pass
- class xxl_queue:
- def __init__(self, pop_threshold, buff_size=100, overwrite_handle=None):
- """
- 类似于开心消消乐的定长队列,攒够阈值,则弹出数据
- 注意:该队列目前不能按照最新插入时间,调整覆盖
- :param pop_threshold: 弹出数据阈值
- :param buff_size: 缓冲区大小
- """
- assert isinstance(buff_size, int) and buff_size > 0, "请给我一个大于零的整数来指定长度"
- self.len = buff_size
- self.data = [None] * self.len
- self.hand = 0
- self.pop_threshold = pop_threshold
- self.overwrite_handle = overwrite_handle
- self.index_data = {}
- pass
- def append(self, key, obj):
- result_list = None
- flag = False
- if key in self.index_data:
- tmp_index = self.index_data[key]
- if self.data[tmp_index] is None:
- ##todo
- pass
- else:
- _key = self.data[tmp_index][0]
- _obj_list = self.data[tmp_index][1]
- if _key == key:
- if len(_obj_list) + 1 >= self.pop_threshold:
- r = _obj_list.copy()
- r.append(obj)
- result_list = r
- del self.index_data[key]
- self.data[tmp_index] = None
- pass
- else:
- self.data[tmp_index][1].append(obj)
- pass
- flag = True
- pass
- pass
- pass
- if not flag:
- self._put(index=self.hand, key=key, obj=obj)
- pass
- return result_list
- pass
- def _put(self, index, key, obj):
- if self.data[index] is None:
- self.data[index] = (key, [obj])
- self.index_data[key] = index
- self.hand = (self.hand + 1) % self.len # 指针后移
- pass
- elif self.data[index][0] == key:
- self.data[index][1].append(obj)
- else:
- _key, _obj_list = self.data[index]
- del self.index_data[_key]
- if self.overwrite_handle is not None:
- self.overwrite_handle(_key, _obj_list)
- pass
- self.index_data[key] = index
- self.data[index] = (key, [obj]) # 覆盖当前index
- self.hand = (self.hand + 1) % self.len # 指针后移
- pass
- pass
- def get(self):
- ret = self.data[self.hand:]
- ret.extend(self.data[:self.hand])
- return ret
- def print_info(self):
- print(f"--info-- \n index_data: {self.index_data} \n data: {self.data} \n")
- pass
- if __name__ == '__main__':
- q = xxl_queue(pop_threshold=2, buff_size=3, overwrite_handle=default_overwrite_handle)
- log.info('%s' % q.append('a', '1'))
- log.info('%s' % q.append('a', '2'))
- # print(q.append('a', '3'))
- log.info('%s' % q.append('b', '1'))
- log.info('%s' % q.append('b', '2'))
- # print(q.append('b', '3'))
- log.info('%s' % q.append('a', '3'))
- log.info('%s' % q.append('b', '3'))
- log.info('%s' % q.append('c', '3'))
- log.info('%s' % q.append('d', '3'))
- log.info('%s' % q.append('e', '3'))
- log.info('%s' % q.append('f', '3'))
- # print(q.append('a', '1'))
- # print(q.append('a', '2'))
- pass
|