Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

validating schema in discovery mode #56

Open
wants to merge 33 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
23b316c
Adding time_extracted and bookmark_properties
Nov 22, 2017
1d0416c
Adding commas to fix error
Nov 22, 2017
3524f40
Fixing space errors
Nov 22, 2017
90cc986
Fixing pylint warnings
Nov 22, 2017
9cbdfa2
validating schema in discovery mode
Nov 22, 2017
25b19d2
Check for time_extracted being an aware datetime and ensure timezone …
Nov 27, 2017
d297a0e
Fixing pylint errors
Nov 27, 2017
247d4d3
Wrapping up
Nov 28, 2017
e56e488
Pylint
Nov 28, 2017
c53912f
import pytz
ccapurso Nov 28, 2017
7db81df
handle string or array bookmark_properties
ccapurso Nov 28, 2017
42fe69a
allow bookmark_properties to be None
ccapurso Nov 28, 2017
ea4b5ba
Merge pull request #55 from singer-io/time_extracted_and_bookmark_pro…
ccapurso Nov 28, 2017
1b1f32c
bump version to 5.0.0
ccapurso Nov 28, 2017
5a89d5d
move bookmark property handling from write_schema into SchemaMessage …
Nov 29, 2017
555c6ca
Merge pull request #57 from singer-io/fix/initialize-bookmark-propert…
nick-mccoy Nov 29, 2017
d3dd5ba
bump version to 5.0.1
Nov 29, 2017
231dd45
Forcing SchemaMessage so that bookmark_properties is always an array
Nov 30, 2017
23c389d
Pylint fix
Nov 30, 2017
69f56bf
Merge pull request #58 from singer-io/make_bookmark_properties_list
nick-mccoy Nov 30, 2017
bdbd776
Bumping singer-python version to 5.0.2 and deploying
Nov 30, 2017
65bda5b
Add metadata assignment to Catalog.from_dict. Update test_catalog wit…
flash716 Dec 10, 2017
219b8eb
Add stream_alias to Catalog.to_dict. Add to unit test.
flash716 Dec 10, 2017
c5182e9
Merge pull request #61 from flash716/stream-alias
nick-mccoy Dec 13, 2017
13275d8
Merge pull request #60 from flash716/catalog-metadata
nick-mccoy Dec 14, 2017
be8b565
changing strftime format string
Dec 14, 2017
215171b
fixing strftime format string
Dec 14, 2017
16b53f3
Merge pull request #63 from singer-io/fix_strftime_format
nick-mccoy Dec 15, 2017
cdbf7ed
bumping version
Dec 15, 2017
89c6466
validating schema in discovery mode
Nov 22, 2017
4411df5
Rebasing validating_schema_in_discovery_mode with master
Dec 15, 2017
c8c507b
pylint
Dec 15, 2017
b82f0f1
pylint fixes
Dec 18, 2017
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -90,3 +90,6 @@ ENV/
.ropeproject

.pypirc

# PyCharm
.idea/
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import subprocess

setup(name="singer-python",
version='4.1.0',
version='5.0.3',
description="Singer.io utility library",
author="Stitch",
classifiers=['Programming Language :: Python :: 3 :: Only'],
Expand Down
31 changes: 31 additions & 0 deletions singer/catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import sys

from singer.schema import Schema
from jsonschema import Draft4Validator, FormatChecker

# pylint: disable=too-many-instance-attributes
class CatalogEntry(object):
Expand Down Expand Up @@ -55,6 +56,8 @@ def to_dict(self):
result['stream'] = self.stream
if self.row_count is not None:
result['row_count'] = self.row_count
if self.stream_alias is not None:
result['stream_alias'] = self.stream_alias
if self.metadata is not None:
result['metadata'] = self.metadata
return result
Expand Down Expand Up @@ -95,6 +98,7 @@ def from_dict(cls, data):
entry.schema = Schema.from_dict(stream.get('schema'))
entry.is_view = stream.get('is_view')
entry.stream_alias = stream.get('stream_alias')
entry.metadata = stream.get('metadata')
streams.append(entry)
return Catalog(streams)

Expand All @@ -109,3 +113,30 @@ def get_stream(self, tap_stream_id):
if stream.tap_stream_id == tap_stream_id:
return stream
return None


CATALOG_SCHEMA = {
'type': 'object',
'required': ['streams'],
'properties': {
'streams' : {
'type': 'array',
'items': {
'type': 'object',
'required': ['stream', 'tap_stream_id', 'schema'],
'properties': {
'stream': {'type': 'string'},
'tap_stream_id': {'type': 'string'},
'schema': {'type': 'object'}
}
}
}
}
}

CATALOG_VALIDATOR = Draft4Validator(CATALOG_SCHEMA,
format_checker=FormatChecker())

def write_catalog(streams):
CATALOG_VALIDATOR.validate(streams)
json.dump(streams, sys.stdout, indent=2)
49 changes: 39 additions & 10 deletions singer/messages.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import sys
import simplejson as json

import singer.utils as u
import dateutil
import pytz

class Message(object):
'''Base class for messages.'''
Expand Down Expand Up @@ -37,10 +39,14 @@ class RecordMessage(Message):

'''

def __init__(self, stream, record, version=None):
def __init__(self, stream, record, version=None, time_extracted=None):
self.stream = stream
self.record = record
self.version = version
self.time_extracted = time_extracted
if time_extracted and not time_extracted.tzinfo:
raise ValueError("'time_extracted' must be either None " +
"or an aware datetime (with a time zone)")

def asdict(self):
result = {
Expand All @@ -50,6 +56,9 @@ def asdict(self):
}
if self.version is not None:
result['version'] = self.version
if self.time_extracted:
as_utc = self.time_extracted.astimezone(pytz.utc)
result['time_extracted'] = as_utc.strftime(u.DATETIME_FMT)
return result

def __str__(self):
Expand All @@ -76,18 +85,28 @@ class SchemaMessage(Message):
>>> key_properties=['id'])

'''
def __init__(self, stream, schema, key_properties):
def __init__(self, stream, schema, key_properties, bookmark_properties=None):
self.stream = stream
self.schema = schema
self.key_properties = key_properties

if isinstance(bookmark_properties, (str, bytes)):
bookmark_properties = [bookmark_properties]
if bookmark_properties and not isinstance(bookmark_properties, list):
raise Exception("bookmark_properties must be a string or list of strings")

self.bookmark_properties = bookmark_properties

def asdict(self):
return {
result = {
'type': 'SCHEMA',
'stream': self.stream,
'schema': self.schema,
'key_properties': self.key_properties
}
if self.bookmark_properties:
result['bookmark_properties'] = self.bookmark_properties
return result


class StateMessage(Message):
Expand Down Expand Up @@ -157,14 +176,20 @@ def parse_message(msg):
msg_type = _required_key(obj, 'type')

if msg_type == 'RECORD':
time_extracted = obj.get('time_extracted')
if time_extracted:
time_extracted = dateutil.parser.parse(time_extracted)
return RecordMessage(stream=_required_key(obj, 'stream'),
record=_required_key(obj, 'record'),
version=obj.get('version'))
version=obj.get('version'),
time_extracted=time_extracted)


elif msg_type == 'SCHEMA':
return SchemaMessage(stream=_required_key(obj, 'stream'),
schema=_required_key(obj, 'schema'),
key_properties=_required_key(obj, 'key_properties'))
key_properties=_required_key(obj, 'key_properties'),
bookmark_properties=obj.get('bookmark_properties'))

elif msg_type == 'STATE':
return StateMessage(value=_required_key(obj, 'value'))
Expand All @@ -183,12 +208,14 @@ def write_message(message):
sys.stdout.flush()


def write_record(stream_name, record, stream_alias=None):
def write_record(stream_name, record, stream_alias=None, time_extracted=None):
"""Write a single record for the given stream.

>>> write_record("users", {"id": 2, "email": "mike@stitchdata.com"})
"""
write_message(RecordMessage(stream=(stream_alias or stream_name), record=record))
write_message(RecordMessage(stream=(stream_alias or stream_name),
record=record,
time_extracted=time_extracted))


def write_records(stream_name, records):
Expand All @@ -202,7 +229,7 @@ def write_records(stream_name, records):
write_record(stream_name, record)


def write_schema(stream_name, schema, key_properties, stream_alias=None):
def write_schema(stream_name, schema, key_properties, bookmark_properties=None, stream_alias=None):
"""Write a schema message.

>>> stream = 'test'
Expand All @@ -214,11 +241,13 @@ def write_schema(stream_name, schema, key_properties, stream_alias=None):
key_properties = [key_properties]
if not isinstance(key_properties, list):
raise Exception("key_properties must be a string or list of strings")

write_message(
SchemaMessage(
stream=(stream_alias or stream_name),
schema=schema,
key_properties=key_properties))
key_properties=key_properties,
bookmark_properties=bookmark_properties))


def write_state(value):
Expand Down
6 changes: 3 additions & 3 deletions singer/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from singer.catalog import Catalog

DATETIME_PARSE = "%Y-%m-%dT%H:%M:%SZ"
DATETIME_FMT = "%04Y-%m-%dT%H:%M:%S.%fZ"
DATETIME_FMT = "%Y-%m-%dT%H:%M:%S.%fZ"

def now():
return datetime.datetime.utcnow().replace(tzinfo=pytz.UTC)
Expand All @@ -29,10 +29,10 @@ def strptime(dtime):
except Exception:
return datetime.datetime.strptime(dtime, DATETIME_PARSE)

def strftime(dtime):
def strftime(dtime, format_str=DATETIME_FMT):
if dtime.utcoffset() != datetime.timedelta(0):
raise Exception("datetime must be pegged at UTC tzoneinfo")
return dtime.strftime(DATETIME_FMT)
return dtime.strftime(format_str)

def ratelimit(limit, every):
def limitdecorator(func):
Expand Down
43 changes: 36 additions & 7 deletions tests/test_catalog.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
import unittest
import singer.catalog

from singer.schema import Schema
from singer.catalog import Catalog, CatalogEntry

class TestToDictAndFromDict(unittest.TestCase):

dict_form = {
dict_form = {
'streams': [
{
'stream': 'users',
'tap_stream_id': 'prod_users',
'stream_alias': 'users_alias',
'database_name': 'prod',
'table_name': 'users',
'schema': {
Expand All @@ -20,6 +20,17 @@ class TestToDictAndFromDict(unittest.TestCase):
'name': {'type': 'string', 'selected': True}
}
},
'metadata': [
{
'metadata': {
'metadata-key': 'metadata-value'
},
'breadcrumb': [
'properties',
'name',
],
},
],
},
{
'stream': 'orders',
Expand All @@ -38,18 +49,28 @@ class TestToDictAndFromDict(unittest.TestCase):
]
}

obj_form = Catalog(streams=[
obj_form = Catalog(streams=[
CatalogEntry(
stream='users',
tap_stream_id='prod_users',
stream_alias='users_alias',
database='prod',
table='users',
schema=Schema(
type='object',
selected=True,
properties={
'id': Schema(type='integer', selected=True),
'name': Schema(type='string', selected=True)})),
'name': Schema(type='string', selected=True)}),
metadata=[{
'metadata': {
'metadata-key': 'metadata-value'
},
'breadcrumb': [
'properties',
'name',
],
}]),
CatalogEntry(
stream='orders',
tap_stream_id='prod_orders',
Expand All @@ -62,11 +83,15 @@ class TestToDictAndFromDict(unittest.TestCase):
'id': Schema(type='integer', selected=True),
'amount': Schema(type='number', selected=True)}))])




class TestToDictAndFromDict(unittest.TestCase):
def test_from_dict(self):
self.assertEqual(self.obj_form, Catalog.from_dict(self.dict_form))
self.assertEqual(obj_form, Catalog.from_dict(dict_form))

def test_to_dict(self):
self.assertEqual(self.dict_form, self.obj_form.to_dict())
self.assertEqual(dict_form, obj_form.to_dict())


class TestGetStream(unittest.TestCase):
Expand All @@ -77,3 +102,7 @@ def test(self):
CatalogEntry(tap_stream_id='c')])
entry = catalog.get_stream('b')
self.assertEquals('b', entry.tap_stream_id)

class TestWriteCatalog(unittest.TestCase):
def test(self):
singer.catalog.write_catalog(dict_form)
20 changes: 19 additions & 1 deletion tests/test_singer.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import singer
import unittest

import datetime
import dateutil

class TestSinger(unittest.TestCase):
def test_parse_message_record_good(self):
Expand All @@ -17,6 +18,23 @@ def test_parse_message_record_with_version_good(self):
message,
singer.RecordMessage(record={'name': 'foo'}, stream='users', version=2))

def test_parse_message_record_naive_extraction_time(self):
with self.assertRaisesRegex(ValueError, "must be either None or an aware datetime"):
message = singer.parse_message(
'{"type": "RECORD", "record": {"name": "foo"}, "stream": "users", "version": 2, "time_extracted": "1970-01-02T00:00:00"}')

def test_parse_message_record_aware_extraction_time(self):
message = singer.parse_message(
'{"type": "RECORD", "record": {"name": "foo"}, "stream": "users", "version": 2, "time_extracted": "1970-01-02T00:00:00.000Z"}')
expected = singer.RecordMessage(
record={'name': 'foo'},
stream='users',
version=2,
time_extracted=dateutil.parser.parse("1970-01-02T00:00:00.000Z"))
print(message)
print(expected)
self.assertEqual(message, expected)

def test_parse_message_record_missing_record(self):
with self.assertRaises(Exception):
singer.parse_message('{"type": "RECORD", "stream": "users"}')
Expand Down
2 changes: 1 addition & 1 deletion tests/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,5 @@

class TestFormat(unittest.TestCase):
def test_small_years(self):
self.assertEqual(u.strftime(dt(90, 1, 1, tzinfo=tz.utc)),
self.assertEqual(u.strftime(dt(90, 1, 1, tzinfo=tz.utc), '%04Y-%m-%dT%H:%M:%S.%fZ'),
"0090-01-01T00:00:00.000000Z")