diff --git a/weather_mv/README.md b/weather_mv/README.md index 74513f16..4fa3d9c3 100644 --- a/weather_mv/README.md +++ b/weather_mv/README.md @@ -205,7 +205,7 @@ weather-mv bq --uris "gs://your-bucket/*.grib" \ --output_table $PROJECT.$DATASET_ID.$TABLE_ID \ --xarray_open_dataset_kwargs '{"engine": "cfgrib", "indexpath": "", "backend_kwargs": {"filter_by_keys": {"typeOfLevel": "surface", "edition": 1}}}' \ --temp_location "gs://$BUCKET/tmp" \ - --input_chunks '{ "time": 1 }' + --input_chunks '{ "time": 1 }' \ --direct_num_workers 2 ``` diff --git a/weather_mv/loader_pipeline/bq.py b/weather_mv/loader_pipeline/bq.py index 6495a262..3643327c 100644 --- a/weather_mv/loader_pipeline/bq.py +++ b/weather_mv/loader_pipeline/bq.py @@ -327,7 +327,7 @@ def df_to_rows(self, rows: pd.DataFrame, ds: xr.Dataset, uri: str) -> t.Iterator yield row def chunks_to_rows(self, _, ds: xr.Dataset) -> t.Iterator[t.Dict]: - logger.info(f"Processing for time: {ds['time'].values} and level: {ds['level'].values}") + logger.info(f"Processing for time: {ds['time'].values}") uri = ds.attrs.get(DATA_URI_COLUMN, '') # Re-calculate import time for streaming extractions. if not self.import_time or self.zarr: diff --git a/weather_mv/loader_pipeline/bq_test.py b/weather_mv/loader_pipeline/bq_test.py index fae7ab31..d64b1a67 100644 --- a/weather_mv/loader_pipeline/bq_test.py +++ b/weather_mv/loader_pipeline/bq_test.py @@ -205,7 +205,7 @@ class ExtractRowsTestBase(TestDataBase): def extract(self, data_path, *, variables=None, area=None, open_dataset_kwargs=None, import_time=DEFAULT_IMPORT_TIME, disable_grib_schema_normalization=False, tif_metadata_for_start_time=None, tif_metadata_for_end_time=None, zarr: bool = False, zarr_kwargs=None, - skip_creating_polygon: bool = False) -> t.Iterator[t.Dict]: + skip_creating_polygon: bool = False, input_chunks={ 'time': 1}) -> t.Iterator[t.Dict]: if zarr_kwargs is None: zarr_kwargs = {} op = ToBigQuery.from_kwargs( @@ -215,7 +215,7 @@ def extract(self, data_path, *, variables=None, area=None, open_dataset_kwargs=N tif_metadata_for_start_time=tif_metadata_for_start_time, tif_metadata_for_end_time=tif_metadata_for_end_time, skip_region_validation=True, disable_grib_schema_normalization=disable_grib_schema_normalization, coordinate_chunk_size=1000, - skip_creating_polygon=skip_creating_polygon) + skip_creating_polygon=skip_creating_polygon, input_chunks=input_chunks) coords = op.prepare_coordinates(data_path) for uri, chunk in coords: yield from op.extract_rows(uri, chunk) @@ -792,6 +792,7 @@ def test_extracts_rows(self): variables=list(), area=list(), xarray_open_dataset_kwargs=dict(), import_time=None, infer_schema=False, tif_metadata_for_start_time=None, tif_metadata_for_end_time=None, skip_region_validation=True, disable_grib_schema_normalization=False, + input_chunks={'time': 1} ) with TestPipeline() as p: diff --git a/weather_mv/loader_pipeline/pipeline_test.py b/weather_mv/loader_pipeline/pipeline_test.py index 3834b537..8340a923 100644 --- a/weather_mv/loader_pipeline/pipeline_test.py +++ b/weather_mv/loader_pipeline/pipeline_test.py @@ -69,6 +69,7 @@ def setUp(self) -> None: 'log_level': 2, 'use_local_code': False, 'skip_creating_polygon': False, + 'input_chunks': { 'time': 1 } }