diff --git a/README.md b/README.md index f1b08b7..93e58fb 100644 --- a/README.md +++ b/README.md @@ -20,6 +20,7 @@ Note: This tap currently does not support incremental state. | files | False | None | An array of csv file stream settings. | | csv_files_definition| False | None | A path to the JSON file holding an array of file settings. | | add_metadata_columns| False | False | When True, add the metadata columns (`_sdc_source_file`, `_sdc_source_file_mtime`, `_sdc_source_lineno`) to output. | +| add_metadata_dict| False | False | When True, adds the metadata object (`source`, `time_extracted`) to output. | A full list of supported settings and capabilities is available by running: `tap-csv --about` diff --git a/tap_csv/client.py b/tap_csv/client.py index 096cf61..8359e22 100644 --- a/tap_csv/client.py +++ b/tap_csv/client.py @@ -3,7 +3,7 @@ import csv import os from datetime import datetime, timezone -from typing import Iterable, List, Optional +from typing import Iterable, List, Optional, Dict from singer_sdk import typing as th from singer_sdk.streams import Stream @@ -11,7 +11,7 @@ SDC_SOURCE_FILE_COLUMN = "_sdc_source_file" SDC_SOURCE_LINENO_COLUMN = "_sdc_source_lineno" SDC_SOURCE_FILE_MTIME_COLUMN = "_sdc_source_file_mtime" - +METADATA_COLUMN = "metadata" class CSVStream(Stream): """Stream class for CSV streams.""" @@ -48,6 +48,10 @@ def get_records(self, context: Optional[dict]) -> Iterable[dict]: if self.config.get("add_metadata_columns", False): row = [file_path, file_last_modified, file_lineno] + row + if self.config.get("add_metadata_dict", False): + metadata_dict={"source": file_path, "time_extracted": datetime.utcnow()} + row = [metadata_dict] + row + yield dict(zip(self.header, row)) def _get_recursive_file_paths(self, file_path: str) -> list: @@ -152,7 +156,19 @@ def schema(self) -> dict: th.Property(SDC_SOURCE_FILE_MTIME_COLUMN, th.DateTimeType) ) properties.append(th.Property(SDC_SOURCE_LINENO_COLUMN, th.IntegerType)) + + # If enabled, add file's metadata to output + if self.config.get("add_metadata_dict", False): + header = [ + METADATA_COLUMN, + ] + header + t = th.ObjectType( + th.Property("source", th.StringType), + th.Property("time_extracted", th.StringType), + additional_properties=False, + ) + properties.append(th.Property(METADATA_COLUMN, t)) # Cache header for future use self.header = header diff --git a/tap_csv/tap.py b/tap_csv/tap.py index ab43d31..99e28aa 100644 --- a/tap_csv/tap.py +++ b/tap_csv/tap.py @@ -53,6 +53,15 @@ class TapCSV(Tap): "`_sdc_source_file_mtime`, `_sdc_source_lineno`) to output." ), ), + th.Property( + "add_metadata_dict", + th.BooleanType, + required=False, + default=False, + description=( + "When True, adds basic metadata as dict" + ), + ), ).to_dict() @classproperty