Skip to content

Commit 9e91a20

Browse files
authored
Fix integration tests after revert (#5601)
Co-authored-by: Claire McGinty <[email protected]> 🔥 🔥 🔥
1 parent fe41bb3 commit 9e91a20

File tree

15 files changed

+173
-196
lines changed

15 files changed

+173
-196
lines changed

build.sbt

+3
Original file line numberDiff line numberDiff line change
@@ -494,6 +494,9 @@ ThisBuild / mimaBinaryIssueFilters ++= Seq(
494494
),
495495
ProblemFilters.exclude[IncompatibleResultTypeProblem](
496496
"com.spotify.scio.bigquery.types.package#Json.parse"
497+
),
498+
ProblemFilters.exclude[DirectMissingMethodProblem](
499+
"com.spotify.scio.bigquery.types.package#BigNumeric.bytes"
497500
)
498501
)
499502

integration/src/test/scala/com/spotify/scio/bigquery/TypedBigQueryIT.scala

+12-37
Original file line numberDiff line numberDiff line change
@@ -48,11 +48,7 @@ object TypedBigQueryIT {
4848
timestamp: Instant,
4949
date: LocalDate,
5050
time: LocalTime,
51-
// BQ DATETIME is problematic with avro as BQ api uses different representations:
52-
// - BQ export uses 'string(datetime)'
53-
// - BQ load uses 'long(local-timestamp-micros)'
54-
// BigQueryType avroSchema favors read with string type
55-
// datetime: LocalDateTime,
51+
datetime: LocalDateTime,
5652
geography: Geography,
5753
json: Json,
5854
bigNumeric: BigNumeric
@@ -81,23 +77,12 @@ object TypedBigQueryIT {
8177
y <- Gen.numChar
8278
} yield Geography(s"POINT($x $y)")
8379
)
84-
implicit val arbJson: Arbitrary[Json] = Arbitrary {
85-
import Arbitrary._
86-
import Gen._
87-
Gen
88-
.oneOf(
89-
// json object
90-
alphaLowerStr.flatMap(str => arbInt.arbitrary.map(num => s"""{"$str":$num}""")),
91-
// json array
92-
alphaLowerStr.flatMap(str => arbInt.arbitrary.map(num => s"""["$str",$num]""")),
93-
// json literals
94-
alphaLowerStr.map(str => s""""$str""""),
95-
arbInt.arbitrary.map(_.toString),
96-
arbBool.arbitrary.map(_.toString),
97-
Gen.const("null")
98-
)
99-
.map(wkt => Json(wkt))
100-
}
80+
implicit val arbJson: Arbitrary[Json] = Arbitrary(
81+
for {
82+
key <- Gen.alphaStr
83+
value <- Gen.alphaStr
84+
} yield Json(s"""{"$key":"$value"}""")
85+
)
10186

10287
implicit val arbBigNumeric: Arbitrary[BigNumeric] = Arbitrary {
10388
// Precision: 76.76 (the 77th digit is partial)
@@ -119,7 +104,7 @@ object TypedBigQueryIT {
119104
private val tableRowTable = table("records_tablerow")
120105
private val avroTable = table("records_avro")
121106

122-
private val records = Gen.listOfN(5, recordGen).sample.get
107+
private val records = Gen.listOfN(100, recordGen).sample.get
123108
private val options = PipelineOptionsFactory
124109
.fromArgs(
125110
"--project=data-integration-test",
@@ -131,9 +116,8 @@ object TypedBigQueryIT {
131116
class TypedBigQueryIT extends PipelineSpec with BeforeAndAfterAll {
132117
import TypedBigQueryIT._
133118

134-
private val bq = BigQuery.defaultInstance()
135-
136119
override protected def afterAll(): Unit = {
120+
val bq = BigQuery.defaultInstance()
137121
// best effort cleanup
138122
Try(bq.tables.delete(typedTable.ref))
139123
Try(bq.tables.delete(tableRowTable.ref))
@@ -156,14 +140,6 @@ class TypedBigQueryIT extends PipelineSpec with BeforeAndAfterAll {
156140
runWithRealContext(options) { sc =>
157141
sc.parallelize(records)
158142
.map(Record.toTableRow)
159-
.map { row =>
160-
// TableRow BQ save API uses json
161-
// TO disambiguate from literal json string,
162-
// field MUST be converted to parsed JSON
163-
val jsonLoadRow = new TableRow()
164-
jsonLoadRow.putAll(row.asInstanceOf[java.util.Map[String, _]]) // cast for 2.12
165-
jsonLoadRow.set("json", Json.parse(row.getJson("json")))
166-
}
167143
.saveAsBigQueryTable(
168144
tableRowTable,
169145
schema = Record.schema,
@@ -177,9 +153,9 @@ class TypedBigQueryIT extends PipelineSpec with BeforeAndAfterAll {
177153
}
178154
}
179155

180-
it should "handle records as avro format" in {
156+
// TODO fix if in beam 2.61
157+
ignore should "handle records as avro format" in {
181158
implicit val coder: Coder[GenericRecord] = avroGenericRecordCoder(Record.avroSchema)
182-
183159
runWithRealContext(options) { sc =>
184160
sc.parallelize(records)
185161
.map(Record.toAvro)
@@ -191,8 +167,7 @@ class TypedBigQueryIT extends PipelineSpec with BeforeAndAfterAll {
191167
}.waitUntilFinish()
192168

193169
runWithRealContext(options) { sc =>
194-
val data =
195-
sc.bigQueryTable(avroTable, Format.GenericRecordWithLogicalTypes).map(Record.fromAvro)
170+
val data = sc.bigQueryTable(avroTable, Format.GenericRecord).map(Record.fromAvro)
196171
data should containInAnyOrder(records)
197172
}
198173
}

scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/client/TableOps.scala

+52-1
Original file line numberDiff line numberDiff line change
@@ -228,6 +228,57 @@ final private[client] class TableOps(client: Client) {
228228
def exists(tableSpec: String): Boolean =
229229
exists(bq.BigQueryHelpers.parseTableSpec(tableSpec))
230230

231+
/**
232+
* This is annoying but the GCP BQ client v2 does not accept BQ json rows in the same format as BQ
233+
* load. JSON column are expected as string instead of parsed json
234+
*/
235+
private def normalizeRows(schema: TableSchema)(tableRow: TableRow): TableRow =
236+
normalizeRows(schema.getFields.asScala.toList)(tableRow)
237+
238+
private def normalizeRows(fields: List[TableFieldSchema])(tableRow: TableRow): TableRow = {
239+
import com.spotify.scio.bigquery._
240+
241+
fields.foldLeft(tableRow) { (row, f) =>
242+
f.getType match {
243+
case "JSON" =>
244+
val name = f.getName
245+
f.getMode match {
246+
case "REQUIRED" =>
247+
row.set(name, row.getJson(name).wkt)
248+
case "NULLABLE" =>
249+
row.getJsonOpt(name).fold(row) { json =>
250+
row.set(name, json.wkt)
251+
}
252+
case "REPEATED" =>
253+
row.set(name, row.getJsonList(name).map(_.wkt).asJava)
254+
}
255+
case "RECORD" | "STRUCT" =>
256+
val name = f.getName
257+
val netedFields = f.getFields.asScala.toList
258+
f.getMode match {
259+
case "REQUIRED" =>
260+
row.set(name, normalizeRows(netedFields)(row.getRecord(name)))
261+
case "NULLABLE" =>
262+
row.getRecordOpt(name).fold(row) { nestedRow =>
263+
row.set(name, normalizeRows(netedFields)(nestedRow))
264+
}
265+
case "REPEATED" =>
266+
row.set(
267+
name,
268+
row
269+
.getRecordList(name)
270+
.map { nestedRow =>
271+
normalizeRows(netedFields)(nestedRow)
272+
}
273+
.asJava
274+
)
275+
}
276+
case _ =>
277+
row
278+
}
279+
}
280+
}
281+
231282
/** Write rows to a table. */
232283
def writeRows(
233284
tableReference: TableReference,
@@ -262,7 +313,7 @@ final private[client] class TableOps(client: Client) {
262313
case WriteDisposition.WRITE_APPEND =>
263314
}
264315

265-
service.insertAll(tableReference, rows.asJava)
316+
service.insertAll(tableReference, rows.map(normalizeRows(schema)).asJava)
266317
}
267318

268319
/** Write rows to a table. */

scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/syntax/TableRowSyntax.scala

+4-3
Original file line numberDiff line numberDiff line change
@@ -121,9 +121,10 @@ object TableRowOps {
121121
}
122122

123123
def json(value: AnyRef): Json = value match {
124-
case x: Json => x
125-
case x: String => Json(x)
126-
case _ => throw new UnsupportedOperationException("Cannot convert to json: " + value)
124+
case x: Json => x
125+
case x: TableRow => Json(x)
126+
case x: String => Json(x)
127+
case _ => throw new UnsupportedOperationException("Cannot convert to json: " + value)
127128
}
128129

129130
def bignumeric(value: AnyRef): BigNumeric = value match {

scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/types/ConverterProvider.scala

+8-10
Original file line numberDiff line numberDiff line change
@@ -180,30 +180,27 @@ private[types] object ConverterProvider {
180180
case t if t =:= typeOf[String] => tree
181181

182182
case t if t =:= typeOf[BigDecimal] =>
183-
q"_root_.com.spotify.scio.bigquery.Numeric.bytes($tree)"
183+
q"_root_.com.spotify.scio.bigquery.Numeric($tree).toString"
184184
case t if t =:= typeOf[ByteString] =>
185185
q"_root_.java.nio.ByteBuffer.wrap($tree.toByteArray)"
186186
case t if t =:= typeOf[Array[Byte]] =>
187187
q"_root_.java.nio.ByteBuffer.wrap($tree)"
188188

189-
case t if t =:= typeOf[Instant] =>
190-
q"_root_.com.spotify.scio.bigquery.Timestamp.micros($tree)"
189+
case t if t =:= typeOf[Instant] => q"$tree.getMillis * 1000"
191190
case t if t =:= typeOf[LocalDate] =>
192-
q"_root_.com.spotify.scio.bigquery.Date.days($tree)"
191+
q"_root_.com.spotify.scio.bigquery.Date($tree)"
193192
case t if t =:= typeOf[LocalTime] =>
194-
q"_root_.com.spotify.scio.bigquery.Time.micros($tree)"
193+
q"_root_.com.spotify.scio.bigquery.Time($tree)"
195194
case t if t =:= typeOf[LocalDateTime] =>
196-
// LocalDateTime is read as avro string
197-
// on write we should use `local-timestamp-micros`
198-
q"_root_.com.spotify.scio.bigquery.DateTime.format($tree)"
195+
q"_root_.com.spotify.scio.bigquery.DateTime($tree)"
199196

200197
// different than nested record match below, even though thore are case classes
201198
case t if t =:= typeOf[Geography] =>
202199
q"$tree.wkt"
203200
case t if t =:= typeOf[Json] =>
204201
q"$tree.wkt"
205202
case t if t =:= typeOf[BigNumeric] =>
206-
q"_root_.com.spotify.scio.bigquery.types.BigNumeric.bytes($tree)"
203+
q"_root_.com.spotify.scio.bigquery.types.BigNumeric($tree.wkt).toString"
207204

208205
// nested records
209206
case t if isCaseClass(c)(t) =>
@@ -377,7 +374,8 @@ private[types] object ConverterProvider {
377374
case t if t =:= typeOf[Geography] =>
378375
q"$tree.wkt"
379376
case t if t =:= typeOf[Json] =>
380-
q"$tree.wkt"
377+
// for TableRow/json, use parsed JSON to prevent escaping
378+
q"_root_.com.spotify.scio.bigquery.types.Json.parse($tree)"
381379
case t if t =:= typeOf[BigNumeric] =>
382380
// for TableRow/json, use string to avoid precision loss (like numeric)
383381
q"$tree.wkt.toString"

scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/types/SchemaProvider.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ private[types] object SchemaProvider {
3737
def avroSchemaOf[T: TypeTag]: Schema =
3838
AvroSchemaCache.get(
3939
typeTag[T].tpe.toString,
40-
BigQueryUtils.toGenericAvroSchema(typeTag[T].tpe.toString, schemaOf[T].getFields, true)
40+
BigQueryUtils.toGenericAvroSchema(typeTag[T].tpe.toString, schemaOf[T].getFields)
4141
)
4242

4343
def schemaOf[T: TypeTag]: TableSchema =

scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/types/SchemaUtil.scala

-3
Original file line numberDiff line numberDiff line change
@@ -43,9 +43,6 @@ object SchemaUtil {
4343
case "DATE" => "LocalDate"
4444
case "TIME" => "LocalTime"
4545
case "DATETIME" => "LocalDateTime"
46-
case "GEOGRAPHY" => "Geography"
47-
case "JSON" => "Json"
48-
case "BIGNUMERIC" => "BigNumeric"
4946
case "RECORD" | "STRUCT" => NameProvider.getUniqueName(tfs.getName)
5047
case t => throw new IllegalArgumentException(s"Type: $t not supported")
5148
}

scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/types/TypeProvider.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -335,7 +335,7 @@ private[types] object TypeProvider {
335335
q"override def schema: ${p(c, GModel)}.TableSchema = ${p(c, SUtil)}.parseSchema(${schema.toString})"
336336
}
337337
val defAvroSchema =
338-
q"override def avroSchema: org.apache.avro.Schema = ${p(c, BigQueryUtils)}.toGenericAvroSchema(this.schema, true)"
338+
q"override def avroSchema: org.apache.avro.Schema = ${p(c, BigQueryUtils)}.toGenericAvroSchema(${cName.toString}, this.schema.getFields)"
339339
val defToPrettyString =
340340
q"override def toPrettyString(indent: Int = 0): String = ${p(c, s"$SBQ.types.SchemaUtil")}.toPrettyString(this.schema, ${cName.toString}, indent)"
341341

scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/types/package.scala

+2-4
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,8 @@ package object types {
7272
.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS)
7373
.disable(SerializationFeature.FAIL_ON_EMPTY_BEANS);
7474

75-
def parse(json: Json): AnyRef = mapper.readValue(json.wkt, classOf[Object])
75+
def apply(row: TableRow): Json = Json(mapper.writeValueAsString(row))
76+
def parse(json: Json): TableRow = mapper.readValue(json.wkt, classOf[TableRow])
7677
}
7778

7879
/**
@@ -116,8 +117,5 @@ package object types {
116117
case b: ByteBuffer => new BigNumeric(DecimalConverter.fromBytes(b, null, DecimalLogicalType))
117118
case _ => apply(value.toString)
118119
}
119-
120-
def bytes(value: BigNumeric): ByteBuffer =
121-
DecimalConverter.toBytes(value.wkt.bigDecimal, null, DecimalLogicalType)
122120
}
123121
}

scio-google-cloud-platform/src/test/scala/com/spotify/scio/bigquery/types/ConverterProviderSpec.scala

+11-35
Original file line numberDiff line numberDiff line change
@@ -38,11 +38,6 @@ final class ConverterProviderSpec
3838

3939
import Schemas._
4040

41-
def arbBigDecimal(precision: Int, scale: Int): Arbitrary[BigDecimal] = Arbitrary {
42-
val max = BigInt(10).pow(precision) - 1
43-
Gen.choose(-max, max).map(BigDecimal(_, scale))
44-
}
45-
4641
implicit val arbByteArray: Arbitrary[Array[Byte]] = Arbitrary(Gen.alphaStr.map(_.getBytes))
4742
implicit val arbByteString: Arbitrary[ByteString] = Arbitrary(
4843
Gen.alphaStr.map(ByteString.copyFromUtf8)
@@ -51,37 +46,19 @@ final class ConverterProviderSpec
5146
implicit val arbDate: Arbitrary[LocalDate] = Arbitrary(Gen.const(LocalDate.now()))
5247
implicit val arbTime: Arbitrary[LocalTime] = Arbitrary(Gen.const(LocalTime.now()))
5348
implicit val arbDatetime: Arbitrary[LocalDateTime] = Arbitrary(Gen.const(LocalDateTime.now()))
54-
implicit val arbNumericBigDecimal: Arbitrary[BigDecimal] =
55-
arbBigDecimal(Numeric.MaxNumericPrecision, Numeric.MaxNumericScale)
56-
implicit val arbGeography: Arbitrary[Geography] = Arbitrary(
49+
implicit val arbNumericBigDecimal: Arbitrary[BigDecimal] = Arbitrary {
50+
Arbitrary.arbBigDecimal.arbitrary
51+
.retryUntil(_.precision <= Numeric.MaxNumericPrecision)
52+
.map(Numeric.apply)
53+
}
54+
implicit val arbJson: Arbitrary[Json] = Arbitrary(
5755
for {
58-
x <- Gen.numChar
59-
y <- Gen.numChar
60-
} yield Geography(s"POINT($x $y)")
56+
// f is a key field from TableRow. It cannot be used as column name
57+
// see https://github.com/apache/beam/issues/33531
58+
key <- Gen.alphaStr.retryUntil(_ != "f")
59+
value <- Gen.alphaStr
60+
} yield Json(s"""{"$key":"$value"}""")
6161
)
62-
implicit val arbJson: Arbitrary[Json] = Arbitrary {
63-
import Arbitrary._
64-
import Gen._
65-
Gen
66-
.oneOf(
67-
// json object
68-
alphaLowerStr.flatMap(str => arbInt.arbitrary.map(num => s"""{"$str":$num}""")),
69-
// json array
70-
alphaLowerStr.flatMap(str => arbInt.arbitrary.map(num => s"""["$str",$num]""")),
71-
// json literals
72-
alphaLowerStr.map(str => s""""$str""""),
73-
arbInt.arbitrary.map(_.toString),
74-
arbBool.arbitrary.map(_.toString),
75-
Gen.const("null")
76-
)
77-
.map(wkt => Json(wkt))
78-
}
79-
80-
implicit val arbBigNumeric: Arbitrary[BigNumeric] = Arbitrary {
81-
// Precision: 76.76 (the 77th digit is partial)
82-
arbBigDecimal(BigNumeric.MaxNumericPrecision - 1, BigNumeric.MaxNumericScale).arbitrary
83-
.map(BigNumeric.apply)
84-
}
8562

8663
implicit val eqByteArrays: Eq[Array[Byte]] = Eq.instance[Array[Byte]](_.toList == _.toList)
8764
implicit val eqByteString: Eq[ByteString] = Eq.instance[ByteString](_ == _)
@@ -137,7 +114,6 @@ final class ConverterProviderSpec
137114
o.bigDecimalF.isDefined shouldBe r.containsKey("bigDecimalF")
138115
o.geographyF.isDefined shouldBe r.containsKey("geographyF")
139116
o.jsonF.isDefined shouldBe r.containsKey("jsonF")
140-
o.bigNumericF.isDefined shouldBe r.containsKey("bigNumericF")
141117
}
142118
}
143119

scio-google-cloud-platform/src/test/scala/com/spotify/scio/bigquery/types/ConverterProviderTest.scala

+5-2
Original file line numberDiff line numberDiff line change
@@ -50,9 +50,12 @@ class ConverterProviderTest extends AnyFlatSpec with Matchers {
5050

5151
it should "handle required json type" in {
5252
val wkt = """{"name":"Alice","age":30}"""
53+
val parsed = new TableRow()
54+
.set("name", "Alice")
55+
.set("age", 30)
5356

54-
RequiredJson.fromTableRow(TableRow("a" -> wkt)) shouldBe RequiredJson(Json(wkt))
55-
BigQueryType.toTableRow[RequiredJson](RequiredJson(Json(wkt))) shouldBe TableRow("a" -> wkt)
57+
RequiredJson.fromTableRow(TableRow("a" -> parsed)) shouldBe RequiredJson(Json(wkt))
58+
BigQueryType.toTableRow[RequiredJson](RequiredJson(Json(wkt))) shouldBe TableRow("a" -> parsed)
5659
}
5760

5861
it should "handle required big numeric type" in {

scio-google-cloud-platform/src/test/scala/com/spotify/scio/bigquery/types/SchemaProviderTest.scala

+1-2
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,7 @@ class SchemaProviderTest extends AnyFlatSpec with Matchers {
4242
| {"mode": "$mode", "name": "datetimeF", "type": "DATETIME"},
4343
| {"mode": "$mode", "name": "bigDecimalF", "type": "NUMERIC"},
4444
| {"mode": "$mode", "name": "geographyF", "type": "GEOGRAPHY"},
45-
| {"mode": "$mode", "name": "jsonF", "type": "JSON"},
46-
| {"mode": "$mode", "name": "bigNumericF", "type": "BIGNUMERIC"}
45+
| {"mode": "$mode", "name": "jsonF", "type": "JSON"}
4746
|]
4847
|""".stripMargin
4948

0 commit comments

Comments
 (0)