Source code for hanyuu.streamer.audio
from __future__ import unicode_literals
from __future__ import print_function
from __future__ import absolute_import
from .. import logger
logger = logger.getChild('audio')
import threading
import logging
import audiotools
from . import garbage
from . import encoder
from . import files
from . import icecast
[docs]class Manager(object):
"""
A class that manages the audio pipeline. Each component gets a reference
to the processor before it.
.. note::
This is a very cruel pipeline and has specifics to our needs and is in
no way a generic implementation.
Nor does it have proper definitions of what should go out or into
a processor.
--------------
All processors
--------------
The :class:`Manager` expects that all registered processors have at least
the following characteristics:
:func:`start`: Called when :meth:`Manager.start` is called. This should
initialize required components. The :class:`Manager`
expects that a call to `close` and `start` is close to
equal of recreating the whole instance.
:func:`close`: Called when :meth:`Manager.close` is called. This should
close down the processor cleanly and if potential long
running cleanups are to be done should use the
:mod:`garbage` sub package shipped with the **audio**
package.
:func:`__init__`: Called when the :class:`Manager` instance is created.
This should not start any state dependant parts, these
should be done in the `start` method instead.
Gets passed one positional argument that is the
previous processor in the chain. Or if the first
processor read below.
Gets passed extra keyword arguments if specified in
the class attribute `options`. Read more about this
attribute below.
---------------
First processor
---------------
The current version expects the first specified processor to take a
function as `source` argument. That can be called for the filepath of an
audiofile. This first processor is responsible for opening it.
.. note::
This means the processor doesn't actually need to decode the file but
that it is just expected to accept the function as `source`. What it
does with the function is not important to the :class:`Manager`.
--------------
Last processor
--------------
The current version expects the last specified processor to have several
methods available to be used by the :class:`Manager`. These are:
:func:`status`: A method that is called when :meth:`status` is called.
This should return something of significants to the
user.
:func:`metadata`: A method that accepts a single `unicode` argument.
This is called whenever new metadata is found at the
start of the processor chain.
"""
#: A list of processors that are instanced in order and are passed their
#: previous friend as first argument.
processors = [files.FileSource, encoder.Encoder, icecast.Icecast]
def __init__(self, source, processors=None, options=None, handlers=None):
"""
:param source:
A callable object that returns a FileInformation object.
See :meth:`Manager.get_source` for the exception.
:param processors:
An iterable of processors.
Defaults to :attr:`Manager.processors`
:param options:
A :const:`dict` containing options for the processors found in
:obj:`processors`.
See the individual **processor classes** for accepted options.
.. note::
Unused options are ignored.
:param handlers:
A :const:`dict` containing handlers (functions) that should be
called when the related event is fired.
See the individual **processor classes** for available events.
.. note::
Unused handlers are ignored.
.. warning::
Handlers are called in the audio pipeline threads. For this
reason you should avoid handlers that take a long time to
complete their work or block in other means.
"""
super(Manager, self).__init__()
processors = processors or self.processors
handlers = Handlers(handlers) if handlers else Handlers()
self.instances = []
self.started = threading.Event()
last_proc = self.get_source
for processor in processors:
proc_options = {}
for option, default in getattr(processor, 'options', []):
print(option, default)
proc_options[option] = options.get(option, default)
logger.debug("Creating {!r} instance.".format(processor))
# Create our processor instance with requested options.
# NOTE: We don't call the `start` method here but in our own `start`
# instead. Please don't change this.
proc_instance = processor(last_proc, proc_options, handlers)
# Append it to the instances list so we don't lose reference to
# them. Or garbage collect them by accident.
self.instances.append(proc_instance)
# Set our `last_proc` so we have an easy reference to pass to the
# next processor.
last_proc = proc_instance
self.source = source
[docs] def start(self):
"""
Calls the `start` method on all registered processor instances.
This method does nothing if a previous call to :meth:`start` was
successful but :meth:`close` was not called in between the two calls.
.. warning::
Exceptions are propagated.
"""
if not self.started.is_set():
for proc in self.instances:
proc.start()
self.started.set()
[docs] def status(self):
"""
Calls the `status` method on the last processor in the chain.
If no method was found returns :const:`False` instead.
"""
status = getattr(self.instances[-1], 'status', False)
# Make sure we don't call the `status` method when not needed.
return status() if status else status
[docs] def get_source(self):
"""
:returns unicode: A full file path to an audio file.
.. note::
This can also return :const:`None` if the user gives us a
:const:`None` as filename. This should be handled properly.
-------------------
Source return value
-------------------
The value returned from :meth:`Manager.source` is expected to be an
:class:`FileInformation` object. But there is one exception to this
rule.
When :meth:`Manager.source` returns a different type it will be used
as the positional arguments to the :class:`FileInformation` constructor
by using the `FileInformation(*returntype)` syntax.
"""
info = self.source()
if not isinstance(info, FileInformation):
info = FileInformation(*info)
if info.filename is None:
# If the filename is set to None we should close ourself implicitly
self.close()
# As well as closing ourself we also return None here to let the
# callee know we are closing down. The callee should handle this
# case properly.
return None
if info.metadata:
# We don't check for explicit None here since if there is no actual
# metadata passed it is set to an empty unicode sequence instead.
self.instances[-1].metadata(info.metadata)
return info.filename
try:
audiofile = files.AudioFile(filename)
except (files.AudioError) as err:
logger.exception("Unsupported file.")
return self.give_source()
except (IOError) as err:
logger.exception("Failed opening file.")
return self.give_source()
else:
if hasattr(self, 'icecast'):
self.icecast.set_metadata(meta)
return audiofile
[docs] def close(self):
"""
Calls the `close` method on all registered processor instances.
.. warning::
Exceptions are propagated.
"""
self.started.clear()
for proc in self.instances:
proc.close()
[docs]class Handlers(dict):
def __missing__(self, key):
return lambda *args, **kwargs: None
def __getattr__(self, key):
return self[key]
[docs]def test_dir(directory=u'/media/F/Music', files=None):
import os
import mutagen
files = set() if files is None else files
for base, dir, filenames in os.walk(directory):
for name in filenames:
files.add(os.path.join(base, name))
def pop_file():
try:
filename = files.pop()
except KeyError:
return (None, None)
if (filename.endswith('.flac') or
filename.endswith('.mp3') or
filename.endswith('.ogg')):
try:
meta = mutagen.File(filename, easy=True)
except:
meta = "No metadata available, because I errored."
else:
artist = meta.get('artist')
title = meta.get('title')
meta = u"{:s} - {:s}" if artist else u"{:s}"
if artist:
artist = u", ".join(artist)
if title:
title = u", ".join(title)
meta = meta.format(artist, title)
return (filename, meta)
else:
return pop_file()
return pop_file
[docs]def test_config(password=None):
return {'host': 'stream.r-a-d.io',
'port': 1130,
'password': password,
'format': 1,
'protocol': 0,
'mount': 'test.mp3'}