Skip to content
Philip (flip) Kromer edited this page Jun 5, 2012 · 4 revisions

wukong -- Core Concepts

FIXME: metadata handling

FIXME: `inputs`? `input`? `source`? // `outputs`? `output`? `sink`?
TODO: How to cleanly do a 'read off header lines, pass through the rest'?
TODO: Can I replace myself with an identity streamer?

FIXED: `process` not `call`
FIXED: `Processor`, not Transform or Transformer?
FIXED: stick with `.receive` not `.make`

File.open('xxes.tsv').each do |rec1| rec2 = db.get()

o1, o2 = act(rec1, rec2)

end

Data

Record

A wukong record responds to:

  • to_wire -- conversion of the record and all its children to wireable types (Integer/Float/String/Hash/Array/true/false/nil). When stringified, corresponds to the body of a flume event, the body of an HTTP request/response, or the value of a mapreduce key-value pair. foo.class.receive(foo.to_wire) should behave identically to the original object.
  • _metadata -- bucket of key-value pairs shipped around with this record. Non-nested; use dot-addressed keys. Corresponds to flume metadata; to HTTP headers (where . becomes -, prefixed with Wu-); serialized in-record for mapreduces. Keys must be lowercased dot-separated identifiers (dash - is not OK). Values must be primitive types (Integer/Float/String/true/false/nil -- no arrays or hashes) and should be robust against stringification mid-flight (that is, be prepared for true => "true", false => "false", and nil => ""). The following are reserved item names within the metadata bucket:
    • id -- arbitrary stringlike value identifying this record among all records. Records with the same [partition, group, ID] are considered interchangeable.
    • partition -- named dataflow path. All records in a partition should meet the uniform contract of its consumers. Lowercase identifier string.
    • channel -- named subgrouping of records. Lowercase identifier string.
    • group -- alias for channel.

Event

An event is a model that additionally responds to:

  • timestamp -- time the event originated, assigned by the origin (as anything they like) and unchanged afterwards. A UTC ruby time, serialized as a unix timestamp.
  • origin -- name for the source of this record; in flume, the dispatching host. This influences delivery guarantees. A downcased, dasherized, dot-separated identifier.
  • nano_ctr -- nanosecond timestamp, monotonically-increasing within each origin. The [origin, nano_ctr] pair may be considered globally unique. Serialized as whatever flume uses.

Partitioning

  • Universe -- mapred hdfs ; flume master ; mongoid -- install ; mysql -- install
  • Flow -- mapred partition ; flume stream ; mongoid -- collection ; mysql -- db
  • Channel -- mapred group key ; flume channel ; mongoid -- _type ; mysql -- table
  • Id -- mapred record id ; flume ev id ; mongoid -- _id ; mysql -- id

Stages

A stage is a node in a directed graph. It has a

  • name
  • inputs -- (note the 's'). collection of stages that are inputs to the node. If the node is a source, this will be empty.
  • output -- an item that responds to process.

and responds to

  • initialize(options{name, flow})
  • setup(hsh{input, output})
  • stop
  • report -- returns a bucket summarizing the state of the stage itself.

Source (driven)

An event-driven ('push') source.

  • each -- gives source a block; does block.process on each record. The block in this case serves as a callback.
  • empty?
  • output

Source (iterable)

An iterable ('pull') source of data.

  • pop (?get?) (?get_next?) (next is a ruby keyword)
  • empty?

At some point, one might implement BufferedDriven wrapper that turns any driver (push) into a iterable (pull). Similarly, we can turn an iterable into a driver by polling it in a loop; in flume, this is done by source runners, and in vayacondios, by biographers.

Transform

  • input --

  • output

  • process(rec) -- handle a record. Calls output.process(rec) to emit a record downstream; the return value of process is discarded.

  • ?? error

Sink

  • input
  • process(rec) -- handle a record

Proc

  • call(rec) -- given one record, returns one record
  • invoking to_transform(output) decorates with a process method def process(rec) output.process(self.call(rec)) ; end

Transforms

Filter

Stringifier

Types

Primitive types:

  • Integer (int, long)
  • Float (float, double)
  • String (string, bytes, fixed, enum)
  • true/false (boolean)
  • nil

Wireable types:

The primitive types plus

  • Array (array)
  • Hash (map)

Structured types:

Transported as a key-value bucket, with _type naming the factory capable of reconstituting it.

Identifier formats:

An identifier starts with a lowercase letter, and contains only lowercased letters, numbers and underscore characters. In some cases, we transport a predictable tuple of identifiers; typically separated with . dots. Lastly, we use the dasherized form for internet hostnames.

  • downcased identifier: /\A([a-z][a-z0-9\_]*)\z/
  • downcased, dot-separated identifier: /\A([a-z][a-z0-9\_]*)(\.[a-z][a-z0-9\_]*)*\z/
  • downcased, dasherized, dot-separated identifier: /\A([a-z][a-z0-9\-]*)(\.[a-z][a-z0-9\-]*)*\z/

Error

There "known" errors -- think of the difference between a 4xx error (NotFoundError, BadRequestError) and a 5xx error (InternalServerError). In the former case, everything was processed correctly but the request was invalid (ie it's the sender's fault). In the latter case, the record was not processed correctly.

Exceptions have the following class_attributes:

  • description, a string

  • status_code, an integer

  • ProcessingError

    • BadRecordError
      • BadPayloadError -- can't generate raw structure from blob; un-JSONize, bad UTF, etc
      • TypeMismatchError -- factory can't generate object from raw structure
    • OversizeBodyError -- too large

Execution

Agent

Process hosting this graph segment.


Notifications

Announce/Discover

"Traits" of stages

  • Fileish (path, dir),
  • Serverish (capability, address (port+addr), realm (cluster))
  • Queueish (size)

Event -- other programs

Cube: Cube event

  • body -- data; structured.
  • metadata -- no special support, so serialized in as meta.
  • timestamp -- time (ISO 8601 UTC),
  • channel -- type.
  • key -- id.

Flume: Flume user guide

  • body -- text blob, maximum of 32KB per event (flume.event.max.size.bytes)
  • metadata:
    • table with an arbitrary number of attribute value pairs.
    • priority (trace, debug, info, warn, error, or fatal)
    • Source host
  • stream -- none natively, use extended metadata
  • channel -- none natively, use extended metadata
  • time:
    • unix timestamp
    • nanosecond timestamp (considered monotonic per machine)
Clone this wiki locally