-
Notifications
You must be signed in to change notification settings - Fork 89
Description
Environment details
- Specify the API at the beginning of the title. For example, "BigQuery: ...").
General, Core, and Other are also allowed as types : BigQuery Storage Write API - OS type and version: MacOs Sonoma 14.4.1
- Java version: Java17 and Java20
- version(s):3.16.2 and 3.5.1
Steps to reproduce
We are currently using the BigQuery Storage Write API and have encountered an error regarding datetime formatting. However, upon further inspection, we clearly see that this problem is caused by the integration of protobuf and the BigQuery Storage Write API in Java. This is because if we use a time zone, we have a problem.
We see that using Storage Write API in BigQuery:
"BQ_ERROR_MESSAGE":"Field root.ModifiedDateTime failed to convert to DATETIME. Error: Text '2025-08-14T12:24:03.484Z' could not be parsed, unparsed text found at index 23"
Also we that this problem
com.google.cloud.bigquery.storage.v1.Exceptions$AppendSerializationError: INVALID_ARGUMENT: Append serialization failed for writer: projects/hb-datalake-test/datasets/test/tables/deneme_table/_default
at com.google.cloud.bigquery.storage.v1.SchemaAwareStreamWriter.appendWithUniqueId(SchemaAwareStreamWriter.java:251) ~[google-cloud-bigquerystorage-3.16.2.jar:3.16.2]
at com.google.cloud.bigquery.storage.v1.SchemaAwareStreamWriter.append(SchemaAwareStreamWriter.java:140) ~[google-cloud-bigquerystorage-3.16.2.jar:3.16.2]
at com.google.cloud.bigquery.storage.v1.JsonStreamWriter.append(JsonStreamWriter.java:68) ~[google-cloud-bigquerystorage-3.16.2.jar:3.16.2]
at com.hepsiburada.lightrabbitconnector.service.SWADefaultStream$DataWriter.append(SWADefaultStream.java:190) ~[classes/:na]
at com.hepsiburada.lightrabbitconnector.service.SWADefaultStream.writeToDefaultStream(SWADefaultStream.java:77) ~[classes/:na]
at com.hepsiburada.lightrabbitconnector.service.RabbitMessageProcessor.listen(RabbitMessageProcessor.java:100) ~[classes/:na]
at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:104) ~[na:na]
at java.base/java.lang.reflect.Method.invoke(Method.java:578) ~[na:na]
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:169) ~[spring-messaging-6.0.13.jar:6.0.13]
at org.springframework.amqp.rabbit.listener.adapter.KotlinAwareInvocableHandlerMethod.doInvoke(KotlinAwareInvocableHandlerMethod.java:45) ~[spring-rabbit-3.0.10.jar:3.0.10]
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:119) ~[spring-messaging-6.0.13.jar:6.0.13]
at org.springframework.amqp.rabbit.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:75) ~[spring-rabbit-3.0.10.jar:3.0.10]
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:274) ~[spring-rabbit-3.0.10.jar:3.0.10]
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandlerAndProcessResult(MessagingMessageListenerAdapter.java:224) ~[spring-rabbit-3.0.10.jar:3.0.10]
at org.springframework.amqp.rabbit.listener.adapter.BatchMessagingMessageListenerAdapter.onMessageBatch(BatchMessagingMessageListenerAdapter.java:80) ~[spring-rabbit-3.0.10.jar:3.0.10]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1659) ~[spring-rabbit-3.0.10.jar:3.0.10]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1582) ~[spring-rabbit-3.0.10.jar:3.0.10]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1570) ~[spring-rabbit-3.0.10.jar:3.0.10]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1565) ~[spring-rabbit-3.0.10.jar:3.0.10]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListenerAndHandleException(AbstractMessageListenerContainer.java:1506) ~[spring-rabbit-3.0.10.jar:3.0.10]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1487) ~[spring-rabbit-3.0.10.jar:3.0.10]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.executeWithList(SimpleMessageListenerContainer.java:1057) ~[spring-rabbit-3.0.10.jar:3.0.10]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:1046) ~[spring-rabbit-3.0.10.jar:3.0.10]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:941) ~[spring-rabbit-3.0.10.jar:3.0.10]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.mainLoop(SimpleMessageListenerContainer.java:1323) ~[spring-rabbit-3.0.10.jar:3.0.10]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1225) ~[spring-rabbit-3.0.10.jar:3.0.10]
at java.base/java.lang.Thread.run(Thread.java:1623) ~[na:na]
When I encounter an error, I investigate it, and I think I have found the source of the problem. This code snippet is BQTableSchemaToProtoDescriptor in BigQuery Storage Write API.
Code example
We see that in this code in BQTableSchemaToProtoDescriptor
public class BQTableSchemaToProtoDescriptor {
private static ImmutableMap<TableFieldSchema.Mode, FieldDescriptorProto.Label>
BQTableSchemaModeMap =
ImmutableMap.of(
TableFieldSchema.Mode.NULLABLE, FieldDescriptorProto.Label.LABEL_OPTIONAL,
TableFieldSchema.Mode.REPEATED, FieldDescriptorProto.Label.LABEL_REPEATED,
TableFieldSchema.Mode.REQUIRED, FieldDescriptorProto.Label.LABEL_REQUIRED);
private static ImmutableMap<TableFieldSchema.Type, FieldDescriptorProto.Type>
BQTableSchemaTypeMap =
new ImmutableMap.Builder<TableFieldSchema.Type, FieldDescriptorProto.Type>()
.put(TableFieldSchema.Type.BOOL, FieldDescriptorProto.Type.TYPE_BOOL)
.put(TableFieldSchema.Type.BYTES, FieldDescriptorProto.Type.TYPE_BYTES)
.put(TableFieldSchema.Type.DATE, FieldDescriptorProto.Type.TYPE_INT32)
.put(TableFieldSchema.Type.DATETIME, FieldDescriptorProto.Type.TYPE_INT64)
.put(TableFieldSchema.Type.DOUBLE, FieldDescriptorProto.Type.TYPE_DOUBLE)
.put(TableFieldSchema.Type.GEOGRAPHY, FieldDescriptorProto.Type.TYPE_STRING)
.put(TableFieldSchema.Type.INT64, FieldDescriptorProto.Type.TYPE_INT64)
.put(TableFieldSchema.Type.NUMERIC, FieldDescriptorProto.Type.TYPE_BYTES)
.put(TableFieldSchema.Type.BIGNUMERIC, FieldDescriptorProto.Type.TYPE_BYTES)
.put(TableFieldSchema.Type.STRING, FieldDescriptorProto.Type.TYPE_STRING)
.put(TableFieldSchema.Type.STRUCT, FieldDescriptorProto.Type.TYPE_MESSAGE)
.put(TableFieldSchema.Type.TIME, FieldDescriptorProto.Type.TYPE_INT64)
.put(TableFieldSchema.Type.TIMESTAMP, FieldDescriptorProto.Type.TYPE_INT64)
.put(TableFieldSchema.Type.JSON, FieldDescriptorProto.Type.TYPE_STRING)
.put(TableFieldSchema.Type.INTERVAL, FieldDescriptorProto.Type.TYPE_STRING)
.put(TableFieldSchema.Type.RANGE, FieldDescriptorProto.Type.TYPE_MESSAGE)
.build();
Howewer we see that converter like that
private static ImmutableMap<Field.Mode, TableFieldSchema.Mode> BQTableSchemaModeMap =
ImmutableMap.of(
Field.Mode.NULLABLE, TableFieldSchema.Mode.NULLABLE,
Field.Mode.REPEATED, TableFieldSchema.Mode.REPEATED,
Field.Mode.REQUIRED, TableFieldSchema.Mode.REQUIRED);
private static ImmutableMap<StandardSQLTypeName, TableFieldSchema.Type> BQTableSchemaTypeMap =
new ImmutableMap.Builder<StandardSQLTypeName, TableFieldSchema.Type>()
.put(StandardSQLTypeName.BOOL, TableFieldSchema.Type.BOOL)
.put(StandardSQLTypeName.BYTES, TableFieldSchema.Type.BYTES)
.put(StandardSQLTypeName.DATE, TableFieldSchema.Type.DATE)
.put(StandardSQLTypeName.DATETIME, TableFieldSchema.Type.DATETIME)
.put(StandardSQLTypeName.FLOAT64, TableFieldSchema.Type.DOUBLE)
.put(StandardSQLTypeName.GEOGRAPHY, TableFieldSchema.Type.GEOGRAPHY)
.put(StandardSQLTypeName.INT64, TableFieldSchema.Type.INT64)
.put(StandardSQLTypeName.NUMERIC, TableFieldSchema.Type.NUMERIC)
.put(StandardSQLTypeName.STRING, TableFieldSchema.Type.STRING)
.put(StandardSQLTypeName.STRUCT, TableFieldSchema.Type.STRUCT)
.put(StandardSQLTypeName.TIME, TableFieldSchema.Type.TIME)
.put(StandardSQLTypeName.TIMESTAMP, TableFieldSchema.Type.TIMESTAMP)
.build();
As you can see, the datetime format is not converted with the time zone, such as +03, for the INT64 reason. That's why we get an error. We could write a converter, but we would have a huge latency problem because we use 2 PBs of data per day with this Write API. Therefore, we cannot convert every single datetime in memory.