Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ class TowerClient implements TraceObserverV2 {

static private final String TOKEN_PREFIX = '@token:'

static private final Duration DEFAULT_REQUEST_TIMEOUT = Duration.of('1 min')

@TupleConstructor
static class Response {
final int code
Expand Down Expand Up @@ -141,6 +143,8 @@ class TowerClient implements TraceObserverV2 {

private Map<String,Boolean> allContainers = new ConcurrentHashMap<>()

private Duration requestTimeout = DEFAULT_REQUEST_TIMEOUT

TowerClient(Session session, TowerConfig config) {
this.session = session
this.endpoint = checkUrl(config.endpoint)
Expand All @@ -157,6 +161,11 @@ class TowerClient implements TraceObserverV2 {
return this
}

TowerClient withRequestTimeout(Duration duration) {
this.requestTimeout = duration
return this
}

@TestOnly
protected TowerClient() {
this.generator = TowerJsonGenerator.create(Collections.EMPTY_MAP)
Expand Down Expand Up @@ -543,6 +552,7 @@ class TowerClient implements TraceObserverV2 {
.header('Content-Type', 'application/json; charset=utf-8')
.header('User-Agent', "Nextflow/$BuildInfo.version")
.header('Traceparent', TraceUtils.rndTrace())
.timeout(java.time.Duration.ofMillis(requestTimeout.millis))

if( verb == 'PUT' )
return builder.PUT(HttpRequest.BodyPublishers.ofString(payload)).build()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import java.time.Instant
import java.time.OffsetDateTime
import java.time.ZoneId

import com.github.tomakehurst.wiremock.WireMockServer
import com.github.tomakehurst.wiremock.client.WireMock
import io.seqera.http.HxClient
import nextflow.Session
import nextflow.cloud.types.CloudMachineInfo
Expand All @@ -35,6 +37,7 @@ import nextflow.script.WorkflowMetadata
import nextflow.trace.TraceRecord
import nextflow.trace.WorkflowStats
import nextflow.trace.WorkflowStatsObserver
import nextflow.util.Duration
import nextflow.util.ProcessHelper
import spock.lang.Specification
/**
Expand Down Expand Up @@ -657,4 +660,38 @@ class TowerClientTest extends Specification {
req.tasks[0].acceleratorType == 'v100'
}

def 'should return error response on http request timeout' () {
given: 'a WireMock server that hangs for 5 seconds'
def wireMock = new WireMockServer(0)
wireMock.start()
wireMock.stubFor(
WireMock.post(WireMock.anyUrl())
.willReturn(WireMock.aResponse()
.withFixedDelay(5_000)
.withStatus(200)
.withBody('{}'))
)

and: 'a TowerClient whose requests carry a 200ms timeout'
TowerClient client = Spy(new TowerClient().withRequestTimeout(Duration.of('200 ms'))) {
// inject a short per-request timeout so the test doesn't wait 5 seconds

newHttpClient() >> HxClient.newBuilder()
.connectTimeout(java.time.Duration.ofSeconds(5))
.build()
}
client.@httpClient = client.newHttpClient()
client.@endpoint = wireMock.baseUrl()

when:
def response = client.sendHttpMessage("${wireMock.baseUrl()}/trace/create", [runName: 'test'], 'POST')

then: 'a timeout produces an error response with code 0'
response.code == 0
response.message.contains('Unable to connect')

cleanup:
wireMock.stop()
}

}
Loading