Skip to content

Commit

Permalink
Merge pull request #89 from acmiyaguchi/tuples
Browse files Browse the repository at this point in the history
Add support for tuples in the form of anonymous structs
  • Loading branch information
acmiyaguchi committed Sep 26, 2019
2 parents 553a3dd + 204bb7d commit 3c0b33b
Show file tree
Hide file tree
Showing 16 changed files with 711 additions and 46 deletions.
1 change: 1 addition & 0 deletions .gitignore
Expand Up @@ -5,3 +5,4 @@ __pycache__/
**/*.rs.bk

/schemas
/test_tuple_results
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
@@ -1,6 +1,6 @@
[package]
name = "jsonschema-transpiler"
version = "1.4.1"
version = "1.5.0"
authors = ["Anthony Miyaguchi <amiyaguchi@mozilla.com>"]
description = "A tool to transpile JSON Schema into schemas for data processing"
license = "MPL-2.0"
Expand Down
4 changes: 2 additions & 2 deletions scripts/mps-download-schemas.sh
Expand Up @@ -2,6 +2,6 @@
# Download production mozilla-pipeline-schemas into a schema folder

cd "$(dirname "$0")/.." || exit
curl -o schemas.tar.gz -L https://github.com/mozilla-services/mozilla-pipeline-schemas/archive/dev.tar.gz
tar --strip-components=1 -xvf schemas.tar.gz mozilla-pipeline-schemas-dev/schemas
curl -o schemas.tar.gz -L https://github.com/mozilla-services/mozilla-pipeline-schemas/archive/master.tar.gz
tar --strip-components=1 -xvf schemas.tar.gz mozilla-pipeline-schemas-master/schemas
rm schemas.tar.gz
8 changes: 6 additions & 2 deletions scripts/mps-generate-avro-data-helper.py
Expand Up @@ -15,12 +15,13 @@
else:
sys.exit("Error: missing argument for document")

assert os.path.isdir("data")
assert any(
[document in name for name in os.listdir("data")]
), "document not found in data"
), f"{document} not found in data"
assert any(
[document in name for name in os.listdir("avro")]
), "document not found in avro schemas"
), f"{document} not found in avro schemas"


def format_key(key):
Expand All @@ -43,6 +44,9 @@ def convert(data, schema):
out = {}
if not data:
return out
# cast tuple into an object before continuing
if isinstance(data, list):
data = {f"f{i}_": v for i, v in enumerate(data)}
for key, value in data.items():
# apply the appropriate transformations on the key
key = format_key(key)
Expand Down
2 changes: 1 addition & 1 deletion scripts/mps-generate-avro-data.sh
Expand Up @@ -7,7 +7,7 @@ documents=$(ls data | sed 's/.ndjson//')
total=0
failed=0
for document in $documents; do
if ! python3 scripts/mps-generate-avro-data-helper.py $document 2> /dev/null; then
if ! python3 scripts/mps-generate-avro-data-helper.py $document; then
echo "failed to write $document"
rm "avro-data/$document.avro"
((failed++))
Expand Down
18 changes: 7 additions & 11 deletions scripts/mps-load-avro-bq.sh
Expand Up @@ -2,30 +2,26 @@

cd "$(dirname "$0")/.." || exit

project_id="test-avro-ingest"
project_id=$(gcloud config get-value project)
dataset_id="test_avro"

gsutil -m cp avro-data/* gs://${project_id}/data

documents=$(ls avro-data | sed 's/.avro//')
bq rm -rf $dataset_id
bq mk $dataset_id

total=0
error=0
skip=0

trap "exit" INT
for document in ${documents}; do
for document in $(ls avro-data | sed 's/.avro//'); do
# downcase hyphens to underscores before generating names
bq_document=$(echo $document | sed 's/-/_/g')
namespace=$(echo $bq_document | cut -d. -f1)
doctype=$(echo $bq_document | cut -d. -f2)
docver=$(echo $bq_document | cut -d. -f3)

if ! bq ls | grep ${namespace} >/dev/null ; then
echo "creating dataset: ${namespace}"
bq mk ${namespace}
fi

table_exists=$(bq ls ${namespace} | grep ${doctype}_v${docver})
table_exists=$(bq ls ${dataset_id} | grep ${namespace}__${doctype}_v${docver})

if [[ ! -z ${SKIP_EXISTING+x} ]] && [[ ! -z ${table_exists} ]]; then
echo "skipping bq load for ${document}"
Expand All @@ -36,7 +32,7 @@ for document in ${documents}; do
echo "running bq load for ${document}"
bq load --source_format=AVRO \
--replace \
${namespace}.${doctype}_v${docver} \
${dataset_id}.${namespace}__${doctype}_v${docver} \
gs://${project_id}/data/${document}.avro

if [[ $? -ne 0 ]]; then
Expand Down
43 changes: 43 additions & 0 deletions scripts/mps-verify-tuple-struct.sh
@@ -0,0 +1,43 @@
#!/bin/bash

cd "$(dirname "$0").."

datadir=$(mktemp -d -t tmp.XXXXXXXXXX)
function cleanup {
echo "Running cleanup!"
rm -rf "$datadir"
}
trap cleanup EXIT

scripts/mps-download-schemas.sh

avro_control=$datadir/avro-control
avro_no_tuple=$datadir/avro-no-tuple
avro_tuple=$datadir/avro-tuple

bq_control=$datadir/bq-control
bq_no_tuple=$datadir/bq-no-tuple
bq_tuple=$datadir/bq-tuple


# get control values
git checkout v1.4.1

scripts/mps-generate-schemas.sh $avro_control --type avro --resolve drop
scripts/mps-generate-schemas.sh $bq_control --type bigquery --resolve drop

git checkout -

# get values for tuple/no-tuple
scripts/mps-generate-schemas.sh $avro_no_tuple --type avro --resolve drop
scripts/mps-generate-schemas.sh $avro_tuple --type avro --resolve drop --tuple-struct
scripts/mps-generate-schemas.sh $bq_no_tuple --type bigquery --resolve drop
scripts/mps-generate-schemas.sh $bq_tuple --type bigquery --resolve drop --tuple-struct

outdir="test_tuple_results"
mkdir -p $outdir

diff -r $avro_control $avro_no_tuple > $outdir/avro-no-tuple.diff
diff -r $bq_control $bq_no_tuple > $outdir/bq-no-tuple.diff
diff -r $avro_no_tuple $avro_tuple > $outdir/avro-tuple.diff
diff -r $bq_no_tuple $bq_tuple > $outdir/bq-tuple.diff
8 changes: 7 additions & 1 deletion src/ast.rs
Expand Up @@ -478,6 +478,12 @@ impl Tag {
set_and_recurse(item, "__union__");
}
}
Type::Tuple(tuple) => {
for (i, item) in tuple.items.iter_mut().enumerate() {
let name = format!("f{}_", i);
set_and_recurse(item, &name);
}
}
_ => (),
}
}
Expand Down Expand Up @@ -567,7 +573,7 @@ impl TranslateFrom<jsonschema::Tag> for Tag {
type Error = &'static str;

fn translate_from(tag: jsonschema::Tag, context: Context) -> Result<Self, Self::Error> {
let mut tag = tag.type_into_ast();
let mut tag = tag.type_into_ast(context)?;
tag.infer_name(context.normalize_case);
tag.infer_nullability(context.force_nullable);
tag.is_root = true;
Expand Down
43 changes: 42 additions & 1 deletion src/avro.rs
Expand Up @@ -192,6 +192,40 @@ impl TranslateFrom<ast::Tag> for Type {
Type::Complex(Complex::Record(record))
}
}
ast::Type::Tuple(tuple) => {
let fields = tuple
.items
.iter()
.enumerate()
.map(|(i, v)| {
let default = if v.nullable { Some(json!(null)) } else { None };
(
format!("f{}_", i),
Type::translate_from(v.clone(), context),
default,
)
})
.filter(|(_, v, _)| v.is_ok())
.map(|(name, data_type, default)| Field {
name,
data_type: data_type.unwrap(),
default,
..Default::default()
})
.collect();
let record = Record {
common: CommonAttributes {
name: tag.name.clone().unwrap_or_else(|| "__UNNAMED__".into()),
namespace: tag.namespace.clone(),
..Default::default()
},
fields,
};
if record.common.name == "__UNNAMED__" {
warn!("{} - Unnamed field", tag.fully_qualified_name());
}
Type::Complex(Complex::Record(record))
}
ast::Type::Array(array) => match Type::translate_from(*array.items.clone(), context) {
Ok(data_type) => Type::Complex(Complex::Array(Array {
items: Box::new(data_type),
Expand Down Expand Up @@ -596,7 +630,14 @@ mod tests {
}
}
});
let avro = json!({"type": "string"});
let avro = json!({
"type": "record",
"name": "__UNNAMED__",
"fields": [
{"name": "f0_", "type": {"type": "boolean"}},
{"name": "f1_", "type": {"type": "long"}}
]
});
assert_from_ast_eq(ast, avro);
}

Expand Down
29 changes: 28 additions & 1 deletion src/bigquery.rs
Expand Up @@ -110,6 +110,29 @@ impl TranslateFrom<ast::Tag> for Tag {
Type::Record(Record { fields })
}
}
ast::Type::Tuple(tuple) => {
let fields: Result<Vec<_>, _> = tuple
.items
.iter()
.map(|v| Tag::translate_from(v.clone(), context))
.collect();

let named_fields: HashMap<String, Box<Tag>> = fields?
.into_iter()
.enumerate()
.map(|(i, mut v)| {
// The name is actually derived from the value, not from
// the associated key. Modify the name in place.
let name = format!("f{}_", i);
v.name = Some(name.clone());
(name, Box::new(v))
})
.collect();

Type::Record(Record {
fields: named_fields,
})
}
ast::Type::Array(array) => match Tag::translate_from(*array.items.clone(), context) {
Ok(tag) => *tag.data_type,
Err(_) => return Err(fmt_reason("untyped array")),
Expand Down Expand Up @@ -531,8 +554,12 @@ mod tests {
}
});
let expect = json!({
"type": "STRING",
"type": "RECORD",
"mode": "REQUIRED",
"fields": [
{"name": "f0_", "type": "BOOL", "mode": "REQUIRED"},
{"name": "f1_", "type": "INT64", "mode": "REQUIRED"}
]
});
assert_eq!(expect, transform_tag(data));
}
Expand Down

0 comments on commit 3c0b33b

Please sign in to comment.