Skip to content

Commit 6f1968f

Browse files
authored
feat(auditSearch): support backend audit events and search api (#13377)
1 parent e6babc3 commit 6f1968f

File tree

95 files changed

+4187
-243
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

95 files changed

+4187
-243
lines changed

datahub-frontend/app/auth/sso/oidc/OidcCallbackLogic.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import static com.linkedin.metadata.Constants.GROUP_MEMBERSHIP_ASPECT_NAME;
66
import static org.pac4j.play.store.PlayCookieSessionStore.*;
77
import static play.mvc.Results.internalServerError;
8+
import static utils.FrontendConstants.SSO_LOGIN;
89

910
import auth.CookieConfigs;
1011
import auth.sso.SsoManager;
@@ -290,7 +291,8 @@ private Result handleOidcCallback(
290291
log.info("OIDC callback authentication successful for user: {}", userName);
291292

292293
// Successfully logged in - Generate GMS login token
293-
final String accessToken = authClient.generateSessionTokenForUser(corpUserUrn.getId());
294+
final String accessToken =
295+
authClient.generateSessionTokenForUser(corpUserUrn.getId(), SSO_LOGIN);
294296
return result
295297
.withSession(createSessionMap(corpUserUrn.toString(), accessToken))
296298
.withCookies(

datahub-frontend/app/client/AuthServiceClient.java

+5-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package client;
22

3+
import static com.linkedin.metadata.Constants.DATAHUB_LOGIN_SOURCE_HEADER_NAME;
4+
35
import com.datahub.authentication.Authentication;
46
import com.fasterxml.jackson.databind.ObjectMapper;
57
import com.fasterxml.jackson.databind.node.ObjectNode;
@@ -70,7 +72,7 @@ public AuthServiceClient(
7072
* an Actor of type USER.
7173
*/
7274
@Nonnull
73-
public String generateSessionTokenForUser(@Nonnull final String userId) {
75+
public String generateSessionTokenForUser(@Nonnull final String userId, String loginSource) {
7476
Objects.requireNonNull(userId, "userId must not be null");
7577
CloseableHttpResponse response = null;
7678

@@ -98,6 +100,8 @@ public String generateSessionTokenForUser(@Nonnull final String userId) {
98100
// Add authorization header with DataHub frontend system id and secret.
99101
request.addHeader(Http.HeaderNames.AUTHORIZATION, this.systemAuthentication.getCredentials());
100102

103+
request.addHeader(DATAHUB_LOGIN_SOURCE_HEADER_NAME, loginSource);
104+
101105
response = httpClient.execute(request);
102106
final HttpEntity entity = response.getEntity();
103107
if (response.getStatusLine().getStatusCode() == HttpStatus.SC_OK && entity != null) {

datahub-frontend/app/client/KafkaTrackingProducer.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ public void send(ProducerRecord<String, String> record) {
7070
producer.send(record);
7171
}
7272

73-
private static KafkaProducer createKafkaProducer(
73+
private static KafkaProducer<String, String> createKafkaProducer(
7474
Config config, KafkaConfiguration kafkaConfiguration) {
7575
final ProducerConfiguration producerConfiguration = kafkaConfiguration.getProducer();
7676
final Properties props = new Properties();
@@ -154,7 +154,7 @@ private static KafkaProducer createKafkaProducer(
154154
}
155155
}
156156

157-
return new org.apache.kafka.clients.producer.KafkaProducer<String, String>(props);
157+
return new org.apache.kafka.clients.producer.KafkaProducer<>(props);
158158
}
159159

160160
private static void setConfig(Config config, Properties props, String key, String configKey) {

datahub-frontend/app/controllers/AuthenticationController.java

+15-5
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,11 @@
33
import static auth.AuthUtils.*;
44
import static org.pac4j.core.client.IndirectClient.ATTEMPTED_AUTHENTICATION_SUFFIX;
55
import static org.pac4j.play.store.PlayCookieSessionStore.*;
6+
import static utils.FrontendConstants.FALLBACK_LOGIN;
7+
import static utils.FrontendConstants.GUEST_LOGIN;
8+
import static utils.FrontendConstants.PASSWORD_LOGIN;
9+
import static utils.FrontendConstants.PASSWORD_RESET;
10+
import static utils.FrontendConstants.SIGN_UP_LINK_LOGIN;
611

712
import auth.AuthUtils;
813
import auth.CookieConfigs;
@@ -117,7 +122,8 @@ public Result authenticate(Http.Request request) {
117122
if (guestAuthenticationConfigs.isGuestEnabled()
118123
&& guestAuthenticationConfigs.getGuestPath().equals(redirectPath)) {
119124
final String accessToken =
120-
authClient.generateSessionTokenForUser(guestAuthenticationConfigs.getGuestUser());
125+
authClient.generateSessionTokenForUser(
126+
guestAuthenticationConfigs.getGuestUser(), GUEST_LOGIN);
121127
redirectPath =
122128
"/"; // We requested guest login by accessing {guestPath} URL. It is not really a target.
123129
CorpuserUrn guestUserUrn = new CorpuserUrn(guestAuthenticationConfigs.getGuestUser());
@@ -150,7 +156,8 @@ public Result authenticate(Http.Request request) {
150156

151157
// 3. If no auth enabled, fallback to using default user account & redirect.
152158
// Generate GMS session token, TODO:
153-
final String accessToken = authClient.generateSessionTokenForUser(DEFAULT_ACTOR_URN.getId());
159+
final String accessToken =
160+
authClient.generateSessionTokenForUser(DEFAULT_ACTOR_URN.getId(), FALLBACK_LOGIN);
154161
return Results.redirect(redirectPath)
155162
.withSession(createSessionMap(DEFAULT_ACTOR_URN.toString(), accessToken))
156163
.withCookies(
@@ -215,7 +222,8 @@ public Result logIn(Http.Request request) {
215222

216223
final Urn actorUrn = new CorpuserUrn(username);
217224
logger.info("Login successful for user: {}, urn: {}", username, actorUrn);
218-
final String accessToken = authClient.generateSessionTokenForUser(actorUrn.getId());
225+
final String accessToken =
226+
authClient.generateSessionTokenForUser(actorUrn.getId(), PASSWORD_LOGIN);
219227
return createSession(actorUrn.toString(), accessToken);
220228
}
221229

@@ -279,7 +287,8 @@ public Result signUp(Http.Request request) {
279287
final String userUrnString = userUrn.toString();
280288
authClient.signUp(userUrnString, fullName, email, title, password, inviteToken);
281289
logger.info("Signed up user {} using invite tokens", userUrnString);
282-
final String accessToken = authClient.generateSessionTokenForUser(userUrn.getId());
290+
final String accessToken =
291+
authClient.generateSessionTokenForUser(userUrn.getId(), SIGN_UP_LINK_LOGIN);
283292
return createSession(userUrnString, accessToken);
284293
}
285294

@@ -319,7 +328,8 @@ public Result resetNativeUserCredentials(Http.Request request) {
319328
final Urn userUrn = new CorpuserUrn(email);
320329
final String userUrnString = userUrn.toString();
321330
authClient.resetNativeUserCredentials(userUrnString, password, resetToken);
322-
final String accessToken = authClient.generateSessionTokenForUser(userUrn.getId());
331+
final String accessToken =
332+
authClient.generateSessionTokenForUser(userUrn.getId(), PASSWORD_RESET);
323333
return createSession(userUrnString, accessToken);
324334
}
325335

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
package utils;
2+
3+
public class FrontendConstants {
4+
private FrontendConstants() {}
5+
6+
public static final String PASSWORD_RESET = "passwordReset";
7+
public static final String PASSWORD_LOGIN = "passwordLogin";
8+
public static final String FALLBACK_LOGIN = "fallbackLogin";
9+
public static final String SIGN_UP_LINK_LOGIN = "signUpLinkLogin";
10+
public static final String GUEST_LOGIN = "guestLogin";
11+
public static final String SSO_LOGIN = "ssoLogin";
12+
}

datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/analytics/service/AnalyticsService.java

+13-1
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import com.linkedin.datahub.graphql.generated.NumericDataPoint;
1212
import com.linkedin.datahub.graphql.generated.Row;
1313
import com.linkedin.datahub.graphql.types.entitytype.EntityTypeMapper;
14+
import com.linkedin.metadata.datahubusage.DataHubUsageEventConstants;
1415
import com.linkedin.metadata.utils.elasticsearch.IndexConvention;
1516
import java.util.List;
1617
import java.util.Map;
@@ -178,7 +179,9 @@ public List<NamedBar> getBarChart(
178179
indexName, dateRange, dimensions)
179180
+ String.format("filters: %s, uniqueOn: %s", filters, uniqueOn));
180181

181-
assert (dimensions.size() == 1 || dimensions.size() == 2);
182+
if (!(dimensions.size() == 1 || dimensions.size() == 2)) {
183+
throw new IllegalArgumentException("Dimensions must have 1 or 2 specified: " + dimensions);
184+
}
182185
AggregationBuilder filteredAgg = getFilteredAggregation(filters, mustNotFilters, dateRange);
183186

184187
TermsAggregationBuilder termAgg = AggregationBuilders.terms(DIMENSION).field(dimensions.get(0));
@@ -350,6 +353,7 @@ private AggregationBuilder getFilteredAggregation(
350353
Optional<DateRange> dateRange,
351354
String dateRangeField) {
352355
BoolQueryBuilder filteredQuery = QueryBuilders.boolQuery();
356+
filteredQuery.filter(getDefaultFilters());
353357
mustFilters.forEach((key, values) -> filteredQuery.must(QueryBuilders.termsQuery(key, values)));
354358
mustNotFilters.forEach(
355359
(key, values) -> filteredQuery.mustNot(QueryBuilders.termsQuery(key, values)));
@@ -365,6 +369,14 @@ private AggregationBuilder getFilteredAggregation(
365369
return getFilteredAggregation(mustFilters, mustNotFilters, dateRange, "timestamp");
366370
}
367371

372+
private QueryBuilder getDefaultFilters() {
373+
return QueryBuilders.boolQuery()
374+
.mustNot(
375+
QueryBuilders.termQuery(
376+
DataHubUsageEventConstants.USAGE_SOURCE,
377+
DataHubUsageEventConstants.BACKEND_SOURCE));
378+
}
379+
368380
private QueryBuilder dateRangeQuery(DateRange dateRange) {
369381
// Use timestamp as dateRangeField
370382
return dateRangeQuery(dateRange, "timestamp");

datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/concurrency/GraphQLConcurrencyUtils.java

+5-2
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import com.codahale.metrics.MetricRegistry;
44
import com.linkedin.metadata.utils.metrics.MetricUtils;
5+
import io.opentelemetry.context.Context;
56
import java.util.concurrent.CompletableFuture;
67
import java.util.concurrent.ExecutorService;
78
import java.util.function.Supplier;
@@ -16,7 +17,7 @@ public static ExecutorService getExecutorService() {
1617
}
1718

1819
public static void setExecutorService(ExecutorService executorService) {
19-
GraphQLConcurrencyUtils.graphQLExecutorService = executorService;
20+
GraphQLConcurrencyUtils.graphQLExecutorService = Context.taskWrapping(executorService);
2021
}
2122

2223
public static <T> CompletableFuture<T> supplyAsync(
@@ -26,7 +27,9 @@ public static <T> CompletableFuture<T> supplyAsync(
2627
GraphQLConcurrencyUtils.class.getSimpleName(), "supplyAsync", caller, task))
2728
.inc();
2829
if (GraphQLConcurrencyUtils.graphQLExecutorService == null) {
29-
return CompletableFuture.supplyAsync(supplier);
30+
// Hack around to force context wrapping for base executor
31+
return CompletableFuture.supplyAsync(
32+
supplier, Context.taskWrapping(new CompletableFuture().defaultExecutor()));
3033
} else {
3134
return CompletableFuture.supplyAsync(
3235
supplier, GraphQLConcurrencyUtils.graphQLExecutorService);

datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/assertion/ReportAssertionResultResolver.java

+5-2
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import com.linkedin.data.template.SetMode;
1212
import com.linkedin.data.template.StringMap;
1313
import com.linkedin.datahub.graphql.QueryContext;
14+
import com.linkedin.datahub.graphql.concurrency.GraphQLConcurrencyUtils;
1415
import com.linkedin.datahub.graphql.exception.AuthorizationException;
1516
import com.linkedin.datahub.graphql.generated.AssertionResultInput;
1617
import com.linkedin.datahub.graphql.generated.StringMapEntryInput;
@@ -53,7 +54,7 @@ public CompletableFuture<Boolean> get(DataFetchingEnvironment environment) throw
5354
final AssertionResultInput input =
5455
bindArgument(environment.getArgument("result"), AssertionResultInput.class);
5556

56-
return CompletableFuture.supplyAsync(
57+
return GraphQLConcurrencyUtils.supplyAsync(
5758
() -> {
5859
final Urn asserteeUrn =
5960
_assertionService.getEntityUrnForAssertion(
@@ -80,7 +81,9 @@ public CompletableFuture<Boolean> get(DataFetchingEnvironment environment) throw
8081
}
8182
throw new AuthorizationException(
8283
"Unauthorized to perform this action. Please contact your DataHub administrator.");
83-
});
84+
},
85+
this.getClass().getSimpleName(),
86+
"get");
8487
}
8588

8689
private static StringMap mapContextParameters(List<StringMapEntryInput> input) {

datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/assertion/UpsertCustomAssertionResolver.java

+5-2
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import com.linkedin.common.urn.UrnUtils;
1010
import com.linkedin.data.template.SetMode;
1111
import com.linkedin.datahub.graphql.QueryContext;
12+
import com.linkedin.datahub.graphql.concurrency.GraphQLConcurrencyUtils;
1213
import com.linkedin.datahub.graphql.exception.AuthorizationException;
1314
import com.linkedin.datahub.graphql.generated.Assertion;
1415
import com.linkedin.datahub.graphql.generated.PlatformInput;
@@ -51,7 +52,7 @@ public CompletableFuture<Assertion> get(DataFetchingEnvironment environment) thr
5152
assertionUrn = UrnUtils.getUrn(maybeAssertionUrn);
5253
}
5354

54-
return CompletableFuture.supplyAsync(
55+
return GraphQLConcurrencyUtils.supplyAsync(
5556
() -> {
5657
// Check whether the current user is allowed to update the assertion.
5758
if (AssertionUtils.isAuthorizedToEditAssertionFromAssertee(context, entityUrn)) {
@@ -71,7 +72,9 @@ public CompletableFuture<Assertion> get(DataFetchingEnvironment environment) thr
7172
}
7273
throw new AuthorizationException(
7374
"Unauthorized to perform this action. Please contact your DataHub administrator.");
74-
});
75+
},
76+
this.getClass().getSimpleName(),
77+
"get");
7578
}
7679

7780
@SneakyThrows

datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/auth/CreateAccessTokenResolver.java

+1
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ public CompletableFuture<AccessToken> get(final DataFetchingEnvironment environm
6666

6767
final String accessToken =
6868
_statefulTokenService.generateAccessToken(
69+
context.getOperationContext(),
6970
type,
7071
createActor(input.getType(), actorUrn),
7172
expiresInMs.orElse(null),

datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/auth/RevokeAccessTokenResolver.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ public CompletableFuture<Boolean> get(DataFetchingEnvironment environment) throw
4545

4646
if (isAuthorizedToRevokeToken(context, tokenId)) {
4747
try {
48-
_statefulTokenService.revokeAccessToken(tokenId);
48+
_statefulTokenService.revokeAccessToken(context.getOperationContext(), tokenId);
4949
} catch (Exception e) {
5050
throw new RuntimeException("Failed to revoke access token", e);
5151
}

datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/chart/ChartStatsSummaryResolver.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package com.linkedin.datahub.graphql.resolvers.chart;
22

3+
import com.linkedin.datahub.graphql.concurrency.GraphQLConcurrencyUtils;
34
import com.linkedin.datahub.graphql.generated.ChartStatsSummary;
45
import com.linkedin.metadata.timeseries.TimeseriesAspectService;
56
import graphql.schema.DataFetcher;
@@ -21,6 +22,6 @@ public ChartStatsSummaryResolver(final TimeseriesAspectService timeseriesAspectS
2122
public CompletableFuture<ChartStatsSummary> get(DataFetchingEnvironment environment)
2223
throws Exception {
2324
// Not yet implemented
24-
return CompletableFuture.completedFuture(null);
25+
return GraphQLConcurrencyUtils.supplyAsync(() -> null, this.getClass().getSimpleName(), "get");
2526
}
2627
}

datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/connection/UpsertConnectionResolver.java

+5-2
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import com.linkedin.connection.DataHubConnectionDetailsType;
99
import com.linkedin.connection.DataHubJsonConnection;
1010
import com.linkedin.datahub.graphql.QueryContext;
11+
import com.linkedin.datahub.graphql.concurrency.GraphQLConcurrencyUtils;
1112
import com.linkedin.datahub.graphql.exception.AuthorizationException;
1213
import com.linkedin.datahub.graphql.generated.DataHubConnection;
1314
import com.linkedin.datahub.graphql.generated.UpsertDataHubConnectionInput;
@@ -44,7 +45,7 @@ public CompletableFuture<DataHubConnection> get(final DataFetchingEnvironment en
4445
bindArgument(environment.getArgument("input"), UpsertDataHubConnectionInput.class);
4546
final Authentication authentication = context.getAuthentication();
4647

47-
return CompletableFuture.supplyAsync(
48+
return GraphQLConcurrencyUtils.supplyAsync(
4849
() -> {
4950
if (!ConnectionUtils.canManageConnections(context)) {
5051
throw new AuthorizationException(
@@ -73,6 +74,8 @@ public CompletableFuture<DataHubConnection> get(final DataFetchingEnvironment en
7374
throw new RuntimeException(
7475
String.format("Failed to upsert a Connection from input %s", input), e);
7576
}
76-
});
77+
},
78+
this.getClass().getSimpleName(),
79+
"get");
7780
}
7881
}

datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/datacontract/EntityDataContractResolver.java

+5-2
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import com.linkedin.common.EntityRelationships;
66
import com.linkedin.common.urn.Urn;
77
import com.linkedin.datahub.graphql.QueryContext;
8+
import com.linkedin.datahub.graphql.concurrency.GraphQLConcurrencyUtils;
89
import com.linkedin.datahub.graphql.generated.DataContract;
910
import com.linkedin.datahub.graphql.generated.Entity;
1011
import com.linkedin.datahub.graphql.types.datacontract.DataContractMapper;
@@ -38,7 +39,7 @@ public EntityDataContractResolver(
3839

3940
@Override
4041
public CompletableFuture<DataContract> get(DataFetchingEnvironment environment) {
41-
return CompletableFuture.supplyAsync(
42+
return GraphQLConcurrencyUtils.supplyAsync(
4243
() -> {
4344
final QueryContext context = environment.getContext();
4445
final String entityUrn = ((Entity) environment.getSource()).getUrn();
@@ -91,6 +92,8 @@ public CompletableFuture<DataContract> get(DataFetchingEnvironment environment)
9192
} catch (URISyntaxException | RemoteInvocationException e) {
9293
throw new RuntimeException("Failed to retrieve Data Contract from GMS", e);
9394
}
94-
});
95+
},
96+
this.getClass().getSimpleName(),
97+
"get");
9598
}
9699
}

datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/datacontract/UpsertDataContractResolver.java

+5-2
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import com.linkedin.datacontract.SchemaContract;
2020
import com.linkedin.datacontract.SchemaContractArray;
2121
import com.linkedin.datahub.graphql.QueryContext;
22+
import com.linkedin.datahub.graphql.concurrency.GraphQLConcurrencyUtils;
2223
import com.linkedin.datahub.graphql.exception.AuthorizationException;
2324
import com.linkedin.datahub.graphql.exception.DataHubGraphQLErrorCode;
2425
import com.linkedin.datahub.graphql.exception.DataHubGraphQLException;
@@ -68,7 +69,7 @@ public CompletableFuture<DataContract> get(final DataFetchingEnvironment environ
6869
final UpsertDataContractInput input =
6970
bindArgument(environment.getArgument("input"), UpsertDataContractInput.class);
7071
final Urn entityUrn = UrnUtils.getUrn(input.getEntityUrn());
71-
return CompletableFuture.supplyAsync(
72+
return GraphQLConcurrencyUtils.supplyAsync(
7273
() -> {
7374
if (DataContractUtils.canEditDataContract(context, entityUrn)) {
7475

@@ -124,7 +125,9 @@ public CompletableFuture<DataContract> get(final DataFetchingEnvironment environ
124125
}
125126
throw new AuthorizationException(
126127
"Unauthorized to perform this action. Please contact your DataHub administrator.");
127-
});
128+
},
129+
this.getClass().getSimpleName(),
130+
"get");
128131
}
129132

130133
private void validateInput(

0 commit comments

Comments
 (0)