From adf5210ef8c5283135150d099408ea98002e4fe3 Mon Sep 17 00:00:00 2001 From: Ryo Kitagawa Date: Thu, 13 Feb 2025 21:14:48 +0900 Subject: [PATCH 1/2] feat: Add range partitioning support --- README.md | 24 +++++++++++++ ..._replace_field_range_partitioned_table.yml | 36 +++++++++++++++++++ lib/embulk/output/bigquery.rb | 23 +++++++++++- lib/embulk/output/bigquery/bigquery_client.rb | 14 +++++++- test/test_configure.rb | 26 ++++++++++++++ 5 files changed, 121 insertions(+), 2 deletions(-) create mode 100644 example/config_replace_field_range_partitioned_table.yml diff --git a/README.md b/README.md index 016b27d..216c364 100644 --- a/README.md +++ b/README.md @@ -110,6 +110,12 @@ Following options are same as [bq command-line tools](https://cloud.google.com/b | time_partitioning.type | string | required | nil | The only type supported is DAY, which will generate one partition per day based on data loading time. | | time_partitioning.expiration_ms | int | optional | nil | Number of milliseconds for which to keep the storage for a partition. | | time_partitioning.field | string | optional | nil | `DATE` or `TIMESTAMP` column used for partitioning | +| range_partitioning | hash | optional | nil | See [Range Partitioning](#range-partitioning) | +| range_partitioning.field | string | required | nil | `INT64` column used for partitioning | +| range-partitioning.range | hash | required | nil | Defines the ranges for range paritioning | +| range-partitioning.range.start | string | required | nil | The start of range partitioning, inclusive. This field is an INT64 value represented as a string. | +| range-partitioning.range.end | string | required | nil | The end of range partitioning, exclusive. This field is an INT64 value represented as a string. | +| range-partitioning.range.interval| string | required | nil | The width of each interval. This field is an INT64 value represented as a string. | | clustering | hash | optional | nil | Currently, clustering is supported for partitioned tables, so must be used with `time_partitioning` option. See [clustered tables](https://cloud.google.com/bigquery/docs/clustered-tables) | | clustering.fields | array | required | nil | One or more fields on which data should be clustered. The order of the specified columns determines the sort order of the data. | | schema_update_options | array | optional | nil | (Experimental) List of `ALLOW_FIELD_ADDITION` or `ALLOW_FIELD_RELAXATION` or both. See [jobs#configuration.load.schemaUpdateOptions](https://cloud.google.com/bigquery/docs/reference/v2/jobs#configuration.load.schemaUpdateOptions). NOTE for the current status: `schema_update_options` does not work for `copy` job, that is, is not effective for most of modes such as `append`, `replace` and `replace_backup`. `delete_in_advance` deletes origin table so does not need to update schema. Only `append_direct` can utilize schema update. | @@ -448,6 +454,24 @@ MEMO: [jobs#configuration.load.schemaUpdateOptions](https://cloud.google.com/big to update the schema of the desitination table as a side effect of the load job, but it is not available for copy job. Thus, it was not suitable for embulk-output-bigquery idempotence modes, `append`, `replace`, and `replace_backup`, sigh. +### Range Partitioning + +See also [Creating and Updating Range-Partitioned Tables](https://cloud.google.com/bigquery/docs/creating-partitioned-tables). + +To load into a partition, specify `range_partitioning` and `table` parameter with a partition decorator as: + +```yaml +out: + type: bigquery + table: table_name$1 + range_partitioning: + field: customer_id + range: + start: '1' + end: '99999' + range: '1' +``` + ## Development ### Run example: diff --git a/example/config_replace_field_range_partitioned_table.yml b/example/config_replace_field_range_partitioned_table.yml new file mode 100644 index 0000000..0eceff5 --- /dev/null +++ b/example/config_replace_field_range_partitioned_table.yml @@ -0,0 +1,36 @@ +in: + type: file + path_prefix: example/example.csv + parser: + type: csv + charset: UTF-8 + newline: CRLF + null_string: 'NULL' + skip_header_lines: 1 + comment_line_marker: '#' + columns: + - {name: date, type: string} + - {name: timestamp, type: timestamp, format: "%Y-%m-%d %H:%M:%S.%N", timezone: "+09:00"} + - {name: "null", type: string} + - {name: long, type: long} + - {name: string, type: string} + - {name: double, type: double} + - {name: boolean, type: boolean} +out: + type: bigquery + mode: replace + auth_method: service_account + json_keyfile: example/your-project-000.json + dataset: your_dataset_name + table: your_field_partitioned_table_name + source_format: NEWLINE_DELIMITED_JSON + compression: NONE + auto_create_dataset: true + auto_create_table: true + schema_file: example/schema.json + range_partitioning: + field: 'long' + range: + start: '90' + end: '100' + interval: '1' diff --git a/lib/embulk/output/bigquery.rb b/lib/embulk/output/bigquery.rb index 10cbd00..76bd5ad 100644 --- a/lib/embulk/output/bigquery.rb +++ b/lib/embulk/output/bigquery.rb @@ -89,6 +89,7 @@ def self.configure(config, schema, task_count) 'ignore_unknown_values' => config.param('ignore_unknown_values', :bool, :default => false), 'allow_quoted_newlines' => config.param('allow_quoted_newlines', :bool, :default => false), 'time_partitioning' => config.param('time_partitioning', :hash, :default => nil), + 'range_partitioning' => config.param('range_partitioning', :hash, :default => nil), 'clustering' => config.param('clustering', :hash, :default => nil), # google-api-ruby-client >= v0.21.0 'schema_update_options' => config.param('schema_update_options', :array, :default => nil), @@ -231,10 +232,30 @@ def self.configure(config, schema, task_count) unless task['time_partitioning']['type'] raise ConfigError.new "`time_partitioning` must have `type` key" end - elsif Helper.has_partition_decorator?(task['table']) + # If user specify range_partitioning, it should be used as is + elsif Helper.has_partition_decorator?(task['table']) && task['range_partitioning'].nil? task['time_partitioning'] = {'type' => 'DAY'} end + if task['range_partitioning'] + unless task['range_partitioning']['field'] + raise ConfigError.new "`range_partitioning` must have `field` key" + end + unless task['range_partitioning']['range'] + raise ConfigError.new "`range_partitioning` must have `range` key" + end + + unless task['range_partitioning']['range']['start'] + raise ConfigError.new "`range_partitioning` must have `range.start` key" + end + unless task['range_partitioning']['range']['end'] + raise ConfigError.new "`range_partitioning` must have `range.end` key" + end + unless task['range_partitioning']['range']['interval'] + raise ConfigError.new "`range_partitioning` must have `range.interval` key" + end + end + if task['clustering'] unless task['clustering']['fields'] raise ConfigError.new "`clustering` must have `fields` key" diff --git a/lib/embulk/output/bigquery/bigquery_client.rb b/lib/embulk/output/bigquery/bigquery_client.rb index e443785..be35c4d 100644 --- a/lib/embulk/output/bigquery/bigquery_client.rb +++ b/lib/embulk/output/bigquery/bigquery_client.rb @@ -411,7 +411,7 @@ def create_table_if_not_exists(table, dataset: nil, options: nil) dataset ||= @dataset options ||= {} options['time_partitioning'] ||= @task['time_partitioning'] - if Helper.has_partition_decorator?(table) + if Helper.has_partition_decorator?(table) && @task['range_partitioning'].nil? options['time_partitioning'] ||= {'type' => 'DAY'} table = Helper.chomp_partition_decorator(table) end @@ -435,6 +435,18 @@ def create_table_if_not_exists(table, dataset: nil, options: nil) } end + options['range_partitioning'] ||= @task['range_partitioning'] + if options['range_partitioning'] + body[:range_partitioning] = { + field: options['range_partitioning']['field'], + range: { + start: options['range_partitioning']['range']['start'], + end: options['range_partitioning']['range']['end'], + interval: options['range_partitioning']['range']['interval'], + }, + } + end + options['clustering'] ||= @task['clustering'] if options['clustering'] body[:clustering] = { diff --git a/test/test_configure.rb b/test/test_configure.rb index c4f16aa..38b2b39 100644 --- a/test/test_configure.rb +++ b/test/test_configure.rb @@ -270,6 +270,32 @@ def test_time_partitioning assert_equal 'DAY', task['time_partitioning']['type'] end + def test_range_partitioning + config = least_config.merge('range_partitioning' => {'field' => 'foo', 'range' => { 'start' => '1', 'end' => '2', 'interval' => '1' }}) + assert_nothing_raised { Bigquery.configure(config, schema, processor_count) } + + # field is required + config = least_config.merge('range_partitioning' => {'range' => { 'start' => '1', 'end' => '2', 'interval' => '1' }}) + assert_raise { Bigquery.configure(config, schema, processor_count) } + + + # range is required + config = least_config.merge('range_partitioning' => {'field' => 'foo'}) + assert_raise { Bigquery.configure(config, schema, processor_count) } + + # range.start is required + config = least_config.merge('range_partitioning' => {'field' => 'foo', 'range' => { 'end' => '2', 'interval' => '1' }}) + assert_raise { Bigquery.configure(config, schema, processor_count) } + + # range.end is required + config = least_config.merge('range_partitioning' => {'field' => 'foo', 'range' => { 'start' => '1', 'interval' => '1' }}) + assert_raise { Bigquery.configure(config, schema, processor_count) } + + # range.interval is required + config = least_config.merge('range_partitioning' => {'field' => 'foo', 'range' => { 'start' => '1', 'end' => '2' }}) + assert_raise { Bigquery.configure(config, schema, processor_count) } + end + def test_clustering config = least_config.merge('clustering' => {'fields' => ['field_a']}) assert_nothing_raised { Bigquery.configure(config, schema, processor_count) } From 5398f695aab2bb56d1360ab260400eb097e3a5cd Mon Sep 17 00:00:00 2001 From: Ryo Kitagawa Date: Thu, 6 Mar 2025 21:53:55 +0900 Subject: [PATCH 2/2] fix: change range-partitioning fields to be int --- README.md | 12 ++++++------ ..._replace_field_range_partitioned_table.yml | 6 +++--- lib/embulk/output/bigquery.rb | 19 ++++++++++++++++--- lib/embulk/output/bigquery/bigquery_client.rb | 6 +++--- test/test_configure.rb | 14 +++++++++----- 5 files changed, 37 insertions(+), 20 deletions(-) diff --git a/README.md b/README.md index 216c364..47ebc78 100644 --- a/README.md +++ b/README.md @@ -113,9 +113,9 @@ Following options are same as [bq command-line tools](https://cloud.google.com/b | range_partitioning | hash | optional | nil | See [Range Partitioning](#range-partitioning) | | range_partitioning.field | string | required | nil | `INT64` column used for partitioning | | range-partitioning.range | hash | required | nil | Defines the ranges for range paritioning | -| range-partitioning.range.start | string | required | nil | The start of range partitioning, inclusive. This field is an INT64 value represented as a string. | -| range-partitioning.range.end | string | required | nil | The end of range partitioning, exclusive. This field is an INT64 value represented as a string. | -| range-partitioning.range.interval| string | required | nil | The width of each interval. This field is an INT64 value represented as a string. | +| range-partitioning.range.start | int | required | nil | The start of range partitioning, inclusive. | +| range-partitioning.range.end | int | required | nil | The end of range partitioning, exclusive. | +| range-partitioning.range.interval| int | required | nil | The width of each interval. | | clustering | hash | optional | nil | Currently, clustering is supported for partitioned tables, so must be used with `time_partitioning` option. See [clustered tables](https://cloud.google.com/bigquery/docs/clustered-tables) | | clustering.fields | array | required | nil | One or more fields on which data should be clustered. The order of the specified columns determines the sort order of the data. | | schema_update_options | array | optional | nil | (Experimental) List of `ALLOW_FIELD_ADDITION` or `ALLOW_FIELD_RELAXATION` or both. See [jobs#configuration.load.schemaUpdateOptions](https://cloud.google.com/bigquery/docs/reference/v2/jobs#configuration.load.schemaUpdateOptions). NOTE for the current status: `schema_update_options` does not work for `copy` job, that is, is not effective for most of modes such as `append`, `replace` and `replace_backup`. `delete_in_advance` deletes origin table so does not need to update schema. Only `append_direct` can utilize schema update. | @@ -467,9 +467,9 @@ out: range_partitioning: field: customer_id range: - start: '1' - end: '99999' - range: '1' + start: 1 + end: 99999 + range: 1 ``` ## Development diff --git a/example/config_replace_field_range_partitioned_table.yml b/example/config_replace_field_range_partitioned_table.yml index 0eceff5..5e4383e 100644 --- a/example/config_replace_field_range_partitioned_table.yml +++ b/example/config_replace_field_range_partitioned_table.yml @@ -31,6 +31,6 @@ out: range_partitioning: field: 'long' range: - start: '90' - end: '100' - interval: '1' + start: 90 + end: 100 + interval: 1 diff --git a/lib/embulk/output/bigquery.rb b/lib/embulk/output/bigquery.rb index 76bd5ad..159bd39 100644 --- a/lib/embulk/output/bigquery.rb +++ b/lib/embulk/output/bigquery.rb @@ -245,15 +245,28 @@ def self.configure(config, schema, task_count) raise ConfigError.new "`range_partitioning` must have `range` key" end - unless task['range_partitioning']['range']['start'] + range = task['range_partitioning']['range'] + unless range['start'] raise ConfigError.new "`range_partitioning` must have `range.start` key" end - unless task['range_partitioning']['range']['end'] + unless range['start'].is_a?(Integer) + raise ConfigError.new "`range_partitioning.range.start` must be an integer" + end + unless range['end'] raise ConfigError.new "`range_partitioning` must have `range.end` key" end - unless task['range_partitioning']['range']['interval'] + unless range['end'].is_a?(Integer) + raise ConfigError.new "`range_partitioning.range.end` must be an integer" + end + unless range['interval'] raise ConfigError.new "`range_partitioning` must have `range.interval` key" end + unless range['interval'].is_a?(Integer) + raise ConfigError.new "`range_partitioning.range.interval` must be an integer" + end + if range['start'] + range['interval'] > range['end'] + raise ConfigError.new "`range_partitioning.range.start` + `range_partitioning.range.interval` must be less than `range_partitioning.range.end`" + end end if task['clustering'] diff --git a/lib/embulk/output/bigquery/bigquery_client.rb b/lib/embulk/output/bigquery/bigquery_client.rb index be35c4d..22d803a 100644 --- a/lib/embulk/output/bigquery/bigquery_client.rb +++ b/lib/embulk/output/bigquery/bigquery_client.rb @@ -440,9 +440,9 @@ def create_table_if_not_exists(table, dataset: nil, options: nil) body[:range_partitioning] = { field: options['range_partitioning']['field'], range: { - start: options['range_partitioning']['range']['start'], - end: options['range_partitioning']['range']['end'], - interval: options['range_partitioning']['range']['interval'], + start: options['range_partitioning']['range']['start'].to_s, + end: options['range_partitioning']['range']['end'].to_s, + interval: options['range_partitioning']['range']['interval'].to_s, }, } end diff --git a/test/test_configure.rb b/test/test_configure.rb index 38b2b39..e97c3ff 100644 --- a/test/test_configure.rb +++ b/test/test_configure.rb @@ -271,11 +271,11 @@ def test_time_partitioning end def test_range_partitioning - config = least_config.merge('range_partitioning' => {'field' => 'foo', 'range' => { 'start' => '1', 'end' => '2', 'interval' => '1' }}) + config = least_config.merge('range_partitioning' => {'field' => 'foo', 'range' => { 'start' => 1, 'end' => 2, 'interval' => 1 }}) assert_nothing_raised { Bigquery.configure(config, schema, processor_count) } # field is required - config = least_config.merge('range_partitioning' => {'range' => { 'start' => '1', 'end' => '2', 'interval' => '1' }}) + config = least_config.merge('range_partitioning' => {'range' => { 'start' => 1, 'end' => 2, 'interval' => 1 }}) assert_raise { Bigquery.configure(config, schema, processor_count) } @@ -284,15 +284,19 @@ def test_range_partitioning assert_raise { Bigquery.configure(config, schema, processor_count) } # range.start is required - config = least_config.merge('range_partitioning' => {'field' => 'foo', 'range' => { 'end' => '2', 'interval' => '1' }}) + config = least_config.merge('range_partitioning' => {'field' => 'foo', 'range' => { 'end' => 2, 'interval' => 1 }}) assert_raise { Bigquery.configure(config, schema, processor_count) } # range.end is required - config = least_config.merge('range_partitioning' => {'field' => 'foo', 'range' => { 'start' => '1', 'interval' => '1' }}) + config = least_config.merge('range_partitioning' => {'field' => 'foo', 'range' => { 'start' => 1, 'interval' => 1 }}) assert_raise { Bigquery.configure(config, schema, processor_count) } # range.interval is required - config = least_config.merge('range_partitioning' => {'field' => 'foo', 'range' => { 'start' => '1', 'end' => '2' }}) + config = least_config.merge('range_partitioning' => {'field' => 'foo', 'range' => { 'start' => 1, 'end' => 2 }}) + assert_raise { Bigquery.configure(config, schema, processor_count) } + + # range.start + range.interval should be less than range.end + config = least_config.merge('range_partitioning' => {'field' => 'foo', 'range' => { 'start' => 1, 'end' => 2, 'interval' => 2 }}) assert_raise { Bigquery.configure(config, schema, processor_count) } end