Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1609,6 +1609,28 @@ TableResult query(QueryJobConfiguration configuration, JobOption... options)
TableResult query(QueryJobConfiguration configuration, JobId jobId, JobOption... options)
throws InterruptedException, JobException;

/**
* Starts the query associated with the request, using the given JobId. It returns either
* TableResult for quick queries or Job object for long-running queries.
*
* <p>If the location of the job is not "US" or "EU", the {@code jobId} must specify the job
* location.
*
* <p>This method cannot be used in conjunction with {@link QueryJobConfiguration#dryRun()}
* queries. Since dry-run queries are not actually executed, there's no way to retrieve results.
*
* <p>See {@link #query(QueryJobConfiguration, JobOption...)} for examples on populating a {@link
* QueryJobConfiguration}.
*
* @throws BigQueryException upon failure
* @throws InterruptedException if the current thread gets interrupted while waiting for the query
* to complete
* @throws JobException if the job completes unsuccessfully
*/
Object queryWithTimeout(
QueryJobConfiguration configuration, JobId jobId, Long timeoutMs, JobOption... options)
throws InterruptedException, JobException;

/**
* Returns results of the query associated with the provided job.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1925,48 +1925,10 @@ public Boolean call() throws IOException {
@Override
public TableResult query(QueryJobConfiguration configuration, JobOption... options)
throws InterruptedException, JobException {
Job.checkNotDryRun(configuration, "query");

configuration =
configuration.toBuilder()
.setJobCreationMode(getOptions().getDefaultJobCreationMode())
.build();

Span querySpan = null;
if (getOptions().isOpenTelemetryTracingEnabled()
&& getOptions().getOpenTelemetryTracer() != null) {
querySpan =
getOptions()
.getOpenTelemetryTracer()
.spanBuilder("com.google.cloud.bigquery.BigQuery.query")
.setAllAttributes(otelAttributesFromOptions(options))
.startSpan();
}
try (Scope queryScope = querySpan != null ? querySpan.makeCurrent() : null) {
// If all parameters passed in configuration are supported by the query() method on the
// backend,
// put on fast path
QueryRequestInfo requestInfo =
new QueryRequestInfo(configuration, getOptions().getUseInt64Timestamps());
if (requestInfo.isFastQuerySupported(null)) {
String projectId = getOptions().getProjectId();
QueryRequest content = requestInfo.toPb();
if (getOptions().getLocation() != null) {
content.setLocation(getOptions().getLocation());
}
return queryRpc(projectId, content, options);
}
// Otherwise, fall back to the existing create query job logic
return create(JobInfo.of(configuration), options).getQueryResults();
} finally {
if (querySpan != null) {
querySpan.end();
}
}
return query(configuration, null, options);
}

private TableResult queryRpc(
final String projectId, final QueryRequest content, JobOption... options)
private Object queryRpc(final String projectId, final QueryRequest content, JobOption... options)
throws InterruptedException {
com.google.api.services.bigquery.model.QueryResponse results;
Span queryRpc = null;
Expand Down Expand Up @@ -2030,7 +1992,7 @@ public com.google.api.services.bigquery.model.QueryResponse call()
// here, but this is left as future work.
JobId jobId = JobId.fromPb(results.getJobReference());
Job job = getJob(jobId, options);
return job.getQueryResults();
return job;
}

if (results.getPageToken() != null) {
Expand Down Expand Up @@ -2070,16 +2032,35 @@ public com.google.api.services.bigquery.model.QueryResponse call()
@Override
public TableResult query(QueryJobConfiguration configuration, JobId jobId, JobOption... options)
throws InterruptedException, JobException {
Object result = queryWithTimeout(configuration, jobId, null, options);
if (result instanceof Job) {
return ((Job) result).getQueryResults();
}
return (TableResult) result;
}

@Override
public Object queryWithTimeout(
QueryJobConfiguration configuration, JobId jobId, Long timeoutMs, JobOption... options)
throws InterruptedException, JobException {
Job.checkNotDryRun(configuration, "query");

// If JobCreationMode is not explicitly set, update it with default value;
if (configuration.getJobCreationMode() == null) {
configuration =
configuration.toBuilder()
.setJobCreationMode(getOptions().getDefaultJobCreationMode())
.build();
}

Span querySpan = null;
if (getOptions().isOpenTelemetryTracingEnabled()
&& getOptions().getOpenTelemetryTracer() != null) {
querySpan =
getOptions()
.getOpenTelemetryTracer()
.spanBuilder("com.google.cloud.bigquery.BigQuery.query")
.setAllAttributes(jobId.getOtelAttributes())
.spanBuilder("com.google.cloud.bigquery.BigQuery.queryWithTimeout")
.setAllAttributes(jobId != null ? jobId.getOtelAttributes() : null)
.setAllAttributes(otelAttributesFromOptions(options))
.startSpan();
}
Expand All @@ -2095,18 +2076,23 @@ && getOptions().getOpenTelemetryTracer() != null) {
// fail with "Access denied" if the project do not have enough permissions to run the job.

String projectId =
jobId.getProject() != null ? jobId.getProject() : getOptions().getProjectId();
jobId != null && jobId.getProject() != null
? jobId.getProject()
: getOptions().getProjectId();
QueryRequest content = requestInfo.toPb();
// Be careful when setting the location, if a location is specified in the BigQueryOption or
// JobId the job created by the query method will be in that location, even if the table to
// be
// queried is in a different location. This may cause the query to fail with
// "BigQueryException: Not found"
if (jobId.getLocation() != null) {
if (jobId != null && jobId.getLocation() != null) {
content.setLocation(jobId.getLocation());
} else if (getOptions().getLocation() != null) {
content.setLocation(getOptions().getLocation());
}
if (timeoutMs != null) {
content.setTimeoutMs(timeoutMs);
}

return queryRpc(projectId, content, options);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2610,6 +2610,29 @@ PROJECT, JOB, null, optionMap(Job.DEFAULT_QUERY_WAIT_OPTIONS)))
PROJECT, DATASET, TABLE, Collections.<BigQueryRpc.Option, Object>emptyMap());
}

@Test
public void testQueryWithTimeoutSetsTimeout() throws InterruptedException, IOException {
com.google.api.services.bigquery.model.QueryResponse queryResponsePb =
new com.google.api.services.bigquery.model.QueryResponse()
.setCacheHit(false)
.setJobComplete(true)
.setKind("bigquery#queryResponse")
.setPageToken(null)
.setRows(ImmutableList.of(TABLE_ROW))
.setSchema(TABLE_SCHEMA.toPb())
.setTotalBytesProcessed(42L)
.setTotalRows(BigInteger.valueOf(1L));

when(bigqueryRpcMock.queryRpcSkipExceptionTranslation(eq(PROJECT), requestPbCapture.capture()))
.thenReturn(queryResponsePb);

bigquery = options.getService();
Object result = bigquery.queryWithTimeout(QUERY_JOB_CONFIGURATION_FOR_QUERY, null, 1000L);
assertTrue(result instanceof TableResult);
QueryRequest requestPb = requestPbCapture.getValue();
assertEquals((Long) 1000L, requestPb.getTimeoutMs());
}

@Test
public void testGetQueryResults() throws IOException {
JobId queryJob = JobId.of(JOB);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7215,6 +7215,7 @@ public void testTableResultJobIdAndQueryId() throws InterruptedException {
// 2. For queries that fails the requirements to be stateless, then jobId is populated and
// queryId is not.
// 3. For explicitly created jobs, then jobId is populated and queryId is not populated.
// 4. If QueryJobConfiguration explicitly sets Job Creation Mode to Required.

// Test scenario 1.
// Create local BigQuery for test scenario 1 to not contaminate global test parameters.
Expand All @@ -7241,6 +7242,16 @@ public void testTableResultJobIdAndQueryId() throws InterruptedException {
result = job.getQueryResults();
assertNotNull(result.getJobId());
assertNull(result.getQueryId());

// Test scenario 4.
configWithJob =
QueryJobConfiguration.newBuilder(query)
.setJobCreationMode(JobCreationMode.JOB_CREATION_REQUIRED)
.build();
result = bigQuery.query(configWithJob);
result = job.getQueryResults();
assertNotNull(result.getJobId());
assertNull(result.getQueryId());
}

@Test
Expand Down Expand Up @@ -7294,6 +7305,50 @@ public void testStatelessQueriesWithLocation() throws Exception {
}
}

@Test
public void testQueryWithTimeout() throws InterruptedException {
// Validate that queryWithTimeout returns either TableResult or Job object

RemoteBigQueryHelper bigqueryHelper = RemoteBigQueryHelper.create();
BigQuery bigQuery = bigqueryHelper.getOptions().getService();
bigQuery.getOptions().setDefaultJobCreationMode(JobCreationMode.JOB_CREATION_OPTIONAL);
String largeQuery =
"SELECT * FROM UNNEST(GENERATE_ARRAY(1, 20000)) CROSS JOIN UNNEST(GENERATE_ARRAY(1, 20000))";
String query = "SELECT 1 as one";
// Test scenario 1.
// Stateless query returns TableResult
QueryJobConfiguration config = QueryJobConfiguration.newBuilder(query).build();
Object result = bigQuery.queryWithTimeout(config, null, null);
assertTrue(result instanceof TableResult);
assertNull(((TableResult) result).getJobId());
assertNotNull(((TableResult) result).getQueryId());

// Stateful query returns Job
// Test scenario 2 to ensure job is created if JobCreationMode is set, but for a small query
// it still returns results.
config =
QueryJobConfiguration.newBuilder(query)
.setJobCreationMode(JobCreationMode.JOB_CREATION_REQUIRED)
.build();
result = bigQuery.queryWithTimeout(config, null, null);
assertTrue(result instanceof TableResult);
assertNotNull(((TableResult) result).getJobId());
assertNull(((TableResult) result).getQueryId());

// Stateful query returns Job
// Test scenario 3 to ensure job is created if Query is long running.
// Explicitly disable cache to ensure it is long-running query;
config = QueryJobConfiguration.newBuilder(largeQuery).setUseQueryCache(false).build();
long millis = System.currentTimeMillis();
result = bigQuery.queryWithTimeout(config, null, 1000L);
millis = System.currentTimeMillis() - millis;
assertTrue(result instanceof Job);
// Cancel the job as we don't need results.
((Job) result).cancel();
// Allow 2 seconds of timeout value to account for random delays
assertTrue(millis < 1_000_000 * 2);
}

@Test
public void testUniverseDomainWithInvalidUniverseDomain() {
RemoteBigQueryHelper bigqueryHelper = RemoteBigQueryHelper.create();
Expand Down Expand Up @@ -7743,7 +7798,7 @@ public void testOpenTelemetryTracingQuery() throws InterruptedException {
assertNotNull(
OTEL_ATTRIBUTES.get("com.google.cloud.bigquery.BigQueryRetryHelper.runWithRetries"));
assertNotNull(OTEL_ATTRIBUTES.get("com.google.cloud.bigquery.BigQueryRpc.queryRpc"));
assertTrue(OTEL_ATTRIBUTES.containsKey("com.google.cloud.bigquery.BigQuery.query"));
assertTrue(OTEL_ATTRIBUTES.containsKey("com.google.cloud.bigquery.BigQuery.queryWithTimeout"));

// Query job
String query = "SELECT TimestampField, StringField, BooleanField FROM " + TABLE_ID.getTable();
Expand Down
Loading