Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Usage question (or bug) #35

Open
telegraphic opened this issue Jul 21, 2016 · 2 comments
Open

Usage question (or bug) #35

telegraphic opened this issue Jul 21, 2016 · 2 comments

Comments

@telegraphic
Copy link
Collaborator

I'm trying to write a basic pipeline I can use for testing, and am getting some read failures, which I presume are due me misunderstanding how to read/write to the ring. Here's my code:

"""
# test_block.py

Test pipeline with prototype blocks
"""

import numpy as np
import threading
import json

import bifrost
from bifrost.ring import Ring


class BfPipeline(list):
    """ Simple pipeline class.

    Blocks are appended to the pipeline, which is list-like. The Pipeline class
    provides a run() function, which will start the pipeline.
    """
    def run(self):

        threads = []

        for item in self:
            threads.append(threading.Thread(target=item.main))

        for thread in threads:
            thread.daemon = True
            thread.start()
        for thread in threads:
            # wait for thread to terminate
            thread.join()


class BfBlock(object):
    """ Simple block class """
    def __init__(self, gulp_size=4096, core=-1, guarantee=True):
        self.gulp_size = gulp_size
        self.core = core
        self.guarantee = guarantee

    def _main(self):  # Launched in thread
        bifrost.affinity.set_core(self.core)


class BfSourceBlock(BfBlock):
    """ Block class for I/O sources (readers)

    A source block requires an output ring.
    """

    def __init__(self, oring, *args, **kwargs):
        super(BfSourceBlock, self).__init__(*args, **kwargs)

        self.oring = oring

    def write_sequence(self, oring, name, header, data):
        """ Write """
        header_json = json.dumps(header)
        with oring.begin_sequence(name, header=header_json) as sequence:
            self.write_to_sequence(sequence, data)

    def write_to_sequence(self, sequence, data):
        with sequence.reserve(self.gulp_size) as span:
            data = data.view('uint8').ravel()
            span.data[0][:] = data

class BfSinkBlock(BfBlock):
    """ Block class for I/O sinks (writers)

    A source block requires an input ring.
    """

    def __init__(self, iring, *args, **kwargs):
        super(BfSinkBlock, self).__init__(*args, **kwargs)

        self.iring = iring

class BfTaskBlock(BfBlock):
    """ Block class for tasks (transform the data in some way

    A source block requires an output ring AND and input ring.
    """
    def __init__(self, iring, oring, *args, **kwargs):
        super(BfTaskBlock, self).__init__(*args, **kwargs)
        self.iring = iring
        self.oring = oring



#####################
##                 ##
##  Test classes   ##
##                 ##
#####################

class NumpyTestSource(BfSourceBlock):
    """ This generates a stream of numpy arrays """

    def __init__(self, *args, **kwargs):
        super(NumpyTestSource, self).__init__(*args, **kwargs)
        self.seed = 1

    def generate_new_data(self):

        data = np.arange(1024).astype('float32') + self.seed
        self.seed += 1
        return data

    def generate_header(self):
        hdr = {'seed': self.seed}
        return hdr

    def main(self):  # Launched in thread
        self._main()
        self.oring.resize(self.gulp_size)

        with self.oring.begin_writing() as oring:
            while self.seed <= 100:
                ohdr = self.generate_header()
                data = self.generate_new_data()
                name = str(ohdr['seed'])
                self.write_sequence(oring, name=name, header=ohdr, data=data)

class NumpyTestSink(BfSinkBlock):
    """ This receives a stream of numpy arrays and verifies them """
    def main(self):
        self._main()

        self.iring.resize(self.gulp_size)
        for iseq in self.iring.read(guarantee=self.guarantee):

            header = json.loads(iseq.header.tostring())

            for ispan in iseq.read(self.gulp_size):
                data = ispan.data.view('float32')

                data_check = np.arange(1024).astype('float32') + header['seed']

                try:
                    assert np.allclose(data_check, data)
                except:
                    print "ERROR: %s" % header['seed']
                    print data
                    print data_check


if __name__ == "__main__":

    import time

    # Bring pipeline up and down several times
    n_trials = 10
    for ii in range(n_trials):

        print "Pipeline bringup %i of %i" % (ii+1, n_trials)
        ring1 = Ring()
        pipeline = BfPipeline()

        np_source = NumpyTestSource(ring1, core=1)
        np_sink   = NumpyTestSink(ring1,   core=2, guarantee=True)

        pipeline.append(np_source)
        pipeline.append(np_sink)

        pipeline.run()
        time.sleep(0.1)

When I run this, I get output like:

Pipeline bringup 1 of 10
Pipeline bringup 2 of 10
ERROR: 9
[[   13.    14.    15. ...,  1034.  1035.  1036.]]
[    9.    10.    11. ...,  1030.  1031.  1032.]
ERROR: 10
[[   14.    15.    16. ...,  1035.  1036.  1037.]]
[   10.    11.    12. ...,  1031.  1032.  1033.]
ERROR: 12
[[   16.    17.    18. ...,  1037.  1038.  1039.]]
[   12.    13.    14. ...,  1033.  1034.  1035.]
Pipeline bringup 3 of 10
ERROR: 12
[[   16.    17.    18. ...,  1037.  1038.  1039.]]
[   12.    13.    14. ...,  1033.  1034.  1035.]
ERROR: 13
[[   17.    18.    19. ...,  1038.  1039.  1040.]]
[   13.    14.    15. ...,  1034.  1035.  1036.]
Pipeline bringup 4 of 10
ERROR: 6
[[   10.    11.    12. ...,  1031.  1032.  1033.]]
[    6.     7.     8. ...,  1027.  1028.  1029.]
ERROR: 21
[[   25.    26.    27. ...,  1046.  1047.  1048.]]
[   21.    22.    23. ...,  1042.  1043.  1044.]
Pipeline bringup 5 of 10
Pipeline bringup 6 of 10
Pipeline bringup 7 of 10
Pipeline bringup 8 of 10
Pipeline bringup 9 of 10
Pipeline bringup 10 of 10
ERROR: 8
[[   12.    13.    14. ...,  1033.  1034.  1035.]]
[    8.     9.    10. ...,  1029.  1030.  1031.]
ERROR: 9
[[   13.    14.    15. ...,  1034.  1035.  1036.]]
[    9.    10.    11. ...,  1030.  1031.  1032.]
ERROR: 77
[[   81.    82.    83. ...,  1102.  1103.  1104.]]
[   77.    78.    79. ...,  1098.  1099.  1100.]
ERROR: 81
[[   85.    86.    87. ...,  1106.  1107.  1108.]]
[   81.    82.    83. ...,  1102.  1103.  1104.]

i.e. my test fails. Where am I going wrong?

@benbarsdell
Copy link
Collaborator

Interesting test, took me a bit too get my head around it :)

To confirm, it's failing on some trials but not others? Does it still
happen if you make the sleep much longer?

On 20 Jul 2016 6:44 pm, "Danny Price" notifications@github.com wrote:

I'm trying to write a basic pipeline I can use for testing, and am getting
some read failures, which I presume are due me misunderstanding how to
read/write to the ring. Here's my code:

"""# test_block.pyTest pipeline with prototype blocks"""
import numpy as npimport threadingimport json
import bifrostfrom bifrost.ring import Ring

class BfPipeline(list):
""" Simple pipeline class. Blocks are appended to the pipeline, which is list-like. The Pipeline class provides a run() function, which will start the pipeline. """
def run(self):

    threads = []

    for item in self:
        threads.append(threading.Thread(target=item.main))

    for thread in threads:
        thread.daemon = True
        thread.start()
    for thread in threads:
        # wait for thread to terminate
        thread.join()

class BfBlock(object):
""" Simple block class """
def init(self, gulp_size=4096, core=-1, guarantee=True):
self.gulp_size = gulp_size
self.core = core
self.guarantee = guarantee

def _main(self):  # Launched in thread
    bifrost.affinity.set_core(self.core)

class BfSourceBlock(BfBlock):
""" Block class for I/O sources (readers) A source block requires an output ring. """

def __init__(self, oring, *args, **kwargs):
    super(BfSourceBlock, self).__init__(*args, **kwargs)

    self.oring = oring

def write_sequence(self, oring, name, header, data):
    """ Write """
    header_json = json.dumps(header)
    with oring.begin_sequence(name, header=header_json) as sequence:
        self.write_to_sequence(sequence, data)

def write_to_sequence(self, sequence, data):
    with sequence.reserve(self.gulp_size) as span:
        data = data.view('uint8').ravel()
        span.data[0][:] = data

class BfSinkBlock(BfBlock):
""" Block class for I/O sinks (writers) A source block requires an input ring. """

def __init__(self, iring, *args, **kwargs):
    super(BfSinkBlock, self).__init__(*args, **kwargs)

    self.iring = iring

class BfTaskBlock(BfBlock):
""" Block class for tasks (transform the data in some way A source block requires an output ring AND and input ring. """
def init(self, iring, oring, _args, *_kwargs):
super(BfTaskBlock, self).init(_args, *_kwargs)
self.iring = iring
self.oring = oring

####################### #### Test classes #### #######################
class NumpyTestSource(BfSourceBlock):
""" This generates a stream of numpy arrays """

def __init__(self, *args, **kwargs):
    super(NumpyTestSource, self).__init__(*args, **kwargs)
    self.seed = 1

def generate_new_data(self):

    data = np.arange(1024).astype('float32') + self.seed
    self.seed += 1
    return data

def generate_header(self):
    hdr = {'seed': self.seed}
    return hdr

def main(self):  # Launched in thread
    self._main()
    self.oring.resize(self.gulp_size)

    with self.oring.begin_writing() as oring:
        while self.seed <= 100:
            ohdr = self.generate_header()
            data = self.generate_new_data()
            name = str(ohdr['seed'])
            self.write_sequence(oring, name=name, header=ohdr, data=data)

class NumpyTestSink(BfSinkBlock):
""" This receives a stream of numpy arrays and verifies them """
def main(self):
self._main()

    self.iring.resize(self.gulp_size)
    for iseq in self.iring.read(guarantee=self.guarantee):

        header = json.loads(iseq.header.tostring())

        for ispan in iseq.read(self.gulp_size):
            data = ispan.data.view('float32')

            data_check = np.arange(1024).astype('float32') + header['seed']

            try:
                assert np.allclose(data_check, data)
            except:
                print "ERROR: %s" % header['seed']
                print data
                print data_check

if name == "main":

import time

# Bring pipeline up and down several times
n_trials = 10
for ii in range(n_trials):

    print "Pipeline bringup %i of %i" % (ii+1, n_trials)
    ring1 = Ring()
    pipeline = BfPipeline()

    np_source = NumpyTestSource(ring1, core=1)
    np_sink   = NumpyTestSink(ring1,   core=2, guarantee=True)

    pipeline.append(np_source)
    pipeline.append(np_sink)

    pipeline.run()
    time.sleep(0.1)

When I run this, I get output like:

Pipeline bringup 1 of 10
Pipeline bringup 2 of 10
ERROR: 9
[[ 13. 14. 15. ..., 1034. 1035. 1036.]]
[ 9. 10. 11. ..., 1030. 1031. 1032.]
ERROR: 10
[[ 14. 15. 16. ..., 1035. 1036. 1037.]]
[ 10. 11. 12. ..., 1031. 1032. 1033.]
ERROR: 12
[[ 16. 17. 18. ..., 1037. 1038. 1039.]]
[ 12. 13. 14. ..., 1033. 1034. 1035.]
Pipeline bringup 3 of 10
ERROR: 12
[[ 16. 17. 18. ..., 1037. 1038. 1039.]]
[ 12. 13. 14. ..., 1033. 1034. 1035.]
ERROR: 13
[[ 17. 18. 19. ..., 1038. 1039. 1040.]]
[ 13. 14. 15. ..., 1034. 1035. 1036.]
Pipeline bringup 4 of 10
ERROR: 6
[[ 10. 11. 12. ..., 1031. 1032. 1033.]]
[ 6. 7. 8. ..., 1027. 1028. 1029.]
ERROR: 21
[[ 25. 26. 27. ..., 1046. 1047. 1048.]]
[ 21. 22. 23. ..., 1042. 1043. 1044.]
Pipeline bringup 5 of 10
Pipeline bringup 6 of 10
Pipeline bringup 7 of 10
Pipeline bringup 8 of 10
Pipeline bringup 9 of 10
Pipeline bringup 10 of 10
ERROR: 8
[[ 12. 13. 14. ..., 1033. 1034. 1035.]]
[ 8. 9. 10. ..., 1029. 1030. 1031.]
ERROR: 9
[[ 13. 14. 15. ..., 1034. 1035. 1036.]]
[ 9. 10. 11. ..., 1030. 1031. 1032.]
ERROR: 77
[[ 81. 82. 83. ..., 1102. 1103. 1104.]]
[ 77. 78. 79. ..., 1098. 1099. 1100.]
ERROR: 81
[[ 85. 86. 87. ..., 1106. 1107. 1108.]]
[ 81. 82. 83. ..., 1102. 1103. 1104.]

i.e. my test fails. Where am I going wrong?


You are receiving this because you are subscribed to this thread.
Reply to this email directly, view it on GitHub
#35, or mute the thread
https://github.com/notifications/unsubscribe-auth/ADy3WNH39TiCaiHuOm77wlUzA3VVrCjKks5qXs8FgaJpZM4JRXjN
.

@telegraphic
Copy link
Collaborator Author

Sorry if it is a little strange! I just wanted to test sending numpy arrays through a pipeline.

Yep, seems to fail some trials but not others. Sleep doesn't seem to make a difference. Setting guarantee=False on the second ring causes a crash (it's probably trying to read before the ring is initialized).

The data are "correct" (not just random bits), but the header value appears offset by 4. IIRC this is the default size of the ring, right?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants