Skip to content

Commit

Permalink
Parallelized and partitioned text file reading. takeSample() demo on …
Browse files Browse the repository at this point in the history
…Human Microbiome Project.
  • Loading branch information
svenkreiss committed May 26, 2015
1 parent c2a7561 commit 1cf700b
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 9 deletions.
25 changes: 17 additions & 8 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,7 @@ Examples

The example source codes are included in ``tests/readme_example*.py``.

Line counts
-----------

Count the lines in the ``*.py`` files in the ``tests`` directory and
**Line counts**: Count the lines in the ``*.py`` files in the ``tests`` directory and
count only those lines that start with ``import``:

.. code-block:: python
Expand All @@ -70,10 +67,7 @@ count only those lines that start with ``import``:
which prints ``In tests/*.py: all lines=518, with import=11``.


Common Crawl
------------

More info on the dataset is in this `blog post <http://blog.commoncrawl.org/2015/05/march-2015-crawl-archive-available/>`_.
**Common Crawl**: More info on the dataset is in this `blog post <http://blog.commoncrawl.org/2015/05/march-2015-crawl-archive-available/>`_.

.. code-block:: python
Expand All @@ -90,6 +84,21 @@ More info on the dataset is in this `blog post <http://blog.commoncrawl.org/2015
which prints a long list of paths extracted from two gzip compressed files.


**Human Microbiome Project**: Get a random line without loading the entire
dataset.

.. code-block:: python
from pysparkling import Context
by_subject_rdd = Context().textFile(
's3n://human-microbiome-project/DEMO/HM16STR/46333/by_subject/*'
)
print(by_subject_rdd.takeSample(1))
which prints out a line like ``[u'CAACGCCGCGTGAGGGATGACGGCCTTCGGGTTGTAAACCTCTTTCAGTATCGACGAAGC']``.


API
===

Expand Down
7 changes: 6 additions & 1 deletion pysparkling/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,12 @@ def textFile(self, filename, minPartitions=None, use_unicode=True):
resolved_names = File.resolve_filenames(filename)
log.info('textFile() resolved "{0}" to {1} files.'
''.format(filename, len(resolved_names)))
rdd_filenames = self.parallelize(resolved_names)

num_partitions = len(resolved_names)
if minPartitions and minPartitions > num_partitions:
num_partitions = minPartitions

rdd_filenames = self.parallelize(resolved_names, num_partitions)
rdd = rdd_filenames.flatMap(lambda f_name: [
l.rstrip('\n')
for l in File(f_name).load().read().decode('utf-8').splitlines()
Expand Down
6 changes: 6 additions & 0 deletions pysparkling/fileio/fs/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,14 @@

import boto
import fnmatch
import logging
from io import BytesIO

from ...utils import Tokenizer
from .file_system import FileSystem

log = logging.getLogger(__name__)


class S3(FileSystem):
_conn = None
Expand Down Expand Up @@ -63,9 +66,12 @@ def resolve_filenames(expr):
return files

def load(self):
log.info('Loading {0} with size {1}.'
''.format(self.key.name, self.key.size))
return BytesIO(self.key.get_contents_as_string())

def dump(self, stream):
log.info('Dumping to {0}.'.format(self.key.name))
self.key.set_contents_from_string(b''.join(stream))
return self

Expand Down
6 changes: 6 additions & 0 deletions tests/readme_example_human_microbiome.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from pysparkling import Context

by_subject_rdd = Context().textFile(
's3n://human-microbiome-project/DEMO/HM16STR/46333/by_subject/*'
)
print(by_subject_rdd.takeSample(1))

0 comments on commit 1cf700b

Please sign in to comment.