diff --git a/setup.py b/setup.py index 9f73a0c..4ee1832 100755 --- a/setup.py +++ b/setup.py @@ -11,7 +11,7 @@ py_modules=["target_postgres"], install_requires=[ "singer-python==5.1.1", - "psycopg2==2.7.5", + "psycopg2==2.8.4", "inflection==0.3.1" ], entry_points=""" diff --git a/target_postgres/__init__.py b/target_postgres/__init__.py index 15a12b8..8e7ab43 100644 --- a/target_postgres/__init__.py +++ b/target_postgres/__init__.py @@ -13,13 +13,25 @@ from tempfile import TemporaryFile import pkg_resources -from jsonschema.validators import Draft4Validator +from jsonschema import validators, Draft4Validator, FormatChecker +from decimal import Decimal import singer from target_postgres.db_sync import DbSync logger = singer.get_logger() +def float_to_decimal(value): + '''Walk the given data structure and turn all instances of float into + double.''' + if isinstance(value, float): + return Decimal(str(value)) + if isinstance(value, list): + return [float_to_decimal(child) for child in value] + if isinstance(value, dict): + return {k: float_to_decimal(v) for k, v in value.items()} + return value + def emit_state(state): if state is not None: line = json.dumps(state) @@ -65,7 +77,7 @@ def persist_lines(config, lines): stream = o['stream'] # Validate record - validators[stream].validate(o['record']) + validators[stream].validate(float_to_decimal(o['record'])) sync = stream_to_sync[stream] @@ -93,7 +105,8 @@ def persist_lines(config, lines): raise Exception("Line is missing required key 'stream': {}".format(line)) stream = o['stream'] schemas[stream] = o - validators[stream] = Draft4Validator(o['schema']) + schema = float_to_decimal(o['schema']) + validators[stream] = Draft4Validator(schema, format_checker=FormatChecker()) if 'key_properties' not in o: raise Exception("key_properties field is required") key_properties[stream] = o['key_properties'] diff --git a/target_postgres/db_sync.py b/target_postgres/db_sync.py index 4fba051..6e07c90 100644 --- a/target_postgres/db_sync.py +++ b/target_postgres/db_sync.py @@ -150,7 +150,7 @@ def record_to_csv_line(self, record): flatten = flatten_record(record) return ','.join( [ - json.dumps(flatten[name]) if name in flatten and flatten[name] else '' + json.dumps(flatten[name], ensure_ascii=False) if name in flatten and (flatten[name] == 0 or flatten[name]) else '' for name in self.flatten_schema ] )