Skip to content

feat: Add RecentLogs support #1237

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 7 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -48,4 +48,8 @@ Mono<MetaResponse> meta(MetaRequest request) {
Mono<ReadResponse> read(ReadRequest request) {
return get(request, ReadResponse.class, "read", request.getSourceId()).checkpoint();
}

Mono<ReadResponse> recentLogs(ReadRequest request) {
return get(request, ReadResponse.class, "read", request.getSourceId()).checkpoint();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,11 @@ public Mono<ReadResponse> read(ReadRequest request) {
return getReactorLogCacheEndpoints().read(request);
}

@Override
public Mono<ReadResponse> recentLogs(ReadRequest request) {
return getReactorLogCacheEndpoints().recentLogs(request);
}

/**
* The connection context
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import org.cloudfoundry.reactor.TestRequest;
import org.cloudfoundry.reactor.TestResponse;
import org.cloudfoundry.reactor.client.AbstractClientApiTest;
import org.cloudfoundry.reactor.client.v3.serviceinstances.ReactorServiceInstancesV3;
import org.junit.jupiter.api.Test;
import reactor.test.StepVerifier;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,15 @@ public interface DopplerClient {
*/
Flux<Envelope> firehose(FirehoseRequest request);

//TODO Adapt the message
/**
* Makes the <a href="https://github.com/cloudfoundry/loggregator/tree/develop/src/trafficcontroller#endpoints">Recent Logs</a> request
*
* @deprecated Do not use this type directly, it exists only for the <em>Jackson</em>-binding infrastructure
* @param request the Recent Logs request
* @return the events from the recent logs
*/
@Deprecated
Flux<Envelope> recentLogs(RecentLogsRequest request);

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,4 +46,12 @@ public interface LogCacheClient {
* @return the read response
*/
Mono<ReadResponse> read(ReadRequest request);

/**
* Makes the Log Cache RecentLogs /api/v1/read request
*
* @param request the Recent Logs request
* @return the events from the recent logs
*/
Mono<ReadResponse> recentLogs(ReadRequest request);
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.cloudfoundry.client.v3.spaces.ListSpacesRequest;
import org.cloudfoundry.client.v3.spaces.SpaceResource;
import org.cloudfoundry.doppler.DopplerClient;
import org.cloudfoundry.logcache.v1.LogCacheClient;
import org.cloudfoundry.networking.NetworkingClient;
import org.cloudfoundry.operations.advanced.Advanced;
import org.cloudfoundry.operations.advanced.DefaultAdvanced;
Expand Down Expand Up @@ -79,7 +80,7 @@ public Advanced advanced() {
@Override
@Value.Derived
public Applications applications() {
return new DefaultApplications(getCloudFoundryClientPublisher(), getDopplerClientPublisher(), getSpaceId());
return new DefaultApplications(getCloudFoundryClientPublisher(), getDopplerClientPublisher(), getLogCacheClientPublisher(), getSpaceId());
}

@Override
Expand Down Expand Up @@ -185,6 +186,19 @@ Mono<DopplerClient> getDopplerClientPublisher() {
.orElse(Mono.error(new IllegalStateException("DopplerClient must be set")));
}

/**
* The {@link LogCacheClient} to use for operations functionality
*/
@Nullable
abstract LogCacheClient getLogCacheClient();

@Value.Derived
Mono<LogCacheClient> getLogCacheClientPublisher() {
return Optional.ofNullable(getLogCacheClient())
.map(Mono::just)
.orElse(Mono.error(new IllegalStateException("LogCacheClient must be set")));
}

/**
* The {@link NetworkingClient} to use for operations functionality
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
package org.cloudfoundry.operations.applications;

import org.cloudfoundry.doppler.LogMessage;
import org.cloudfoundry.logcache.v1.Log;
import org.cloudfoundry.logcache.v1.ReadRequest;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

Expand Down Expand Up @@ -115,13 +117,22 @@ public interface Applications {
Flux<Task> listTasks(ListApplicationTasksRequest request);

/**
* List the applications logs
*
* List the applications logs from dopplerClient
* @deprecated Only for compatibility. Switch to logCacheClient method below.
* @param request the application logs request
* @return the applications logs
*/
Flux<LogMessage> logs(LogsRequest request);

/**
* List the applications logs from logCacheClient.
* If no messages are available, an empty Flux is returned.
*
* @param request the application logs request
* @return the applications logs
*/
Flux<Log> logsRecent(ReadRequest request);

/**
* Push a specific application
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,6 @@
import org.cloudfoundry.client.v3.builds.CreateBuildResponse;
import org.cloudfoundry.client.v3.builds.GetBuildRequest;
import org.cloudfoundry.client.v3.builds.GetBuildResponse;
import org.cloudfoundry.client.v3.domains.DomainResource;
import org.cloudfoundry.client.v3.domains.ListDomainsRequest;
import org.cloudfoundry.client.v3.packages.BitsData;
import org.cloudfoundry.client.v3.packages.CreatePackageRequest;
import org.cloudfoundry.client.v3.packages.CreatePackageResponse;
Expand Down Expand Up @@ -154,6 +152,10 @@
import org.cloudfoundry.doppler.LogMessage;
import org.cloudfoundry.doppler.RecentLogsRequest;
import org.cloudfoundry.doppler.StreamRequest;
import org.cloudfoundry.logcache.v1.EnvelopeBatch;
import org.cloudfoundry.logcache.v1.Log;
import org.cloudfoundry.logcache.v1.LogCacheClient;
import org.cloudfoundry.logcache.v1.ReadRequest;
import org.cloudfoundry.operations.util.OperationsLogging;
import org.cloudfoundry.util.DateUtils;
import org.cloudfoundry.util.DelayTimeoutException;
Expand Down Expand Up @@ -198,6 +200,10 @@ public final class DefaultApplications implements Applications {
private static final Comparator<LogMessage> LOG_MESSAGE_COMPARATOR =
Comparator.comparing(LogMessage::getTimestamp);

private static final Comparator<org.cloudfoundry.logcache.v1.Envelope>
LOG_MESSAGE_COMPARATOR_LOG_CACHE =
Comparator.comparing(org.cloudfoundry.logcache.v1.Envelope::getTimestamp);

private static final Duration LOG_MESSAGE_TIMESPAN = Duration.ofMillis(500);

private static final int MAX_NUMBER_OF_RECENT_EVENTS = 50;
Expand All @@ -212,24 +218,37 @@ public final class DefaultApplications implements Applications {

private final Mono<DopplerClient> dopplerClient;

private final Mono<LogCacheClient> logCacheClient;

private final RandomWords randomWords;

private final Mono<String> spaceId;

@Deprecated
public DefaultApplications(
Mono<CloudFoundryClient> cloudFoundryClient,
Mono<DopplerClient> dopplerClient,
Mono<String> spaceId) {
this(cloudFoundryClient, dopplerClient, null, new WordListRandomWords(), spaceId);
}

public DefaultApplications(
Mono<CloudFoundryClient> cloudFoundryClient,
Mono<DopplerClient> dopplerClient,
Mono<LogCacheClient> logCacheClient,
Mono<String> spaceId) {
this(cloudFoundryClient, dopplerClient, new WordListRandomWords(), spaceId);
this(cloudFoundryClient, dopplerClient, logCacheClient, new WordListRandomWords(), spaceId);
}

DefaultApplications(
Mono<CloudFoundryClient> cloudFoundryClient,
Mono<DopplerClient> dopplerClient,
Mono<LogCacheClient> logCacheClient,
RandomWords randomWords,
Mono<String> spaceId) {
this.cloudFoundryClient = cloudFoundryClient;
this.dopplerClient = dopplerClient;
this.logCacheClient = logCacheClient;
this.randomWords = randomWords;
this.spaceId = spaceId;
}
Expand Down Expand Up @@ -527,6 +546,7 @@ public Flux<Task> listTasks(ListApplicationTasksRequest request) {
.checkpoint();
}

@Deprecated
@Override
public Flux<LogMessage> logs(LogsRequest request) {
return Mono.zip(this.cloudFoundryClient, this.spaceId)
Expand All @@ -542,6 +562,13 @@ public Flux<LogMessage> logs(LogsRequest request) {
.checkpoint();
}

@Override
public Flux<Log> logsRecent(ReadRequest request) {
return getRecentLogsLogCache(this.logCacheClient, request)
.transform(OperationsLogging.log("Get Application Logs"))
.checkpoint();
}

@Override
@SuppressWarnings("deprecation")
public Mono<Void> push(PushApplicationRequest request) {
Expand Down Expand Up @@ -651,7 +678,6 @@ public Mono<Void> pushManifestV3(PushManifestV3Request request) {
} catch (IOException e) {
throw new RuntimeException("Could not serialize manifest", e);
}

return Mono.zip(this.cloudFoundryClient, this.spaceId)
.flatMap(
function(
Expand Down Expand Up @@ -1586,6 +1612,17 @@ private static Flux<LogMessage> getLogs(
}
}

private static Flux<Log> getRecentLogsLogCache(
Mono<LogCacheClient> logCacheClient, ReadRequest readRequest) {
return requestLogsRecentLogCache(logCacheClient, readRequest)
.map(EnvelopeBatch::getBatch)
.map(List::stream)
.flatMapIterable(envelopeStream -> envelopeStream.collect(Collectors.toList()))
.filter(e -> e.getLog() != null)
.sort(LOG_MESSAGE_COMPARATOR_LOG_CACHE)
.map(org.cloudfoundry.logcache.v1.Envelope::getLog);
}

@SuppressWarnings("unchecked")
private static Map<String, Object> getMetadataRequest(EventEntity entity) {
Map<String, Optional<Object>> metadata =
Expand Down Expand Up @@ -2382,15 +2419,6 @@ private static Mono<AbstractApplicationResource> requestGetApplication(
.cast(AbstractApplicationResource.class);
}

private static Flux<DomainResource> requestListDomains(
CloudFoundryClient cloudFoundryClient, String organizationId) {
return PaginationUtils.requestClientV3Resources(
page ->
cloudFoundryClient
.domainsV3()
.list(ListDomainsRequest.builder().page(page).build()));
}

private static Flux<PrivateDomainResource> requestListPrivateDomains(
CloudFoundryClient cloudFoundryClient, String organizationId) {
return PaginationUtils.requestClientV2Resources(
Expand Down Expand Up @@ -2470,6 +2498,7 @@ private static Flux<TaskResource> requestListTasks(
.build()));
}

@Deprecated
private static Flux<Envelope> requestLogsRecent(
Mono<DopplerClient> dopplerClient, String applicationId) {
return dopplerClient.flatMapMany(
Expand All @@ -2478,6 +2507,14 @@ private static Flux<Envelope> requestLogsRecent(
RecentLogsRequest.builder().applicationId(applicationId).build()));
}

private static Mono<EnvelopeBatch> requestLogsRecentLogCache(
Mono<LogCacheClient> logCacheClient, ReadRequest readRequest) {
return logCacheClient.flatMap(
client ->
client.recentLogs(readRequest)
.flatMap(response -> Mono.justOrEmpty(response.getEnvelopes())));
}

private static Flux<Envelope> requestLogsStream(
Mono<DopplerClient> dopplerClient, String applicationId) {
return dopplerClient.flatMapMany(
Expand Down Expand Up @@ -2736,12 +2773,6 @@ private static Mono<AbstractApplicationResource> requestUpdateApplicationScale(
builder -> builder.diskQuota(disk).instances(instances).memory(memory));
}

private static Mono<AbstractApplicationResource> requestUpdateApplicationSsh(
CloudFoundryClient cloudFoundryClient, String applicationId, Boolean enabled) {
return requestUpdateApplication(
cloudFoundryClient, applicationId, builder -> builder.enableSsh(enabled));
}

private static Mono<AbstractApplicationResource> requestUpdateApplicationState(
CloudFoundryClient cloudFoundryClient, String applicationId, String state) {
return requestUpdateApplication(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.cloudfoundry.client.v3.spaces.SpacesV3;
import org.cloudfoundry.client.v3.tasks.Tasks;
import org.cloudfoundry.doppler.DopplerClient;
import org.cloudfoundry.logcache.v1.LogCacheClient;
import org.cloudfoundry.routing.RoutingClient;
import org.cloudfoundry.routing.v1.routergroups.RouterGroups;
import org.cloudfoundry.uaa.UaaClient;
Expand Down Expand Up @@ -101,6 +102,8 @@ public abstract class AbstractOperationsTest {

protected final DopplerClient dopplerClient = mock(DopplerClient.class, RETURNS_SMART_NULLS);

protected final LogCacheClient logCacheClient = mock(LogCacheClient.class, RETURNS_SMART_NULLS);

protected final Events events = mock(Events.class, RETURNS_SMART_NULLS);

protected final FeatureFlags featureFlags = mock(FeatureFlags.class, RETURNS_SMART_NULLS);
Expand Down
Loading