Source code for eta.util.buffer

"""Utilities for manipulating priority queues, termed "buffers" in Eta's architecture.

NOTE: although importance values are positive, the values stored in the underlying priority queues are
negated, since heapq implements a min heap.
"""

import heapq

[docs] def enqueue(item, buffer): """Enqueue an item (either an object or an (importance, object) tuple) in a buffer. Parameters ---------- item : object or tuple[float, object] If item is an object, enqueue using a default importance value. Otherwise, item is an (importance, object) tuple, and the object is enqueued with the given importance. buffer : list[tuple[float, object]] """ if isinstance(item, tuple) and len(item) == 2 and isinstance(item[0], (int, float)): heapq.heappush(buffer, (-item[0], item[1])) else: heapq.heappush(buffer, (-1, item))
[docs] def enqueue_ordered(items, buffer, inc_val=0.0001): """Enqueue a list of items, preserving original order unless an importance value is explicitly specified. Parameters ---------- items : list A list where each item is either an object or (importance, object) tuple. For each item where an importance is not specified, a default importance value is generated using an increment so as to preserve the original order. buffer : list[tuple[float, object]] inc_val : float, default=0.0001 The increment to use for generated importance values. """ cur_max = max_importance(buffer) inc = inc_val importance = cur_max for item in items[::-1]: if isinstance(item, tuple) and len(item) == 2 and isinstance(item[0], (int, float)): importance = item[0] item = item[1] else: importance = cur_max importance += inc enqueue((importance, item), buffer) inc += inc_val
[docs] def is_empty(buffer): """Check whether a buffer is empty. Parameters ---------- buffer : list[tuple[float, object]] Returns ------- bool """ return False if buffer else True
[docs] def pop_item(buffer, importance=False): """Pop the top item from a buffer. Parameters ---------- buffer : list[tuple[float, object]] importance : bool, default=False If True is given, return the importance as well as the object. Returns ------- object or tuple[float, object] """ if is_empty(buffer): return None item = heapq.heappop(buffer) if importance: return (-item[0], item[1]) else: return item[1]
[docs] def pop_all(buffer, importance=False): """Pop all items from a buffer. Parameters ---------- buffer : list[tuple[float, object]] importance : bool, default=False If True is given, return the importances as well as the objects. Returns ------- list[object] or list[tuple[float, object]] """ ret = [] while not is_empty(buffer): ret.append(pop_item(buffer, importance=importance)) return ret
[docs] def get_item(buffer, importance=False): """Get the top item from a buffer without popping it. Parameters ---------- buffer : list[tuple[float, object]] importance : bool, default=False If True is given, return the importance as well as the object. Returns ------- object or tuple[float, object] """ if not is_empty(buffer): item = heapq.nsmallest(1, buffer)[0] if importance: return (-item[0], item[1]) else: return item[1] else: return None
[docs] def max_importance(buffer): """Get the maximum importance value from a buffer. Parameters ---------- buffer : list[tuple[float, object]] Returns ------- float """ if not is_empty(buffer): return -heapq.nsmallest(1, buffer)[0][0] else: return 0
[docs] def clear(buffer): """Empty a buffer. Parameters ---------- buffer : list[tuple[float, object]] """ buffer.clear()
[docs] def iterate(buffer, func=None): """Return a buffer as a list (optionally applying some function to each value). Parameters ---------- buffer : list[tuple[float, object]] func : function, optional A function to apply to each value in the buffer. Returns ------- list[object] """ elems = [item[1] for item in heapq.nsmallest(len(buffer), buffer)] if func: return [func(e) for e in elems] else: return elems