diff --git a/google-cloud-bigquery/clirr-ignored-differences.xml b/google-cloud-bigquery/clirr-ignored-differences.xml index bd455d2d4..cef0f30f7 100644 --- a/google-cloud-bigquery/clirr-ignored-differences.xml +++ b/google-cloud-bigquery/clirr-ignored-differences.xml @@ -117,6 +117,11 @@ com/google/cloud/bigquery/TableInfo* *ResourceTags(*) + + 7012 + com/google/cloud/bigquery/BigQuery + java.lang.Object queryWithTimeout(com.google.cloud.bigquery.QueryJobConfiguration, com.google.cloud.bigquery.JobId, java.lang.Long, com.google.cloud.bigquery.BigQuery$JobOption[]) + 7012 com/google/cloud/bigquery/Connection diff --git a/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/BigQuery.java b/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/BigQuery.java index 2e0747f6b..ab16ed40f 100644 --- a/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/BigQuery.java +++ b/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/BigQuery.java @@ -1609,6 +1609,28 @@ TableResult query(QueryJobConfiguration configuration, JobOption... options) TableResult query(QueryJobConfiguration configuration, JobId jobId, JobOption... options) throws InterruptedException, JobException; + /** + * Starts the query associated with the request, using the given JobId. It returns either + * TableResult for quick queries or Job object for long-running queries. + * + *

If the location of the job is not "US" or "EU", the {@code jobId} must specify the job + * location. + * + *

This method cannot be used in conjunction with {@link QueryJobConfiguration#dryRun()} + * queries. Since dry-run queries are not actually executed, there's no way to retrieve results. + * + *

See {@link #query(QueryJobConfiguration, JobOption...)} for examples on populating a {@link + * QueryJobConfiguration}. + * + * @throws BigQueryException upon failure + * @throws InterruptedException if the current thread gets interrupted while waiting for the query + * to complete + * @throws JobException if the job completes unsuccessfully + */ + Object queryWithTimeout( + QueryJobConfiguration configuration, JobId jobId, Long timeoutMs, JobOption... options) + throws InterruptedException, JobException; + /** * Returns results of the query associated with the provided job. * diff --git a/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/BigQueryImpl.java b/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/BigQueryImpl.java index 088d15c09..ac8fce708 100644 --- a/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/BigQueryImpl.java +++ b/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/BigQueryImpl.java @@ -1925,48 +1925,10 @@ public Boolean call() throws IOException { @Override public TableResult query(QueryJobConfiguration configuration, JobOption... options) throws InterruptedException, JobException { - Job.checkNotDryRun(configuration, "query"); - - configuration = - configuration.toBuilder() - .setJobCreationMode(getOptions().getDefaultJobCreationMode()) - .build(); - - Span querySpan = null; - if (getOptions().isOpenTelemetryTracingEnabled() - && getOptions().getOpenTelemetryTracer() != null) { - querySpan = - getOptions() - .getOpenTelemetryTracer() - .spanBuilder("com.google.cloud.bigquery.BigQuery.query") - .setAllAttributes(otelAttributesFromOptions(options)) - .startSpan(); - } - try (Scope queryScope = querySpan != null ? querySpan.makeCurrent() : null) { - // If all parameters passed in configuration are supported by the query() method on the - // backend, - // put on fast path - QueryRequestInfo requestInfo = - new QueryRequestInfo(configuration, getOptions().getUseInt64Timestamps()); - if (requestInfo.isFastQuerySupported(null)) { - String projectId = getOptions().getProjectId(); - QueryRequest content = requestInfo.toPb(); - if (getOptions().getLocation() != null) { - content.setLocation(getOptions().getLocation()); - } - return queryRpc(projectId, content, options); - } - // Otherwise, fall back to the existing create query job logic - return create(JobInfo.of(configuration), options).getQueryResults(); - } finally { - if (querySpan != null) { - querySpan.end(); - } - } + return query(configuration, null, options); } - private TableResult queryRpc( - final String projectId, final QueryRequest content, JobOption... options) + private Object queryRpc(final String projectId, final QueryRequest content, JobOption... options) throws InterruptedException { com.google.api.services.bigquery.model.QueryResponse results; Span queryRpc = null; @@ -2030,7 +1992,7 @@ public com.google.api.services.bigquery.model.QueryResponse call() // here, but this is left as future work. JobId jobId = JobId.fromPb(results.getJobReference()); Job job = getJob(jobId, options); - return job.getQueryResults(); + return job; } if (results.getPageToken() != null) { @@ -2070,16 +2032,35 @@ public com.google.api.services.bigquery.model.QueryResponse call() @Override public TableResult query(QueryJobConfiguration configuration, JobId jobId, JobOption... options) throws InterruptedException, JobException { + Object result = queryWithTimeout(configuration, jobId, null, options); + if (result instanceof Job) { + return ((Job) result).getQueryResults(); + } + return (TableResult) result; + } + + @Override + public Object queryWithTimeout( + QueryJobConfiguration configuration, JobId jobId, Long timeoutMs, JobOption... options) + throws InterruptedException, JobException { Job.checkNotDryRun(configuration, "query"); + // If JobCreationMode is not explicitly set, update it with default value; + if (configuration.getJobCreationMode() == null) { + configuration = + configuration.toBuilder() + .setJobCreationMode(getOptions().getDefaultJobCreationMode()) + .build(); + } + Span querySpan = null; if (getOptions().isOpenTelemetryTracingEnabled() && getOptions().getOpenTelemetryTracer() != null) { querySpan = getOptions() .getOpenTelemetryTracer() - .spanBuilder("com.google.cloud.bigquery.BigQuery.query") - .setAllAttributes(jobId.getOtelAttributes()) + .spanBuilder("com.google.cloud.bigquery.BigQuery.queryWithTimeout") + .setAllAttributes(jobId != null ? jobId.getOtelAttributes() : null) .setAllAttributes(otelAttributesFromOptions(options)) .startSpan(); } @@ -2095,18 +2076,23 @@ && getOptions().getOpenTelemetryTracer() != null) { // fail with "Access denied" if the project do not have enough permissions to run the job. String projectId = - jobId.getProject() != null ? jobId.getProject() : getOptions().getProjectId(); + jobId != null && jobId.getProject() != null + ? jobId.getProject() + : getOptions().getProjectId(); QueryRequest content = requestInfo.toPb(); // Be careful when setting the location, if a location is specified in the BigQueryOption or // JobId the job created by the query method will be in that location, even if the table to // be // queried is in a different location. This may cause the query to fail with // "BigQueryException: Not found" - if (jobId.getLocation() != null) { + if (jobId != null && jobId.getLocation() != null) { content.setLocation(jobId.getLocation()); } else if (getOptions().getLocation() != null) { content.setLocation(getOptions().getLocation()); } + if (timeoutMs != null) { + content.setTimeoutMs(timeoutMs); + } return queryRpc(projectId, content, options); } diff --git a/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/BigQueryImplTest.java b/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/BigQueryImplTest.java index c0367beae..393455e36 100644 --- a/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/BigQueryImplTest.java +++ b/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/BigQueryImplTest.java @@ -2610,6 +2610,29 @@ PROJECT, JOB, null, optionMap(Job.DEFAULT_QUERY_WAIT_OPTIONS))) PROJECT, DATASET, TABLE, Collections.emptyMap()); } + @Test + public void testQueryWithTimeoutSetsTimeout() throws InterruptedException, IOException { + com.google.api.services.bigquery.model.QueryResponse queryResponsePb = + new com.google.api.services.bigquery.model.QueryResponse() + .setCacheHit(false) + .setJobComplete(true) + .setKind("bigquery#queryResponse") + .setPageToken(null) + .setRows(ImmutableList.of(TABLE_ROW)) + .setSchema(TABLE_SCHEMA.toPb()) + .setTotalBytesProcessed(42L) + .setTotalRows(BigInteger.valueOf(1L)); + + when(bigqueryRpcMock.queryRpcSkipExceptionTranslation(eq(PROJECT), requestPbCapture.capture())) + .thenReturn(queryResponsePb); + + bigquery = options.getService(); + Object result = bigquery.queryWithTimeout(QUERY_JOB_CONFIGURATION_FOR_QUERY, null, 1000L); + assertTrue(result instanceof TableResult); + QueryRequest requestPb = requestPbCapture.getValue(); + assertEquals((Long) 1000L, requestPb.getTimeoutMs()); + } + @Test public void testGetQueryResults() throws IOException { JobId queryJob = JobId.of(JOB); diff --git a/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/it/ITBigQueryTest.java b/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/it/ITBigQueryTest.java index c9f6296cc..ec1f7b5a0 100644 --- a/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/it/ITBigQueryTest.java +++ b/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/it/ITBigQueryTest.java @@ -7215,6 +7215,7 @@ public void testTableResultJobIdAndQueryId() throws InterruptedException { // 2. For queries that fails the requirements to be stateless, then jobId is populated and // queryId is not. // 3. For explicitly created jobs, then jobId is populated and queryId is not populated. + // 4. If QueryJobConfiguration explicitly sets Job Creation Mode to Required. // Test scenario 1. // Create local BigQuery for test scenario 1 to not contaminate global test parameters. @@ -7241,6 +7242,16 @@ public void testTableResultJobIdAndQueryId() throws InterruptedException { result = job.getQueryResults(); assertNotNull(result.getJobId()); assertNull(result.getQueryId()); + + // Test scenario 4. + configWithJob = + QueryJobConfiguration.newBuilder(query) + .setJobCreationMode(JobCreationMode.JOB_CREATION_REQUIRED) + .build(); + result = bigQuery.query(configWithJob); + result = job.getQueryResults(); + assertNotNull(result.getJobId()); + assertNull(result.getQueryId()); } @Test @@ -7294,6 +7305,50 @@ public void testStatelessQueriesWithLocation() throws Exception { } } + @Test + public void testQueryWithTimeout() throws InterruptedException { + // Validate that queryWithTimeout returns either TableResult or Job object + + RemoteBigQueryHelper bigqueryHelper = RemoteBigQueryHelper.create(); + BigQuery bigQuery = bigqueryHelper.getOptions().getService(); + bigQuery.getOptions().setDefaultJobCreationMode(JobCreationMode.JOB_CREATION_OPTIONAL); + String largeQuery = + "SELECT * FROM UNNEST(GENERATE_ARRAY(1, 20000)) CROSS JOIN UNNEST(GENERATE_ARRAY(1, 20000))"; + String query = "SELECT 1 as one"; + // Test scenario 1. + // Stateless query returns TableResult + QueryJobConfiguration config = QueryJobConfiguration.newBuilder(query).build(); + Object result = bigQuery.queryWithTimeout(config, null, null); + assertTrue(result instanceof TableResult); + assertNull(((TableResult) result).getJobId()); + assertNotNull(((TableResult) result).getQueryId()); + + // Stateful query returns Job + // Test scenario 2 to ensure job is created if JobCreationMode is set, but for a small query + // it still returns results. + config = + QueryJobConfiguration.newBuilder(query) + .setJobCreationMode(JobCreationMode.JOB_CREATION_REQUIRED) + .build(); + result = bigQuery.queryWithTimeout(config, null, null); + assertTrue(result instanceof TableResult); + assertNotNull(((TableResult) result).getJobId()); + assertNull(((TableResult) result).getQueryId()); + + // Stateful query returns Job + // Test scenario 3 to ensure job is created if Query is long running. + // Explicitly disable cache to ensure it is long-running query; + config = QueryJobConfiguration.newBuilder(largeQuery).setUseQueryCache(false).build(); + long millis = System.currentTimeMillis(); + result = bigQuery.queryWithTimeout(config, null, 1000L); + millis = System.currentTimeMillis() - millis; + assertTrue(result instanceof Job); + // Cancel the job as we don't need results. + ((Job) result).cancel(); + // Allow 2 seconds of timeout value to account for random delays + assertTrue(millis < 1_000_000 * 2); + } + @Test public void testUniverseDomainWithInvalidUniverseDomain() { RemoteBigQueryHelper bigqueryHelper = RemoteBigQueryHelper.create(); @@ -7743,7 +7798,7 @@ public void testOpenTelemetryTracingQuery() throws InterruptedException { assertNotNull( OTEL_ATTRIBUTES.get("com.google.cloud.bigquery.BigQueryRetryHelper.runWithRetries")); assertNotNull(OTEL_ATTRIBUTES.get("com.google.cloud.bigquery.BigQueryRpc.queryRpc")); - assertTrue(OTEL_ATTRIBUTES.containsKey("com.google.cloud.bigquery.BigQuery.query")); + assertTrue(OTEL_ATTRIBUTES.containsKey("com.google.cloud.bigquery.BigQuery.queryWithTimeout")); // Query job String query = "SELECT TimestampField, StringField, BooleanField FROM " + TABLE_ID.getTable();