Skip to content

Commit 2a179fd

Browse files
authored
Merge pull request #115 from browserstack/codex/bq-custom-partitioning-cdk
Fix BigQuery destination check with default partitioning field
2 parents f6ad8a3 + 3a42f7b commit 2a179fd

File tree

2 files changed

+51
-3
lines changed

2 files changed

+51
-3
lines changed

airbyte-integrations/connectors/destination-bigquery/src/main/kotlin/io/airbyte/integrations/destination/bigquery/SafeDestinationCatalogFactory.kt

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,9 @@ import io.airbyte.cdk.load.config.CHECK_STREAM_NAMESPACE
1818
import io.airbyte.cdk.load.data.FieldType
1919
import io.airbyte.cdk.load.data.IntegerType
2020
import io.airbyte.cdk.load.data.ObjectType
21+
import io.airbyte.cdk.load.data.TimestampTypeWithTimezone
2122
import io.airbyte.cdk.load.data.json.JsonSchemaToAirbyteType
23+
import io.airbyte.integrations.destination.bigquery.spec.BigqueryConfiguration
2224
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog
2325
import io.airbyte.protocol.models.v0.DestinationSyncMode
2426
import io.micronaut.context.annotation.Factory
@@ -80,19 +82,27 @@ class SafeDestinationCatalogFactory {
8082
fun checkCatalog(
8183
namespaceMapper: NamespaceMapper,
8284
@Named("checkNamespace") checkNamespace: String?,
85+
config: BigqueryConfiguration,
8386
): DestinationCatalog {
8487
// Copied from DefaultDestinationCatalogFactory to maintain behavior
8588
val date = LocalDate.now().format(DateTimeFormatter.ofPattern("yyyyMMdd"))
8689
val random = RandomStringUtils.randomAlphabetic(5).lowercase()
8790
val namespace = checkNamespace ?: "${CHECK_STREAM_NAMESPACE}_$date$random"
91+
val schemaFields = linkedMapOf("test" to FieldType(IntegerType, nullable = true))
92+
if (
93+
!config.defaultPartitioningField.isNullOrBlank() &&
94+
config.defaultPartitioningField != "_airbyte_extracted_at"
95+
) {
96+
schemaFields[config.defaultPartitioningField] =
97+
FieldType(TimestampTypeWithTimezone, nullable = true)
98+
}
8899
return DestinationCatalog(
89100
listOf(
90101
DestinationStream(
91102
unmappedNamespace = namespace,
92103
unmappedName = "test$date$random",
93104
importType = Append,
94-
schema =
95-
ObjectType(linkedMapOf("test" to FieldType(IntegerType, nullable = true))),
105+
schema = ObjectType(schemaFields),
96106
generationId = 1,
97107
minimumGenerationId = 0,
98108
syncId = 1,

airbyte-integrations/connectors/destination-bigquery/src/test/kotlin/io/airbyte/integrations/destination/bigquery/SafeDestinationCatalogFactoryTest.kt

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,12 @@ package io.airbyte.integrations.destination.bigquery
66

77
import io.airbyte.cdk.load.command.Dedupe
88
import io.airbyte.cdk.load.command.NamespaceMapper
9+
import io.airbyte.cdk.load.data.ObjectType
910
import io.airbyte.cdk.load.data.json.JsonSchemaToAirbyteType
11+
import io.airbyte.integrations.destination.bigquery.spec.BatchedStandardInsertConfiguration
12+
import io.airbyte.integrations.destination.bigquery.spec.BigqueryConfiguration
13+
import io.airbyte.integrations.destination.bigquery.spec.BigqueryRegion
14+
import io.airbyte.integrations.destination.bigquery.spec.CdcDeletionMode
1015
import io.airbyte.protocol.models.v0.AirbyteStream
1116
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog
1217
import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream
@@ -62,10 +67,43 @@ class SafeDestinationCatalogFactoryTest {
6267
val factory = SafeDestinationCatalogFactory()
6368
val namespaceMapper = mockk<NamespaceMapper>(relaxed = true)
6469

65-
val destCatalog = factory.checkCatalog(namespaceMapper, "custom_check_ns")
70+
val destCatalog = factory.checkCatalog(namespaceMapper, "custom_check_ns", config())
6671

6772
assertEquals(1, destCatalog.streams.size)
6873
assertEquals("custom_check_ns", destCatalog.streams.first().unmappedNamespace)
6974
assert(destCatalog.streams.first().unmappedName.startsWith("test"))
7075
}
76+
77+
@Test
78+
fun `test checkCatalog adds default partitioning field to check stream schema`() {
79+
val factory = SafeDestinationCatalogFactory()
80+
val namespaceMapper = mockk<NamespaceMapper>(relaxed = true)
81+
82+
val destCatalog =
83+
factory.checkCatalog(
84+
namespaceMapper,
85+
"custom_check_ns",
86+
config(defaultPartitioningField = "created_at"),
87+
)
88+
89+
val schema = destCatalog.streams.first().schema as ObjectType
90+
assert(schema.properties.containsKey("created_at"))
91+
}
92+
93+
private fun config(defaultPartitioningField: String? = null): BigqueryConfiguration =
94+
BigqueryConfiguration(
95+
projectId = "test-project",
96+
datasetLocation = BigqueryRegion.US,
97+
datasetId = "test_dataset",
98+
loadingMethod = BatchedStandardInsertConfiguration,
99+
credentialsJson = null,
100+
cdcDeletionMode = CdcDeletionMode.HARD_DELETE,
101+
internalTableDataset = "airbyte_internal",
102+
legacyRawTablesOnly = false,
103+
defaultPartitioningField = defaultPartitioningField,
104+
defaultClusteringField = null,
105+
defaultTableSuffix = null,
106+
defaultPartitioningGranularity = null,
107+
streamConfigMap = emptyMap(),
108+
)
71109
}

0 commit comments

Comments
 (0)