Source code for hanyuu.streamer.audio.garbage
from __future__ import unicode_literals
from __future__ import print_function
from __future__ import absolute_import
import threading
import logging
import time
logger = logging.getLogger('garbage')
[docs]class Singleton(type):
def __init__(mcs, name, bases, dict):
super(Singleton, mcs).__init__(name, bases, dict)
mcs.instance = None
def __call__(mcs, *args, **kw):
if mcs.instance is None:
mcs.instance = super(Singleton, mcs).__call__(*args, **kw)
return mcs.instance
[docs]class Collector(object):
__metaclass__ = Singleton
_hooks = list()
def __init__(self):
super(Collector, self).__init__()
self.items = set()
self.collecting = threading.Event()
self.thread = threading.Thread(target=self.run,
name="Garbage Collection Thread")
self.thread.daemon = True
self.thread.start()
[docs] def add(self, garbage):
self.items.add(garbage)
for hook in self._hooks:
try:
hook(garbage)
except:
logger.exception("Hook function exception.")
[docs] def run(self):
while not self.collecting.is_set():
removal = set()
for item in self.items.copy():
try:
code = item.collect() # Try collecting
except:
logger.exception("Collection Failure.")
else:
if code: # If it returned True it was successful
removal.add(item)
self.items -= removal # We remove our set from the item one
time.sleep(15.0)
[docs] def info(self):
"""Returns a list of GarbageInfo objects containing information
about current pending garbage."""
yield None # Not implemented
@classmethod
[docs] def add_hook(cls, hook):
cls._hooks.append(hook)
[docs]class Garbage(object):
collector = Collector()
def __init__(self, item=None):
super(Garbage, self).__init__()
self.item = item
self.collector.add(self)
[docs] def collect(self):
"""Gets called on each collection cycle.
Should return True if the garbage got cleaned up properly,
False if it requires another collect in the next cycle.
"""
raise NotImplementedError("collect method not overridden.")