Skip to content

Big Data ETL Pipeline for ASL-to-Text (Computer Vision), using Apache Beam on GCP Dataflow

Notifications You must be signed in to change notification settings

sacontreras/fids-capstone-asl-translation

Repository files navigation

Introduction

Welcome to my first "Big Data"(ish) project on GCP Dataflow.

This was a Deep Learning project wherein I tackled attempting to train a DNN (a Convolutional Neural Network followed by a Recurrant Neural Network, an LSTM specifically).

The datasets were extracted from more than 2600 videos produced by the research conducted by Boston and Rutgers Universities, jointly under the National Center for Sign Language and Gesture Resources project.

Each video has up to three camera perspectives synchronized from obviously three independent video cameras.

The researchers produced corpus documents in the form of XML and linked metadata therein to corresponding videos. Videos were processed with the OpenCV Python library.

After all data was extracted, this resulted in 3,314 distinct "tokens". One distinct ASL sign = one lingustic token or "word", which is the rough equivalent of an English word plus other ASL-specific linguistic aspects. There may not always be a direct translation since an ASL utterance is comprised of much more than a simple English word. I am not a linguist so I will just stop right there.

But the focus of the FIRST part of my project - captured by the source code in this repo and this summary - focuses on the "Big Data" aspect. I will start with how I end.

When I began this summary, I had no idea what I was in for. In the end, there are more than 2600 videos to process, which amount to more than 561,000 frames, assuming a frame-rate of 30 FPS, which is only half the frame-rate at which the videos were originally recorded.

After frame extraction, data needed to be related to each and every one of those 561,000+ frames, which was NOT done by the researchers. This, I had to do myself.

So this part of my project focuses on making that pipeline doable. It had to be done using cloud storage and a parallel data processing framework. I chose Apache Beam using GCP Dataflow as the runner in the end - Apache Beam since its "raison d'etre" is portability to other runners, including local and interactive (Jupyter Notebooks).

I will leave the report of my initial failures for the end.

This write-up is more concerned with my eventual success using Apache Beam on GCP Dataflow.

So, without further ado, let me dive right in!

Preliminaries

This notebook assumes that all setup steps from GCP-readme.md have been followed.

This notebook does not demonstrate the full ETL pipeline used in production (executed on GCP Dataflow).

We only investigate the first couple of major steps of the production pipeline (which is executed in Dataflow on the Google Cloud Platform) in order to demonstrate the general idea of the process. Note that the production pipeline is really a sequence of (sub) pipelines that are daisychained together in a particular order, since latter pipelines depend on former pipelines.

In this notebook, we demonstrate the first two pipelines, which accomplish the following:

1. Boostrap the video index

Substeps are:

1. Download the video index (archive)
2. Extract it.
3. Write it to the destination directory as a CSV

The video index drives the entire full-blown production pipeline.

It tells us the filenames of the target videos.


2. Download each video segment comprising final target videos. (The video index contains the URLs).

Target videos are comprised of video segments since some of the final target videos can be rather large.

Altgether, the production pipeline (executed on GCP Dataflow) retrieves more than 2600 videos produced by the research conducted by Boston and Rutgers Universities, jointly under the National Center for Sign Language and Gesture Resources project.

This notebook will demonstrate retrieving 50 of those.

The implementation of the download shall leverage Apache Beam's parallelism in order to avoid the amount of time it would take to accomplish doing it sequentially. Note that when executed locally, the Apache Beam SDK uses Docker containers for worker node clusters. A cluster in this case consists of 8 workers nodes since my local machine has 8 cores.

In production, on GCP Dataflow, this can be scheduled to your heart's content (but this, of course, costs more money to do so).


3. Use the OpenCV library to extract all frames (from each segment) for each target video.

This step leverages Apache Beam's parallelism as well.

But we MUST take care to ensure that a single worker extracts the frames of each segment associated with the target video.

This is because frames are ordered/sequenced.

Allowing two different workers to extract frames of different segments associated with the same final target video would likely result in frames being extracted out of order (due to parallelism).

Therefore, we partition the extraction task by final target video in order to ensure a single worker handles all segments associated with a single target video.

But we do want parallelism to occurr at the final target video level.

In the end, in production, the pipeline extracts more than 561,000 frames (images) from the source target videos!

Of course in this demonstration we will be extracting much less - only 50 out of the more than 2600 videos available will be downloaded and processed (frames extracted). Still, extracting from 50 videos will amount to thousands of frames.

%load_ext autoreload

%autoreload 2

from __future__ import absolute_import

import apache_beam as beam
import apache_beam.runners.interactive.interactive_beam as ib

from api import beam__common, fileio, fidscs_globals
from api.fidscs_globals import disp_source
from api import data_extractor__beam

from apache_beam.options.pipeline_options import PipelineOptions

Constants to be used in this notebook

WORK_DIR = '/tmp'
MAX_TARGET_VIDEOS = 50  # set to -1 for all in production but not in this interactive notebook! That will result in extracting more than 561,000 images from more than 2600 videos! (onto your local machine)
PIPELINE_BASE_JOB_NAME = 'sc-fids-capstone-etl-demo'

Use Apache Beam PipelineOptions for any global settings

We MUST do this since the point of Apache Beam is to enable parallelism (in processing).

How this is accomplished is beyond the scope of this notebook.

But suffice it to say that any notion of a global variable cannot be implemented in the manner one is normally implemented - e.g. with Python global variables.

However, a PipelineOptions object IS passed to each and every worker node by Apache Beam.

Therefore, we accomplish global settings to be shared by all workers - e.g. the working directory and the final destination filepaths to be output by the pipeline - by passing would-be global settings to PipelineOptions, which are required to bootstrap each worker node by Apache Beam.

Custom Apache Beam Pipeline options

The beam__common.FIDSCapstonePipelineOptions class was written to do just that and allows us to create and use our own custom options in Apache Beam pipelines.

Without it, attempting to set custom options on the Pipeline will fail since Apache Beam's PipelineOptions class will reject any options it doesn't recognize.

disp_source(beam__common.FIDSCapstonePipelineOptions)
<title></title> <style type="text/css"> /* generated by Pygments Copyright 2006-2019 by the Pygments team. Licensed under the BSD license, see LICENSE for details. */ td.linenos { background-color: #f0f0f0; padding-right: 10px; } span.lineno { background-color: #f0f0f0; padding: 0 5px 0 5px; } pre { line-height: 125%; } body .hll { background-color: #ffffcc } body { background: #f8f8f8; } body .c { color: #408080; font-style: italic } /* Comment */ body .err { border: 1px solid #FF0000 } /* Error */ body .k { color: #008000; font-weight: bold } /* Keyword */ body .o { color: #666666 } /* Operator */ body .ch { color: #408080; font-style: italic } /* Comment.Hashbang */ body .cm { color: #408080; font-style: italic } /* Comment.Multiline */ body .cp { color: #BC7A00 } /* Comment.Preproc */ body .cpf { color: #408080; font-style: italic } /* Comment.PreprocFile */ body .c1 { color: #408080; font-style: italic } /* Comment.Single */ body .cs { color: #408080; font-style: italic } /* Comment.Special */ body .gd { color: #A00000 } /* Generic.Deleted */ body .ge { font-style: italic } /* Generic.Emph */ body .gr { color: #FF0000 } /* Generic.Error */ body .gh { color: #000080; font-weight: bold } /* Generic.Heading */ body .gi { color: #00A000 } /* Generic.Inserted */ body .go { color: #888888 } /* Generic.Output */ body .gp { color: #000080; font-weight: bold } /* Generic.Prompt */ body .gs { font-weight: bold } /* Generic.Strong */ body .gu { color: #800080; font-weight: bold } /* Generic.Subheading */ body .gt { color: #0044DD } /* Generic.Traceback */ body .kc { color: #008000; font-weight: bold } /* Keyword.Constant */ body .kd { color: #008000; font-weight: bold } /* Keyword.Declaration */ body .kn { color: #008000; font-weight: bold } /* Keyword.Namespace */ body .kp { color: #008000 } /* Keyword.Pseudo */ body .kr { color: #008000; font-weight: bold } /* Keyword.Reserved */ body .kt { color: #B00040 } /* Keyword.Type */ body .m { color: #666666 } /* Literal.Number */ body .s { color: #BA2121 } /* Literal.String */ body .na { color: #7D9029 } /* Name.Attribute */ body .nb { color: #008000 } /* Name.Builtin */ body .nc { color: #0000FF; font-weight: bold } /* Name.Class */ body .no { color: #880000 } /* Name.Constant */ body .nd { color: #AA22FF } /* Name.Decorator */ body .ni { color: #999999; font-weight: bold } /* Name.Entity */ body .ne { color: #D2413A; font-weight: bold } /* Name.Exception */ body .nf { color: #0000FF } /* Name.Function */ body .nl { color: #A0A000 } /* Name.Label */ body .nn { color: #0000FF; font-weight: bold } /* Name.Namespace */ body .nt { color: #008000; font-weight: bold } /* Name.Tag */ body .nv { color: #19177C } /* Name.Variable */ body .ow { color: #AA22FF; font-weight: bold } /* Operator.Word */ body .w { color: #bbbbbb } /* Text.Whitespace */ body .mb { color: #666666 } /* Literal.Number.Bin */ body .mf { color: #666666 } /* Literal.Number.Float */ body .mh { color: #666666 } /* Literal.Number.Hex */ body .mi { color: #666666 } /* Literal.Number.Integer */ body .mo { color: #666666 } /* Literal.Number.Oct */ body .sa { color: #BA2121 } /* Literal.String.Affix */ body .sb { color: #BA2121 } /* Literal.String.Backtick */ body .sc { color: #BA2121 } /* Literal.String.Char */ body .dl { color: #BA2121 } /* Literal.String.Delimiter */ body .sd { color: #BA2121; font-style: italic } /* Literal.String.Doc */ body .s2 { color: #BA2121 } /* Literal.String.Double */ body .se { color: #BB6622; font-weight: bold } /* Literal.String.Escape */ body .sh { color: #BA2121 } /* Literal.String.Heredoc */ body .si { color: #BB6688; font-weight: bold } /* Literal.String.Interpol */ body .sx { color: #008000 } /* Literal.String.Other */ body .sr { color: #BB6688 } /* Literal.String.Regex */ body .s1 { color: #BA2121 } /* Literal.String.Single */ body .ss { color: #19177C } /* Literal.String.Symbol */ body .bp { color: #008000 } /* Name.Builtin.Pseudo */ body .fm { color: #0000FF } /* Name.Function.Magic */ body .vc { color: #19177C } /* Name.Variable.Class */ body .vg { color: #19177C } /* Name.Variable.Global */ body .vi { color: #19177C } /* Name.Variable.Instance */ body .vm { color: #19177C } /* Name.Variable.Magic */ body .il { color: #666666 } /* Literal.Number.Integer.Long */ </style>

class FIDSCapstonePipelineOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_argument(
f'--{opt_name_to_command_line_opt(fidscs_globals.OPT_NAME_MAX_TARGET_VIDEOS)}',
default=None
)
parser.add_argument(
f'--{opt_name_to_command_line_opt(fidscs_globals.OPT_NAME_WORK_DIR)}',
default=None,
)
parser.add_argument(
f'--{opt_name_to_command_line_opt(fidscs_globals.OPT_NAME_DATA_DIR)}',
default=None,
)
parser.add_argument(
f'--{opt_name_to_command_line_opt(fidscs_globals.OPT_NAME_TMP_DIR)}',
default=None,
)
parser.add_argument(
f'--{opt_name_to_command_line_opt(fidscs_globals.OPT_NAME_VIDEO_DIR)}',
default=None,
)
parser.add_argument(
f'--{opt_name_to_command_line_opt(fidscs_globals.OPT_NAME_STITCHED_VIDEO_FRAMES_DIR)}',
default=None,
)
parser.add_argument(
f'--{opt_name_to_command_line_opt(fidscs_globals.OPT_NAME_CORPUS_DIR)}',
default=None,
)
parser.add_argument(
f'--{opt_name_to_command_line_opt(fidscs_globals.OPT_NAME_CORPUS_DS_PATH)}',
default=None,
)
parser.add_argument(
f'--{opt_name_to_command_line_opt(fidscs_globals.OPT_NAME_DOCUMENT_ASL_CONSULTANT_DS_PATH)}',
default=None,
)
parser.add_argument(
f'--{opt_name_to_command_line_opt(fidscs_globals.OPT_NAME_ASL_CONSULTANT_DS_PATH)}',
default=None,
)
parser.add_argument(
f'--{opt_name_to_command_line_opt(fidscs_globals.OPT_NAME_VIDEO_INDEXES_DIR)}',
default=None,
)
parser.add_argument(
f'--{opt_name_to_command_line_opt(fidscs_globals.OPT_NAME_SELECTED_VIDEO_INDEX_PATH)}',
default=None,
)
parser.add_argument(
f'--{opt_name_to_command_line_opt(fidscs_globals.OPT_NAME_VIDEO_DS_PATH)}',
default=None,
)
parser.add_argument(
f'--{opt_name_to_command_line_opt(fidscs_globals.OPT_NAME_VIDEO_SEGMENT_DS_PATH)}',
default=None,
)
parser.add_argument(
f'--{opt_name_to_command_line_opt(fidscs_globals.OPT_NAME_VIDEO_FRAME_DS_PATH)}',
default=None,
)
parser.add_argument(
f'--{opt_name_to_command_line_opt(fidscs_globals.OPT_NAME_UTTERANCE_DS_PATH)}',
default=None,
)
parser.add_argument(
f'--{opt_name_to_command_line_opt(fidscs_globals.OPT_NAME_UTTERANCE_VIDEO_DS_PATH)}',
default=None,
)
parser.add_argument(
f'--{opt_name_to_command_line_opt(fidscs_globals.OPT_NAME_UTTERANCE_TOKEN_DS_PATH)}',
default=None,
)
parser.add_argument(
f'--{opt_name_to_command_line_opt(fidscs_globals.OPT_NAME_UTTERANCE_TOKEN_FRAME_DS_PATH)}',
default=None,
)
parser.add_argument(
f'--{opt_name_to_command_line_opt(fidscs_globals.OPT_NAME_VOCABULARY_DS_PATH)}',
default=None,
)

PipelineOptions Initialization

For this notebook, we execute locally (vs. GCP Dataflow) - that is, we use the Apache Beam's DirectRunner. Actually, we use a variant of - the InteractiveRunner - geared specifically for running in notebooks. But it is still run locally. Some PipelineOptions options differ (or are not needed), relative to the DataflowRunner.

To see the full implementation on how this differs from using the Dataflow runner, start by inspecting run_cloud__etl.py and follow the code.

Initializing the dict upon which PipelineOptions are based has been wrapped up within the beam__common.make_fids_options_dict function.

disp_source(beam__common.make_fids_options_dict)
<title></title> <style type="text/css"> /* generated by Pygments Copyright 2006-2019 by the Pygments team. Licensed under the BSD license, see LICENSE for details. */ td.linenos { background-color: #f0f0f0; padding-right: 10px; } span.lineno { background-color: #f0f0f0; padding: 0 5px 0 5px; } pre { line-height: 125%; } body .hll { background-color: #ffffcc } body { background: #f8f8f8; } body .c { color: #408080; font-style: italic } /* Comment */ body .err { border: 1px solid #FF0000 } /* Error */ body .k { color: #008000; font-weight: bold } /* Keyword */ body .o { color: #666666 } /* Operator */ body .ch { color: #408080; font-style: italic } /* Comment.Hashbang */ body .cm { color: #408080; font-style: italic } /* Comment.Multiline */ body .cp { color: #BC7A00 } /* Comment.Preproc */ body .cpf { color: #408080; font-style: italic } /* Comment.PreprocFile */ body .c1 { color: #408080; font-style: italic } /* Comment.Single */ body .cs { color: #408080; font-style: italic } /* Comment.Special */ body .gd { color: #A00000 } /* Generic.Deleted */ body .ge { font-style: italic } /* Generic.Emph */ body .gr { color: #FF0000 } /* Generic.Error */ body .gh { color: #000080; font-weight: bold } /* Generic.Heading */ body .gi { color: #00A000 } /* Generic.Inserted */ body .go { color: #888888 } /* Generic.Output */ body .gp { color: #000080; font-weight: bold } /* Generic.Prompt */ body .gs { font-weight: bold } /* Generic.Strong */ body .gu { color: #800080; font-weight: bold } /* Generic.Subheading */ body .gt { color: #0044DD } /* Generic.Traceback */ body .kc { color: #008000; font-weight: bold } /* Keyword.Constant */ body .kd { color: #008000; font-weight: bold } /* Keyword.Declaration */ body .kn { color: #008000; font-weight: bold } /* Keyword.Namespace */ body .kp { color: #008000 } /* Keyword.Pseudo */ body .kr { color: #008000; font-weight: bold } /* Keyword.Reserved */ body .kt { color: #B00040 } /* Keyword.Type */ body .m { color: #666666 } /* Literal.Number */ body .s { color: #BA2121 } /* Literal.String */ body .na { color: #7D9029 } /* Name.Attribute */ body .nb { color: #008000 } /* Name.Builtin */ body .nc { color: #0000FF; font-weight: bold } /* Name.Class */ body .no { color: #880000 } /* Name.Constant */ body .nd { color: #AA22FF } /* Name.Decorator */ body .ni { color: #999999; font-weight: bold } /* Name.Entity */ body .ne { color: #D2413A; font-weight: bold } /* Name.Exception */ body .nf { color: #0000FF } /* Name.Function */ body .nl { color: #A0A000 } /* Name.Label */ body .nn { color: #0000FF; font-weight: bold } /* Name.Namespace */ body .nt { color: #008000; font-weight: bold } /* Name.Tag */ body .nv { color: #19177C } /* Name.Variable */ body .ow { color: #AA22FF; font-weight: bold } /* Operator.Word */ body .w { color: #bbbbbb } /* Text.Whitespace */ body .mb { color: #666666 } /* Literal.Number.Bin */ body .mf { color: #666666 } /* Literal.Number.Float */ body .mh { color: #666666 } /* Literal.Number.Hex */ body .mi { color: #666666 } /* Literal.Number.Integer */ body .mo { color: #666666 } /* Literal.Number.Oct */ body .sa { color: #BA2121 } /* Literal.String.Affix */ body .sb { color: #BA2121 } /* Literal.String.Backtick */ body .sc { color: #BA2121 } /* Literal.String.Char */ body .dl { color: #BA2121 } /* Literal.String.Delimiter */ body .sd { color: #BA2121; font-style: italic } /* Literal.String.Doc */ body .s2 { color: #BA2121 } /* Literal.String.Double */ body .se { color: #BB6622; font-weight: bold } /* Literal.String.Escape */ body .sh { color: #BA2121 } /* Literal.String.Heredoc */ body .si { color: #BB6688; font-weight: bold } /* Literal.String.Interpol */ body .sx { color: #008000 } /* Literal.String.Other */ body .sr { color: #BB6688 } /* Literal.String.Regex */ body .s1 { color: #BA2121 } /* Literal.String.Single */ body .ss { color: #19177C } /* Literal.String.Symbol */ body .bp { color: #008000 } /* Name.Builtin.Pseudo */ body .fm { color: #0000FF } /* Name.Function.Magic */ body .vc { color: #19177C } /* Name.Variable.Class */ body .vg { color: #19177C } /* Name.Variable.Global */ body .vi { color: #19177C } /* Name.Variable.Instance */ body .vm { color: #19177C } /* Name.Variable.Magic */ body .il { color: #666666 } /* Literal.Number.Integer.Long */ </style>

def make_fids_options_dict(work_dir, max_target_videos=-1, beam_gcp_project=fidscs_globals.GCP_PROJECT):
data_dir = fileio.path_join(work_dir, fidscs_globals.DATA_DIR_NAME)
tmp_dir = fileio.path_join(data_dir, fidscs_globals.TMP_DIR_NAME)
videos_dir = fileio.path_join(data_dir, fidscs_globals.VIDEO_DIR_NAME)
stitched_video_frames_dir = fileio.path_join(data_dir, fidscs_globals.STICHED_VIDEO_FRAMES_DIR_NAME)
corpus_dir = fileio.path_join(tmp_dir, fidscs_globals.CORPUS_BASE)
corpus_ds_path = fileio.path_join(data_dir, fidscs_globals.CORPUS_DS_FNAME)
document_asl_cconsultant_ds_path = fileio.path_join(data_dir, fidscs_globals.DOCUMENT_ASL_CONSULTANT_DS_FNAME)
asl_consultant_ds_path = fileio.path_join(data_dir, fidscs_globals.ASL_CONSULTANT_DS_FNAME)
video_indexes_dir = fileio.path_join(tmp_dir, fidscs_globals.VIDEO_INDEX_BASE)
selected_video_index_path = fileio.path_join(video_indexes_dir, fidscs_globals.SELECTED_VIDEO_INDEX)
video_ds_path = fileio.path_join(data_dir, fidscs_globals.VIDEO_DS_FNAME)
video_segment_ds_path = fileio.path_join(data_dir, fidscs_globals.VIDEO_SEGMENT_DS_FNAME)
video_frame_ds_path = fileio.path_join(data_dir, fidscs_globals.VIDEO_FRAME_DS_FNAME)
utterance_ds_path = fileio.path_join(data_dir, fidscs_globals.UTTERANCE_DS_FNAME)
utterance_video_ds_path = fileio.path_join(data_dir, fidscs_globals.UTTERANCE_VIDEO_DS_FNAME)
utterance_token_ds_path = fileio.path_join(data_dir, fidscs_globals.UTTERANCE_TOKEN_DS_FNAME)
utterance_token_frame_ds_path = fileio.path_join(data_dir, fidscs_globals.UTTERANCE_TOKEN_FRAME_DS_FNAME)
vocabulary_ds_path = fileio.path_join(data_dir, fidscs_globals.VOCABULARY_DS_FNAME)
return {
fidscs_globals.OPT_NAME_PROJECT: beam_gcp_project,
fidscs_globals.OPT_NAME_MAX_TARGET_VIDEOS: max_target_videos,
fidscs_globals.OPT_NAME_WORK_DIR: work_dir,
fidscs_globals.OPT_NAME_DATA_DIR: data_dir,
fidscs_globals.OPT_NAME_TMP_DIR: tmp_dir,
fidscs_globals.OPT_NAME_VIDEO_DIR: videos_dir,
fidscs_globals.OPT_NAME_STITCHED_VIDEO_FRAMES_DIR: stitched_video_frames_dir,
fidscs_globals.OPT_NAME_CORPUS_DIR: corpus_dir,
fidscs_globals.OPT_NAME_CORPUS_DS_PATH: corpus_ds_path,
fidscs_globals.OPT_NAME_DOCUMENT_ASL_CONSULTANT_DS_PATH: document_asl_cconsultant_ds_path,
fidscs_globals.OPT_NAME_ASL_CONSULTANT_DS_PATH: asl_consultant_ds_path,
fidscs_globals.OPT_NAME_VIDEO_INDEXES_DIR: video_indexes_dir,
fidscs_globals.OPT_NAME_SELECTED_VIDEO_INDEX_PATH: selected_video_index_path,
fidscs_globals.OPT_NAME_VIDEO_DS_PATH: video_ds_path,
fidscs_globals.OPT_NAME_VIDEO_SEGMENT_DS_PATH: video_segment_ds_path,
fidscs_globals.OPT_NAME_VIDEO_FRAME_DS_PATH: video_frame_ds_path,
fidscs_globals.OPT_NAME_UTTERANCE_DS_PATH: utterance_ds_path,
fidscs_globals.OPT_NAME_UTTERANCE_VIDEO_DS_PATH: utterance_video_ds_path,
fidscs_globals.OPT_NAME_UTTERANCE_TOKEN_DS_PATH: utterance_token_ds_path,
fidscs_globals.OPT_NAME_UTTERANCE_TOKEN_FRAME_DS_PATH: utterance_token_frame_ds_path,
fidscs_globals.OPT_NAME_VOCABULARY_DS_PATH: vocabulary_ds_path
}

PipelineOptions geared for the InteractiveRunner

Note that InteractiveRunner is a variant of the DirectRunner that allows us to run an Apache Beam pipeline with a Jupyter Notebook.

Documentation for InteractiveRunner can be found here.

First, it must be reiterated that we must use this runner in order to collect (reduce) data kept in Apache Beam Pcollections for conversion to Pandas DataFrames for display within this notebook. Apache Beam Pcollections can generally and roughly be thought of as Resilient Distributed Datasets. The documentation for Apache Beam Pcollections can be found in the Apache Beam Programming Guide located here.

But Pcollections are the basis for all processing within an Apache Beam pipeline.

Also note that the InteractiveRunner is not really meant to be used for enterprise (read: "Big Data") pipelines.

The runner used for production in this project is the DataFlow Google Cloud Platform runner.

The reader is reminded that the point of this notebook, however, is to present a demonstration of only a subset of the full Apache Beam pipeline (used in this project).

options = {
    # 'runner': 'DirectRunner',
    'runner': 'InteractiveRunner',
    'environment_type': 'DOCKER',
    'direct_num_workers': 0, # 0 is use all available cores
    'direct_running_mode': 'multi_threading', # ['in_memory', 'multi_threading', 'multi_processing']
    'streaming': False # set to True if data source is unbounded (e.g. GCP PubSub),
}

options.update(beam__common.make_fids_options_dict(WORK_DIR, max_target_videos=MAX_TARGET_VIDEOS))

Finally, instantiate the PipelineOptions (using the above options dict)

job_suffix = 'boostrap-vid-index'
job_name = f"{PIPELINE_BASE_JOB_NAME}--{job_suffix}"
options.update({
    'job_name': job_name
})

pipeline_options = PipelineOptions(flags=[], **options) # easier to pass in options from command-line this way
print(f"PipelineOptions:\n{pipeline_options.get_all_options()}\n")
PipelineOptions:
{'runner': 'InteractiveRunner', 'streaming': False, 'beam_services': {}, 'type_check_strictness': 'DEFAULT_TO_ANY', 'type_check_additional': '', 'pipeline_type_check': True, 'runtime_type_check': False, 'performance_runtime_type_check': False, 'direct_runner_use_stacked_bundle': True, 'direct_runner_bundle_repeat': 0, 'direct_num_workers': 0, 'direct_running_mode': 'multi_threading', 'dataflow_endpoint': 'https://dataflow.googleapis.com', 'project': 'sc-fids-capstone', 'job_name': 'sc-fids-capstone-etl-demo--boostrap-vid-index', 'staging_location': None, 'temp_location': None, 'region': None, 'service_account_email': None, 'no_auth': False, 'template_location': None, 'labels': None, 'update': False, 'transform_name_mapping': None, 'enable_streaming_engine': False, 'dataflow_kms_key': None, 'flexrs_goal': None, 'hdfs_host': None, 'hdfs_port': None, 'hdfs_user': None, 'hdfs_full_urls': False, 'num_workers': None, 'max_num_workers': None, 'autoscaling_algorithm': None, 'machine_type': None, 'disk_size_gb': None, 'disk_type': None, 'worker_region': None, 'worker_zone': None, 'zone': None, 'network': None, 'subnetwork': None, 'worker_harness_container_image': None, 'sdk_harness_container_image_overrides': None, 'use_public_ips': None, 'min_cpu_platform': None, 'dataflow_worker_jar': None, 'dataflow_job_file': None, 'experiments': None, 'number_of_worker_harness_threads': None, 'profile_cpu': False, 'profile_memory': False, 'profile_location': None, 'profile_sample_rate': 1.0, 'requirements_file': None, 'requirements_cache': None, 'setup_file': None, 'beam_plugins': None, 'save_main_session': False, 'sdk_location': 'default', 'extra_packages': None, 'prebuild_sdk_container_engine': None, 'prebuild_sdk_container_base_image': None, 'docker_registry_push_url': None, 'job_endpoint': None, 'artifact_endpoint': None, 'job_server_timeout': 60, 'environment_type': 'DOCKER', 'environment_config': None, 'environment_options': None, 'sdk_worker_parallelism': 1, 'environment_cache_millis': 0, 'output_executable_path': None, 'artifacts_dir': None, 'job_port': 0, 'artifact_port': 0, 'expansion_port': 0, 'flink_master': '[auto]', 'flink_version': '1.10', 'flink_job_server_jar': None, 'flink_submit_uber_jar': False, 'spark_master_url': 'local[4]', 'spark_job_server_jar': None, 'spark_submit_uber_jar': False, 'spark_rest_url': None, 'on_success_matcher': None, 'dry_run': False, 'wait_until_finish_duration': None, 'pubsubRootUrl': None, 's3_access_key_id': None, 's3_secret_access_key': None, 's3_session_token': None, 's3_endpoint_url': None, 's3_region_name': None, 's3_api_version': None, 's3_verify': None, 's3_disable_ssl': False, 'fidscs_capstone_max_target_videos': 50, 'fidscs_capstone_work_dir': '/tmp', 'fidscs_capstone_data_dir': '/tmp/data', 'fidscs_capstone_tmp_dir': '/tmp/data/tmp', 'fidscs_capstone_videos_dir': '/tmp/data/videos', 'fidscs_capstone_stitched_video_frames_dir': '/tmp/data/stitched_video_frames', 'fidscs_capstone_corpus_dir': '/tmp/data/tmp/ncslgr-xml', 'fidscs_capstone_corpus_ds_path': '/tmp/data/ncslgr-corpus-index.csv', 'fidscs_capstone_document_asl_cconsultant_ds_path': '/tmp/data/document-consultant-index.csv', 'fidscs_capstone_asl_consultant_ds_path': '/tmp/data/consultant-index.csv', 'fidscs_capstone_video_indexes_dir': '/tmp/data/tmp/video_index-20120129', 'fidscs_capstone_selected_video_index_path': '/tmp/data/tmp/video_index-20120129/files_by_video_name.csv', 'fidscs_capstone_video_ds_path': '/tmp/data/document-consultant-targetvideo-index.csv', 'fidscs_capstone_video_segment_ds_path': '/tmp/data/document-consultant-targetvideo-segment-index.csv', 'fidscs_capstone_video_frame_ds_path': '/tmp/data/document-consultant-targetvideo-frame-index.csv', 'fidscs_capstone_utterance_ds_path': '/tmp/data/document-consultant-utterance-index.csv', 'fidscs_capstone_utterance_video_ds_path': '/tmp/data/document-consultant-utterance-targetvideo-index.csv', 'fidscs_capstone_utterance_token_ds_path': '/tmp/data/document-consultant-utterance-token-index.csv', 'fidscs_capstone_utterance_token_frame_ds_path': '/tmp/data/document-consultant-targetvideo-utterance-token-frame-index.csv', 'fidscs_capstone_vocabulary_ds_path': '/tmp/data/vocabulary-index.csv'}

But before running the pipeline, create necessary filestructure within WORK_DIR

if not fileio.dir_path_exists(options[fidscs_globals.OPT_NAME_DATA_DIR], options)[0]:
    fileio.make_dirs(options[fidscs_globals.OPT_NAME_DATA_DIR], options)

if not fileio.dir_path_exists(options[fidscs_globals.OPT_NAME_TMP_DIR], options)[0]:
    fileio.make_dirs(options[fidscs_globals.OPT_NAME_TMP_DIR], options)

if not beam__common.dataset_csv_files_exist(options):
    if not fileio.dir_path_exists(options[fidscs_globals.OPT_NAME_VIDEO_DIR], options)[0]:
        fileio.make_dirs(options[fidscs_globals.OPT_NAME_VIDEO_DIR], options)

if not fileio.dir_path_exists(options[fidscs_globals.OPT_NAME_STITCHED_VIDEO_FRAMES_DIR], options)[0]:
    fileio.make_dirs(options[fidscs_globals.OPT_NAME_STITCHED_VIDEO_FRAMES_DIR], options)

We are now ready to execute the pipeline. But before doing so, let's discuss how it works.


There are two top-level functions used by the "boostrap-vid-index" pipeline, in this order:

  1. data_extractor__beam.pl__1__bootstrap_target_video_index
  2. data_extractor__beam.pl__2__write_target_vid_index_csv


Let's examine the source code for data_extractor__beam.pl__1__bootstrap_target_video_index...

The following python source code illustrates the programming paradigm used in all Apache Beam (stands for Batch and Stream processing) pipelines.

disp_source(data_extractor__beam.pl__1__bootstrap_target_video_index)
<title></title> <style type="text/css"> /* generated by Pygments Copyright 2006-2019 by the Pygments team. Licensed under the BSD license, see LICENSE for details. */ td.linenos { background-color: #f0f0f0; padding-right: 10px; } span.lineno { background-color: #f0f0f0; padding: 0 5px 0 5px; } pre { line-height: 125%; } body .hll { background-color: #ffffcc } body { background: #f8f8f8; } body .c { color: #408080; font-style: italic } /* Comment */ body .err { border: 1px solid #FF0000 } /* Error */ body .k { color: #008000; font-weight: bold } /* Keyword */ body .o { color: #666666 } /* Operator */ body .ch { color: #408080; font-style: italic } /* Comment.Hashbang */ body .cm { color: #408080; font-style: italic } /* Comment.Multiline */ body .cp { color: #BC7A00 } /* Comment.Preproc */ body .cpf { color: #408080; font-style: italic } /* Comment.PreprocFile */ body .c1 { color: #408080; font-style: italic } /* Comment.Single */ body .cs { color: #408080; font-style: italic } /* Comment.Special */ body .gd { color: #A00000 } /* Generic.Deleted */ body .ge { font-style: italic } /* Generic.Emph */ body .gr { color: #FF0000 } /* Generic.Error */ body .gh { color: #000080; font-weight: bold } /* Generic.Heading */ body .gi { color: #00A000 } /* Generic.Inserted */ body .go { color: #888888 } /* Generic.Output */ body .gp { color: #000080; font-weight: bold } /* Generic.Prompt */ body .gs { font-weight: bold } /* Generic.Strong */ body .gu { color: #800080; font-weight: bold } /* Generic.Subheading */ body .gt { color: #0044DD } /* Generic.Traceback */ body .kc { color: #008000; font-weight: bold } /* Keyword.Constant */ body .kd { color: #008000; font-weight: bold } /* Keyword.Declaration */ body .kn { color: #008000; font-weight: bold } /* Keyword.Namespace */ body .kp { color: #008000 } /* Keyword.Pseudo */ body .kr { color: #008000; font-weight: bold } /* Keyword.Reserved */ body .kt { color: #B00040 } /* Keyword.Type */ body .m { color: #666666 } /* Literal.Number */ body .s { color: #BA2121 } /* Literal.String */ body .na { color: #7D9029 } /* Name.Attribute */ body .nb { color: #008000 } /* Name.Builtin */ body .nc { color: #0000FF; font-weight: bold } /* Name.Class */ body .no { color: #880000 } /* Name.Constant */ body .nd { color: #AA22FF } /* Name.Decorator */ body .ni { color: #999999; font-weight: bold } /* Name.Entity */ body .ne { color: #D2413A; font-weight: bold } /* Name.Exception */ body .nf { color: #0000FF } /* Name.Function */ body .nl { color: #A0A000 } /* Name.Label */ body .nn { color: #0000FF; font-weight: bold } /* Name.Namespace */ body .nt { color: #008000; font-weight: bold } /* Name.Tag */ body .nv { color: #19177C } /* Name.Variable */ body .ow { color: #AA22FF; font-weight: bold } /* Operator.Word */ body .w { color: #bbbbbb } /* Text.Whitespace */ body .mb { color: #666666 } /* Literal.Number.Bin */ body .mf { color: #666666 } /* Literal.Number.Float */ body .mh { color: #666666 } /* Literal.Number.Hex */ body .mi { color: #666666 } /* Literal.Number.Integer */ body .mo { color: #666666 } /* Literal.Number.Oct */ body .sa { color: #BA2121 } /* Literal.String.Affix */ body .sb { color: #BA2121 } /* Literal.String.Backtick */ body .sc { color: #BA2121 } /* Literal.String.Char */ body .dl { color: #BA2121 } /* Literal.String.Delimiter */ body .sd { color: #BA2121; font-style: italic } /* Literal.String.Doc */ body .s2 { color: #BA2121 } /* Literal.String.Double */ body .se { color: #BB6622; font-weight: bold } /* Literal.String.Escape */ body .sh { color: #BA2121 } /* Literal.String.Heredoc */ body .si { color: #BB6688; font-weight: bold } /* Literal.String.Interpol */ body .sx { color: #008000 } /* Literal.String.Other */ body .sr { color: #BB6688 } /* Literal.String.Regex */ body .s1 { color: #BA2121 } /* Literal.String.Single */ body .ss { color: #19177C } /* Literal.String.Symbol */ body .bp { color: #008000 } /* Name.Builtin.Pseudo */ body .fm { color: #0000FF } /* Name.Function.Magic */ body .vc { color: #19177C } /* Name.Variable.Class */ body .vg { color: #19177C } /* Name.Variable.Global */ body .vi { color: #19177C } /* Name.Variable.Instance */ body .vm { color: #19177C } /* Name.Variable.Magic */ body .il { color: #666666 } /* Literal.Number.Integer.Long */ </style>

def pl__1__bootstrap_target_video_index(pl):
if not fileio.file_path_exists(pl._options._all_options[fidscs_globals.OPT_NAME_SELECTED_VIDEO_INDEX_PATH], pl._options._all_options)[0]:
sel_vid_index_path = (
pl
| "Beam PL: create initial pcoll containing information for boostrap_target_video_index" >> beam.Create(
[ # one row containing dict of:
# 1. url of video indexes archive
# 2. local destination (path) for the downloaded archive
# 3. local destination (path) which will receive the extracted archive csv files (there are more than one)
# 4. final path to the selected videx index csv
# (note that the dict is not laid out in the above order)
{
'vid_indexes_dir': pl._options._all_options[fidscs_globals.OPT_NAME_VIDEO_INDEXES_DIR],
'sel_vid_index_path': pl._options._all_options[fidscs_globals.OPT_NAME_SELECTED_VIDEO_INDEX_PATH],
'video_indexes_archive': fidscs_globals.VIDEO_INDEXES_ARCHIVE,
'tmp_dir': pl._options._all_options[fidscs_globals.OPT_NAME_TMP_DIR],
'video_ds_path': pl._options._all_options[fidscs_globals.OPT_NAME_VIDEO_DS_PATH]
}
]
)
# | "Beam PL: bootstrap target video index" >> beam.Map(boostrap_target_video_index) # boostrap_target_video_index outputs SELECTED_VIDEO_INDEX_PATH but beam.Map() wraps this in a pcoll and is fed to...
| "Beam PL: bootstrap target video index" >> beam.ParDo(TargetVideoIndexBootstrapper(pl._options._all_options)) # boostrap_target_video_index outputs SELECTED_VIDEO_INDEX_PATH but beam.Map() wraps this in a pcoll and is fed to...
)
else:
sel_vid_index_path = (
pl
| "Beam PL: create initial pcoll containing path to existing sel_vid_index" >> beam.Create([pl._options._all_options[fidscs_globals.OPT_NAME_SELECTED_VIDEO_INDEX_PATH]])
| "Beam PL: print path to existing sel_vid_index" >> beam.ParDo(beam__common.PipelinePcollPrinter(msg="FOUND EXISTING SEL VID INDEX"))
)
full_target_vid_index_schemad_pcoll = (
sel_vid_index_path
| "Beam PL: read video index into pcoll" >> beam.FlatMap(beam__common.load_vid_index_csv)
| "Beam PL: apply schema to video index pcoll" >> beam.Map(
lambda x: beam.Row(
target_video_filename=str(urllib.parse.quote(x[fidscs_globals.SCHEMA_COL_NAMES__VIDEO_INDEX[0]])),
video_seq_id=int(x[fidscs_globals.SCHEMA_COL_NAMES__VIDEO_INDEX[1]]),
perspective_cam_id=int(x[fidscs_globals.SCHEMA_COL_NAMES__VIDEO_INDEX[2]]),
compressed_mov_url=str(x[fidscs_globals.SCHEMA_COL_NAMES__VIDEO_INDEX[3]]),
uncompressed_avi_url=str(x[fidscs_globals.SCHEMA_COL_NAMES__VIDEO_INDEX[4]]),
uncompressed_avi_mirror_1_url=str(x[fidscs_globals.SCHEMA_COL_NAMES__VIDEO_INDEX[5]]),
uncompressed_avi_mirror_2_url=str(x[fidscs_globals.SCHEMA_COL_NAMES__VIDEO_INDEX[6]])
)
)
)
return full_target_vid_index_schemad_pcoll

The gist of data_extractor__beam.pl__1__bootstrap_target_video_index is that it will ensure that the video index exists locally before any other dependent pipeline can execute.

If it doesn't, it will download/extract the contents of the video index archive from http://www.bu.edu/asllrp/ncslgr-for-download/video_index-20120129.zip.

The first noteable point to make is that it uses the custom class data_extractor__beam.TargetVideoIndexBootstrapper, which inherits beam__common.PipelinePcollElementProcessor, which in turn inherits from from Apache Beam's DoFn class. Inheriting from Apache Beam's DoFn allows the inherited class to be used in Apache Beam pipelines via beam.ParDo (which stands for "Parallel Do"). Full documentation can be found here.

There is nothing particularly noteworthy about the internal implementation of data_extractor__beam.TargetVideoIndexBootstrapper. It simply downloads the video index archive (to a memfile) and extracts its contents (in-memory). Please see its implementation for details if you are interested.

Source code for data_extractor__beam.pl__2__write_target_vid_index_csv is listed below. It simply writes the bytes extracted from the archive to destintation path <WORK_DIR>/data/video_index-20120129.csv (using an Apache Beam schema so that column names can easily be referenced/manipulated later).

disp_source(data_extractor__beam.pl__2__write_target_vid_index_csv)
<title></title> <style type="text/css"> /* generated by Pygments Copyright 2006-2019 by the Pygments team. Licensed under the BSD license, see LICENSE for details. */ td.linenos { background-color: #f0f0f0; padding-right: 10px; } span.lineno { background-color: #f0f0f0; padding: 0 5px 0 5px; } pre { line-height: 125%; } body .hll { background-color: #ffffcc } body { background: #f8f8f8; } body .c { color: #408080; font-style: italic } /* Comment */ body .err { border: 1px solid #FF0000 } /* Error */ body .k { color: #008000; font-weight: bold } /* Keyword */ body .o { color: #666666 } /* Operator */ body .ch { color: #408080; font-style: italic } /* Comment.Hashbang */ body .cm { color: #408080; font-style: italic } /* Comment.Multiline */ body .cp { color: #BC7A00 } /* Comment.Preproc */ body .cpf { color: #408080; font-style: italic } /* Comment.PreprocFile */ body .c1 { color: #408080; font-style: italic } /* Comment.Single */ body .cs { color: #408080; font-style: italic } /* Comment.Special */ body .gd { color: #A00000 } /* Generic.Deleted */ body .ge { font-style: italic } /* Generic.Emph */ body .gr { color: #FF0000 } /* Generic.Error */ body .gh { color: #000080; font-weight: bold } /* Generic.Heading */ body .gi { color: #00A000 } /* Generic.Inserted */ body .go { color: #888888 } /* Generic.Output */ body .gp { color: #000080; font-weight: bold } /* Generic.Prompt */ body .gs { font-weight: bold } /* Generic.Strong */ body .gu { color: #800080; font-weight: bold } /* Generic.Subheading */ body .gt { color: #0044DD } /* Generic.Traceback */ body .kc { color: #008000; font-weight: bold } /* Keyword.Constant */ body .kd { color: #008000; font-weight: bold } /* Keyword.Declaration */ body .kn { color: #008000; font-weight: bold } /* Keyword.Namespace */ body .kp { color: #008000 } /* Keyword.Pseudo */ body .kr { color: #008000; font-weight: bold } /* Keyword.Reserved */ body .kt { color: #B00040 } /* Keyword.Type */ body .m { color: #666666 } /* Literal.Number */ body .s { color: #BA2121 } /* Literal.String */ body .na { color: #7D9029 } /* Name.Attribute */ body .nb { color: #008000 } /* Name.Builtin */ body .nc { color: #0000FF; font-weight: bold } /* Name.Class */ body .no { color: #880000 } /* Name.Constant */ body .nd { color: #AA22FF } /* Name.Decorator */ body .ni { color: #999999; font-weight: bold } /* Name.Entity */ body .ne { color: #D2413A; font-weight: bold } /* Name.Exception */ body .nf { color: #0000FF } /* Name.Function */ body .nl { color: #A0A000 } /* Name.Label */ body .nn { color: #0000FF; font-weight: bold } /* Name.Namespace */ body .nt { color: #008000; font-weight: bold } /* Name.Tag */ body .nv { color: #19177C } /* Name.Variable */ body .ow { color: #AA22FF; font-weight: bold } /* Operator.Word */ body .w { color: #bbbbbb } /* Text.Whitespace */ body .mb { color: #666666 } /* Literal.Number.Bin */ body .mf { color: #666666 } /* Literal.Number.Float */ body .mh { color: #666666 } /* Literal.Number.Hex */ body .mi { color: #666666 } /* Literal.Number.Integer */ body .mo { color: #666666 } /* Literal.Number.Oct */ body .sa { color: #BA2121 } /* Literal.String.Affix */ body .sb { color: #BA2121 } /* Literal.String.Backtick */ body .sc { color: #BA2121 } /* Literal.String.Char */ body .dl { color: #BA2121 } /* Literal.String.Delimiter */ body .sd { color: #BA2121; font-style: italic } /* Literal.String.Doc */ body .s2 { color: #BA2121 } /* Literal.String.Double */ body .se { color: #BB6622; font-weight: bold } /* Literal.String.Escape */ body .sh { color: #BA2121 } /* Literal.String.Heredoc */ body .si { color: #BB6688; font-weight: bold } /* Literal.String.Interpol */ body .sx { color: #008000 } /* Literal.String.Other */ body .sr { color: #BB6688 } /* Literal.String.Regex */ body .s1 { color: #BA2121 } /* Literal.String.Single */ body .ss { color: #19177C } /* Literal.String.Symbol */ body .bp { color: #008000 } /* Name.Builtin.Pseudo */ body .fm { color: #0000FF } /* Name.Function.Magic */ body .vc { color: #19177C } /* Name.Variable.Class */ body .vg { color: #19177C } /* Name.Variable.Global */ body .vi { color: #19177C } /* Name.Variable.Instance */ body .vm { color: #19177C } /* Name.Variable.Magic */ body .il { color: #666666 } /* Literal.Number.Integer.Long */ </style>

def pl__2__write_target_vid_index_csv(full_target_vid_index_schemad_pcoll, d_pl_options):
vid_index_path = fileio.path_join(d_pl_options[fidscs_globals.OPT_NAME_DATA_DIR], fidscs_globals.VIDEO_INDEX_BASE+'.csv')
if not fileio.file_path_exists(vid_index_path, d_pl_options)[0]:
sorted_full_target_vid_index_schemad_pcoll = beam__common.pl__X__sort_pcoll(full_target_vid_index_schemad_pcoll, pcoll_label="full_target_vid_index")
sorted_corpus_index_csv_rows_pcoll = (
sorted_full_target_vid_index_schemad_pcoll
| "Beam PL: re-apply schema to sorted_full_target_vid_index" >> beam.Map(lambda sorted_full_target_vid_index_schemad_pcoll_row: beam.Row(
target_video_filename=sorted_full_target_vid_index_schemad_pcoll_row.target_video_filename,
video_seq_id=sorted_full_target_vid_index_schemad_pcoll_row.video_seq_id,
perspective_cam_id=sorted_full_target_vid_index_schemad_pcoll_row.perspective_cam_id,
compressed_mov_url=sorted_full_target_vid_index_schemad_pcoll_row.compressed_mov_url,
uncompressed_avi_url=sorted_full_target_vid_index_schemad_pcoll_row.uncompressed_avi_url,
uncompressed_avi_mirror_1_url=sorted_full_target_vid_index_schemad_pcoll_row.uncompressed_avi_mirror_1_url,
uncompressed_avi_mirror_2_url=sorted_full_target_vid_index_schemad_pcoll_row.uncompressed_avi_mirror_2_url
)
)
| beam.Map(lambda sorted_full_target_vid_index_schemad_pcoll_row: beam__common.beam_row_to_csv_string(sorted_full_target_vid_index_schemad_pcoll_row))
)
return beam__common.pl__X__write_pcoll_to_csv(
sorted_corpus_index_csv_rows_pcoll,
"TARGET-VIDEO-INDEX",
fidscs_globals.VIDEO_INDEXES_ARCHIVE,
fidscs_globals.SCHEMA_COL_NAMES__VIDEO_INDEX,
d_pl_options
)
else:
print(f"FOUND EXISTING VID INDEX: {vid_index_path}")
return [vid_index_path]

We are now ready to execute the first step of the "boostrap-vid-index" pipeline!

n_partitions = 8 # hardcoded for now but we need to retrieve this from beam to be the number of workers

pl = beam.Pipeline(options=pipeline_options)

full_target_vid_index_schemad_pcoll = data_extractor__beam.pl__1__bootstrap_target_video_index(pl)

That seems fast! That's because the pipeline wasn't acctually executed yet. What Apache Beam did in this case was create the corresponding pipelines execution graph (which is actually a Directed Acyclic Graph).

With the InteractiveRunner, the pipeline only gets executed when it is required.

This happens in notebooks by calling ib.collect or ib.show, which essentially executes and then reduces the distributed Pcollection.

In this case, we use ib.collect which also stuffs the results into a Pandas DataFrame for viewing purposes within notebooks.

Note that this is NOT done in production (in the cloud, in GCP Dataflow) since Pandas DataFrames aren't needed and are simply impractical for "Big Data" solutions. Pandas DataFrames really don't serve this purpose. Can you imagine attempting to hold all the corresponding tensor bytes for 561,000+ images in memory??

Anyway, moving on... before calling ib.collect, we must first tell Apache Beam to "record" all Pcollections (up to a certain point) by calling ib.watch(locals()). Note that only Pcollections prior to calling ib.watch(locals()) are eligible for "collection" (conversion to Pandas DataFrames).

# we require this in order to make use of ib.show() (which provides visualization of the pcolls specified) or ib.collect() (which creates a pandas dataframe from a pcoll)
    # but all pcolls we wish to visualize must be created prior to executing the following line
ib.watch(locals())

We can now collect the full_target_vid_index_schemad_pcoll Pcollection into a Pandas DataFrame for display in this notebook.

df_full_target_vid_index = ib.collect(full_target_vid_index_schemad_pcoll)
VIDEO-INDEX BOOTSTRAP INFO: {'vid_indexes_dir': '/tmp/data/tmp/video_index-20120129', 'sel_vid_index_path': '/tmp/data/tmp/video_index-20120129/files_by_video_name.csv', 'video_indexes_archive': 'video_index-20120129.zip', 'tmp_dir': '/tmp/data/tmp', 'video_ds_path': '/tmp/data/document-consultant-targetvideo-index.csv'}
unzipping http://www.bu.edu/asllrp/ncslgr-for-download/video_index-20120129.zip in-memory...
	DONE
df_full_target_vid_index
<style scoped> .dataframe tbody tr th:only-of-type { vertical-align: middle; }
.dataframe tbody tr th {
    vertical-align: top;
}

.dataframe thead th {
    text-align: right;
}
</style>
0 1 2 3 4 5 6
0 http://csr.bu.edu/asl/sequences/compressed/mas... 0 539_219_small_0.mov 219
1 http://csr.bu.edu/asl/sequences/compressed/sla... 1 539_219_small_1.mov http://csr.bu.edu/asl0/uploading/2000_02_29/se... 219
2 http://csr.bu.edu/asl/sequences/compressed/sla... 2 539_219_small_2.mov http://csr.bu.edu/asl0/uploading/2000_02_29/se... http://csr.bu.edu/asl0/working/tapes1/2000_01_... http://csr.bu.edu/asl0/uploading/2000_02_25/sl... 219
3 http://csr.bu.edu/asl/private/compressed/maste... 0 548_master_small.mov http://csr.bu.edu/asl0/working/tapes2/2000_06_... http://csr.bu.edu/asl0/uploading/2000_06_07/ma... 548
4 http://csr.bu.edu/asl/private/compressed/slave... 1 548_slave1_small.mov http://csr.bu.edu/asl0/working/tapes2/2000_06_... 548
... ... ... ... ... ... ... ...
2607 http://csr.bu.edu/asl/private/downloads/2001_8... 0 siblings_1066_small_0.mov http://csr.bu.edu/asl0/working/tapes2/2001_07_... http://csr.bu.edu/asl0/working/tapes2/2001_07_... 1066
2608 http://csr.bu.edu/asl/private/downloads/2001_8... 2 siblings_1066_small_2.mov http://csr.bu.edu/asl0/uploading/2001_07_24/sl... 1066
2609 http://csr.bu.edu/asl/private/downloads/2003_0... 0 whitewater_1049_small_0.mov http://csr.bu.edu/asl0/working/tapes2/2001_07_... http://csr.bu.edu/asl0/working/tapes2/2001_07_... 1049
2610 http://csr.bu.edu/asl/private/downloads/2003_0... 2 whitewater_1049_small_2.mov http://csr.bu.edu/asl0/working/tapes2/2001_07_... http://csr.bu.edu/asl0/working/tapes2/2001_07_... 1049
2611 http://csr.bu.edu/asl/private/downloads/2003_0... 3 whitewater_1049_small_3.mov 1049

2612 rows Ă— 7 columns

Note that this isn't entirely useful yet since we don't have any corresponding column names (in the above Pandas DataFrame). We have applied a schema to the Pcollection but that doesn't get applied to the Pandas DataFrame since applying a schema to a Pcollection is carried out by mappinng each row to a literal Apache Beam Row object, thereby effectively converting each element to an unhashed dict. Thus, we cannot guarantee the ordering of the columns will be fixed. We must therefore use the schema to refer to columns by name.

But, we do see that there are 2,612 corresponding target videos to download. Note that since target videos are actually comprised of segments, there may actually be more videos than that that we download in the end (if we were to download them all... which is exactly what is done in production, on GCP Dataflow).

This is done inline while writing the Pcollection (collected into the above Pandas DataFrame just for viewing) to the destination <WORK_DIR>/data/video_index-20120129.csv file path (by data_extractor__beam.pl__2__write_target_vid_index_csv).

But, as a nuance of collecting a Pcollection into a DataFrame, we can't simply call data_extractor__beam.pl__2__write_target_vid_index_csv now if we want to view the resulting Pcollection as a DataFrame. Recall that only Pcollections prior to calling ib.watch(locals()) are eligible for "collection" (conversion to Pandas DataFrames), which we already did. This means we must re-execute the first step (data_extractor__beam.pl__1__bootstrap_target_video_index), followed by data_extractor__beam.pl__2__write_target_vid_index_csv, call ib.watch(locals()), and then finally call ib.collect on each of the corresponding Pcollections in order to view them.

But won't that mean that data_extractor__beam.pl__1__bootstrap_target_video_index will re-download the video index? ANSWER: no because it was written specifically to guard against that case. Take a look at its source and you'll see. If the video index exists locally, it will simply load it from the "tmp" directory and the resulting Pcollection is used as input for data_extractor__beam.pl__2__write_target_vid_index_csv (which will apply a schema and then write it to the final destination path <WORK_DIR>/data/video_index-20120129.csv).

Let's do that now...


The full "boostrap-vid-index" pipeline

# create a new instance of the pipeline
pl = beam.Pipeline(options=pipeline_options)

full_target_vid_index_schemad_pcoll = data_extractor__beam.pl__1__bootstrap_target_video_index(pl)
_ = data_extractor__beam.pl__2__write_target_vid_index_csv(full_target_vid_index_schemad_pcoll, pl._options._all_options)

We know that observing the full_target_vid_index_schemad_pcoll Pcollection won't be particularly useful and the Pcollection that data_extractor__beam.pl__2__write_target_vid_index_csv outputs simply has the destination path after it successfully writes full_target_vid_index_schemad_pcoll to <WORK_DIR>/data/video_index-20120129.csv, which isn't particularly interesting. But we need to be sure this pipeline completes before executing the more interesting "download-videos-extract-frames" pipeline.

So instead of calling ib.collect to force the above pipeline to run, we'll simply call pl.run instead (since we are not particularly interested in viewing any Pcollection-to-DataFrame conversions from it).

print(f"\n\n****************************** Starting pipeline job: {job_name} ******************************")
pl.run();
print(f"****************************** Finished pipeline job: {job_name} ******************************")
****************************** Starting pipeline job: sc-fids-capstone-etl-demo--boostrap-vid-index ******************************
FOUND EXISTING SEL VID INDEX: /tmp/data/tmp/video_index-20120129/files_by_video_name.csv
TARGET-VIDEO-INDEX CSV WRITTEN TO STORAGE: /tmp/data/video_index-20120129.csv
****************************** Finished pipeline job: sc-fids-capstone-etl-demo--boostrap-vid-index ******************************


The "download-videos-extract-frames" pipeline


The "download-videos-extract-frames" pipeline is comprised of four steps:

  1. beam__common.pl__1__read_target_vid_index_csv
  2. data_extractor__beam.pl__2__filter_target_vid_index
  3. data_extractor__beam.pl__3__parallel_download_videos
  4. data_extractor__beam.pl__4__parallel_extract_target_video_frames

The function names used for each step suggest what they do. So I will only show source code for data_extractor__beam.pl__3__parallel_download_videos and data_extractor__beam.pl__4__parallel_extract_target_video_frames, and provide short explanations for steps 1 and 2.

Step 1 obviously reads <WORK_DIR>/data/video_index-20120129.csv from storage into a Pcollection to be used as input for data_extractor__beam.pl__2__filter_target_vid_index, which simply selects the first MAX_TARGET_VIDEOS from the full list of records from the full_target_vid_index_schemad_pcoll Pcollection that beam__common.pl__1__read_target_vid_index_csv returns. Note that data_extractor__beam.pl__2__write_target_vid_index_csv, in addition to applying a schema, also applies a row id and writes to <WORK_DIR>/data/video_index-20120129.csv in the order of that index. beam__common.pl__1__read_target_vid_index_csv returns the corresponding Pcollection ordered by this index.

Let's now inspect source code for steps 3 and 4...

disp_source(data_extractor__beam.pl__3__parallel_download_videos)
<title></title> <style type="text/css"> /* generated by Pygments Copyright 2006-2019 by the Pygments team. Licensed under the BSD license, see LICENSE for details. */ td.linenos { background-color: #f0f0f0; padding-right: 10px; } span.lineno { background-color: #f0f0f0; padding: 0 5px 0 5px; } pre { line-height: 125%; } body .hll { background-color: #ffffcc } body { background: #f8f8f8; } body .c { color: #408080; font-style: italic } /* Comment */ body .err { border: 1px solid #FF0000 } /* Error */ body .k { color: #008000; font-weight: bold } /* Keyword */ body .o { color: #666666 } /* Operator */ body .ch { color: #408080; font-style: italic } /* Comment.Hashbang */ body .cm { color: #408080; font-style: italic } /* Comment.Multiline */ body .cp { color: #BC7A00 } /* Comment.Preproc */ body .cpf { color: #408080; font-style: italic } /* Comment.PreprocFile */ body .c1 { color: #408080; font-style: italic } /* Comment.Single */ body .cs { color: #408080; font-style: italic } /* Comment.Special */ body .gd { color: #A00000 } /* Generic.Deleted */ body .ge { font-style: italic } /* Generic.Emph */ body .gr { color: #FF0000 } /* Generic.Error */ body .gh { color: #000080; font-weight: bold } /* Generic.Heading */ body .gi { color: #00A000 } /* Generic.Inserted */ body .go { color: #888888 } /* Generic.Output */ body .gp { color: #000080; font-weight: bold } /* Generic.Prompt */ body .gs { font-weight: bold } /* Generic.Strong */ body .gu { color: #800080; font-weight: bold } /* Generic.Subheading */ body .gt { color: #0044DD } /* Generic.Traceback */ body .kc { color: #008000; font-weight: bold } /* Keyword.Constant */ body .kd { color: #008000; font-weight: bold } /* Keyword.Declaration */ body .kn { color: #008000; font-weight: bold } /* Keyword.Namespace */ body .kp { color: #008000 } /* Keyword.Pseudo */ body .kr { color: #008000; font-weight: bold } /* Keyword.Reserved */ body .kt { color: #B00040 } /* Keyword.Type */ body .m { color: #666666 } /* Literal.Number */ body .s { color: #BA2121 } /* Literal.String */ body .na { color: #7D9029 } /* Name.Attribute */ body .nb { color: #008000 } /* Name.Builtin */ body .nc { color: #0000FF; font-weight: bold } /* Name.Class */ body .no { color: #880000 } /* Name.Constant */ body .nd { color: #AA22FF } /* Name.Decorator */ body .ni { color: #999999; font-weight: bold } /* Name.Entity */ body .ne { color: #D2413A; font-weight: bold } /* Name.Exception */ body .nf { color: #0000FF } /* Name.Function */ body .nl { color: #A0A000 } /* Name.Label */ body .nn { color: #0000FF; font-weight: bold } /* Name.Namespace */ body .nt { color: #008000; font-weight: bold } /* Name.Tag */ body .nv { color: #19177C } /* Name.Variable */ body .ow { color: #AA22FF; font-weight: bold } /* Operator.Word */ body .w { color: #bbbbbb } /* Text.Whitespace */ body .mb { color: #666666 } /* Literal.Number.Bin */ body .mf { color: #666666 } /* Literal.Number.Float */ body .mh { color: #666666 } /* Literal.Number.Hex */ body .mi { color: #666666 } /* Literal.Number.Integer */ body .mo { color: #666666 } /* Literal.Number.Oct */ body .sa { color: #BA2121 } /* Literal.String.Affix */ body .sb { color: #BA2121 } /* Literal.String.Backtick */ body .sc { color: #BA2121 } /* Literal.String.Char */ body .dl { color: #BA2121 } /* Literal.String.Delimiter */ body .sd { color: #BA2121; font-style: italic } /* Literal.String.Doc */ body .s2 { color: #BA2121 } /* Literal.String.Double */ body .se { color: #BB6622; font-weight: bold } /* Literal.String.Escape */ body .sh { color: #BA2121 } /* Literal.String.Heredoc */ body .si { color: #BB6688; font-weight: bold } /* Literal.String.Interpol */ body .sx { color: #008000 } /* Literal.String.Other */ body .sr { color: #BB6688 } /* Literal.String.Regex */ body .s1 { color: #BA2121 } /* Literal.String.Single */ body .ss { color: #19177C } /* Literal.String.Symbol */ body .bp { color: #008000 } /* Name.Builtin.Pseudo */ body .fm { color: #0000FF } /* Name.Function.Magic */ body .vc { color: #19177C } /* Name.Variable.Class */ body .vg { color: #19177C } /* Name.Variable.Global */ body .vi { color: #19177C } /* Name.Variable.Instance */ body .vm { color: #19177C } /* Name.Variable.Magic */ body .il { color: #666666 } /* Literal.Number.Integer.Long */ </style>

def pl__3__parallel_download_videos(vid_index_schemad_pcoll, d_pl_options, n_partitions=8):
vid_index_schemad_pcoll_download_partitions = (
vid_index_schemad_pcoll
| "Beam PL: partition schemad video index for download parallelization" >> beam.Partition(
lambda vid_index_row, num_partitions: random.randint(0,num_partitions-1),
n_partitions
)
)
partition_download_results = [None for i in range(n_partitions)]
for i, vid_index_schemad_pcoll_partition in enumerate(vid_index_schemad_pcoll_download_partitions):
p_label = f"p{i+1}"
p_label_indented = f"\t{p_label}"
p_dl_results = (
vid_index_schemad_pcoll_partition
| f"Beam PL: {p_label} gather download info for video segments" >> beam.ParDo(VideoSegmentInfoGatherer(d_pl_options))
| f"Beam PL: {p_label} download video segments" >> beam.ParDo(VideoSegmentDownloader(d_pl_options, f"{p_label_indented}"))
)
partition_download_results[i] = p_dl_results
merged_download_results = (
(p_dl_r for p_dl_r in partition_download_results)
| f"Beam PL: merge download results" >> beam.Flatten()
)
return merged_download_results

Now things get really interesting with data_extractor__beam.pl__3__parallel_download_videos...

What we do here is explicitly tell Apache Beam to create 8 independent partitions, each of which will download videos independently of one another, corresponding to worker nodes.

Note that either threads or worker nodes. How that plays out is beyond the scope of this notebook. Suffice it to say that this results in much faster processing than simply executing sequentially.

When they are all done, the results are merged (via beam.Flatten) into a single Pcollection to be supplied as input to data_extractor__beam.pl__4__parallel_extract_target_video_frames.

disp_source(data_extractor__beam.pl__4__parallel_extract_target_video_frames)
<title></title> <style type="text/css"> /* generated by Pygments Copyright 2006-2019 by the Pygments team. Licensed under the BSD license, see LICENSE for details. */ td.linenos { background-color: #f0f0f0; padding-right: 10px; } span.lineno { background-color: #f0f0f0; padding: 0 5px 0 5px; } pre { line-height: 125%; } body .hll { background-color: #ffffcc } body { background: #f8f8f8; } body .c { color: #408080; font-style: italic } /* Comment */ body .err { border: 1px solid #FF0000 } /* Error */ body .k { color: #008000; font-weight: bold } /* Keyword */ body .o { color: #666666 } /* Operator */ body .ch { color: #408080; font-style: italic } /* Comment.Hashbang */ body .cm { color: #408080; font-style: italic } /* Comment.Multiline */ body .cp { color: #BC7A00 } /* Comment.Preproc */ body .cpf { color: #408080; font-style: italic } /* Comment.PreprocFile */ body .c1 { color: #408080; font-style: italic } /* Comment.Single */ body .cs { color: #408080; font-style: italic } /* Comment.Special */ body .gd { color: #A00000 } /* Generic.Deleted */ body .ge { font-style: italic } /* Generic.Emph */ body .gr { color: #FF0000 } /* Generic.Error */ body .gh { color: #000080; font-weight: bold } /* Generic.Heading */ body .gi { color: #00A000 } /* Generic.Inserted */ body .go { color: #888888 } /* Generic.Output */ body .gp { color: #000080; font-weight: bold } /* Generic.Prompt */ body .gs { font-weight: bold } /* Generic.Strong */ body .gu { color: #800080; font-weight: bold } /* Generic.Subheading */ body .gt { color: #0044DD } /* Generic.Traceback */ body .kc { color: #008000; font-weight: bold } /* Keyword.Constant */ body .kd { color: #008000; font-weight: bold } /* Keyword.Declaration */ body .kn { color: #008000; font-weight: bold } /* Keyword.Namespace */ body .kp { color: #008000 } /* Keyword.Pseudo */ body .kr { color: #008000; font-weight: bold } /* Keyword.Reserved */ body .kt { color: #B00040 } /* Keyword.Type */ body .m { color: #666666 } /* Literal.Number */ body .s { color: #BA2121 } /* Literal.String */ body .na { color: #7D9029 } /* Name.Attribute */ body .nb { color: #008000 } /* Name.Builtin */ body .nc { color: #0000FF; font-weight: bold } /* Name.Class */ body .no { color: #880000 } /* Name.Constant */ body .nd { color: #AA22FF } /* Name.Decorator */ body .ni { color: #999999; font-weight: bold } /* Name.Entity */ body .ne { color: #D2413A; font-weight: bold } /* Name.Exception */ body .nf { color: #0000FF } /* Name.Function */ body .nl { color: #A0A000 } /* Name.Label */ body .nn { color: #0000FF; font-weight: bold } /* Name.Namespace */ body .nt { color: #008000; font-weight: bold } /* Name.Tag */ body .nv { color: #19177C } /* Name.Variable */ body .ow { color: #AA22FF; font-weight: bold } /* Operator.Word */ body .w { color: #bbbbbb } /* Text.Whitespace */ body .mb { color: #666666 } /* Literal.Number.Bin */ body .mf { color: #666666 } /* Literal.Number.Float */ body .mh { color: #666666 } /* Literal.Number.Hex */ body .mi { color: #666666 } /* Literal.Number.Integer */ body .mo { color: #666666 } /* Literal.Number.Oct */ body .sa { color: #BA2121 } /* Literal.String.Affix */ body .sb { color: #BA2121 } /* Literal.String.Backtick */ body .sc { color: #BA2121 } /* Literal.String.Char */ body .dl { color: #BA2121 } /* Literal.String.Delimiter */ body .sd { color: #BA2121; font-style: italic } /* Literal.String.Doc */ body .s2 { color: #BA2121 } /* Literal.String.Double */ body .se { color: #BB6622; font-weight: bold } /* Literal.String.Escape */ body .sh { color: #BA2121 } /* Literal.String.Heredoc */ body .si { color: #BB6688; font-weight: bold } /* Literal.String.Interpol */ body .sx { color: #008000 } /* Literal.String.Other */ body .sr { color: #BB6688 } /* Literal.String.Regex */ body .s1 { color: #BA2121 } /* Literal.String.Single */ body .ss { color: #19177C } /* Literal.String.Symbol */ body .bp { color: #008000 } /* Name.Builtin.Pseudo */ body .fm { color: #0000FF } /* Name.Function.Magic */ body .vc { color: #19177C } /* Name.Variable.Class */ body .vg { color: #19177C } /* Name.Variable.Global */ body .vi { color: #19177C } /* Name.Variable.Instance */ body .vm { color: #19177C } /* Name.Variable.Magic */ body .il { color: #666666 } /* Literal.Number.Integer.Long */ </style>

def pl__4__parallel_extract_target_video_frames(merged_download_results, d_pl_options, n_partitions=8):
"""
# ******************** EXTRACT SEGMENT-FRAMES IN PARALLEL: BEGIN ********************
# NOTE! THIS IS A CRUCIAL PIECE SO PAY ATTENTION TO THE FOLLOWING!!
# ********** --> IMPORTANT VIDEO-FRAME EXTRACTION PROCESSING INFORMATION<-- (BEGIN) **********
# We partitioned vid_index_schemad_pcoll so that video-SEGMENT downloads can occur independently.
# Downloading segments can occur independently since there is no correlation between each segment
# AS FAR AS DOWNLOADING IS CONCERNED.
#
# However, AS FAR AS EXTRACTION IS CONCERNED, each segment is related by the target video composed
# of each segment. The segment-videos themselves are ordered as they compose the final target
# video corresponding of ordered segment videos. For example, if a target video is composed of
# three segment videos, those segments occur in a certain order, as specified by the video index.
# Expanding upon this example, suppose target video "some_story_given_by_john_doe_0.mov", was recorded
# and saved in three corresponding video segments (to save space, I guess?)
# "some_story_given_by_john_doe_0_1.mov", "some_story_given_by_john_doe_0_2.mov", and
# "some_story_given_by_john_doe_0_3.mov". Note that the trailing "0" in the TARGET VIDEO filename
# indicates the camera perspective... all stories are potentially filmed from multiple synchronized
# camera perspectives/angles - there were obvioiusly multiple synchronized video recorders used in
# in that case. However, for this example, we are focusing on the target video for camera perspective 0.
# Anyway, as said, there are three segments which compose the target video. THESE SEGMENT VIDEOS
# ARE ORDERED (in time). THEREFORE, THE FRAMES COMPOSING EACH SEGMENT VIDEO ARE CONSEQUENTLY ORDERED
# (in time). THE BOTTOM LINE IS THAT WE NOW NEED TO GROUP SEGMENT VIDEOS, KEYED BY CORRESPONDING
# TARGET VIDEO. FURTHERMORE, THE COLLECTION OF SEGMENT VIDEOS FOR EACH TARGET VIDEO MUST BE ORDERED.
# THAT IS, WE MUST EXTRACT SEGMENT FRAMES AND SAVE THEM TO THE FILE SYSTEM WITH A FILE NAMING SCHEME
# THAT REFLECTS FRAME ORDER OF THE UNION OF ALL SEGMENT FRAMES. IF WE EXTRACT THE FRAMES OF EACH
# ORDERED SEGMENT, THEN A SIMPLE NUMERIC INDEX AS SEGMENT-FRAME FILENAME WILL DO THE TRICK.
# ********** --> IMPORTANT VIDEO-FRAME EXTRACTION PROCESSING INFORMATION<-- (END) **********
"""
# GROUP segment videos by target video
# note that this depends on the DAG - i.e. will not occur until partition_download_results are ready which, of course, does not occur until all videos have been downloaded
target_vid_seg_frame_extraction_partitions = (
merged_download_results
| f"Beam PL: group extraction info for video segments by target video" >> beam.GroupBy(lambda d: d['target_video_fname'])
| f"Beam PL: partition target video segment info for extraction parallelization" >> beam.Partition(
lambda vid_index_row, num_partitions: random.randint(0,num_partitions-1),
n_partitions
)
)
partition_extraction_results = [None for i in range(n_partitions)]
for i, p in enumerate(target_vid_seg_frame_extraction_partitions):
p_label = f"p{i+1}"
p_label_indented = f"\t{p_label}"
p_extraction_results = (
p
| f"Beam PL: {p_label} extract frames of each segment per target video" >> beam.ParDo(SegmentFrameExtractor(d_pl_options, f"{p_label_indented}", debug=False))
)
partition_extraction_results[i] = p_extraction_results
(
p_extraction_results
| f"Beam PL: {p_label} count target videos processed" >> beam.combiners.Count.Globally()
| f"Beam PL: {p_label} print target videos processed count" >> beam.ParDo(beam__common.PipelinePcollPrinter(label=p_label_indented, msg="target videos processed"))
)
merged_extraction_results = (
(p_extraction_results for p_extraction_results in partition_extraction_results)
| f"Beam PL: merge extraction results" >> beam.Flatten()
)
_ = (
merged_extraction_results
| "Beam PL: apply schema to merged extraction results pcoll" >> beam.Map(lambda x: beam.Row(
video_fname=str(x[0]),
n_stitched_frames=int(x[1])
))
# | "Beam PL: count total frames extracted" >> beam.transforms.sql.SqlTransform(f"SELECT SUM(n_stitched_frames) AS total_frames_extracted FROM PCOLLECTION") # this is VERY, VERY SLOW
| "Beam PL: select n_stitched_frames" >> beam.Map(lambda extraction_results_row: extraction_results_row.n_stitched_frames)
| "Beam PL: count total frames extracted" >> beam.CombineGlobally(sum)
| f"Beam PL: print total frames extracted" >> beam.ParDo(beam__common.PipelinePcollPrinter(msg="TOTAL FRAMES EXTRACTED"))
)
return merged_extraction_results

From the first part of this notebook...

This step leverages Apache Beam's parallelism as well.

But we MUST take care to ensure that a single worker extracts the frames of each segment associated with the target video.

This is because frames are ordered/sequenced.

Allowing two different workers to extract frames of different segments associated with the same final target video would likely result in frames being extracted out of order (due to parallelism).

Therefore, we partition the extraction task by final target video in order to ensure a single worker handles all segments associated with a single target video.

But we do want parallelism to occurr at the final target video level.

Before creating the pipeline execution graph, it is worth taking a deeper look into the internals of how we use the OpenCV library to process the videos (extract frames).

The data_extractor__beam.SegmentFrameExtractor wraps the data_extractor__beam.beam_extract_frames, which houses the logic for this processing. There are also a couple of helper functions that `` uses: data_extractor__beam.capture_segment_video and `data_extractor__beam.write_frame_to_file`. These will be listed after `data_extractor__beam.beam_extract_frames`.

disp_source(data_extractor__beam.beam_extract_frames)
<title></title> <style type="text/css"> /* generated by Pygments Copyright 2006-2019 by the Pygments team. Licensed under the BSD license, see LICENSE for details. */ td.linenos { background-color: #f0f0f0; padding-right: 10px; } span.lineno { background-color: #f0f0f0; padding: 0 5px 0 5px; } pre { line-height: 125%; } body .hll { background-color: #ffffcc } body { background: #f8f8f8; } body .c { color: #408080; font-style: italic } /* Comment */ body .err { border: 1px solid #FF0000 } /* Error */ body .k { color: #008000; font-weight: bold } /* Keyword */ body .o { color: #666666 } /* Operator */ body .ch { color: #408080; font-style: italic } /* Comment.Hashbang */ body .cm { color: #408080; font-style: italic } /* Comment.Multiline */ body .cp { color: #BC7A00 } /* Comment.Preproc */ body .cpf { color: #408080; font-style: italic } /* Comment.PreprocFile */ body .c1 { color: #408080; font-style: italic } /* Comment.Single */ body .cs { color: #408080; font-style: italic } /* Comment.Special */ body .gd { color: #A00000 } /* Generic.Deleted */ body .ge { font-style: italic } /* Generic.Emph */ body .gr { color: #FF0000 } /* Generic.Error */ body .gh { color: #000080; font-weight: bold } /* Generic.Heading */ body .gi { color: #00A000 } /* Generic.Inserted */ body .go { color: #888888 } /* Generic.Output */ body .gp { color: #000080; font-weight: bold } /* Generic.Prompt */ body .gs { font-weight: bold } /* Generic.Strong */ body .gu { color: #800080; font-weight: bold } /* Generic.Subheading */ body .gt { color: #0044DD } /* Generic.Traceback */ body .kc { color: #008000; font-weight: bold } /* Keyword.Constant */ body .kd { color: #008000; font-weight: bold } /* Keyword.Declaration */ body .kn { color: #008000; font-weight: bold } /* Keyword.Namespace */ body .kp { color: #008000 } /* Keyword.Pseudo */ body .kr { color: #008000; font-weight: bold } /* Keyword.Reserved */ body .kt { color: #B00040 } /* Keyword.Type */ body .m { color: #666666 } /* Literal.Number */ body .s { color: #BA2121 } /* Literal.String */ body .na { color: #7D9029 } /* Name.Attribute */ body .nb { color: #008000 } /* Name.Builtin */ body .nc { color: #0000FF; font-weight: bold } /* Name.Class */ body .no { color: #880000 } /* Name.Constant */ body .nd { color: #AA22FF } /* Name.Decorator */ body .ni { color: #999999; font-weight: bold } /* Name.Entity */ body .ne { color: #D2413A; font-weight: bold } /* Name.Exception */ body .nf { color: #0000FF } /* Name.Function */ body .nl { color: #A0A000 } /* Name.Label */ body .nn { color: #0000FF; font-weight: bold } /* Name.Namespace */ body .nt { color: #008000; font-weight: bold } /* Name.Tag */ body .nv { color: #19177C } /* Name.Variable */ body .ow { color: #AA22FF; font-weight: bold } /* Operator.Word */ body .w { color: #bbbbbb } /* Text.Whitespace */ body .mb { color: #666666 } /* Literal.Number.Bin */ body .mf { color: #666666 } /* Literal.Number.Float */ body .mh { color: #666666 } /* Literal.Number.Hex */ body .mi { color: #666666 } /* Literal.Number.Integer */ body .mo { color: #666666 } /* Literal.Number.Oct */ body .sa { color: #BA2121 } /* Literal.String.Affix */ body .sb { color: #BA2121 } /* Literal.String.Backtick */ body .sc { color: #BA2121 } /* Literal.String.Char */ body .dl { color: #BA2121 } /* Literal.String.Delimiter */ body .sd { color: #BA2121; font-style: italic } /* Literal.String.Doc */ body .s2 { color: #BA2121 } /* Literal.String.Double */ body .se { color: #BB6622; font-weight: bold } /* Literal.String.Escape */ body .sh { color: #BA2121 } /* Literal.String.Heredoc */ body .si { color: #BB6688; font-weight: bold } /* Literal.String.Interpol */ body .sx { color: #008000 } /* Literal.String.Other */ body .sr { color: #BB6688 } /* Literal.String.Regex */ body .s1 { color: #BA2121 } /* Literal.String.Single */ body .ss { color: #19177C } /* Literal.String.Symbol */ body .bp { color: #008000 } /* Name.Builtin.Pseudo */ body .fm { color: #0000FF } /* Name.Function.Magic */ body .vc { color: #19177C } /* Name.Variable.Class */ body .vg { color: #19177C } /* Name.Variable.Global */ body .vi { color: #19177C } /* Name.Variable.Instance */ body .vm { color: #19177C } /* Name.Variable.Magic */ body .il { color: #666666 } /* Literal.Number.Integer.Long */ </style>

def beam_extract_frames(tpl_target_video_extraction_info, d_pl_options, label="", debug=False):
"""
expects tpl_target_video_extraction_info: (video_fname, list({'target_video_fname': target_video_fname, 'target_video_frames_dir': target_video_frames_dir, 'segment_url': str(url), 'segment_fname': str(url).split('/')[-1]}))
"""

# # log_results = []
target_video_fname = tpl_target_video_extraction_info[0]
segment_dicts = sorted(tpl_target_video_extraction_info[1], key=lambda segment_dict: segment_dict['segment_fname'])
target_video_frames_dir = segment_dicts[0]['target_video_frames_dir']

target_stitched_vid_name = target_video_frames_dir.split(os.path.sep)[-1]
if not fileio.dir_path_exists(target_video_frames_dir, d_pl_options)[0]:
fileio.make_dirs(target_video_frames_dir, d_pl_options)

video_dir = d_pl_options[fidscs_globals.OPT_NAME_VIDEO_DIR]
local_vid_segment_paths = [fileio.path_join(video_dir, segment_dict['segment_fname']) for segment_dict in segment_dicts]
for segment_dict in segment_dicts:
segment_dict['n_frames_extracted'] = 0

# create local dir for extraction (since OpenCV works only with local file system currently) if we have GCS filesystem
truly_local_vid_dir = None
truly_local_vid_dir_suffix = None
fs = FileSystems.get_filesystem(video_dir)
if type(fs) == GCSFileSystem:
truly_local_vid_dir_suffix = '/'.join(video_dir.split('/')[1:])
truly_local_vid_dir = '/tmp'+truly_local_vid_dir_suffix
# print(f"\t\tGCS storage detected! Extracting frames to truly_local_vid_dir {truly_local_vid_dir} (and will then upload to GCS after that)...")
if debug: print(f"\t\t{truly_local_vid_dir} exists: {fileio.dir_path_exists(truly_local_vid_dir, d_pl_options)}")
if not fileio.dir_path_exists(truly_local_vid_dir, d_pl_options)[0]:
if debug: print(f"\tcreating {truly_local_vid_dir}...")
truly_local_vid_dir_path_segs = truly_local_vid_dir.split('/')
if debug: print(f"\t\ttruly_local_vid_dir_path_segs: {truly_local_vid_dir_path_segs}")
s_cum_path = ''
for i, truly_local_vid_dir_path_seg in enumerate(truly_local_vid_dir_path_segs[1:]):
s_cum_path += '/'+truly_local_vid_dir_path_seg
fileio.make_dirs(s_cum_path, d_pl_options)
if debug: print(f"\t\t{s_cum_path} exists: {fileio.dir_path_exists(s_cum_path, d_pl_options)}")

vc_results = [capture_segment_video(local_vid_segment_path, truly_local_vid_dir, d_pl_options, debug=debug) for local_vid_segment_path in local_vid_segment_paths]
vid_caps = [vc_result[0] for vc_result in vc_results]
truly_local_target_video_frames_dirs = [vc_result[1] for vc_result in vc_results]

for seg_vid_cap in vid_caps:
seg_vid_cap.set(cv2.CAP_PROP_FPS, fidscs_globals.FPS)
frame_counts = list(map(lambda vc: int(vc.get(cv2.CAP_PROP_FRAME_COUNT)), vid_caps))
n_frames_expected = sum(frame_counts)

failed_target_videos = []

n_stitched_frames = 0
if n_frames_expected > 0:
# get count of existing stitched frames in target_stitched_vid_frames_dir
n_stitched_frames = len(fileio.list_dir(target_video_frames_dir, d_pl_options))

b_restitch = n_stitched_frames < n_frames_expected
n_stitched_frames = 0 if b_restitch else n_stitched_frames

for i, seg_vid_cap in enumerate(vid_caps):
segment_dict = segment_dicts[i]
_n_frames_expected = frame_counts[i]

if b_restitch:
success, frame = seg_vid_cap.read()
n_frames = 0
while success:
write_frame_to_file(
frame,
n_stitched_frames,
target_video_frames_dir,
truly_local_target_video_frames_dir=truly_local_target_video_frames_dirs[i],
debug=debug
)

n_frames += 1
n_stitched_frames += 1
success, frame = seg_vid_cap.read()

seg_path = local_vid_segment_paths[i]
seg_fname = seg_path.split(os.path.sep)[-1]
if n_frames != _n_frames_expected:
print(f"{label+': ' if len(label)>0 else ''}{fidscs_globals.VALIDATION_FATAL_ERROR_TEXT} Cannot stitch together target video {target_video_fname} since {_n_frames_expected} frames were expected from segment {seg_fname} ({seg_path}) but only {n_frames} were successfully extracted")
failed_target_videos.append(target_video_fname)
fail = True
break
else:
print(f"{label+': ' if len(label)>0 else ''}Added {n_stitched_frames} frames from segment {seg_fname} for target video {target_video_fname} (stitched-frames dir {target_video_frames_dir})")

else:
n_frames = _n_frames_expected
print(f"{label+': ' if len(label)>0 else ''}Found existing stiched-frames for {target_stitched_vid_name} ({n_stitched_frames} frames in {target_video_frames_dir})")

segment_dict['n_frames_extracted'] = n_frames

else:
if fidscs_globals.OUTPUT_INFO_LEVEL <= fidscs_globals.OUTPUT_INFO_LEVEL__WARNING:
print(f"\t{fidscs_globals.VALIDATION_WARNING_TEXT} Cannot stitch together target video {target_video_fname} since cv2.CAP_PROP_FRAME_COUNT reports segments have zero frames")
failed_target_videos.append(target_video_fname)
fail = True

if truly_local_vid_dir is not None:
for truly_local_target_video_frames_dir in truly_local_target_video_frames_dirs:
fileio.delete_file(truly_local_target_video_frames_dir, d_pl_options, recursive=True, debug=True)

return [(tpl_target_video_extraction_info[0], n_stitched_frames, segment_dicts)]
disp_source(data_extractor__beam.capture_segment_video)
<title></title> <style type="text/css"> /* generated by Pygments Copyright 2006-2019 by the Pygments team. Licensed under the BSD license, see LICENSE for details. */ td.linenos { background-color: #f0f0f0; padding-right: 10px; } span.lineno { background-color: #f0f0f0; padding: 0 5px 0 5px; } pre { line-height: 125%; } body .hll { background-color: #ffffcc } body { background: #f8f8f8; } body .c { color: #408080; font-style: italic } /* Comment */ body .err { border: 1px solid #FF0000 } /* Error */ body .k { color: #008000; font-weight: bold } /* Keyword */ body .o { color: #666666 } /* Operator */ body .ch { color: #408080; font-style: italic } /* Comment.Hashbang */ body .cm { color: #408080; font-style: italic } /* Comment.Multiline */ body .cp { color: #BC7A00 } /* Comment.Preproc */ body .cpf { color: #408080; font-style: italic } /* Comment.PreprocFile */ body .c1 { color: #408080; font-style: italic } /* Comment.Single */ body .cs { color: #408080; font-style: italic } /* Comment.Special */ body .gd { color: #A00000 } /* Generic.Deleted */ body .ge { font-style: italic } /* Generic.Emph */ body .gr { color: #FF0000 } /* Generic.Error */ body .gh { color: #000080; font-weight: bold } /* Generic.Heading */ body .gi { color: #00A000 } /* Generic.Inserted */ body .go { color: #888888 } /* Generic.Output */ body .gp { color: #000080; font-weight: bold } /* Generic.Prompt */ body .gs { font-weight: bold } /* Generic.Strong */ body .gu { color: #800080; font-weight: bold } /* Generic.Subheading */ body .gt { color: #0044DD } /* Generic.Traceback */ body .kc { color: #008000; font-weight: bold } /* Keyword.Constant */ body .kd { color: #008000; font-weight: bold } /* Keyword.Declaration */ body .kn { color: #008000; font-weight: bold } /* Keyword.Namespace */ body .kp { color: #008000 } /* Keyword.Pseudo */ body .kr { color: #008000; font-weight: bold } /* Keyword.Reserved */ body .kt { color: #B00040 } /* Keyword.Type */ body .m { color: #666666 } /* Literal.Number */ body .s { color: #BA2121 } /* Literal.String */ body .na { color: #7D9029 } /* Name.Attribute */ body .nb { color: #008000 } /* Name.Builtin */ body .nc { color: #0000FF; font-weight: bold } /* Name.Class */ body .no { color: #880000 } /* Name.Constant */ body .nd { color: #AA22FF } /* Name.Decorator */ body .ni { color: #999999; font-weight: bold } /* Name.Entity */ body .ne { color: #D2413A; font-weight: bold } /* Name.Exception */ body .nf { color: #0000FF } /* Name.Function */ body .nl { color: #A0A000 } /* Name.Label */ body .nn { color: #0000FF; font-weight: bold } /* Name.Namespace */ body .nt { color: #008000; font-weight: bold } /* Name.Tag */ body .nv { color: #19177C } /* Name.Variable */ body .ow { color: #AA22FF; font-weight: bold } /* Operator.Word */ body .w { color: #bbbbbb } /* Text.Whitespace */ body .mb { color: #666666 } /* Literal.Number.Bin */ body .mf { color: #666666 } /* Literal.Number.Float */ body .mh { color: #666666 } /* Literal.Number.Hex */ body .mi { color: #666666 } /* Literal.Number.Integer */ body .mo { color: #666666 } /* Literal.Number.Oct */ body .sa { color: #BA2121 } /* Literal.String.Affix */ body .sb { color: #BA2121 } /* Literal.String.Backtick */ body .sc { color: #BA2121 } /* Literal.String.Char */ body .dl { color: #BA2121 } /* Literal.String.Delimiter */ body .sd { color: #BA2121; font-style: italic } /* Literal.String.Doc */ body .s2 { color: #BA2121 } /* Literal.String.Double */ body .se { color: #BB6622; font-weight: bold } /* Literal.String.Escape */ body .sh { color: #BA2121 } /* Literal.String.Heredoc */ body .si { color: #BB6688; font-weight: bold } /* Literal.String.Interpol */ body .sx { color: #008000 } /* Literal.String.Other */ body .sr { color: #BB6688 } /* Literal.String.Regex */ body .s1 { color: #BA2121 } /* Literal.String.Single */ body .ss { color: #19177C } /* Literal.String.Symbol */ body .bp { color: #008000 } /* Name.Builtin.Pseudo */ body .fm { color: #0000FF } /* Name.Function.Magic */ body .vc { color: #19177C } /* Name.Variable.Class */ body .vg { color: #19177C } /* Name.Variable.Global */ body .vi { color: #19177C } /* Name.Variable.Instance */ body .vm { color: #19177C } /* Name.Variable.Magic */ body .il { color: #666666 } /* Literal.Number.Integer.Long */ </style>

def capture_segment_video(vid_segment_path, truly_local_vid_dir, d_pl_options, debug=False):
video_fname = vid_segment_path.split('/')[-1]

truly_local_target_video_frames_dir = None

fs = FileSystems.get_filesystem(vid_segment_path)
if type(fs) == GCSFileSystem:
if debug: print(f"\n\n\tattempting to open video {vid_segment_path} for reading...")
with fileio.open_file_read(vid_segment_path) as f:
if debug: print(f"\t\tSUCCESS")

# now read from local bytes and write to GCS
buffer = f.read()
truly_local_vid_segment_path = truly_local_vid_dir+'/'+video_fname
if debug: print(f"\t\tattempting to write {truly_local_vid_segment_path} (truly) locally...")
with fileio.open_file_write(truly_local_vid_segment_path) as f_local:
f_local.write(buffer)
f_local.close()
if debug: print(f"\t\t\tSUCCESS")
f.close()

vid_segment_path = truly_local_vid_segment_path

# (truly local) dir for saving frames
truly_local_target_video_frames_dir = truly_local_vid_dir+'/'+fidscs_globals.STICHED_VIDEO_FRAMES_DIR_NAME+'/'+video_fname.split('.')[0]
if debug: print(f"\t\t\tattempting to create directory {truly_local_target_video_frames_dir} (truly_local_target_video_frames_dir) for frames extracted from (truly local) video {truly_local_vid_segment_path}...")
if not fileio.dir_path_exists(truly_local_target_video_frames_dir, d_pl_options)[0]:
if debug: print(f"\t\t\t\tcreating {truly_local_target_video_frames_dir}...")
fileio.make_dirs(truly_local_target_video_frames_dir, d_pl_options)
truly_local_target_video_frames_dir_exists = fileio.dir_path_exists(truly_local_target_video_frames_dir, d_pl_options)[0]
if debug: print(f"\t\t\t\t\t{truly_local_target_video_frames_dir} exists: {truly_local_target_video_frames_dir_exists}")
if not truly_local_target_video_frames_dir_exists:
raise Exception(f"required directory truly_local_target_video_frames_dir {truly_local_target_video_frames_dir_exists} does not exist")

if debug: print(f"\t\t\tattempting to capture (cv2.VideoCapture) video {vid_segment_path})...")

# finally, capture the video bytes
return cv2.VideoCapture(vid_segment_path), truly_local_target_video_frames_dir
disp_source(data_extractor__beam.write_frame_to_file)
<title></title> <style type="text/css"> /* generated by Pygments Copyright 2006-2019 by the Pygments team. Licensed under the BSD license, see LICENSE for details. */ td.linenos { background-color: #f0f0f0; padding-right: 10px; } span.lineno { background-color: #f0f0f0; padding: 0 5px 0 5px; } pre { line-height: 125%; } body .hll { background-color: #ffffcc } body { background: #f8f8f8; } body .c { color: #408080; font-style: italic } /* Comment */ body .err { border: 1px solid #FF0000 } /* Error */ body .k { color: #008000; font-weight: bold } /* Keyword */ body .o { color: #666666 } /* Operator */ body .ch { color: #408080; font-style: italic } /* Comment.Hashbang */ body .cm { color: #408080; font-style: italic } /* Comment.Multiline */ body .cp { color: #BC7A00 } /* Comment.Preproc */ body .cpf { color: #408080; font-style: italic } /* Comment.PreprocFile */ body .c1 { color: #408080; font-style: italic } /* Comment.Single */ body .cs { color: #408080; font-style: italic } /* Comment.Special */ body .gd { color: #A00000 } /* Generic.Deleted */ body .ge { font-style: italic } /* Generic.Emph */ body .gr { color: #FF0000 } /* Generic.Error */ body .gh { color: #000080; font-weight: bold } /* Generic.Heading */ body .gi { color: #00A000 } /* Generic.Inserted */ body .go { color: #888888 } /* Generic.Output */ body .gp { color: #000080; font-weight: bold } /* Generic.Prompt */ body .gs { font-weight: bold } /* Generic.Strong */ body .gu { color: #800080; font-weight: bold } /* Generic.Subheading */ body .gt { color: #0044DD } /* Generic.Traceback */ body .kc { color: #008000; font-weight: bold } /* Keyword.Constant */ body .kd { color: #008000; font-weight: bold } /* Keyword.Declaration */ body .kn { color: #008000; font-weight: bold } /* Keyword.Namespace */ body .kp { color: #008000 } /* Keyword.Pseudo */ body .kr { color: #008000; font-weight: bold } /* Keyword.Reserved */ body .kt { color: #B00040 } /* Keyword.Type */ body .m { color: #666666 } /* Literal.Number */ body .s { color: #BA2121 } /* Literal.String */ body .na { color: #7D9029 } /* Name.Attribute */ body .nb { color: #008000 } /* Name.Builtin */ body .nc { color: #0000FF; font-weight: bold } /* Name.Class */ body .no { color: #880000 } /* Name.Constant */ body .nd { color: #AA22FF } /* Name.Decorator */ body .ni { color: #999999; font-weight: bold } /* Name.Entity */ body .ne { color: #D2413A; font-weight: bold } /* Name.Exception */ body .nf { color: #0000FF } /* Name.Function */ body .nl { color: #A0A000 } /* Name.Label */ body .nn { color: #0000FF; font-weight: bold } /* Name.Namespace */ body .nt { color: #008000; font-weight: bold } /* Name.Tag */ body .nv { color: #19177C } /* Name.Variable */ body .ow { color: #AA22FF; font-weight: bold } /* Operator.Word */ body .w { color: #bbbbbb } /* Text.Whitespace */ body .mb { color: #666666 } /* Literal.Number.Bin */ body .mf { color: #666666 } /* Literal.Number.Float */ body .mh { color: #666666 } /* Literal.Number.Hex */ body .mi { color: #666666 } /* Literal.Number.Integer */ body .mo { color: #666666 } /* Literal.Number.Oct */ body .sa { color: #BA2121 } /* Literal.String.Affix */ body .sb { color: #BA2121 } /* Literal.String.Backtick */ body .sc { color: #BA2121 } /* Literal.String.Char */ body .dl { color: #BA2121 } /* Literal.String.Delimiter */ body .sd { color: #BA2121; font-style: italic } /* Literal.String.Doc */ body .s2 { color: #BA2121 } /* Literal.String.Double */ body .se { color: #BB6622; font-weight: bold } /* Literal.String.Escape */ body .sh { color: #BA2121 } /* Literal.String.Heredoc */ body .si { color: #BB6688; font-weight: bold } /* Literal.String.Interpol */ body .sx { color: #008000 } /* Literal.String.Other */ body .sr { color: #BB6688 } /* Literal.String.Regex */ body .s1 { color: #BA2121 } /* Literal.String.Single */ body .ss { color: #19177C } /* Literal.String.Symbol */ body .bp { color: #008000 } /* Name.Builtin.Pseudo */ body .fm { color: #0000FF } /* Name.Function.Magic */ body .vc { color: #19177C } /* Name.Variable.Class */ body .vg { color: #19177C } /* Name.Variable.Global */ body .vi { color: #19177C } /* Name.Variable.Instance */ body .vm { color: #19177C } /* Name.Variable.Magic */ body .il { color: #666666 } /* Literal.Number.Integer.Long */ </style>

def write_frame_to_file(frame, index, target_video_frames_dir, truly_local_target_video_frames_dir=None, debug=False):
local_frame_path = fileio.path_join(target_video_frames_dir, f"{index}.jpg") # this is the final frame path

if truly_local_target_video_frames_dir is not None:
# write truly local frame file
truly_local_frame_path = truly_local_target_video_frames_dir+'/'+f"{index}.jpg"
if debug: print(f"\t\t\t\t\t\tattempting to write {truly_local_frame_path} frame...")
cv2.imwrite(truly_local_frame_path, frame)
if debug: print(f"\t\t\t\t\t\t\tSUCCESS")
if debug: print(f"\t\t\t\t\t\t\tattempting to open {truly_local_frame_path} for read...")
with fileio.open_file_read(truly_local_frame_path) as f_truly_local_frame:
buffer = f_truly_local_frame.read()
if debug: print(f"\t\t\t\t\t\t\t\tSUCCESS")
if debug: print(f"\t\t\t\t\t\t\t\t\tattempting to open {local_frame_path} for final write...")
with fileio.open_file_write(local_frame_path) as f_frame_final:
f_frame_final.write(buffer)
f_frame_final.close()
if debug: print(f"\t\t\t\t\t\t\t\t\t\tSUCCESS")
buffer = None
f_truly_local_frame.close()

else:
if debug: print(f"\t\t\t\t\t\t\t\t\tattempting to open {local_frame_path} for final write...")
cv2.imwrite(local_frame_path, frame)
if debug: print(f"\t\t\t\t\t\t\t\t\t\tSUCCESS")


We are now ready to execute the "download-videos-extract-frames" pipeline. But first we must...


Create the "download-videos-extract-frames" pipeline execution graph

job_suffix = 'download-videos-extract-frames'
job_name = f"{PIPELINE_BASE_JOB_NAME}--{job_suffix}"
options.update({
    'job_name': job_name
})
pipeline_options = PipelineOptions(flags=[], **options) # easier to pass in options from command-line this way
print(f"PipelineOptions:\n{pipeline_options.get_all_options()}\n")

pl = beam.Pipeline(options=pipeline_options)

full_target_vid_index_schemad_pcoll = beam__common.pl__1__read_target_vid_index_csv(pl)
filtered_target_vid_index_schemad_pcoll = data_extractor__beam.pl__2__filter_target_vid_index(full_target_vid_index_schemad_pcoll, pl._options._all_options)
merged_download_results = data_extractor__beam.pl__3__parallel_download_videos(filtered_target_vid_index_schemad_pcoll, pl._options._all_options, n_partitions)
merged_extraction_results = data_extractor__beam.pl__4__parallel_extract_target_video_frames(merged_download_results, pl._options._all_options, n_partitions)
PipelineOptions:
{'runner': 'InteractiveRunner', 'streaming': False, 'beam_services': {}, 'type_check_strictness': 'DEFAULT_TO_ANY', 'type_check_additional': '', 'pipeline_type_check': True, 'runtime_type_check': False, 'performance_runtime_type_check': False, 'direct_runner_use_stacked_bundle': True, 'direct_runner_bundle_repeat': 0, 'direct_num_workers': 0, 'direct_running_mode': 'multi_threading', 'dataflow_endpoint': 'https://dataflow.googleapis.com', 'project': 'sc-fids-capstone', 'job_name': 'sc-fids-capstone-etl-demo--download-videos-extract-frames', 'staging_location': None, 'temp_location': None, 'region': None, 'service_account_email': None, 'no_auth': False, 'template_location': None, 'labels': None, 'update': False, 'transform_name_mapping': None, 'enable_streaming_engine': False, 'dataflow_kms_key': None, 'flexrs_goal': None, 'hdfs_host': None, 'hdfs_port': None, 'hdfs_user': None, 'hdfs_full_urls': False, 'num_workers': None, 'max_num_workers': None, 'autoscaling_algorithm': None, 'machine_type': None, 'disk_size_gb': None, 'disk_type': None, 'worker_region': None, 'worker_zone': None, 'zone': None, 'network': None, 'subnetwork': None, 'worker_harness_container_image': None, 'sdk_harness_container_image_overrides': None, 'use_public_ips': None, 'min_cpu_platform': None, 'dataflow_worker_jar': None, 'dataflow_job_file': None, 'experiments': None, 'number_of_worker_harness_threads': None, 'profile_cpu': False, 'profile_memory': False, 'profile_location': None, 'profile_sample_rate': 1.0, 'requirements_file': None, 'requirements_cache': None, 'setup_file': None, 'beam_plugins': None, 'save_main_session': False, 'sdk_location': 'default', 'extra_packages': None, 'prebuild_sdk_container_engine': None, 'prebuild_sdk_container_base_image': None, 'docker_registry_push_url': None, 'job_endpoint': None, 'artifact_endpoint': None, 'job_server_timeout': 60, 'environment_type': 'DOCKER', 'environment_config': None, 'environment_options': None, 'sdk_worker_parallelism': 1, 'environment_cache_millis': 0, 'output_executable_path': None, 'artifacts_dir': None, 'job_port': 0, 'artifact_port': 0, 'expansion_port': 0, 'flink_master': '[auto]', 'flink_version': '1.10', 'flink_job_server_jar': None, 'flink_submit_uber_jar': False, 'spark_master_url': 'local[4]', 'spark_job_server_jar': None, 'spark_submit_uber_jar': False, 'spark_rest_url': None, 'on_success_matcher': None, 'dry_run': False, 'wait_until_finish_duration': None, 'pubsubRootUrl': None, 's3_access_key_id': None, 's3_secret_access_key': None, 's3_session_token': None, 's3_endpoint_url': None, 's3_region_name': None, 's3_api_version': None, 's3_verify': None, 's3_disable_ssl': False, 'fidscs_capstone_max_target_videos': 50, 'fidscs_capstone_work_dir': '/tmp', 'fidscs_capstone_data_dir': '/tmp/data', 'fidscs_capstone_tmp_dir': '/tmp/data/tmp', 'fidscs_capstone_videos_dir': '/tmp/data/videos', 'fidscs_capstone_stitched_video_frames_dir': '/tmp/data/stitched_video_frames', 'fidscs_capstone_corpus_dir': '/tmp/data/tmp/ncslgr-xml', 'fidscs_capstone_corpus_ds_path': '/tmp/data/ncslgr-corpus-index.csv', 'fidscs_capstone_document_asl_cconsultant_ds_path': '/tmp/data/document-consultant-index.csv', 'fidscs_capstone_asl_consultant_ds_path': '/tmp/data/consultant-index.csv', 'fidscs_capstone_video_indexes_dir': '/tmp/data/tmp/video_index-20120129', 'fidscs_capstone_selected_video_index_path': '/tmp/data/tmp/video_index-20120129/files_by_video_name.csv', 'fidscs_capstone_video_ds_path': '/tmp/data/document-consultant-targetvideo-index.csv', 'fidscs_capstone_video_segment_ds_path': '/tmp/data/document-consultant-targetvideo-segment-index.csv', 'fidscs_capstone_video_frame_ds_path': '/tmp/data/document-consultant-targetvideo-frame-index.csv', 'fidscs_capstone_utterance_ds_path': '/tmp/data/document-consultant-utterance-index.csv', 'fidscs_capstone_utterance_video_ds_path': '/tmp/data/document-consultant-utterance-targetvideo-index.csv', 'fidscs_capstone_utterance_token_ds_path': '/tmp/data/document-consultant-utterance-token-index.csv', 'fidscs_capstone_utterance_token_frame_ds_path': '/tmp/data/document-consultant-targetvideo-utterance-token-frame-index.csv', 'fidscs_capstone_vocabulary_ds_path': '/tmp/data/vocabulary-index.csv'}

This time we would like to observe the results (collected into Pandas DataFrames)...

# we require this in order to make use of ib.show() (which provides visualization of the pcolls specified) or ib.collect() (which creates a pandas dataframe from a pcoll)
    # but all pcolls we wish to visualize must be created prior to executing the following line
ib.watch(locals())

And calling ib.collect forces the pipeline to actually run...


Run the full "download-videos-extract-frames" pipeline

We do this by collecting Pcollections into Pandas DataFrames for viewing with Interactive Beam.

print(f"\n\n****************************** Starting pipeline job: {job_name} ******************************")
df_full_target_vid_index_schemad_pcoll = ib.collect(full_target_vid_index_schemad_pcoll)
df_filtered_target_vid_index_schemad_pcoll = ib.collect(filtered_target_vid_index_schemad_pcoll)
df_merged_download_results = ib.collect(merged_download_results)
df_merged_extraction_results = ib.collect(merged_extraction_results)
print(f"****************************** Finished pipeline job: {job_name} ******************************")
****************************** Starting pipeline job: sc-fids-capstone-etl-demo--download-videos-extract-frames ******************************
	p3: Downloaded http://csr.bu.edu/asl/private/downloads/2002_8_15/_1508_small_3.mov to /tmp/data/videos/_1508_small_3.mov
	p8: Downloaded http://csr.bu.edu/asl/private/downloads/2002_8_15/_1510_small_0.mov to /tmp/data/videos/_1510_small_0.mov
	p3: Downloaded http://csr.bu.edu/asl/private/downloads/2002_8_15/_1510_small_1.mov to /tmp/data/videos/_1510_small_1.mov
	p7: Downloaded http://csr.bu.edu/asl/private/downloads/2002_8_15/_1510_small_3.mov to /tmp/data/videos/_1510_small_3.mov
	p7: Downloaded http://csr.bu.edu/asl/private/downloads/2002_8_15/_1512_small_0.mov to /tmp/data/videos/_1512_small_0.mov
	p6: Downloaded http://csr.bu.edu/asl/private/downloads/2002_8_15/_1512_small_1.mov to /tmp/data/videos/_1512_small_1.mov
	p1: Downloaded http://csr.bu.edu/asl/private/downloads/2002_8_15/_1512_small_3.mov to /tmp/data/videos/_1512_small_3.mov
	p8: Downloaded http://csr.bu.edu/asl/private/downloads/2002_8_15/_1513_small_0.mov to /tmp/data/videos/_1513_small_0.mov
	p7: Downloaded http://csr.bu.edu/asl/private/downloads/2002_8_15/_1513_small_1.mov to /tmp/data/videos/_1513_small_1.mov
	p1: Downloaded http://csr.bu.edu/asl/private/downloads/2002_8_15/_1513_small_3.mov to /tmp/data/videos/_1513_small_3.mov
	p1: Downloaded http://csr.bu.edu/asl/private/downloads/2002_8_15/_1516_small_0.mov to /tmp/data/videos/_1516_small_0.mov
	p5: Downloaded http://csr.bu.edu/asl/private/downloads/2002_8_15/_1516_small_1.mov to /tmp/data/videos/_1516_small_1.mov
	p5: Downloaded http://csr.bu.edu/asl/private/downloads/2002_8_15/_1516_small_3.mov to /tmp/data/videos/_1516_small_3.mov
	p6: Downloaded http://csr.bu.edu/asl/private/downloads/2002_8_15/_1522_small_0.mov to /tmp/data/videos/_1522_small_0.mov
	p2: Downloaded http://csr.bu.edu/asl/private/downloads/2002_8_15/_1522_small_1.mov to /tmp/data/videos/_1522_small_1.mov
	p8: Downloaded http://csr.bu.edu/asl/private/downloads/2002_8_15/_1522_small_3.mov to /tmp/data/videos/_1522_small_3.mov
	p7: Downloaded http://csr.bu.edu/asl/private/downloads/2002_8_15/_1523_small_0.mov to /tmp/data/videos/_1523_small_0.mov
	p6: Downloaded http://csr.bu.edu/asl/private/downloads/2002_8_15/_1523_small_1.mov to /tmp/data/videos/_1523_small_1.mov
	p2: Downloaded http://csr.bu.edu/asl/private/downloads/2002_8_15/_1523_small_3.mov to /tmp/data/videos/_1523_small_3.mov
	p1: Downloaded http://csr.bu.edu/asl/private/downloads/2002_8_15/_1524_small_0.mov to /tmp/data/videos/_1524_small_0.mov
	p5: Downloaded http://csr.bu.edu/asl/private/downloads/2002_8_15/_1524_small_1.mov to /tmp/data/videos/_1524_small_1.mov
	p5: Downloaded http://csr.bu.edu/asl/private/downloads/2002_8_15/_1524_small_3.mov to /tmp/data/videos/_1524_small_3.mov
	p8: Downloaded http://csr.bu.edu/asl/private/downloads/2002_8_15/_1525_small_0.mov to /tmp/data/videos/_1525_small_0.mov
	p6: Downloaded http://csr.bu.edu/asl/private/downloads/2002_8_15/_1525_small_1.mov to /tmp/data/videos/_1525_small_1.mov
	p8: Downloaded http://csr.bu.edu/asl/private/downloads/2002_8_15/_1525_small_3.mov to /tmp/data/videos/_1525_small_3.mov
	p7: Downloaded http://csr.bu.edu/asl/private/downloads/2002_8_15/_1526_small_0.mov to /tmp/data/videos/_1526_small_0.mov
	p3: Downloaded http://csr.bu.edu/asl/private/downloads/2002_8_15/_1526_small_1.mov to /tmp/data/videos/_1526_small_1.mov
	p4: Downloaded http://csr.bu.edu/asl/private/downloads/2002_8_15/_1526_small_3.mov to /tmp/data/videos/_1526_small_3.mov
	p7: Downloaded http://csr.bu.edu/asl/private/downloads/2002_8_15/_1531_small_0.mov to /tmp/data/videos/_1531_small_0.mov
	p2: Downloaded http://csr.bu.edu/asl/private/downloads/2002_8_15/_1531_small_1.mov to /tmp/data/videos/_1531_small_1.mov
	p7: Downloaded http://csr.bu.edu/asl/private/downloads/2002_8_15/_1531_small_3.mov to /tmp/data/videos/_1531_small_3.mov
	p8: Downloaded http://csr.bu.edu/asl/private/downloads/2002_8_15/_1532_small_0.mov to /tmp/data/videos/_1532_small_0.mov
	p1: Downloaded http://csr.bu.edu/asl/private/downloads/2002_8_15/_1532_small_1.mov to /tmp/data/videos/_1532_small_1.mov
	p7: Downloaded http://csr.bu.edu/asl/private/downloads/2002_8_15/_1532_small_3.mov to /tmp/data/videos/_1532_small_3.mov
	p5: Downloaded http://csr.bu.edu/asl/private/downloads/2002_8_15/_1533_small_0.mov to /tmp/data/videos/_1533_small_0.mov
	p6: Downloaded http://csr.bu.edu/asl/private/downloads/2002_8_15/_1533_small_1.mov to /tmp/data/videos/_1533_small_1.mov
	p4: Downloaded http://csr.bu.edu/asl/private/downloads/2002_8_15/_1533_small_3.mov to /tmp/data/videos/_1533_small_3.mov
	p4: Downloaded http://csr.bu.edu/asl/private/downloads/2002_8_15/_1535_small_0.mov to /tmp/data/videos/_1535_small_0.mov
	p8: Downloaded http://csr.bu.edu/asl/private/downloads/2002_8_15/_1535_small_1.mov to /tmp/data/videos/_1535_small_1.mov
	p2: Downloaded http://csr.bu.edu/asl/private/downloads/2002_8_15/_1535_small_3.mov to /tmp/data/videos/_1535_small_3.mov
	p2: Downloaded http://csr.bu.edu/asl/private/downloads/2002_8_15/_1536_small_0.mov to /tmp/data/videos/_1536_small_0.mov
	p5: Downloaded http://csr.bu.edu/asl/private/downloads/2002_8_15/_1536_small_1.mov to /tmp/data/videos/_1536_small_1.mov
	p3: Downloaded http://csr.bu.edu/asl/private/downloads/2002_8_15/_1536_small_3.mov to /tmp/data/videos/_1536_small_3.mov
	p6: Downloaded http://csr.bu.edu/asl/private/downloads/2002_8_15/_1537_small_0.mov to /tmp/data/videos/_1537_small_0.mov
	p2: Downloaded http://csr.bu.edu/asl/private/downloads/2002_8_15/_1537_small_1.mov to /tmp/data/videos/_1537_small_1.mov
	p7: Downloaded http://csr.bu.edu/asl/private/downloads/2002_8_15/_1537_small_3.mov to /tmp/data/videos/_1537_small_3.mov
	p3: Downloaded http://csr.bu.edu/asl/private/downloads/2002_8_15/_1538_small_0.mov to /tmp/data/videos/_1538_small_0.mov
	p3: Downloaded http://csr.bu.edu/asl/private/downloads/2002_8_15/_1538_small_1.mov to /tmp/data/videos/_1538_small_1.mov
	p3: Downloaded http://csr.bu.edu/asl/private/downloads/2002_8_15/_1538_small_3.mov to /tmp/data/videos/_1538_small_3.mov
	p5: Downloaded http://csr.bu.edu/asl/private/downloads/2002_8_15/_1539_small_0.mov to /tmp/data/videos/_1539_small_0.mov
	p3: Added 81 frames from segment _1516_small_0.mov for target video _1516_small_0.mov (stitched-frames dir /tmp/data/stitched_video_frames/_1516_small_0)
	p3: Added 81 frames from segment _1526_small_1.mov for target video _1526_small_1.mov (stitched-frames dir /tmp/data/stitched_video_frames/_1526_small_1)
	p4: Added 81 frames from segment _1516_small_3.mov for target video _1516_small_3.mov (stitched-frames dir /tmp/data/stitched_video_frames/_1516_small_3)
	p6: Added 81 frames from segment _1536_small_3.mov for target video _1536_small_3.mov (stitched-frames dir /tmp/data/stitched_video_frames/_1536_small_3)
	p8: Added 85 frames from segment _1512_small_0.mov for target video _1512_small_0.mov (stitched-frames dir /tmp/data/stitched_video_frames/_1512_small_0)
	p4: Added 89 frames from segment _1510_small_1.mov for target video _1510_small_1.mov (stitched-frames dir /tmp/data/stitched_video_frames/_1510_small_1)
	p2: Added 93 frames from segment _1533_small_0.mov for target video _1533_small_0.mov (stitched-frames dir /tmp/data/stitched_video_frames/_1533_small_0)
	p6: Added 129 frames from segment _1523_small_3.mov for target video _1523_small_3.mov (stitched-frames dir /tmp/data/stitched_video_frames/_1523_small_3)
	p8: Added 71 frames from segment _1537_small_1.mov for target video _1537_small_1.mov (stitched-frames dir /tmp/data/stitched_video_frames/_1537_small_1)
	p6: Added 65 frames from segment _1524_small_1.mov for target video _1524_small_1.mov (stitched-frames dir /tmp/data/stitched_video_frames/_1524_small_1)
	p4: Added 81 frames from segment _1526_small_3.mov for target video _1526_small_3.mov (stitched-frames dir /tmp/data/stitched_video_frames/_1526_small_3)
	p7: Added 65 frames from segment _1524_small_0.mov for target video _1524_small_0.mov (stitched-frames dir /tmp/data/stitched_video_frames/_1524_small_0)
	p4: Added 81 frames from segment _1516_small_1.mov for target video _1516_small_1.mov (stitched-frames dir /tmp/data/stitched_video_frames/_1516_small_1)
	p6: Added 89 frames from segment _1510_small_3.mov for target video _1510_small_3.mov (stitched-frames dir /tmp/data/stitched_video_frames/_1510_small_3)
	p3: Added 121 frames from segment _1531_small_0.mov for target video _1531_small_0.mov (stitched-frames dir /tmp/data/stitched_video_frames/_1531_small_0)
	p5: Added 93 frames from segment _1533_small_3.mov for target video _1533_small_3.mov (stitched-frames dir /tmp/data/stitched_video_frames/_1533_small_3)
	p4: Added 63 frames from segment _1532_small_3.mov for target video _1532_small_3.mov (stitched-frames dir /tmp/data/stitched_video_frames/_1532_small_3)
	p5: Added 83 frames from segment _1513_small_3.mov for target video _1513_small_3.mov (stitched-frames dir /tmp/data/stitched_video_frames/_1513_small_3)
	p7: Added 81 frames from segment _1536_small_1.mov for target video _1536_small_1.mov (stitched-frames dir /tmp/data/stitched_video_frames/_1536_small_1)
	p8: Added 93 frames from segment _1533_small_1.mov for target video _1533_small_1.mov (stitched-frames dir /tmp/data/stitched_video_frames/_1533_small_1)
	p3: Added 81 frames from segment _1526_small_0.mov for target video _1526_small_0.mov (stitched-frames dir /tmp/data/stitched_video_frames/_1526_small_0)
	p3: Added 71 frames from segment _1537_small_0.mov for target video _1537_small_0.mov (stitched-frames dir /tmp/data/stitched_video_frames/_1537_small_0)
	p5: Added 71 frames from segment _1537_small_3.mov for target video _1537_small_3.mov (stitched-frames dir /tmp/data/stitched_video_frames/_1537_small_3)
	p5: Added 65 frames from segment _1524_small_3.mov for target video _1524_small_3.mov (stitched-frames dir /tmp/data/stitched_video_frames/_1524_small_3)
	p7: Added 89 frames from segment _1510_small_0.mov for target video _1510_small_0.mov (stitched-frames dir /tmp/data/stitched_video_frames/_1510_small_0)
	p5: Added 129 frames from segment _1523_small_1.mov for target video _1523_small_1.mov (stitched-frames dir /tmp/data/stitched_video_frames/_1523_small_1)
	p4: Added 89 frames from segment _1508_small_3.mov for target video _1508_small_3.mov (stitched-frames dir /tmp/data/stitched_video_frames/_1508_small_3)
	p5: Added 111 frames from segment _1539_small_0.mov for target video _1539_small_0.mov (stitched-frames dir /tmp/data/stitched_video_frames/_1539_small_0)
	p6: Added 63 frames from segment _1532_small_1.mov for target video _1532_small_1.mov (stitched-frames dir /tmp/data/stitched_video_frames/_1532_small_1)
	p5: Added 85 frames from segment _1512_small_1.mov for target video _1512_small_1.mov (stitched-frames dir /tmp/data/stitched_video_frames/_1512_small_1)
	p6: Added 111 frames from segment _1535_small_0.mov for target video _1535_small_0.mov (stitched-frames dir /tmp/data/stitched_video_frames/_1535_small_0)
	p5: Added 85 frames from segment _1512_small_3.mov for target video _1512_small_3.mov (stitched-frames dir /tmp/data/stitched_video_frames/_1512_small_3)
	p4: Added 81 frames from segment _1522_small_1.mov for target video _1522_small_1.mov (stitched-frames dir /tmp/data/stitched_video_frames/_1522_small_1)
	p8: Added 81 frames from segment _1522_small_0.mov for target video _1522_small_0.mov (stitched-frames dir /tmp/data/stitched_video_frames/_1522_small_0)
	p1: Added 121 frames from segment _1531_small_1.mov for target video _1531_small_1.mov (stitched-frames dir /tmp/data/stitched_video_frames/_1531_small_1)
	p4: Added 83 frames from segment _1513_small_1.mov for target video _1513_small_1.mov (stitched-frames dir /tmp/data/stitched_video_frames/_1513_small_1)
	p2: Added 71 frames from segment _1525_small_1.mov for target video _1525_small_1.mov (stitched-frames dir /tmp/data/stitched_video_frames/_1525_small_1)
	p3: Added 89 frames from segment _1538_small_0.mov for target video _1538_small_0.mov (stitched-frames dir /tmp/data/stitched_video_frames/_1538_small_0)
	p7: Added 89 frames from segment _1538_small_3.mov for target video _1538_small_3.mov (stitched-frames dir /tmp/data/stitched_video_frames/_1538_small_3)
	p7: Added 63 frames from segment _1532_small_0.mov for target video _1532_small_0.mov (stitched-frames dir /tmp/data/stitched_video_frames/_1532_small_0)
	p3: Added 71 frames from segment _1525_small_3.mov for target video _1525_small_3.mov (stitched-frames dir /tmp/data/stitched_video_frames/_1525_small_3)
	p6: Added 129 frames from segment _1523_small_0.mov for target video _1523_small_0.mov (stitched-frames dir /tmp/data/stitched_video_frames/_1523_small_0)
	p6: Added 71 frames from segment _1525_small_0.mov for target video _1525_small_0.mov (stitched-frames dir /tmp/data/stitched_video_frames/_1525_small_0)
	p8: Added 81 frames from segment _1536_small_0.mov for target video _1536_small_0.mov (stitched-frames dir /tmp/data/stitched_video_frames/_1536_small_0)
	p6: Added 81 frames from segment _1522_small_3.mov for target video _1522_small_3.mov (stitched-frames dir /tmp/data/stitched_video_frames/_1522_small_3)
	p2: Added 111 frames from segment _1535_small_1.mov for target video _1535_small_1.mov (stitched-frames dir /tmp/data/stitched_video_frames/_1535_small_1)
	p2: Added 83 frames from segment _1513_small_0.mov for target video _1513_small_0.mov (stitched-frames dir /tmp/data/stitched_video_frames/_1513_small_0)
	p4: Added 121 frames from segment _1531_small_3.mov for target video _1531_small_3.mov (stitched-frames dir /tmp/data/stitched_video_frames/_1531_small_3)
	p3: Added 111 frames from segment _1535_small_3.mov for target video _1535_small_3.mov (stitched-frames dir /tmp/data/stitched_video_frames/_1535_small_3)
	p2: Added 89 frames from segment _1538_small_1.mov for target video _1538_small_1.mov (stitched-frames dir /tmp/data/stitched_video_frames/_1538_small_1)




****************************** Finished pipeline job: sc-fids-capstone-etl-demo--download-videos-extract-frames ******************************
df_filtered_target_vid_index_schemad_pcoll
<style scoped> .dataframe tbody tr th:only-of-type { vertical-align: middle; }
.dataframe tbody tr th {
    vertical-align: top;
}

.dataframe thead th {
    text-align: right;
}
</style>
target_video_filename video_seq_id perspective_cam_id compressed_mov_url uncompressed_avi_url uncompressed_avi_mirror_1_url uncompressed_avi_mirror_2_url
0 _1508_small_3.mov 1508 3 http://csr.bu.edu/asl/private/downloads/2002_8... http://csr.bu.edu/asl0/working/tapes3/2002_08_...
1 _1510_small_0.mov 1510 0 http://csr.bu.edu/asl/private/downloads/2002_8... http://csr.bu.edu/asl0/working/tapes3/2002_08_...
2 _1510_small_1.mov 1510 1 http://csr.bu.edu/asl/private/downloads/2002_8... http://csr.bu.edu/asl0/working/tapes3/2002_08_...
3 _1510_small_3.mov 1510 3 http://csr.bu.edu/asl/private/downloads/2002_8... http://csr.bu.edu/asl0/working/tapes3/2002_08_...
4 _1512_small_0.mov 1512 0 http://csr.bu.edu/asl/private/downloads/2002_8... http://csr.bu.edu/asl0/working/tapes3/2002_08_...
5 _1512_small_1.mov 1512 1 http://csr.bu.edu/asl/private/downloads/2002_8... http://csr.bu.edu/asl0/working/tapes3/2002_08_...
6 _1512_small_3.mov 1512 3 http://csr.bu.edu/asl/private/downloads/2002_8... http://csr.bu.edu/asl0/working/tapes3/2002_08_...
7 _1513_small_0.mov 1513 0 http://csr.bu.edu/asl/private/downloads/2002_8... http://csr.bu.edu/asl0/working/tapes3/2002_08_...
8 _1513_small_1.mov 1513 1 http://csr.bu.edu/asl/private/downloads/2002_8... http://csr.bu.edu/asl0/working/tapes3/2002_08_...
9 _1513_small_3.mov 1513 3 http://csr.bu.edu/asl/private/downloads/2002_8... http://csr.bu.edu/asl0/working/tapes3/2002_08_...
10 _1516_small_0.mov 1516 0 http://csr.bu.edu/asl/private/downloads/2002_8... http://csr.bu.edu/asl0/working/tapes3/2002_08_...
11 _1516_small_1.mov 1516 1 http://csr.bu.edu/asl/private/downloads/2002_8... http://csr.bu.edu/asl0/working/tapes3/2002_08_...
12 _1516_small_3.mov 1516 3 http://csr.bu.edu/asl/private/downloads/2002_8... http://csr.bu.edu/asl0/working/tapes3/2002_08_...
13 _1522_small_0.mov 1522 0 http://csr.bu.edu/asl/private/downloads/2002_8... http://csr.bu.edu/asl0/working/tapes3/2002_08_...
14 _1522_small_1.mov 1522 1 http://csr.bu.edu/asl/private/downloads/2002_8... http://csr.bu.edu/asl0/working/tapes3/2002_08_...
15 _1522_small_3.mov 1522 3 http://csr.bu.edu/asl/private/downloads/2002_8... http://csr.bu.edu/asl0/working/tapes3/2002_08_...
16 _1523_small_0.mov 1523 0 http://csr.bu.edu/asl/private/downloads/2002_8... http://csr.bu.edu/asl0/working/tapes3/2002_08_...
17 _1523_small_1.mov 1523 1 http://csr.bu.edu/asl/private/downloads/2002_8... http://csr.bu.edu/asl0/working/tapes3/2002_08_...
18 _1523_small_3.mov 1523 3 http://csr.bu.edu/asl/private/downloads/2002_8... http://csr.bu.edu/asl0/working/tapes3/2002_08_...
19 _1524_small_0.mov 1524 0 http://csr.bu.edu/asl/private/downloads/2002_8... http://csr.bu.edu/asl0/working/tapes3/2002_08_...
20 _1524_small_1.mov 1524 1 http://csr.bu.edu/asl/private/downloads/2002_8... http://csr.bu.edu/asl0/working/tapes3/2002_08_...
21 _1524_small_3.mov 1524 3 http://csr.bu.edu/asl/private/downloads/2002_8... http://csr.bu.edu/asl0/working/tapes3/2002_08_...
22 _1525_small_0.mov 1525 0 http://csr.bu.edu/asl/private/downloads/2002_8... http://csr.bu.edu/asl0/working/tapes3/2002_08_...
23 _1525_small_1.mov 1525 1 http://csr.bu.edu/asl/private/downloads/2002_8... http://csr.bu.edu/asl0/working/tapes3/2002_08_...
24 _1525_small_3.mov 1525 3 http://csr.bu.edu/asl/private/downloads/2002_8... http://csr.bu.edu/asl0/working/tapes3/2002_08_...
25 _1526_small_0.mov 1526 0 http://csr.bu.edu/asl/private/downloads/2002_8... http://csr.bu.edu/asl0/working/tapes3/2002_08_...
26 _1526_small_1.mov 1526 1 http://csr.bu.edu/asl/private/downloads/2002_8... http://csr.bu.edu/asl0/working/tapes3/2002_08_...
27 _1526_small_3.mov 1526 3 http://csr.bu.edu/asl/private/downloads/2002_8... http://csr.bu.edu/asl0/working/tapes3/2002_08_...
28 _1531_small_0.mov 1531 0 http://csr.bu.edu/asl/private/downloads/2002_8... http://csr.bu.edu/asl0/working/tapes3/2002_08_...
29 _1531_small_1.mov 1531 1 http://csr.bu.edu/asl/private/downloads/2002_8... http://csr.bu.edu/asl0/working/tapes3/2002_08_...
30 _1531_small_3.mov 1531 3 http://csr.bu.edu/asl/private/downloads/2002_8... http://csr.bu.edu/asl0/working/tapes3/2002_08_...
31 _1532_small_0.mov 1532 0 http://csr.bu.edu/asl/private/downloads/2002_8... http://csr.bu.edu/asl0/working/tapes3/2002_08_...
32 _1532_small_1.mov 1532 1 http://csr.bu.edu/asl/private/downloads/2002_8... http://csr.bu.edu/asl0/working/tapes3/2002_08_...
33 _1532_small_3.mov 1532 3 http://csr.bu.edu/asl/private/downloads/2002_8... http://csr.bu.edu/asl0/working/tapes3/2002_08_...
34 _1533_small_0.mov 1533 0 http://csr.bu.edu/asl/private/downloads/2002_8... http://csr.bu.edu/asl0/working/tapes3/2002_08_...
35 _1533_small_1.mov 1533 1 http://csr.bu.edu/asl/private/downloads/2002_8... http://csr.bu.edu/asl0/working/tapes3/2002_08_...
36 _1533_small_3.mov 1533 3 http://csr.bu.edu/asl/private/downloads/2002_8... http://csr.bu.edu/asl0/working/tapes3/2002_08_...
37 _1535_small_0.mov 1535 0 http://csr.bu.edu/asl/private/downloads/2002_8... http://csr.bu.edu/asl0/working/tapes3/2002_08_...
38 _1535_small_1.mov 1535 1 http://csr.bu.edu/asl/private/downloads/2002_8... http://csr.bu.edu/asl0/working/tapes3/2002_08_...
39 _1535_small_3.mov 1535 3 http://csr.bu.edu/asl/private/downloads/2002_8... http://csr.bu.edu/asl0/working/tapes3/2002_08_...
40 _1536_small_0.mov 1536 0 http://csr.bu.edu/asl/private/downloads/2002_8... http://csr.bu.edu/asl0/working/tapes3/2002_08_...
41 _1536_small_1.mov 1536 1 http://csr.bu.edu/asl/private/downloads/2002_8... http://csr.bu.edu/asl0/working/tapes3/2002_08_...
42 _1536_small_3.mov 1536 3 http://csr.bu.edu/asl/private/downloads/2002_8... http://csr.bu.edu/asl0/working/tapes3/2002_08_...
43 _1537_small_0.mov 1537 0 http://csr.bu.edu/asl/private/downloads/2002_8... http://csr.bu.edu/asl0/working/tapes3/2002_08_...
44 _1537_small_1.mov 1537 1 http://csr.bu.edu/asl/private/downloads/2002_8... http://csr.bu.edu/asl0/working/tapes3/2002_08_...
45 _1537_small_3.mov 1537 3 http://csr.bu.edu/asl/private/downloads/2002_8... http://csr.bu.edu/asl0/working/tapes3/2002_08_...
46 _1538_small_0.mov 1538 0 http://csr.bu.edu/asl/private/downloads/2002_8... http://csr.bu.edu/asl0/working/tapes3/2002_08_...
47 _1538_small_1.mov 1538 1 http://csr.bu.edu/asl/private/downloads/2002_8... http://csr.bu.edu/asl0/working/tapes3/2002_08_...
48 _1538_small_3.mov 1538 3 http://csr.bu.edu/asl/private/downloads/2002_8... http://csr.bu.edu/asl0/working/tapes3/2002_08_...
49 _1539_small_0.mov 1539 0 http://csr.bu.edu/asl/private/downloads/2002_8... http://csr.bu.edu/asl0/working/tapes3/2002_08_...
df_merged_download_results
<style scoped> .dataframe tbody tr th:only-of-type { vertical-align: middle; }
.dataframe tbody tr th {
    vertical-align: top;
}

.dataframe thead th {
    text-align: right;
}
</style>
target_video_fname target_video_frames_dir segment_url segment_fname
0 _1508_small_3.mov /tmp/data/stitched_video_frames/_1508_small_3 http://csr.bu.edu/asl/private/downloads/2002_8... _1508_small_3.mov
1 _1513_small_1.mov /tmp/data/stitched_video_frames/_1513_small_1 http://csr.bu.edu/asl/private/downloads/2002_8... _1513_small_1.mov
2 _1523_small_0.mov /tmp/data/stitched_video_frames/_1523_small_0 http://csr.bu.edu/asl/private/downloads/2002_8... _1523_small_0.mov
3 _1525_small_3.mov /tmp/data/stitched_video_frames/_1525_small_3 http://csr.bu.edu/asl/private/downloads/2002_8... _1525_small_3.mov
4 _1532_small_1.mov /tmp/data/stitched_video_frames/_1532_small_1 http://csr.bu.edu/asl/private/downloads/2002_8... _1532_small_1.mov
5 _1536_small_0.mov /tmp/data/stitched_video_frames/_1536_small_0 http://csr.bu.edu/asl/private/downloads/2002_8... _1536_small_0.mov
6 _1538_small_3.mov /tmp/data/stitched_video_frames/_1538_small_3 http://csr.bu.edu/asl/private/downloads/2002_8... _1538_small_3.mov
7 _1512_small_0.mov /tmp/data/stitched_video_frames/_1512_small_0 http://csr.bu.edu/asl/private/downloads/2002_8... _1512_small_0.mov
8 _1516_small_3.mov /tmp/data/stitched_video_frames/_1516_small_3 http://csr.bu.edu/asl/private/downloads/2002_8... _1516_small_3.mov
9 _1524_small_1.mov /tmp/data/stitched_video_frames/_1524_small_1 http://csr.bu.edu/asl/private/downloads/2002_8... _1524_small_1.mov
10 _1531_small_0.mov /tmp/data/stitched_video_frames/_1531_small_0 http://csr.bu.edu/asl/private/downloads/2002_8... _1531_small_0.mov
11 _1533_small_3.mov /tmp/data/stitched_video_frames/_1533_small_3 http://csr.bu.edu/asl/private/downloads/2002_8... _1533_small_3.mov
12 _1537_small_1.mov /tmp/data/stitched_video_frames/_1537_small_1 http://csr.bu.edu/asl/private/downloads/2002_8... _1537_small_1.mov
13 _1510_small_1.mov /tmp/data/stitched_video_frames/_1510_small_1 http://csr.bu.edu/asl/private/downloads/2002_8... _1510_small_1.mov
14 _1516_small_0.mov /tmp/data/stitched_video_frames/_1516_small_0 http://csr.bu.edu/asl/private/downloads/2002_8... _1516_small_0.mov
15 _1523_small_3.mov /tmp/data/stitched_video_frames/_1523_small_3 http://csr.bu.edu/asl/private/downloads/2002_8... _1523_small_3.mov
16 _1526_small_1.mov /tmp/data/stitched_video_frames/_1526_small_1 http://csr.bu.edu/asl/private/downloads/2002_8... _1526_small_1.mov
17 _1533_small_0.mov /tmp/data/stitched_video_frames/_1533_small_0 http://csr.bu.edu/asl/private/downloads/2002_8... _1533_small_0.mov
18 _1536_small_3.mov /tmp/data/stitched_video_frames/_1536_small_3 http://csr.bu.edu/asl/private/downloads/2002_8... _1536_small_3.mov
19 _1510_small_0.mov /tmp/data/stitched_video_frames/_1510_small_0 http://csr.bu.edu/asl/private/downloads/2002_8... _1510_small_0.mov
20 _1513_small_3.mov /tmp/data/stitched_video_frames/_1513_small_3 http://csr.bu.edu/asl/private/downloads/2002_8... _1513_small_3.mov
21 _1523_small_1.mov /tmp/data/stitched_video_frames/_1523_small_1 http://csr.bu.edu/asl/private/downloads/2002_8... _1523_small_1.mov
22 _1526_small_0.mov /tmp/data/stitched_video_frames/_1526_small_0 http://csr.bu.edu/asl/private/downloads/2002_8... _1526_small_0.mov
23 _1532_small_3.mov /tmp/data/stitched_video_frames/_1532_small_3 http://csr.bu.edu/asl/private/downloads/2002_8... _1532_small_3.mov
24 _1536_small_1.mov /tmp/data/stitched_video_frames/_1536_small_1 http://csr.bu.edu/asl/private/downloads/2002_8... _1536_small_1.mov
25 _1539_small_0.mov /tmp/data/stitched_video_frames/_1539_small_0 http://csr.bu.edu/asl/private/downloads/2002_8... _1539_small_0.mov
26 _1512_small_3.mov /tmp/data/stitched_video_frames/_1512_small_3 http://csr.bu.edu/asl/private/downloads/2002_8... _1512_small_3.mov
27 _1522_small_1.mov /tmp/data/stitched_video_frames/_1522_small_1 http://csr.bu.edu/asl/private/downloads/2002_8... _1522_small_1.mov
28 _1525_small_0.mov /tmp/data/stitched_video_frames/_1525_small_0 http://csr.bu.edu/asl/private/downloads/2002_8... _1525_small_0.mov
29 _1531_small_3.mov /tmp/data/stitched_video_frames/_1531_small_3 http://csr.bu.edu/asl/private/downloads/2002_8... _1531_small_3.mov
30 _1535_small_1.mov /tmp/data/stitched_video_frames/_1535_small_1 http://csr.bu.edu/asl/private/downloads/2002_8... _1535_small_1.mov
31 _1538_small_0.mov /tmp/data/stitched_video_frames/_1538_small_0 http://csr.bu.edu/asl/private/downloads/2002_8... _1538_small_0.mov
32 _1510_small_3.mov /tmp/data/stitched_video_frames/_1510_small_3 http://csr.bu.edu/asl/private/downloads/2002_8... _1510_small_3.mov
33 _1516_small_1.mov /tmp/data/stitched_video_frames/_1516_small_1 http://csr.bu.edu/asl/private/downloads/2002_8... _1516_small_1.mov
34 _1524_small_0.mov /tmp/data/stitched_video_frames/_1524_small_0 http://csr.bu.edu/asl/private/downloads/2002_8... _1524_small_0.mov
35 _1526_small_3.mov /tmp/data/stitched_video_frames/_1526_small_3 http://csr.bu.edu/asl/private/downloads/2002_8... _1526_small_3.mov
36 _1533_small_1.mov /tmp/data/stitched_video_frames/_1533_small_1 http://csr.bu.edu/asl/private/downloads/2002_8... _1533_small_1.mov
37 _1537_small_0.mov /tmp/data/stitched_video_frames/_1537_small_0 http://csr.bu.edu/asl/private/downloads/2002_8... _1537_small_0.mov
38 _1512_small_1.mov /tmp/data/stitched_video_frames/_1512_small_1 http://csr.bu.edu/asl/private/downloads/2002_8... _1512_small_1.mov
39 _1522_small_0.mov /tmp/data/stitched_video_frames/_1522_small_0 http://csr.bu.edu/asl/private/downloads/2002_8... _1522_small_0.mov
40 _1524_small_3.mov /tmp/data/stitched_video_frames/_1524_small_3 http://csr.bu.edu/asl/private/downloads/2002_8... _1524_small_3.mov
41 _1531_small_1.mov /tmp/data/stitched_video_frames/_1531_small_1 http://csr.bu.edu/asl/private/downloads/2002_8... _1531_small_1.mov
42 _1535_small_0.mov /tmp/data/stitched_video_frames/_1535_small_0 http://csr.bu.edu/asl/private/downloads/2002_8... _1535_small_0.mov
43 _1537_small_3.mov /tmp/data/stitched_video_frames/_1537_small_3 http://csr.bu.edu/asl/private/downloads/2002_8... _1537_small_3.mov
44 _1513_small_0.mov /tmp/data/stitched_video_frames/_1513_small_0 http://csr.bu.edu/asl/private/downloads/2002_8... _1513_small_0.mov
45 _1522_small_3.mov /tmp/data/stitched_video_frames/_1522_small_3 http://csr.bu.edu/asl/private/downloads/2002_8... _1522_small_3.mov
46 _1525_small_1.mov /tmp/data/stitched_video_frames/_1525_small_1 http://csr.bu.edu/asl/private/downloads/2002_8... _1525_small_1.mov
47 _1532_small_0.mov /tmp/data/stitched_video_frames/_1532_small_0 http://csr.bu.edu/asl/private/downloads/2002_8... _1532_small_0.mov
48 _1535_small_3.mov /tmp/data/stitched_video_frames/_1535_small_3 http://csr.bu.edu/asl/private/downloads/2002_8... _1535_small_3.mov
49 _1538_small_1.mov /tmp/data/stitched_video_frames/_1538_small_1 http://csr.bu.edu/asl/private/downloads/2002_8... _1538_small_1.mov
df_merged_extraction_results.columns = ['segment_fname', 'frames', 'segment_dicts']
df_merged_extraction_results
<style scoped> .dataframe tbody tr th:only-of-type { vertical-align: middle; }
.dataframe tbody tr th {
    vertical-align: top;
}

.dataframe thead th {
    text-align: right;
}
</style>
segment_fname frames segment_dicts
0 _1523_small_3.mov 129 [{'target_video_fname': '_1523_small_3.mov', '...
1 _1531_small_0.mov 121 [{'target_video_fname': '_1531_small_0.mov', '...
2 _1537_small_0.mov 71 [{'target_video_fname': '_1537_small_0.mov', '...
3 _1523_small_1.mov 129 [{'target_video_fname': '_1523_small_1.mov', '...
4 _1525_small_1.mov 71 [{'target_video_fname': '_1525_small_1.mov', '...
5 _1525_small_0.mov 71 [{'target_video_fname': '_1525_small_0.mov', '...
6 _1538_small_1.mov 89 [{'target_video_fname': '_1538_small_1.mov', '...
7 _1536_small_3.mov 81 [{'target_video_fname': '_1536_small_3.mov', '...
8 _1537_small_1.mov 71 [{'target_video_fname': '_1537_small_1.mov', '...
9 _1513_small_3.mov 83 [{'target_video_fname': '_1513_small_3.mov', '...
10 _1524_small_3.mov 65 [{'target_video_fname': '_1524_small_3.mov', '...
11 _1513_small_1.mov 83 [{'target_video_fname': '_1513_small_1.mov', '...
12 _1536_small_0.mov 81 [{'target_video_fname': '_1536_small_0.mov', '...
13 _1516_small_3.mov 81 [{'target_video_fname': '_1516_small_3.mov', '...
14 _1533_small_3.mov 93 [{'target_video_fname': '_1533_small_3.mov', '...
15 _1510_small_0.mov 89 [{'target_video_fname': '_1510_small_0.mov', '...
16 _1512_small_3.mov 85 [{'target_video_fname': '_1512_small_3.mov', '...
17 _1523_small_0.mov 129 [{'target_video_fname': '_1523_small_0.mov', '...
18 _1531_small_3.mov 121 [{'target_video_fname': '_1531_small_3.mov', '...
19 _1526_small_1.mov 81 [{'target_video_fname': '_1526_small_1.mov', '...
20 _1524_small_0.mov 65 [{'target_video_fname': '_1524_small_0.mov', '...
21 _1532_small_3.mov 63 [{'target_video_fname': '_1532_small_3.mov', '...
22 _1508_small_3.mov 89 [{'target_video_fname': '_1508_small_3.mov', '...
23 _1522_small_1.mov 81 [{'target_video_fname': '_1522_small_1.mov', '...
24 _1531_small_1.mov 121 [{'target_video_fname': '_1531_small_1.mov', '...
25 _1535_small_3.mov 111 [{'target_video_fname': '_1535_small_3.mov', '...
26 _1510_small_1.mov 89 [{'target_video_fname': '_1510_small_1.mov', '...
27 _1524_small_1.mov 65 [{'target_video_fname': '_1524_small_1.mov', '...
28 _1533_small_1.mov 93 [{'target_video_fname': '_1533_small_1.mov', '...
29 _1512_small_1.mov 85 [{'target_video_fname': '_1512_small_1.mov', '...
30 _1538_small_0.mov 89 [{'target_video_fname': '_1538_small_0.mov', '...
31 _1538_small_3.mov 89 [{'target_video_fname': '_1538_small_3.mov', '...
32 _1516_small_0.mov 81 [{'target_video_fname': '_1516_small_0.mov', '...
33 _1516_small_1.mov 81 [{'target_video_fname': '_1516_small_1.mov', '...
34 _1536_small_1.mov 81 [{'target_video_fname': '_1536_small_1.mov', '...
35 _1535_small_0.mov 111 [{'target_video_fname': '_1535_small_0.mov', '...
36 _1525_small_3.mov 71 [{'target_video_fname': '_1525_small_3.mov', '...
37 _1535_small_1.mov 111 [{'target_video_fname': '_1535_small_1.mov', '...
38 _1512_small_0.mov 85 [{'target_video_fname': '_1512_small_0.mov', '...
39 _1526_small_3.mov 81 [{'target_video_fname': '_1526_small_3.mov', '...
40 _1537_small_3.mov 71 [{'target_video_fname': '_1537_small_3.mov', '...
41 _1532_small_1.mov 63 [{'target_video_fname': '_1532_small_1.mov', '...
42 _1522_small_0.mov 81 [{'target_video_fname': '_1522_small_0.mov', '...
43 _1513_small_0.mov 83 [{'target_video_fname': '_1513_small_0.mov', '...
44 _1533_small_0.mov 93 [{'target_video_fname': '_1533_small_0.mov', '...
45 _1510_small_3.mov 89 [{'target_video_fname': '_1510_small_3.mov', '...
46 _1526_small_0.mov 81 [{'target_video_fname': '_1526_small_0.mov', '...
47 _1539_small_0.mov 111 [{'target_video_fname': '_1539_small_0.mov', '...
48 _1532_small_0.mov 63 [{'target_video_fname': '_1532_small_0.mov', '...
49 _1522_small_3.mov 81 [{'target_video_fname': '_1522_small_3.mov', '...
print(f"We extracted {df_merged_extraction_results.frames.sum()} frames from {df_merged_extraction_results.segment_fname.count()} downloaded segments.")
We extracted 4382 frames from 50 downloaded segments.


Conclusion

This notebook hopefully adequately demonstrated the power of data processing parallelism that can be accomplished by using Apache Beam. Remember, the full pipeline is executed in the GCP Dataflow environment. This project was really only feasible doing it in that environment.

The following statistics demonstrate this.


Trying to do it locally, without Apache Beam

As I said when I began this summary, when I first started my work on this project, I did some initial testing using the Pandas DataFrame approach.

First, I ran out of space and my MacBook Pro was very upset with me.

Second, the sequence didn't complete.

Third, I let the process run for more than 18 hours.

When I rebooted, I found the sequence had only progressed about 20% of the way through the entire list of videos.

Of course, since I required a way to relate other information about the videos and frames, such as the data about the ASL consultant, camera perspective, etc., I had the "brilliant" idea to stuff each and every image byte into a table (csv). No wonder I ran out of space.

I estimated that had I let it finish, assuming I had enough local disk space to do so, it would have taken close to a week to do so.


And with Apache Beam using GCP Dataflow as the runner, coupled with GCS for storage

Whereas, by executing the pipeline in GCP Dataflow, the entire pipeline took a little over 4 hours to complete, including the validation-train split (which will be discussed in another blog post.). See the snapshot below.


Dataflow-Jobs-All-ETL


Other Statistics

Total Frame Count: 561,000+
Total Disk Space: 17 GB+
19 Datasets

But only 2 are used to train the DNN.

Please keep in mind that I did this, basically at half resolution, at 30 FPS when the videos were produced at 60 FPS.

I had to compromise somewhere.


Final Report

The total calendar time from start to finish was a little over four months.

That seems like a long time. But this was a MASSIVE project and before I realized I bit off more than I could chew initially, I was already committed.

So I persevered, did A LOT of research, and learned how to do Big Data.

During that time frame I worked on this nearly every day, taking time off to sleep and when I needed a sanity check.

Most of the time I worked an average of probably at least 10 hours per day.

One thing I will say is that Apache Beam is not for the light-hearted.

It is fairly low-level. This is not the Pandas framework, folks.

One needs to build nearly everything oneself.

BUT... the catch is that one can do Big Data. One can do data processing the likes of which would not otherwise be possible on a single, local machine.

That's BIG! It's not called "Big Data" for nothing.

In spite of all the pain, I give Apache Beam 5 out of 5 stars.

It was well worth the blood, sweat, and tears.

So... thank you and goodnight!

About

Big Data ETL Pipeline for ASL-to-Text (Computer Vision), using Apache Beam on GCP Dataflow

Topics

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published