Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Turn local scanning into a pipeline #25

Open
adamcik opened this issue Jun 5, 2014 · 7 comments
Open

Turn local scanning into a pipeline #25

adamcik opened this issue Jun 5, 2014 · 7 comments
Labels
A-scan Area: Media scanning C-enhancement Category: A PR with an enhancement or an issue with an enhancement proposal

Comments

@adamcik
Copy link
Member

adamcik commented Jun 5, 2014

Playing with some prototypes to speed up scanning I've come up with the following plan:

  • Switch to having a single actor look for files to scan
  • This actor feeds results onto a worker queue which will have 1-n scanners doing work
  • Scanners then place the results on a new queue which feeds into the library

Additionally this means we can stat files as we go instead of trying to everything up front and most likely still keep up with not blocking the scanners. My quick hack to add multiple scanners also showed that this will quickly move the bottleneck to the library indexing. At least this was the case with whoosh, while for the json backend there was no work to do so scanners could simple work at full speed.

@adamcik
Copy link
Member Author

adamcik commented Jul 27, 2014

I've been thinking more about this lately and the setup I imagine is still that we have the external interface be the URLs to scan queue. The GStreamer scanning bit should be reduced a bit in scope so we only produce the tags and the duration. This result gets passed to the next queue without getting converted to a track, this format should match whatever the tags changed event emits.

The results in the post-processing queue should have per URI type annotations run, an example of this would be last modified for file:// URIs. This would also have the potential for catching and converting the playlists meaning mopidy/mopidy#701 becomes obsolete.

At the end of the post processing we should either have an other queue, or just emit the resulting track/playlist/... From this point the consumers of the scanner data become relevant again and take over either returning the metadata we found for a stream lookup or adding the result to the library.

For the GStreamer bit we should consider getting rid of the manual bus handling and just leave that to the GObject loop. This way we can scale number or GStreamer scanners without throwing more Python threads at the problem (GStreamer will still have it's own internal threads though).

Note that there are however two/three current use cases. Local scanning done "offline", the planed in process local scanning and finally metadata lookup for streams. This is important to note as the turnaround time for the stream case is much tighter than the others. As such there is a fair chance we should make the queues priority queues, or have two of them with different service levels, ensuring that a running local scan doesn't block the stream metadata lookup or otherwise consume to much resources.

For this I'm also assuming that we have single scanner pipeline running as part of audio/core which local and others are allowed to use.

For the batch scanning case we don't really care about when we get our answers out, while for the stream case we do. So an other idea that just popped into my head while writing this is to have "scan sessions" each session has a priority and a way to add tracks and then also get the results out. For the stream case we simply create a session, give it the one URI, get our result and then close the session. For the batch scanning we create a session, feed it with URIs to scan as we find them (might be slow due to network etc) and process results as we get them and then when we've found the last URI we want to scan we tell the session, at which point we can join the queues. Of course this assumes a batch oriented mind set, and for in-process a streaming continual approach would be nicer IMO.

Hopefully some of this still makes sense, as this became a bit more of braindump than I had planed.

@jodal
Copy link
Member

jodal commented Jul 27, 2014

Makes sense to me :-)

@adamcik
Copy link
Member Author

adamcik commented Jan 13, 2015

The current state has also caused out of memory issues for at least one user trying to scan 100k songs over SMB. In this case it was a raspi running out of memory already at the finder stage.

@ghost
Copy link

ghost commented Sep 26, 2016

I've written for other project a scanner actor:

I use Discoverer from gst.pbutils instead of python one
(i've used code from another source)
uri.py

# -*- coding: utf-8 -*-
from __future__ import division, print_function, absolute_import


def path2uri(path):
    """
    Return a valid uri (file scheme) from absolute path name of a file
    >>> path2uri('/home/john/my_file.wav')
    'file:///home/john/my_file.wav'
    >>> path2uri('C:\Windows\my_file.wav')
    'file:///C%3A%5CWindows%5Cmy_file.wav'
    """
    import urlparse
    import urllib

    return urlparse.urljoin('file:', urllib.pathname2url(path))


def source_info(source):
    import os.path

    src_info = {'is_file': False,
                'uri': '',
                'pathname': ''}

    if os.path.exists(source):
        src_info['is_file'] = True
        # get the absolute path
        src_info['pathname'] = os.path.abspath(source)
        # and make a uri of it
        src_info['uri'] = path2uri(src_info['pathname'])
    return src_info


def get_uri(source):
    """
    Check a media source as a valid file or uri and return the proper uri
    """
    import gst

    src_info = source_info(source)

    if src_info['is_file']:  # Is this a file?
        return get_uri(src_info['uri'])

    elif gst.uri_is_valid(source):  # Is this a valid URI source for Gstreamer
        uri_protocol = gst.uri_get_protocol(source)
        if gst.uri_protocol_is_supported(gst.URI_SRC, uri_protocol):
            return source
        else:
            raise IOError('Invalid URI source for Gstreamer')
    else:
        raise IOError('Failed getting uri for path %s: no such file' % source)


def get_media_uri_info(uri, timeout=5):
    from gst.pbutils import Discoverer
    from gst import SECOND as GST_SECOND, uri_get_protocol
    from itelbase.utils.ping import get_ipaddr_async

    GST_DISCOVER_TIMEOUT = timeout * GST_SECOND
    uri_discoverer = Discoverer(GST_DISCOVER_TIMEOUT)
    info = dict()

    try:
        clean_uri = get_uri(uri)
    except IOError as e:
        return {'uri': uri, 'result': 'GST_URI_ERROR', 'error-string': str(e)}

    # check if dns is working
    if uri_get_protocol(clean_uri) != 'file':
        ipaddr = get_ipaddr_async(uri, timeout=1)
        if not ipaddr:
            return {'uri': uri, 'result': 'GST_DISCOVERER_ERROR', 'error-string': 'DNS ERROR'}

    try:
        uri_info = uri_discoverer.discover_uri(clean_uri)
    except Exception as e:
        # raise IOError(e)
        return {'uri': uri, 'result': 'GST_DISCOVERER_ERROR', 'error-string': str(e)}

    info['uri'] = uri_info.get_uri()
    info['result'] = uri_info.get_result().value_name
    # info['duration'] = uri_info.get_duration() / GST_SECOND       # Duration in seconds
    info['seekable'] = uri_info.get_seekable()
    # info['stream-info'] = uri_info.get_stream_info()
    # info['container-streams'] = uri_info.get_container_streams()
    # info['streams'] = uri_info.get_streams()
    # info['misc'] = uri_info.get_misc()
    # info['tags'] = uri_info.get_tags()
    # info['video'] = uri_info.get_video_streams()

    audio_streams = uri_info.get_audio_streams()
    info['streams'] = []
    for stream in audio_streams:
        stream_info = {
            'type': stream.get_stream_type_nick(),
            'bitrate': stream.get_bitrate(),
            'channels': stream.get_channels(),
            'depth': stream.get_depth(),
            'samplerate': stream.get_sample_rate(),
            # 'max_bitrate': stream.get_max_bitrate(),
        }
        info['streams'].append(stream_info)

    return info

and a Resolver Actor

class Resolver(pykka.ThreadingActor):
    def __init__(self, timeout, done_function):
        super(Resolver, self).__init__()
        self._done_function = done_function
        self._default_timeout = timeout

    def scan(self, element):
        def _is_good(x):
            if isinstance(x, dict) and 'result' in x and x['result'] == 'GST_DISCOVERER_OK':
                return UriStatus.GOOD
            return UriStatus.BAD

        if element is not None:
            # logger.info('Resolve element=%s', element)
            uri = element['uri']
            response = uri_mod.get_media_uri_info(uri, self._default_timeout)

            result = urifactory(index=element['index'],
                                uri=response['uri'],
                                status=_is_good(response))  # info=response)
        else:
            result = None

        # logger.info('Resolve finished result=%s', result)
        self._done_function(result)

that I feed from another actor, it try to discover an Uri and call another function on completion
(I don't know if there's a better method)

@adamcik
Copy link
Member Author

adamcik commented Sep 26, 2016

Thanks for the suggestion, we've looked into this before and at the time speed was the main reason for not using the built in one. Downside of course being having to reinvent the wheel and discovering problems already solved upstream (such a srcsinks with dynamic pads).

If this can be shown to run with acceptable speed we should probably switch.

On a side note we've also talked about splitting mopidy-local out to it's own extension instead of bundling it. This would probably also cover killing off mopidy-local-json and merging mopidy-local-sqlite into the new mopidy-local. In which case it would be up to who ever maintains that new extension to figure out what is best, and we can keep doing our own thing in core as we see fit :-)

@ghost
Copy link

ghost commented Sep 27, 2016

This resolver class could be implemented as a pool of services, if speed is crucial but for my application is faster than mopidy default and doesn't block the base class waiting for resolver to start

the only problem could be if one checker dies and do not call the done function (I need in pykka a system to launch something if thread dies....)

@kingosticks
Copy link
Member

Just to update this, if someone were to do some benchmarks comparing recent versions of GST's Discoverer to our solution then we can make a decision on this. It may have been slower in 2015/2016 (GST 0.10?) but 3 years later that may no longer be the case.

@jodal jodal transferred this issue from mopidy/mopidy Dec 21, 2019
@jodal jodal added C-enhancement Category: A PR with an enhancement or an issue with an enhancement proposal A-scan Area: Media scanning labels Dec 21, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
A-scan Area: Media scanning C-enhancement Category: A PR with an enhancement or an issue with an enhancement proposal
Projects
None yet
Development

No branches or pull requests

3 participants