Skip to content

feat: Add range partitioning support #174

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,12 @@ Following options are same as [bq command-line tools](https://cloud.google.com/b
| time_partitioning.type | string | required | nil | The only type supported is DAY, which will generate one partition per day based on data loading time. |
| time_partitioning.expiration_ms | int | optional | nil | Number of milliseconds for which to keep the storage for a partition. |
| time_partitioning.field | string | optional | nil | `DATE` or `TIMESTAMP` column used for partitioning |
| range_partitioning | hash | optional | nil | See [Range Partitioning](#range-partitioning) |
| range_partitioning.field | string | required | nil | `INT64` column used for partitioning |
| range-partitioning.range | hash | required | nil | Defines the ranges for range paritioning |
| range-partitioning.range.start | string | required | nil | The start of range partitioning, inclusive. This field is an INT64 value represented as a string. |
| range-partitioning.range.end | string | required | nil | The end of range partitioning, exclusive. This field is an INT64 value represented as a string. |
| range-partitioning.range.interval| string | required | nil | The width of each interval. This field is an INT64 value represented as a string. |
| clustering | hash | optional | nil | Currently, clustering is supported for partitioned tables, so must be used with `time_partitioning` option. See [clustered tables](https://cloud.google.com/bigquery/docs/clustered-tables) |
| clustering.fields | array | required | nil | One or more fields on which data should be clustered. The order of the specified columns determines the sort order of the data. |
| schema_update_options | array | optional | nil | (Experimental) List of `ALLOW_FIELD_ADDITION` or `ALLOW_FIELD_RELAXATION` or both. See [jobs#configuration.load.schemaUpdateOptions](https://cloud.google.com/bigquery/docs/reference/v2/jobs#configuration.load.schemaUpdateOptions). NOTE for the current status: `schema_update_options` does not work for `copy` job, that is, is not effective for most of modes such as `append`, `replace` and `replace_backup`. `delete_in_advance` deletes origin table so does not need to update schema. Only `append_direct` can utilize schema update. |
Expand Down Expand Up @@ -448,6 +454,24 @@ MEMO: [jobs#configuration.load.schemaUpdateOptions](https://cloud.google.com/big
to update the schema of the desitination table as a side effect of the load job, but it is not available for copy job.
Thus, it was not suitable for embulk-output-bigquery idempotence modes, `append`, `replace`, and `replace_backup`, sigh.

### Range Partitioning

See also [Creating and Updating Range-Partitioned Tables](https://cloud.google.com/bigquery/docs/creating-partitioned-tables).

To load into a partition, specify `range_partitioning` and `table` parameter with a partition decorator as:

```yaml
out:
type: bigquery
table: table_name$1
range_partitioning:
field: customer_id
range:
start: '1'
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does this part use string instead of integer? As far as I know, range partition uses a number. And we can check the range is valid (start < end) if we use integer.

What do you think of this configuration layout?
(Do we need a range block?).

range_partitioning:
  field:  customer_id # document uses `column` but this plugin uses `field`. [1] 
  start: 1
  end: 1000
  interval: 10
  # [1] https://cloud.google.com/bigquery/docs/creating-partitioned-tables#create_an_integer-range_partitioned_table

(This is just my opinion, I want to ask co-maintainers this comment.)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the comment! I followed the api documentation. If you prefer integer, I'll change this!

https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#RangePartitioning

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need a range block?

I fixed it with 49d2c36.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hello, @kitagry. Thank you for waiting.

Could you use this layout? (Sorry, we decided to use the original design (except using integer instead of string in range values.)

range_partitioning:
  field: customer_id
  range:
    start: 1    # integer not string.
    end: 99999  # integer not string.
    interval: 1 # integer not string.

and Could you check the range start + interval < end?
If you have any concern, please let me know.

I referenced the time_partition configurations.

BigQuery API use

{
  "type": string,
  "expirationMs": string,
  "field": string,
  "requirePartitionFilter": boolean
}

embulk configuration

  type: bigquery
  table: table_name$20160929
  time_partitioning:
    type: DAY
    expiration_ms: 259200000 # integer not strong, use sake case

We discussed this using this design document. (Written in Japanese)

After modification, I'll check the partition feature.

end: '99999'
range: '1'
```

## Development

### Run example:
Expand Down
36 changes: 36 additions & 0 deletions example/config_replace_field_range_partitioned_table.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
in:
type: file
path_prefix: example/example.csv
parser:
type: csv
charset: UTF-8
newline: CRLF
null_string: 'NULL'
skip_header_lines: 1
comment_line_marker: '#'
columns:
- {name: date, type: string}
- {name: timestamp, type: timestamp, format: "%Y-%m-%d %H:%M:%S.%N", timezone: "+09:00"}
- {name: "null", type: string}
- {name: long, type: long}
- {name: string, type: string}
- {name: double, type: double}
- {name: boolean, type: boolean}
out:
type: bigquery
mode: replace
auth_method: service_account
json_keyfile: example/your-project-000.json
dataset: your_dataset_name
table: your_field_partitioned_table_name
source_format: NEWLINE_DELIMITED_JSON
compression: NONE
auto_create_dataset: true
auto_create_table: true
schema_file: example/schema.json
range_partitioning:
field: 'long'
range:
start: '90'
end: '100'
interval: '1'
23 changes: 22 additions & 1 deletion lib/embulk/output/bigquery.rb
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ def self.configure(config, schema, task_count)
'ignore_unknown_values' => config.param('ignore_unknown_values', :bool, :default => false),
'allow_quoted_newlines' => config.param('allow_quoted_newlines', :bool, :default => false),
'time_partitioning' => config.param('time_partitioning', :hash, :default => nil),
'range_partitioning' => config.param('range_partitioning', :hash, :default => nil),
'clustering' => config.param('clustering', :hash, :default => nil), # google-api-ruby-client >= v0.21.0
'schema_update_options' => config.param('schema_update_options', :array, :default => nil),

Expand Down Expand Up @@ -231,10 +232,30 @@ def self.configure(config, schema, task_count)
unless task['time_partitioning']['type']
raise ConfigError.new "`time_partitioning` must have `type` key"
end
elsif Helper.has_partition_decorator?(task['table'])
# If user specify range_partitioning, it should be used as is
elsif Helper.has_partition_decorator?(task['table']) && task['range_partitioning'].nil?
task['time_partitioning'] = {'type' => 'DAY'}
end

if task['range_partitioning']
unless task['range_partitioning']['field']
raise ConfigError.new "`range_partitioning` must have `field` key"
end
unless task['range_partitioning']['range']
raise ConfigError.new "`range_partitioning` must have `range` key"
end

unless task['range_partitioning']['range']['start']
raise ConfigError.new "`range_partitioning` must have `range.start` key"
end
unless task['range_partitioning']['range']['end']
raise ConfigError.new "`range_partitioning` must have `range.end` key"
end
unless task['range_partitioning']['range']['interval']
raise ConfigError.new "`range_partitioning` must have `range.interval` key"
end
end

if task['clustering']
unless task['clustering']['fields']
raise ConfigError.new "`clustering` must have `fields` key"
Expand Down
14 changes: 13 additions & 1 deletion lib/embulk/output/bigquery/bigquery_client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,7 @@ def create_table_if_not_exists(table, dataset: nil, options: nil)
dataset ||= @dataset
options ||= {}
options['time_partitioning'] ||= @task['time_partitioning']
if Helper.has_partition_decorator?(table)
if Helper.has_partition_decorator?(table) && @task['range_partitioning'].nil?
options['time_partitioning'] ||= {'type' => 'DAY'}
table = Helper.chomp_partition_decorator(table)
end
Expand All @@ -435,6 +435,18 @@ def create_table_if_not_exists(table, dataset: nil, options: nil)
}
end

options['range_partitioning'] ||= @task['range_partitioning']
if options['range_partitioning']
body[:range_partitioning] = {
field: options['range_partitioning']['field'],
range: {
start: options['range_partitioning']['range']['start'],
end: options['range_partitioning']['range']['end'],
interval: options['range_partitioning']['range']['interval'],
},
}
end

options['clustering'] ||= @task['clustering']
if options['clustering']
body[:clustering] = {
Expand Down
26 changes: 26 additions & 0 deletions test/test_configure.rb
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,32 @@ def test_time_partitioning
assert_equal 'DAY', task['time_partitioning']['type']
end

def test_range_partitioning
config = least_config.merge('range_partitioning' => {'field' => 'foo', 'range' => { 'start' => '1', 'end' => '2', 'interval' => '1' }})
assert_nothing_raised { Bigquery.configure(config, schema, processor_count) }

# field is required
config = least_config.merge('range_partitioning' => {'range' => { 'start' => '1', 'end' => '2', 'interval' => '1' }})
assert_raise { Bigquery.configure(config, schema, processor_count) }


# range is required
config = least_config.merge('range_partitioning' => {'field' => 'foo'})
assert_raise { Bigquery.configure(config, schema, processor_count) }

# range.start is required
config = least_config.merge('range_partitioning' => {'field' => 'foo', 'range' => { 'end' => '2', 'interval' => '1' }})
assert_raise { Bigquery.configure(config, schema, processor_count) }

# range.end is required
config = least_config.merge('range_partitioning' => {'field' => 'foo', 'range' => { 'start' => '1', 'interval' => '1' }})
assert_raise { Bigquery.configure(config, schema, processor_count) }

# range.interval is required
config = least_config.merge('range_partitioning' => {'field' => 'foo', 'range' => { 'start' => '1', 'end' => '2' }})
assert_raise { Bigquery.configure(config, schema, processor_count) }
end

def test_clustering
config = least_config.merge('clustering' => {'fields' => ['field_a']})
assert_nothing_raised { Bigquery.configure(config, schema, processor_count) }
Expand Down