Skip to content

Commit

Permalink
Fix Tests
Browse files Browse the repository at this point in the history
  • Loading branch information
Darshan Prajapati committed Nov 2, 2023
1 parent eda169e commit 0e0a907
Show file tree
Hide file tree
Showing 4 changed files with 6 additions and 4 deletions.
2 changes: 1 addition & 1 deletion weather_mv/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ weather-mv bq --uris "gs://your-bucket/*.grib" \
--output_table $PROJECT.$DATASET_ID.$TABLE_ID \
--xarray_open_dataset_kwargs '{"engine": "cfgrib", "indexpath": "", "backend_kwargs": {"filter_by_keys": {"typeOfLevel": "surface", "edition": 1}}}' \
--temp_location "gs://$BUCKET/tmp" \
--input_chunks '{ "time": 1 }'
--input_chunks '{ "time": 1 }' \
--direct_num_workers 2
```

Expand Down
2 changes: 1 addition & 1 deletion weather_mv/loader_pipeline/bq.py
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,7 @@ def df_to_rows(self, rows: pd.DataFrame, ds: xr.Dataset, uri: str) -> t.Iterator
yield row

def chunks_to_rows(self, _, ds: xr.Dataset) -> t.Iterator[t.Dict]:
logger.info(f"Processing for time: {ds['time'].values} and level: {ds['level'].values}")
logger.info(f"Processing for time: {ds['time'].values}")
uri = ds.attrs.get(DATA_URI_COLUMN, '')
# Re-calculate import time for streaming extractions.
if not self.import_time or self.zarr:
Expand Down
5 changes: 3 additions & 2 deletions weather_mv/loader_pipeline/bq_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ class ExtractRowsTestBase(TestDataBase):
def extract(self, data_path, *, variables=None, area=None, open_dataset_kwargs=None,
import_time=DEFAULT_IMPORT_TIME, disable_grib_schema_normalization=False,
tif_metadata_for_start_time=None, tif_metadata_for_end_time=None, zarr: bool = False, zarr_kwargs=None,
skip_creating_polygon: bool = False) -> t.Iterator[t.Dict]:
skip_creating_polygon: bool = False, input_chunks={ 'time': 1}) -> t.Iterator[t.Dict]:
if zarr_kwargs is None:
zarr_kwargs = {}
op = ToBigQuery.from_kwargs(
Expand All @@ -215,7 +215,7 @@ def extract(self, data_path, *, variables=None, area=None, open_dataset_kwargs=N
tif_metadata_for_start_time=tif_metadata_for_start_time,
tif_metadata_for_end_time=tif_metadata_for_end_time, skip_region_validation=True,
disable_grib_schema_normalization=disable_grib_schema_normalization, coordinate_chunk_size=1000,
skip_creating_polygon=skip_creating_polygon)
skip_creating_polygon=skip_creating_polygon, input_chunks=input_chunks)
coords = op.prepare_coordinates(data_path)
for uri, chunk in coords:
yield from op.extract_rows(uri, chunk)
Expand Down Expand Up @@ -792,6 +792,7 @@ def test_extracts_rows(self):
variables=list(), area=list(), xarray_open_dataset_kwargs=dict(), import_time=None, infer_schema=False,
tif_metadata_for_start_time=None, tif_metadata_for_end_time=None, skip_region_validation=True,
disable_grib_schema_normalization=False,
input_chunks={'time': 1}
)

with TestPipeline() as p:
Expand Down
1 change: 1 addition & 0 deletions weather_mv/loader_pipeline/pipeline_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ def setUp(self) -> None:
'log_level': 2,
'use_local_code': False,
'skip_creating_polygon': False,
'input_chunks': { 'time': 1 }
}


Expand Down

0 comments on commit 0e0a907

Please sign in to comment.