Skip to content

Commit de88e88

Browse files
authored
Merge pull request #143 from Netflix/2167-source-job-has-incorrect-subscriptions
RTDI-2167 source job has incorrect subscriptions
2 parents d048a3d + 1f24186 commit de88e88

File tree

2 files changed

+40
-22
lines changed

2 files changed

+40
-22
lines changed

mantis-common/src/main/java/com/mantisrx/common/utils/MantisSSEConstants.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,4 +43,6 @@ public class MantisSSEConstants {
4343
public static final String ID = "id";
4444

4545
public static final String MQL = "mql";
46+
47+
public static final String TARGET_JOB = "sourceJobName";
4648
}

mantis-runtime/src/main/java/io/mantisrx/runtime/sink/ServerSentEventRequestHandler.java

Lines changed: 38 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import io.mantisrx.runtime.Context;
3131
import io.mantisrx.runtime.sink.predicate.Predicate;
3232
import io.netty.buffer.ByteBuf;
33+
import io.netty.handler.codec.http.HttpResponseStatus;
3334
import io.reactivx.mantis.operators.DisableBackPressureOperator;
3435
import io.reactivx.mantis.operators.DropOperator;
3536
import java.net.InetSocketAddress;
@@ -142,6 +143,43 @@ public Observable<Void> handle(HttpServerRequest<ByteBuf> request,
142143

143144
String uniqueClientId = socketAddrStr;
144145

146+
Tag[] tags = new Tag[2];
147+
final String clientId = Optional.ofNullable(uniqueClientId).orElse("none");
148+
final String sockAddr = Optional.ofNullable(socketAddrStr).orElse("none");
149+
tags[0] = new BasicTag("clientId", clientId);
150+
tags[1] = new BasicTag("sockAddr", sockAddr);
151+
152+
Metrics sseSinkMetrics = new Metrics.Builder()
153+
.id("ServerSentEventRequestHandler", tags)
154+
.addCounter("processedCounter")
155+
.addCounter("pingCounter")
156+
.addCounter("errorCounter")
157+
.addCounter("droppedCounter")
158+
.addCounter("flushCounter")
159+
.addCounter("sourceJobNameMismatchRejection")
160+
.build();
161+
162+
163+
final Counter msgProcessedCounter = sseSinkMetrics.getCounter("processedCounter");
164+
final Counter pingCounter = sseSinkMetrics.getCounter("pingCounter");
165+
final Counter errorCounter = sseSinkMetrics.getCounter("errorCounter");
166+
final Counter droppedWrites = sseSinkMetrics.getCounter("droppedCounter");
167+
final Counter flushCounter = sseSinkMetrics.getCounter("flushCounter");
168+
final Counter sourceJobNameMismatchRejectionCounter = sseSinkMetrics.getCounter("sourceJobNameMismatchRejection");
169+
170+
171+
if (queryParameters != null && queryParameters.containsKey(MantisSSEConstants.TARGET_JOB)) {
172+
String targetJob = queryParameters.get(MantisSSEConstants.TARGET_JOB).get(0);
173+
String currentJob = this.context.getWorkerInfo().getJobClusterName();
174+
if (!currentJob.equalsIgnoreCase(targetJob)) {
175+
LOG.info("Rejecting connection from {}. Client is targeting job {} but this is job {}.", uniqueClientId, targetJob, currentJob);
176+
sourceJobNameMismatchRejectionCounter.increment();
177+
response.setStatus(HttpResponseStatus.BAD_REQUEST);
178+
response.writeStringAndFlush("data: " + MantisSSEConstants.TARGET_JOB + " is " + targetJob + " but this is " + currentJob + "." + TWO_NEWLINES);
179+
return response.close();
180+
}
181+
}
182+
145183
if (queryParameters != null && queryParameters.containsKey(CLIENT_ID_PARAM)) {
146184
// enablePings
147185
uniqueClientId = queryParameters.get(CLIENT_ID_PARAM).get(0);
@@ -202,28 +240,6 @@ public Observable<Void> handle(HttpServerRequest<ByteBuf> request,
202240
? queryParameters.get(MantisSSEConstants.MANTIS_COMPRESSION_DELIMITER).get(0).getBytes()
203241
: null;
204242

205-
Tag[] tags = new Tag[2];
206-
final String clientId = Optional.ofNullable(uniqueClientId).orElse("none");
207-
final String sockAddr = Optional.ofNullable(socketAddrStr).orElse("none");
208-
tags[0] = new BasicTag("clientId", clientId);
209-
tags[1] = new BasicTag("sockAddr", sockAddr);
210-
211-
Metrics sseSinkMetrics = new Metrics.Builder()
212-
.id("ServerSentEventRequestHandler", tags)
213-
.addCounter("processedCounter")
214-
.addCounter("pingCounter")
215-
.addCounter("errorCounter")
216-
.addCounter("droppedCounter")
217-
.addCounter("flushCounter")
218-
.build();
219-
220-
221-
final Counter msgProcessedCounter = sseSinkMetrics.getCounter("processedCounter");
222-
final Counter pingCounter = sseSinkMetrics.getCounter("pingCounter");
223-
final Counter errorCounter = sseSinkMetrics.getCounter("errorCounter");
224-
final Counter droppedWrites = sseSinkMetrics.getCounter("droppedCounter");
225-
final Counter flushCounter = sseSinkMetrics.getCounter("flushCounter");
226-
227243
// get predicate, defaults to return true for all T
228244
Func1<T, Boolean> filterFunction = new Func1<T, Boolean>() {
229245
@Override

0 commit comments

Comments
 (0)