Skip to content

Opamp client api #1835

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 69 additions & 0 deletions opamp-client/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
import de.undercouch.gradle.tasks.download.Download
import de.undercouch.gradle.tasks.download.DownloadExtension
import groovy.json.JsonSlurper

plugins {
id("otel.java-conventions")
id("de.undercouch.download") version "5.6.0"
id("com.squareup.wire") version "5.3.1"
}

description = "Client implementation of the OpAMP spec."
Expand All @@ -9,3 +15,66 @@ java {
sourceCompatibility = JavaVersion.VERSION_1_8
targetCompatibility = JavaVersion.VERSION_1_8
}

dependencies {
annotationProcessor("com.google.auto.value:auto-value")
compileOnly("com.google.auto.value:auto-value-annotations")
}

val opampReleaseInfo = tasks.register<Download>("opampLastReleaseInfo") {
group = "opamp"
src("https://api.github.com/repos/open-telemetry/opamp-spec/releases/latest")
dest(project.layout.buildDirectory.file("opamp/release.json"))
}

val opampProtos = tasks.register<DownloadOpampProtos>("opampProtoDownload", download)
opampProtos.configure {
group = "opamp"
dependsOn(opampReleaseInfo)
lastReleaseInfoJson.set {
opampReleaseInfo.get().dest
}
outputProtosDir.set(project.layout.buildDirectory.dir("opamp/protos"))
downloadedZipFile.set(project.layout.buildDirectory.file("intermediate/$name/release.zip"))
}

wire {
java {}
sourcePath {
srcDir(opampProtos)
}
}

abstract class DownloadOpampProtos @Inject constructor(
private val download: DownloadExtension,
private val archiveOps: ArchiveOperations,
private val fileOps: FileSystemOperations,
) : DefaultTask() {

@get:InputFile
abstract val lastReleaseInfoJson: RegularFileProperty

@get:OutputDirectory
abstract val outputProtosDir: DirectoryProperty

@get:Internal
abstract val downloadedZipFile: RegularFileProperty

@Suppress("UNCHECKED_CAST")
@TaskAction
fun execute() {
val releaseInfo = JsonSlurper().parse(lastReleaseInfoJson.get().asFile) as Map<String, String>
val zipUrl = releaseInfo["zipball_url"]
download.run {
src(zipUrl)
dest(downloadedZipFile)
}
val protos = archiveOps.zipTree(downloadedZipFile).matching {
setIncludes(listOf("**/*.proto"))
}
fileOps.sync {
from(protos.files)
into(outputProtosDir)
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.opamp.client.internal;

import io.opentelemetry.opamp.client.internal.response.MessageData;
import opamp.proto.AgentDescription;
import opamp.proto.RemoteConfigStatus;
import opamp.proto.ServerErrorResponse;

public interface OpampClient {

/**
* Starts the client and begin attempts to connect to the Server. Once connection is established
* the client will attempt to maintain it by reconnecting if the connection is lost. All failed
* connection attempts will be reported via {@link Callbacks#onConnectFailed(OpampClient,
* Throwable)} callback.
*
* <p>This method does not wait until the connection to the Server is established and will likely
* return before the connection attempts are even made.
*
* <p>This method may be called only once.
*
* @param callbacks The Callback to which the Client will notify about any Server requests and
* responses.
*/
void start(Callbacks callbacks);

/**
* Stops the client. May be called only after {@link #start(Callbacks)}. May be called only once.
* After this call returns successfully it is guaranteed that no callbacks will be called. Once
* stopped, the client cannot be started again.
*/
void stop();

/**
* Sets attributes of the Agent. The attributes will be included in the next status report sent to
* the Server. When called after {@link #start(Callbacks)}, the attributes will be included in the
* next outgoing status report. This is typically used by Agents which allow their
* AgentDescription to change dynamically while the OpAMPClient is started. May be also called
* from {@link Callbacks#onMessage(OpampClient, MessageData)}.
*
* @param agentDescription The new agent description.
*/
void setAgentDescription(AgentDescription agentDescription);

/**
* Sets the current remote config status which will be sent in the next agent to server request.
*
* @param remoteConfigStatus The new remote config status.
*/
void setRemoteConfigStatus(RemoteConfigStatus remoteConfigStatus);

interface Callbacks {
/**
* Called when the connection is successfully established to the Server. May be called after
* {@link #start(Callbacks)} is called and every time a connection is established to the Server.
* For WebSocket clients this is called after the handshake is completed without any error. For
* HTTP clients this is called for any request if the response status is OK.
*
* @param client The relevant {@link OpampClient} instance.
*/
void onConnect(OpampClient client);

/**
* Called when the connection to the Server cannot be established. May be called after {@link
* #start(Callbacks)} is called and tries to connect to the Server. May also be called if the
* connection is lost and reconnection attempt fails.
*
* @param client The relevant {@link OpampClient} instance.
* @param throwable The exception.
*/
void onConnectFailed(OpampClient client, Throwable throwable);

/**
* Called when the Server reports an error in response to some previously sent request. Useful
* for logging purposes. The Agent should not attempt to process the error by reconnecting or
* retrying previous operations. The client handles the ErrorResponse_UNAVAILABLE case
* internally by performing retries as necessary.
*
* @param client The relevant {@link OpampClient} instance.
* @param errorResponse The error returned by the Server.
*/
void onErrorResponse(OpampClient client, ServerErrorResponse errorResponse);

/**
* Called when the Agent receives a message that needs processing. See {@link MessageData}
* definition for the data that may be available for processing. During onMessage execution the
* {@link OpampClient} functions that change the status of the client may be called, e.g. if
* RemoteConfig is processed then {@link #setRemoteConfigStatus(opamp.proto.RemoteConfigStatus)}
* should be called to reflect the processing result. These functions may also be called after
* onMessage returns. This is advisable if processing can take a long time. In that case
* returning quickly is preferable to avoid blocking the {@link OpampClient}.
*
* @param client The relevant {@link OpampClient} instance.
* @param messageData The server response data that needs processing.
*/
void onMessage(OpampClient client, MessageData messageData);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.opamp.client.internal.response;

import com.google.auto.value.AutoValue;
import io.opentelemetry.opamp.client.internal.OpampClient;
import javax.annotation.Nullable;
import opamp.proto.AgentRemoteConfig;

/**
* Data class provided in {@link OpampClient.Callbacks#onMessage(OpampClient, MessageData)} with
* Server's provided status changes.
*/
@AutoValue
public abstract class MessageData {
@Nullable
public abstract AgentRemoteConfig getRemoteConfig();

public static Builder builder() {
return new AutoValue_MessageData.Builder();
}

@AutoValue.Builder
public abstract static class Builder {
public abstract Builder setRemoteConfig(AgentRemoteConfig remoteConfig);

public abstract MessageData build();
}
}
Loading