Skip to content
9 changes: 9 additions & 0 deletions agent/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,12 @@ atum.dispatcher.type="http"
# Maximum number of dispatch captures to keep in memory
#atum.dispatcher.capture.capture-limit=1000 # 0 means no limit

# Retry configuration for the HTTP dispatcher (all keys optional — defaults apply if absent)
#atum.dispatcher.http.retry.max-retries=3 # number of retries after first failure (0 = no retries)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My personal option would be, to have default on 1 - generally in a healthy, stable well load balanced environment, there should not be need for more - but not a hard complain.

@lsulak lsulak May 25, 2026

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would tend to go more defensive on this one.

In healthy stable environment it might be that there are almost no retries or 1 is enough, but there were some production issues and I would like the agent to be generally / by default a bit more robust.

So, basically if there are 3 potential retries and it's not used in healthy environment, there is zero cost to have configured 3. But under the load during peaks / different environments this can be beneficial.

#atum.dispatcher.http.retry.initial-delay-ms=1000 # initial backoff delay in milliseconds
#atum.dispatcher.http.retry.max-delay-ms=10000 # maximum backoff delay cap in milliseconds

# HTTP timeout configuration (all keys optional — defaults apply if absent)
#atum.dispatcher.http.timeout.connect-ms=10000 # TCP connection timeout in milliseconds (default: 10 000)
#atum.dispatcher.http.timeout.read-ms=30000 # response read timeout in milliseconds (default: 30 000)
#atum.dispatcher.http.timeout.write-ms=10000 # request write timeout in milliseconds (default: 10 000)
Comment on lines +30 to +31

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry I am not sure, what exactly mean read and write timeout?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wow, thanks for the review & doing it so quickly even! This is used here:

    val okHttpClient = new OkHttpClient.Builder()
      .connectTimeout(connectTimeoutMs, TimeUnit.MILLISECONDS)
      .readTimeout(readTimeoutMs, TimeUnit.MILLISECONDS)
      .writeTimeout(writeTimeoutMs, TimeUnit.MILLISECONDS)
      .build()

so these values are used in the OkHTTP meaning that it's all on the HTTP layer: https://www.baeldung.com/okhttp-timeouts

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From the source above:

ConnectionTimeout: It defines a time period in which our client should establish a connection with a target host.

ReadTimeout: It defines a maximum time of inactivity between two data packets when waiting for the server’s response.

WriteTimeout: it defines a maximum time of inactivity between two data packets when sending the request to the server.

Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package za.co.absa.atum.agent.dispatcher

import com.typesafe.config.Config
import okhttp3.OkHttpClient
import org.apache.spark.internal.Logging
import sttp.capabilities
import sttp.client3._
Expand All @@ -29,6 +30,8 @@ import za.co.absa.atum.model.dto._
import za.co.absa.atum.model.envelopes.SuccessResponse.{MultiSuccessResponse, SingleSuccessResponse}
import za.co.absa.atum.model.utils.JsonSyntaxExtensions._

import java.util.concurrent.TimeUnit

class HttpDispatcher(config: Config) extends Dispatcher(config) with Logging {
import HttpDispatcher._

Expand All @@ -42,7 +45,21 @@ class HttpDispatcher(config: Config) extends Dispatcher(config) with Logging {
.header("Content-Type", "application/json")
.response(asString)

private[dispatcher] val backend: SttpBackend[Identity, capabilities.WebSockets] = OkHttpSyncBackend()
private[dispatcher] val backend: SttpBackend[Identity, capabilities.WebSockets] = {
val connectTimeoutMs = optionalLong(ConnectTimeoutKey).getOrElse(DefaultConnectTimeoutMs)
val readTimeoutMs = optionalLong(ReadTimeoutKey).getOrElse(DefaultReadTimeoutMs)
val writeTimeoutMs = optionalLong(WriteTimeoutKey).getOrElse(DefaultWriteTimeoutMs)

val okHttpClient = new OkHttpClient.Builder()
.connectTimeout(connectTimeoutMs, TimeUnit.MILLISECONDS)
.readTimeout(readTimeoutMs, TimeUnit.MILLISECONDS)
.writeTimeout(writeTimeoutMs, TimeUnit.MILLISECONDS)
.build()

OkHttpSyncBackend.usingClient(okHttpClient)
}

private val httpRetry: HttpRetry = HttpRetry.fromConfig(config)

/**
* This method is used to get the partitioning ID from the server.
Expand All @@ -53,7 +70,7 @@ class HttpDispatcher(config: Config) extends Dispatcher(config) with Logging {
val encodedPartitioning = partitioning.asBase64EncodedJsonString
val request = commonAtumRequest.get(getPartitioningIdEndpoint.addParam("partitioning", encodedPartitioning))

val response = backend.send(request)
val response = withRetry(request)

handleResponseBody(response).as[SingleSuccessResponse[PartitioningWithIdDTO]].data.id
}
Expand All @@ -62,7 +79,7 @@ class HttpDispatcher(config: Config) extends Dispatcher(config) with Logging {
val encodedPartitioning = partitioning.asBase64EncodedJsonString
val request = commonAtumRequest.get(getPartitioningIdEndpoint.addParam("partitioning", encodedPartitioning))

val response = backend.send(request)
val response = withRetry(request)

response.code match {
case StatusCode.NotFound => None
Expand All @@ -85,7 +102,7 @@ class HttpDispatcher(config: Config) extends Dispatcher(config) with Logging {
partitioning.authorIfNew
).asJsonString
)
val response = backend.send(request)
val response = withRetry(request)
handleResponseBody(response).as[SingleSuccessResponse[PartitioningWithIdDTO]].data
}

Expand All @@ -94,7 +111,7 @@ class HttpDispatcher(config: Config) extends Dispatcher(config) with Logging {
s"$serverUrl$apiV2/${ApiPaths.V2Paths.Partitionings}/${newPartitioningWithIdDTO.id}/${ApiPaths.V2Paths.Measures}"
)
val req = commonAtumRequest.get(endpoint)
val resp = backend.send(req)
val resp = withRetry(req)
handleResponseBody(resp).as[MultiSuccessResponse[MeasureDTO]].data.toSet
}

Expand All @@ -103,7 +120,7 @@ class HttpDispatcher(config: Config) extends Dispatcher(config) with Logging {
s"$serverUrl$apiV2/${ApiPaths.V2Paths.Partitionings}/${newPartitioningWithIdDTO.id}/${ApiPaths.V2Paths.AdditionalData}"
)
val req = commonAtumRequest.get(endpoint)
val resp = backend.send(req)
val resp = withRetry(req)
handleResponseBody(resp).as[MultiSuccessResponse[AdditionalDataItemV2DTO]]
}

Expand Down Expand Up @@ -135,8 +152,12 @@ class HttpDispatcher(config: Config) extends Dispatcher(config) with Logging {
.post(endpoint)
.body(checkpointV2DTO.asJsonString)

val response = backend.send(request)
handleResponseBody(response)
val response = withRetry(request)
// 409 Conflict means the checkpoint was already saved (e.g. prior attempt succeeded but response was lost).
// This is expected on retry and should not be treated as an error.
if (response.code != StatusCode.Conflict) {
handleResponseBody(response)
}
}

override protected[agent] def updateAdditionalData(
Expand All @@ -154,27 +175,56 @@ class HttpDispatcher(config: Config) extends Dispatcher(config) with Logging {
.patch(endpoint)
.body(additionalDataPatchDTO.asJsonString)

val response = backend.send(request)
val response = withRetry(request)

val data: AdditionalDataDTO.Data = handleResponseBody(response).as[MultiSuccessResponse[AdditionalDataItemV2DTO]]
val data: AdditionalDataDTO.Data = handleResponseBody(response)
.as[MultiSuccessResponse[AdditionalDataItemV2DTO]]
.data
.map( item => item.value match {
case Some(_) => item.key -> Some(AdditionalDataItemDTO(item.value.get, item.author))
case None => item.key -> None
}).toMap
.map(item =>
item.value match {
case Some(_) => item.key -> Some(AdditionalDataItemDTO(item.value.get, item.author))
case None => item.key -> None
}
)
.toMap

AdditionalDataDTO(data)
}

/**
* Sends `request` via the sttp backend, retrying on transient failures with exponential backoff.
* Retries on HTTP 5xx (server errors) and IOException (covers network failures,
* connection resets, and SocketTimeoutException which OkHttp throws on read/connect timeout).
*/
private def withRetry(
request: Request[Either[String, String], capabilities.WebSockets]
): Response[Either[String, String]] = {
httpRetry.retry(backend.send(request))(_.code.code >= StatusCode.InternalServerError.code) {
(attempt, total, delay, reason) =>
log.warn(s"Atum agent: $reason on attempt $attempt/$total, retrying in ${delay}ms")
}
}

private def handleResponseBody(response: Response[Either[String, String]]): String = {
response.body match {
case Left(body) => throw HttpException(response.code.code, body)
case Right(body) => body
}
}

private def optionalLong(key: String): Option[Long] =
if (config.hasPath(key)) Some(config.getLong(key)) else None

}

object HttpDispatcher {
private val UrlKey = "atum.dispatcher.http.url"
private val ConnectTimeoutKey = "atum.dispatcher.http.timeout.connect-ms"
private val ReadTimeoutKey = "atum.dispatcher.http.timeout.read-ms"
private val WriteTimeoutKey = "atum.dispatcher.http.timeout.write-ms"

// OkHttp defaults are 10s each; 30s read gives more headroom for slow DB queries under burst load
private val DefaultConnectTimeoutMs = 10000L
private val DefaultReadTimeoutMs = 30000L
private val DefaultWriteTimeoutMs = 10000L
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
/*
* Copyright 2021 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.atum.agent.dispatcher

import com.typesafe.config.Config

/**
* Retry configuration and execution for [[HttpDispatcher]].
*
* All three parameters are optional in the config: if the `atum.dispatcher.http.retry` block is absent
* the [[HttpRetry.Default]] values are used and no `ConfigException` is thrown.
*
* @param maxRetries Maximum number of retry attempts after the first failure (0 = no retries).
* @param initialDelay Initial backoff delay in milliseconds; doubles on every subsequent attempt.
* @param maxDelay Upper cap for the computed backoff delay in milliseconds.
*/
case class HttpRetry(
maxRetries: Int,
initialDelay: Long,
maxDelay: Long
) {

private val random = new scala.util.Random()

// Maximum exponent to prevent Long overflow when computing initialDelay * 2^attempt
private val MaxExponent = 30
// Fraction of the delay added as random jitter (0.1 = up to 10% extra).
// When many agents fail simultaneously, jitter spreads their retries over a short window instead of all hitting the
// server at the same instant.
private val JitterFraction = 0.1

/**
* Computes the backoff delay for a given attempt using exponential backoff with jitter.
*
* @param attempt Zero-based attempt index (0 = first retry delay, 1 = second, ...).
* @return Delay in milliseconds to sleep before the next attempt.
*/
private[dispatcher] def computeDelay(attempt: Int): Long = {
val exponential = initialDelay * math.pow(2, math.min(attempt, MaxExponent)).toLong
val cappedDelay = math.min(exponential, maxDelay)
cappedDelay + (random.nextDouble() * cappedDelay * JitterFraction).toLong
}

/**
* Executes `action`, retrying on transient failures with exponential backoff.
*
* Retry conditions:
* - `isRetryable(result)` returns true (e.g. HTTP 5xx)
* - `java.io.IOException` is thrown (network-level failure)
*
* No-retry conditions (fail immediately):
* - `isRetryable(result)` returns false (e.g. 2xx, 4xx)
* - Any non-IOException exception
*
* @param action The by-name action to execute (called on each attempt).
* @param isRetryable Predicate on the result — true means the attempt should be retried.
* @param onRetry Callback invoked before each retry sleep, receiving
* (attemptNumber 1-based, totalAttempts, delayMs, reason).
* @return The result of the first non-retryable attempt, or the last result after retries exhausted.
* @throws java.io.IOException if all attempts result in an IOException.
*/
private[dispatcher] def retry[T](
action: => T
)(isRetryable: T => Boolean)(onRetry: (Int, Int, Long, String) => Unit): T = {
val totalAttempts = maxRetries + 1
var attempt = 0

while (true) {
try {
val result = action
if (isRetryable(result) && attempt < maxRetries) {
val delay = computeDelay(attempt)
onRetry(attempt + 1, totalAttempts, delay, "retryable response")
Thread.sleep(delay)
attempt += 1
} else {
return result
}
} catch {
case e: java.io.IOException =>
if (attempt < maxRetries) {
val delay = computeDelay(attempt)
onRetry(attempt + 1, totalAttempts, delay, s"IOException: ${e.getMessage}")
Thread.sleep(delay)
attempt += 1
} else {
throw e
}
}
}

// Unreachable: while(true) always returns or throws, but the compiler cannot infer that.
throw new IllegalStateException("retry loop exited without a result")
}
}

object HttpRetry {

private val MaxRetriesKey = "atum.dispatcher.http.retry.max-retries"
private val InitialDelayKey = "atum.dispatcher.http.retry.initial-delay-ms"
private val MaxDelayKey = "atum.dispatcher.http.retry.max-delay-ms"

/** Sensible out-of-the-box defaults: 3 retries, 1 s initial delay, 10 s cap. */
val Default: HttpRetry = HttpRetry(
maxRetries = 3,
initialDelay = 1000L,
maxDelay = 10000L
)

/**
* Reads retry configuration from a Typesafe Config instance.
*
* Each key is optional: if absent, the corresponding [[Default]] value is used.
* This design ensures backward-compatibility — existing configs that omit the
* `atum.dispatcher.http.retry` block will never throw a `ConfigException`.
*
* @param config Typesafe Config, typically the application config passed to [[HttpDispatcher]].
* @return Populated [[HttpRetry]], falling back to [[Default]] for any absent key.
*/
def fromConfig(config: Config): HttpRetry = {
val maxRetries = if (config.hasPath(MaxRetriesKey)) config.getInt(MaxRetriesKey) else Default.maxRetries
val initialDelay = if (config.hasPath(InitialDelayKey)) config.getLong(InitialDelayKey) else Default.initialDelay
val maxDelay = if (config.hasPath(MaxDelayKey)) config.getLong(MaxDelayKey) else Default.maxDelay

require(maxRetries >= 0, s"atum.dispatcher.http.retry.max-retries must be >= 0, got $maxRetries")
require(initialDelay > 0, s"atum.dispatcher.http.retry.initial-delay-ms must be > 0, got $initialDelay")
require(maxDelay > 0, s"atum.dispatcher.http.retry.max-delay-ms must be > 0, got $maxDelay")

HttpRetry(maxRetries, initialDelay, maxDelay)
}
}
9 changes: 9 additions & 0 deletions agent/src/test/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,12 @@ atum.dispatcher.type="console"
# Maximum number of dispatch captures to keep in memory
# atum.dispatcher.capture.capture-limit=1000 # 0 means no limit

# Retry configuration for the HTTP dispatcher (all keys optional — defaults apply if absent)
#atum.dispatcher.http.retry.max-retries=3 # number of retries after first failure (0 = no retries)
#atum.dispatcher.http.retry.initial-delay-ms=1000 # initial backoff delay in milliseconds
#atum.dispatcher.http.retry.max-delay-ms=10000 # maximum backoff delay cap in milliseconds

# HTTP timeout configuration (all keys optional — defaults apply if absent)
#atum.dispatcher.http.timeout.connect-ms=10000 # TCP connection timeout in milliseconds (default: 10 000)
#atum.dispatcher.http.timeout.read-ms=30000 # response read timeout in milliseconds (default: 30 000)
#atum.dispatcher.http.timeout.write-ms=10000 # request write timeout in milliseconds (default: 10 000)
Original file line number Diff line number Diff line change
Expand Up @@ -259,4 +259,35 @@ class HttpDispatcherUnitTests extends AnyFlatSpec with Matchers with BeforeAndAf
an[HttpException] should be thrownBy dispatcher.saveCheckpoint(checkpoint)
}

it should "treat HTTP 409 Conflict as success (checkpoint already saved on prior attempt)" in {
val checkpoint = CheckpointDTO(
id = java.util.UUID.randomUUID(),
name = "cp",
author = "author",
measuredByAtumAgent = true,
processStartTime = ZonedDateTime.now(),
processEndTime = Some(ZonedDateTime.now()),
measurements = Set.empty,
partitioning = testPartitioningDTO
)
val partitioningWithId = PartitioningWithIdDTO(123L, testPartitioningDTO, "author")
val getPartitioningResponse = Response(
Right(SingleSuccessResponse(partitioningWithId).asJsonString): Either[String, String],
StatusCode.Ok
)
val conflictResponse = Response(Left("Checkpoint already present"): Either[String, String], StatusCode.Conflict)

stubGetPartitioning(testPartitioningDTO, getPartitioningResponse)
when(
mockBackend.send(
argThat[Request[Either[String, String], capabilities.WebSockets]](
req => req.method.method == "POST" && req.uri.path.mkString.contains("checkpoints")
)
)
).thenReturn(conflictResponse)

// Should NOT throw — 409 means the checkpoint was already saved
noException should be thrownBy dispatcher.saveCheckpoint(checkpoint)
}

}
Loading
Loading