Skip to content

Commit

Permalink
Updated the docs (#2225)
Browse files Browse the repository at this point in the history
  • Loading branch information
liferoad authored Mar 3, 2025
1 parent 8d4c9f7 commit ec5d5d2
Show file tree
Hide file tree
Showing 9 changed files with 113 additions and 28 deletions.
13 changes: 9 additions & 4 deletions v2/jdbc-to-googlecloud/README_Jdbc_to_BigQuery_Flex.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,12 @@ on [Metadata Annotations](https://github.com/GoogleCloudPlatform/DataflowTemplat
* **KMSEncryptionKey**: The Cloud KMS encryption key to use to decrypt the username, password, and connection string. If you pass in a Cloud KMS key, you must also encrypt the username, password, and connection string. For example, `projects/your-project/locations/global/keyRings/your-keyring/cryptoKeys/your-key`.
* **useColumnAlias**: If set to `true`, the pipeline uses the column alias (`AS`) instead of the column name to map the rows to BigQuery. Defaults to `false`.
* **isTruncate**: If set to `true`, the pipeline truncates before loading data into BigQuery. Defaults to `false`, which causes the pipeline to append data.
* **partitionColumn**: If this parameter is provided with the name of the `table` defined as an optional parameter, JdbcIO reads the table in parallel by executing multiple instances of the query on the same table (subquery) using ranges. Currently, only supports `Long` partition columns.
* **partitionColumn**: If `partitionColumn` is specified along with the `table`, JdbcIO reads the table in parallel by executing multiple instances of the query on the same table (subquery) using ranges. Currently, supports `Long` and `DateTime` partition columns. Pass the column type through `partitionColumnType`.
* **partitionColumnType**: The type of the `partitionColumn`, accepts either `long` or `datetime`. Defaults to: long.
* **table**: The table to read from when using partitions. This parameter also accepts a subquery in parentheses. For example, `(select id, name from Person) as subq`.
* **numPartitions**: The number of partitions. With the lower and upper bound, this value forms partition strides for generated `WHERE` clause expressions that are used to split the partition column evenly. When the input is less than `1`, the number is set to `1`.
* **lowerBound**: The lower bound to use in the partition scheme. If not provided, this value is automatically inferred by Apache Beam for the supported types.
* **upperBound**: The upper bound to use in the partition scheme. If not provided, this value is automatically inferred by Apache Beam for the supported types.
* **lowerBound**: The lower bound to use in the partition scheme. If not provided, this value is automatically inferred by Apache Beam for the supported types. `datetime` partitionColumnType accepts lower bound in the format `yyyy-MM-dd HH:mm:ss.SSSZ`. For example, `2024-02-20 07:55:45.000+03:30`.
* **upperBound**: The upper bound to use in the partition scheme. If not provided, this value is automatically inferred by Apache Beam for the supported types. `datetime` partitionColumnType accepts upper bound in the format `yyyy-MM-dd HH:mm:ss.SSSZ`. For example, `2024-02-20 07:55:45.000+03:30`.
* **fetchSize**: The number of rows to be fetched from database at a time. Not used for partitioned reads. Defaults to: 50000.
* **createDisposition**: The BigQuery CreateDisposition to use. For example, `CREATE_IF_NEEDED` or `CREATE_NEVER`. Defaults to: CREATE_NEVER.
* **bigQuerySchemaPath**: The Cloud Storage path for the BigQuery JSON schema. If `createDisposition` is set to `CREATE_IF_NEEDED`, this parameter must be specified. For example, `gs://your-bucket/your-schema.json`.
Expand Down Expand Up @@ -147,6 +148,7 @@ export KMSENCRYPTION_KEY=<KMSEncryptionKey>
export USE_COLUMN_ALIAS=false
export IS_TRUNCATE=false
export PARTITION_COLUMN=<partitionColumn>
export PARTITION_COLUMN_TYPE=long
export TABLE=<table>
export NUM_PARTITIONS=<numPartitions>
export LOWER_BOUND=<lowerBound>
Expand Down Expand Up @@ -177,6 +179,7 @@ gcloud dataflow flex-template run "jdbc-to-bigquery-flex-job" \
--parameters "useColumnAlias=$USE_COLUMN_ALIAS" \
--parameters "isTruncate=$IS_TRUNCATE" \
--parameters "partitionColumn=$PARTITION_COLUMN" \
--parameters "partitionColumnType=$PARTITION_COLUMN_TYPE" \
--parameters "table=$TABLE" \
--parameters "numPartitions=$NUM_PARTITIONS" \
--parameters "lowerBound=$LOWER_BOUND" \
Expand Down Expand Up @@ -222,6 +225,7 @@ export KMSENCRYPTION_KEY=<KMSEncryptionKey>
export USE_COLUMN_ALIAS=false
export IS_TRUNCATE=false
export PARTITION_COLUMN=<partitionColumn>
export PARTITION_COLUMN_TYPE=long
export TABLE=<table>
export NUM_PARTITIONS=<numPartitions>
export LOWER_BOUND=<lowerBound>
Expand All @@ -242,7 +246,7 @@ mvn clean package -PtemplatesRun \
-Dregion="$REGION" \
-DjobName="jdbc-to-bigquery-flex-job" \
-DtemplateName="Jdbc_to_BigQuery_Flex" \
-Dparameters="driverJars=$DRIVER_JARS,driverClassName=$DRIVER_CLASS_NAME,connectionURL=$CONNECTION_URL,connectionProperties=$CONNECTION_PROPERTIES,username=$USERNAME,password=$PASSWORD,query=$QUERY,outputTable=$OUTPUT_TABLE,bigQueryLoadingTemporaryDirectory=$BIG_QUERY_LOADING_TEMPORARY_DIRECTORY,KMSEncryptionKey=$KMSENCRYPTION_KEY,useColumnAlias=$USE_COLUMN_ALIAS,isTruncate=$IS_TRUNCATE,partitionColumn=$PARTITION_COLUMN,table=$TABLE,numPartitions=$NUM_PARTITIONS,lowerBound=$LOWER_BOUND,upperBound=$UPPER_BOUND,fetchSize=$FETCH_SIZE,createDisposition=$CREATE_DISPOSITION,bigQuerySchemaPath=$BIG_QUERY_SCHEMA_PATH,outputDeadletterTable=$OUTPUT_DEADLETTER_TABLE,disabledAlgorithms=$DISABLED_ALGORITHMS,extraFilesToStage=$EXTRA_FILES_TO_STAGE,useStorageWriteApi=$USE_STORAGE_WRITE_API,useStorageWriteApiAtLeastOnce=$USE_STORAGE_WRITE_API_AT_LEAST_ONCE" \
-Dparameters="driverJars=$DRIVER_JARS,driverClassName=$DRIVER_CLASS_NAME,connectionURL=$CONNECTION_URL,connectionProperties=$CONNECTION_PROPERTIES,username=$USERNAME,password=$PASSWORD,query=$QUERY,outputTable=$OUTPUT_TABLE,bigQueryLoadingTemporaryDirectory=$BIG_QUERY_LOADING_TEMPORARY_DIRECTORY,KMSEncryptionKey=$KMSENCRYPTION_KEY,useColumnAlias=$USE_COLUMN_ALIAS,isTruncate=$IS_TRUNCATE,partitionColumn=$PARTITION_COLUMN,partitionColumnType=$PARTITION_COLUMN_TYPE,table=$TABLE,numPartitions=$NUM_PARTITIONS,lowerBound=$LOWER_BOUND,upperBound=$UPPER_BOUND,fetchSize=$FETCH_SIZE,createDisposition=$CREATE_DISPOSITION,bigQuerySchemaPath=$BIG_QUERY_SCHEMA_PATH,outputDeadletterTable=$OUTPUT_DEADLETTER_TABLE,disabledAlgorithms=$DISABLED_ALGORITHMS,extraFilesToStage=$EXTRA_FILES_TO_STAGE,useStorageWriteApi=$USE_STORAGE_WRITE_API,useStorageWriteApiAtLeastOnce=$USE_STORAGE_WRITE_API_AT_LEAST_ONCE" \
-f v2/jdbc-to-googlecloud
```

Expand Down Expand Up @@ -300,6 +304,7 @@ resource "google_dataflow_flex_template_job" "jdbc_to_bigquery_flex" {
# useColumnAlias = "false"
# isTruncate = "false"
# partitionColumn = "<partitionColumn>"
# partitionColumnType = "long"
# table = "<table>"
# numPartitions = "<numPartitions>"
# lowerBound = "<lowerBound>"
Expand Down
24 changes: 22 additions & 2 deletions v2/kafka-to-bigquery/README_Kafka_to_BigQuery_Flex.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ on [Metadata Annotations](https://github.com/GoogleCloudPlatform/DataflowTemplat

* **readBootstrapServerAndTopic**: Kafka Topic to read the input from.
* **writeMode**: Write records to one table or multiple tables (based on schema). The `DYNAMIC_TABLE_NAMES` mode is supported only for `AVRO_CONFLUENT_WIRE_FORMAT` Source Message Format and `SCHEMA_REGISTRY` Schema Source. The target table name is auto-generated based on the Avro schema name of each message, it could either be a single schema (creating a single table) or multiple schemas (creating multiple tables). The `SINGLE_TABLE_NAME` mode writes to a single table (single schema) specified by the user. Defaults to `SINGLE_TABLE_NAME`.
* **kafkaReadAuthenticationMode**: The mode of authentication to use with the Kafka cluster. Use `KafkaAuthenticationMethod.NONE` for no authentication, `KafkaAuthenticationMethod.SASL_PLAIN` for SASL/PLAIN username and password, and `KafkaAuthenticationMethod.TLS` for certificate-based authentication. `KafkaAuthenticationMethod.APPLICATION_DEFAULT_CREDENTIALS` should be used only for Google Cloud Apache Kafka for BigQuery cluster, it allows to authenticate using application default credentials.
* **kafkaReadAuthenticationMode**: The mode of authentication to use with the Kafka cluster. Use `KafkaAuthenticationMethod.NONE` for no authentication, `KafkaAuthenticationMethod.SASL_PLAIN` for SASL/PLAIN username and password, `KafkaAuthenticationMethod.SASL_SCRAM_512` for SASL_SCRAM_512 authentication and `KafkaAuthenticationMethod.TLS` for certificate-based authentication. `KafkaAuthenticationMethod.APPLICATION_DEFAULT_CREDENTIALS` should be used only for Google Cloud Apache Kafka for BigQuery cluster, it allows to authenticate using application default credentials.
* **messageFormat**: The format of the Kafka messages to read. The supported values are `AVRO_CONFLUENT_WIRE_FORMAT` (Confluent Schema Registry encoded Avro), `AVRO_BINARY_ENCODING` (Plain binary Avro), and `JSON`. Defaults to: AVRO_CONFLUENT_WIRE_FORMAT.
* **useBigQueryDLQ**: If true, failed messages will be written to BigQuery with extra error information. Defaults to: false.

Expand All @@ -52,6 +52,10 @@ on [Metadata Annotations](https://github.com/GoogleCloudPlatform/DataflowTemplat
* **kafkaReadTruststorePasswordSecretId**: The Google Cloud Secret Manager secret ID that contains the password to use to access the Java TrustStore (JKS) file for Kafka TLS authentication For example, `projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>`.
* **kafkaReadKeystorePasswordSecretId**: The Google Cloud Secret Manager secret ID that contains the password to use to access the Java KeyStore (JKS) file for Kafka TLS authentication. For example, `projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>`.
* **kafkaReadKeyPasswordSecretId**: The Google Cloud Secret Manager secret ID that contains the password to use to access the private key within the Java KeyStore (JKS) file for Kafka TLS authentication. For example, `projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>`.
* **kafkaReadSaslScramUsernameSecretId**: The Google Cloud Secret Manager secret ID that contains the Kafka username to use with `SASL_SCRAM` authentication. For example, `projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>`.
* **kafkaReadSaslScramPasswordSecretId**: The Google Cloud Secret Manager secret ID that contains the Kafka password to use with `SASL_SCRAM` authentication. For example, `projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>`.
* **kafkaReadSaslScramTruststoreLocation**: The Google Cloud Storage path to the Java TrustStore (JKS) file that contains the trusted certificates to use to verify the identity of the Kafka broker.
* **kafkaReadSaslScramTruststorePasswordSecretId**: The Google Cloud Secret Manager secret ID that contains the password to use to access the Java TrustStore (JKS) file for Kafka SASL_SCRAM authentication For example, `projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>`.
* **schemaFormat**: The Kafka schema format. Can be provided as `SINGLE_SCHEMA_FILE` or `SCHEMA_REGISTRY`. If `SINGLE_SCHEMA_FILE` is specified, use the schema mentioned in the avro schema file for all messages. If `SCHEMA_REGISTRY` is specified, the messages can have either a single schema or multiple schemas. Defaults to: SINGLE_SCHEMA_FILE.
* **confluentAvroSchemaPath**: The Google Cloud Storage path to the single Avro schema file used to decode all of the messages in a topic. Defaults to empty.
* **schemaRegistryConnectionUrl**: The URL for the Confluent Schema Registry instance used to manage Avro schemas for message decoding. Defaults to empty.
Expand Down Expand Up @@ -186,6 +190,10 @@ export KAFKA_READ_TRUSTSTORE_LOCATION=<kafkaReadTruststoreLocation>
export KAFKA_READ_TRUSTSTORE_PASSWORD_SECRET_ID=<kafkaReadTruststorePasswordSecretId>
export KAFKA_READ_KEYSTORE_PASSWORD_SECRET_ID=<kafkaReadKeystorePasswordSecretId>
export KAFKA_READ_KEY_PASSWORD_SECRET_ID=<kafkaReadKeyPasswordSecretId>
export KAFKA_READ_SASL_SCRAM_USERNAME_SECRET_ID=<kafkaReadSaslScramUsernameSecretId>
export KAFKA_READ_SASL_SCRAM_PASSWORD_SECRET_ID=<kafkaReadSaslScramPasswordSecretId>
export KAFKA_READ_SASL_SCRAM_TRUSTSTORE_LOCATION=<kafkaReadSaslScramTruststoreLocation>
export KAFKA_READ_SASL_SCRAM_TRUSTSTORE_PASSWORD_SECRET_ID=<kafkaReadSaslScramTruststorePasswordSecretId>
export SCHEMA_FORMAT=SINGLE_SCHEMA_FILE
export CONFLUENT_AVRO_SCHEMA_PATH=""
export SCHEMA_REGISTRY_CONNECTION_URL=""
Expand Down Expand Up @@ -233,6 +241,10 @@ gcloud dataflow flex-template run "kafka-to-bigquery-flex-job" \
--parameters "kafkaReadTruststorePasswordSecretId=$KAFKA_READ_TRUSTSTORE_PASSWORD_SECRET_ID" \
--parameters "kafkaReadKeystorePasswordSecretId=$KAFKA_READ_KEYSTORE_PASSWORD_SECRET_ID" \
--parameters "kafkaReadKeyPasswordSecretId=$KAFKA_READ_KEY_PASSWORD_SECRET_ID" \
--parameters "kafkaReadSaslScramUsernameSecretId=$KAFKA_READ_SASL_SCRAM_USERNAME_SECRET_ID" \
--parameters "kafkaReadSaslScramPasswordSecretId=$KAFKA_READ_SASL_SCRAM_PASSWORD_SECRET_ID" \
--parameters "kafkaReadSaslScramTruststoreLocation=$KAFKA_READ_SASL_SCRAM_TRUSTSTORE_LOCATION" \
--parameters "kafkaReadSaslScramTruststorePasswordSecretId=$KAFKA_READ_SASL_SCRAM_TRUSTSTORE_PASSWORD_SECRET_ID" \
--parameters "messageFormat=$MESSAGE_FORMAT" \
--parameters "schemaFormat=$SCHEMA_FORMAT" \
--parameters "confluentAvroSchemaPath=$CONFLUENT_AVRO_SCHEMA_PATH" \
Expand Down Expand Up @@ -299,6 +311,10 @@ export KAFKA_READ_TRUSTSTORE_LOCATION=<kafkaReadTruststoreLocation>
export KAFKA_READ_TRUSTSTORE_PASSWORD_SECRET_ID=<kafkaReadTruststorePasswordSecretId>
export KAFKA_READ_KEYSTORE_PASSWORD_SECRET_ID=<kafkaReadKeystorePasswordSecretId>
export KAFKA_READ_KEY_PASSWORD_SECRET_ID=<kafkaReadKeyPasswordSecretId>
export KAFKA_READ_SASL_SCRAM_USERNAME_SECRET_ID=<kafkaReadSaslScramUsernameSecretId>
export KAFKA_READ_SASL_SCRAM_PASSWORD_SECRET_ID=<kafkaReadSaslScramPasswordSecretId>
export KAFKA_READ_SASL_SCRAM_TRUSTSTORE_LOCATION=<kafkaReadSaslScramTruststoreLocation>
export KAFKA_READ_SASL_SCRAM_TRUSTSTORE_PASSWORD_SECRET_ID=<kafkaReadSaslScramTruststorePasswordSecretId>
export SCHEMA_FORMAT=SINGLE_SCHEMA_FILE
export CONFLUENT_AVRO_SCHEMA_PATH=""
export SCHEMA_REGISTRY_CONNECTION_URL=""
Expand All @@ -325,7 +341,7 @@ mvn clean package -PtemplatesRun \
-Dregion="$REGION" \
-DjobName="kafka-to-bigquery-flex-job" \
-DtemplateName="Kafka_to_BigQuery_Flex" \
-Dparameters="readBootstrapServerAndTopic=$READ_BOOTSTRAP_SERVER_AND_TOPIC,outputTableSpec=$OUTPUT_TABLE_SPEC,persistKafkaKey=$PERSIST_KAFKA_KEY,writeMode=$WRITE_MODE,outputProject=$OUTPUT_PROJECT,outputDataset=$OUTPUT_DATASET,bqTableNamePrefix=$BQ_TABLE_NAME_PREFIX,createDisposition=$CREATE_DISPOSITION,writeDisposition=$WRITE_DISPOSITION,useAutoSharding=$USE_AUTO_SHARDING,numStorageWriteApiStreams=$NUM_STORAGE_WRITE_API_STREAMS,storageWriteApiTriggeringFrequencySec=$STORAGE_WRITE_API_TRIGGERING_FREQUENCY_SEC,useStorageWriteApiAtLeastOnce=$USE_STORAGE_WRITE_API_AT_LEAST_ONCE,enableCommitOffsets=$ENABLE_COMMIT_OFFSETS,consumerGroupId=$CONSUMER_GROUP_ID,kafkaReadOffset=$KAFKA_READ_OFFSET,kafkaReadAuthenticationMode=$KAFKA_READ_AUTHENTICATION_MODE,kafkaReadUsernameSecretId=$KAFKA_READ_USERNAME_SECRET_ID,kafkaReadPasswordSecretId=$KAFKA_READ_PASSWORD_SECRET_ID,kafkaReadKeystoreLocation=$KAFKA_READ_KEYSTORE_LOCATION,kafkaReadTruststoreLocation=$KAFKA_READ_TRUSTSTORE_LOCATION,kafkaReadTruststorePasswordSecretId=$KAFKA_READ_TRUSTSTORE_PASSWORD_SECRET_ID,kafkaReadKeystorePasswordSecretId=$KAFKA_READ_KEYSTORE_PASSWORD_SECRET_ID,kafkaReadKeyPasswordSecretId=$KAFKA_READ_KEY_PASSWORD_SECRET_ID,messageFormat=$MESSAGE_FORMAT,schemaFormat=$SCHEMA_FORMAT,confluentAvroSchemaPath=$CONFLUENT_AVRO_SCHEMA_PATH,schemaRegistryConnectionUrl=$SCHEMA_REGISTRY_CONNECTION_URL,binaryAvroSchemaPath=$BINARY_AVRO_SCHEMA_PATH,schemaRegistryAuthenticationMode=$SCHEMA_REGISTRY_AUTHENTICATION_MODE,schemaRegistryTruststoreLocation=$SCHEMA_REGISTRY_TRUSTSTORE_LOCATION,schemaRegistryTruststorePasswordSecretId=$SCHEMA_REGISTRY_TRUSTSTORE_PASSWORD_SECRET_ID,schemaRegistryKeystoreLocation=$SCHEMA_REGISTRY_KEYSTORE_LOCATION,schemaRegistryKeystorePasswordSecretId=$SCHEMA_REGISTRY_KEYSTORE_PASSWORD_SECRET_ID,schemaRegistryKeyPasswordSecretId=$SCHEMA_REGISTRY_KEY_PASSWORD_SECRET_ID,schemaRegistryOauthClientId=$SCHEMA_REGISTRY_OAUTH_CLIENT_ID,schemaRegistryOauthClientSecretId=$SCHEMA_REGISTRY_OAUTH_CLIENT_SECRET_ID,schemaRegistryOauthScope=$SCHEMA_REGISTRY_OAUTH_SCOPE,schemaRegistryOauthTokenEndpointUrl=$SCHEMA_REGISTRY_OAUTH_TOKEN_ENDPOINT_URL,outputDeadletterTable=$OUTPUT_DEADLETTER_TABLE,useBigQueryDLQ=$USE_BIG_QUERY_DLQ,javascriptTextTransformGcsPath=$JAVASCRIPT_TEXT_TRANSFORM_GCS_PATH,javascriptTextTransformFunctionName=$JAVASCRIPT_TEXT_TRANSFORM_FUNCTION_NAME,javascriptTextTransformReloadIntervalMinutes=$JAVASCRIPT_TEXT_TRANSFORM_RELOAD_INTERVAL_MINUTES" \
-Dparameters="readBootstrapServerAndTopic=$READ_BOOTSTRAP_SERVER_AND_TOPIC,outputTableSpec=$OUTPUT_TABLE_SPEC,persistKafkaKey=$PERSIST_KAFKA_KEY,writeMode=$WRITE_MODE,outputProject=$OUTPUT_PROJECT,outputDataset=$OUTPUT_DATASET,bqTableNamePrefix=$BQ_TABLE_NAME_PREFIX,createDisposition=$CREATE_DISPOSITION,writeDisposition=$WRITE_DISPOSITION,useAutoSharding=$USE_AUTO_SHARDING,numStorageWriteApiStreams=$NUM_STORAGE_WRITE_API_STREAMS,storageWriteApiTriggeringFrequencySec=$STORAGE_WRITE_API_TRIGGERING_FREQUENCY_SEC,useStorageWriteApiAtLeastOnce=$USE_STORAGE_WRITE_API_AT_LEAST_ONCE,enableCommitOffsets=$ENABLE_COMMIT_OFFSETS,consumerGroupId=$CONSUMER_GROUP_ID,kafkaReadOffset=$KAFKA_READ_OFFSET,kafkaReadAuthenticationMode=$KAFKA_READ_AUTHENTICATION_MODE,kafkaReadUsernameSecretId=$KAFKA_READ_USERNAME_SECRET_ID,kafkaReadPasswordSecretId=$KAFKA_READ_PASSWORD_SECRET_ID,kafkaReadKeystoreLocation=$KAFKA_READ_KEYSTORE_LOCATION,kafkaReadTruststoreLocation=$KAFKA_READ_TRUSTSTORE_LOCATION,kafkaReadTruststorePasswordSecretId=$KAFKA_READ_TRUSTSTORE_PASSWORD_SECRET_ID,kafkaReadKeystorePasswordSecretId=$KAFKA_READ_KEYSTORE_PASSWORD_SECRET_ID,kafkaReadKeyPasswordSecretId=$KAFKA_READ_KEY_PASSWORD_SECRET_ID,kafkaReadSaslScramUsernameSecretId=$KAFKA_READ_SASL_SCRAM_USERNAME_SECRET_ID,kafkaReadSaslScramPasswordSecretId=$KAFKA_READ_SASL_SCRAM_PASSWORD_SECRET_ID,kafkaReadSaslScramTruststoreLocation=$KAFKA_READ_SASL_SCRAM_TRUSTSTORE_LOCATION,kafkaReadSaslScramTruststorePasswordSecretId=$KAFKA_READ_SASL_SCRAM_TRUSTSTORE_PASSWORD_SECRET_ID,messageFormat=$MESSAGE_FORMAT,schemaFormat=$SCHEMA_FORMAT,confluentAvroSchemaPath=$CONFLUENT_AVRO_SCHEMA_PATH,schemaRegistryConnectionUrl=$SCHEMA_REGISTRY_CONNECTION_URL,binaryAvroSchemaPath=$BINARY_AVRO_SCHEMA_PATH,schemaRegistryAuthenticationMode=$SCHEMA_REGISTRY_AUTHENTICATION_MODE,schemaRegistryTruststoreLocation=$SCHEMA_REGISTRY_TRUSTSTORE_LOCATION,schemaRegistryTruststorePasswordSecretId=$SCHEMA_REGISTRY_TRUSTSTORE_PASSWORD_SECRET_ID,schemaRegistryKeystoreLocation=$SCHEMA_REGISTRY_KEYSTORE_LOCATION,schemaRegistryKeystorePasswordSecretId=$SCHEMA_REGISTRY_KEYSTORE_PASSWORD_SECRET_ID,schemaRegistryKeyPasswordSecretId=$SCHEMA_REGISTRY_KEY_PASSWORD_SECRET_ID,schemaRegistryOauthClientId=$SCHEMA_REGISTRY_OAUTH_CLIENT_ID,schemaRegistryOauthClientSecretId=$SCHEMA_REGISTRY_OAUTH_CLIENT_SECRET_ID,schemaRegistryOauthScope=$SCHEMA_REGISTRY_OAUTH_SCOPE,schemaRegistryOauthTokenEndpointUrl=$SCHEMA_REGISTRY_OAUTH_TOKEN_ENDPOINT_URL,outputDeadletterTable=$OUTPUT_DEADLETTER_TABLE,useBigQueryDLQ=$USE_BIG_QUERY_DLQ,javascriptTextTransformGcsPath=$JAVASCRIPT_TEXT_TRANSFORM_GCS_PATH,javascriptTextTransformFunctionName=$JAVASCRIPT_TEXT_TRANSFORM_FUNCTION_NAME,javascriptTextTransformReloadIntervalMinutes=$JAVASCRIPT_TEXT_TRANSFORM_RELOAD_INTERVAL_MINUTES" \
-f v2/kafka-to-bigquery
```

Expand Down Expand Up @@ -396,6 +412,10 @@ resource "google_dataflow_flex_template_job" "kafka_to_bigquery_flex" {
# kafkaReadTruststorePasswordSecretId = "<kafkaReadTruststorePasswordSecretId>"
# kafkaReadKeystorePasswordSecretId = "<kafkaReadKeystorePasswordSecretId>"
# kafkaReadKeyPasswordSecretId = "<kafkaReadKeyPasswordSecretId>"
# kafkaReadSaslScramUsernameSecretId = "<kafkaReadSaslScramUsernameSecretId>"
# kafkaReadSaslScramPasswordSecretId = "<kafkaReadSaslScramPasswordSecretId>"
# kafkaReadSaslScramTruststoreLocation = "<kafkaReadSaslScramTruststoreLocation>"
# kafkaReadSaslScramTruststorePasswordSecretId = "<kafkaReadSaslScramTruststorePasswordSecretId>"
# schemaFormat = "SINGLE_SCHEMA_FILE"
# confluentAvroSchemaPath = ""
# schemaRegistryConnectionUrl = ""
Expand Down
Loading

0 comments on commit ec5d5d2

Please sign in to comment.