Skip to content

java.lang.NullPointerException: Cannot invoke "org.reactivestreams.Subscription.cancel()" because "this.upstream" is null #11643

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
kevin-wise opened this issue Mar 3, 2025 · 8 comments
Labels
status: awaiting validation Waiting to be validated as a real issue

Comments

@kevin-wise
Copy link
Contributor

kevin-wise commented Mar 3, 2025

Seeing this new error after upgrading to Micronaut 4.7.6 from 4.6.3. It doesn't seem to cause any problems in the API, just fills up the logs. Two services upgraded so far, and both are seeing the issue-- though one only saw it once and the other 512 times.

Expected Behavior

No NullPointerException should be thrown

Actual Behaviour

Seeing this error in the logs: "Operator called default onErrorDropped" with the following stacktrace:

java.lang.NullPointerException: Cannot invoke "org.reactivestreams.Subscription.cancel()" because "this.upstream" is null
	at reactor.core.publisher.MonoCacheInvalidateIf$CoordinatorSubscriber.remove(MonoCacheInvalidateIf.java:273)
	at reactor.core.publisher.MonoCacheInvalidateIf$CacheMonoSubscriber.cancel(MonoCacheInvalidateIf.java:377)
	at reactor.core.publisher.FluxMap$MapSubscriber.cancel(FluxMap.java:169)
	at reactor.core.publisher.FluxMap$MapSubscriber.cancel(FluxMap.java:169)
	at reactor.core.publisher.Operators.terminate(Operators.java:1328)
	at reactor.core.publisher.FluxFlatMap$FlatMapInner.cancel(FluxFlatMap.java:1026)
	at reactor.core.publisher.FluxFlatMap$FlatMapMain.unsubscribeEntry(FluxFlatMap.java:342)
	at reactor.core.publisher.FluxFlatMap$FlatMapMain.unsubscribeEntry(FluxFlatMap.java:221)
	at reactor.core.publisher.FlatMapTracker.unsubscribe(FluxFlatMap.java:1087)
	at reactor.core.publisher.FluxFlatMap$FlatMapMain.checkTerminated(FluxFlatMap.java:815)
	at reactor.core.publisher.FluxFlatMap$FlatMapMain.drainLoop(FluxFlatMap.java:612)
	at reactor.core.publisher.FluxFlatMap$FlatMapMain.tryEmit(FluxFlatMap.java:573)
	at reactor.core.publisher.FluxFlatMap$FlatMapInner.onNext(FluxFlatMap.java:988)
	at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:122)
	at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:122)
	at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1865)
	at reactor.core.publisher.MonoCacheInvalidateIf$CoordinatorSubscriber.onNext(MonoCacheInvalidateIf.java:319)
	at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:122)
	at reactor.core.publisher.FluxDefaultIfEmpty$DefaultIfEmptySubscriber.onNext(FluxDefaultIfEmpty.java:122)
	at reactor.core.publisher.FluxHandle$HandleSubscriber.onNext(FluxHandle.java:129)
	at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79)
	at reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:82)
	at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:122)
	at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79)
	at reactor.core.publisher.SerializedSubscriber.onNext(SerializedSubscriber.java:99)
	at reactor.core.publisher.FluxTimeout$TimeoutMainSubscriber.onNext(FluxTimeout.java:181)
	at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.onNext(FluxHide.java:137)
	at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:122)
	at reactor.core.publisher.MonoContextWriteRestoringThreadLocals$ContextWriteRestoringThreadLocalsSubscriber.onNext(MonoContextWriteRestoringThreadLocals.java:110)
	at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:158)
	at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79)
	at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:122)
	at reactor.core.publisher.MonoContextWriteRestoringThreadLocals$ContextWriteRestoringThreadLocalsSubscriber.onNext(MonoContextWriteRestoringThreadLocals.java:110)
	at reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:82)
	at reactor.core.publisher.FluxContextWriteRestoringThreadLocals$ContextWriteRestoringThreadLocalsSubscriber.onNext(FluxContextWriteRestoringThreadLocals.java:118)
	at io.micronaut.core.async.propagation.ReactivePropagation$2.onNext(ReactivePropagation.java:108)
	at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onNext(MonoPeekTerminal.java:180)
	at reactor.core.publisher.FluxPeekFuseable$PeekConditionalSubscriber.onNext(FluxPeekFuseable.java:854)
	at reactor.core.publisher.FluxMap$MapConditionalSubscriber.onNext(FluxMap.java:224)
	at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79)
	at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:122)
	at reactor.core.publisher.MonoContextWriteRestoringThreadLocals$ContextWriteRestoringThreadLocalsSubscriber.onNext(MonoContextWriteRestoringThreadLocals.java:110)
	at reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:82)
	at reactor.core.publisher.FluxContextWriteRestoringThreadLocals$ContextWriteRestoringThreadLocalsSubscriber.onNext(FluxContextWriteRestoringThreadLocals.java:118)
	at io.micronaut.core.async.propagation.ReactivePropagation$2.onNext(ReactivePropagation.java:108)
	at io.micronaut.http.client.filters.ClientServerRequestTracingPublisher$1.lambda$onNext$1(ClientServerRequestTracingPublisher.java:60)
	at io.micronaut.http.context.ServerRequestContext.with(ServerRequestContext.java:49)
	at io.micronaut.http.client.filters.ClientServerRequestTracingPublisher$1.onNext(ClientServerRequestTracingPublisher.java:60)
	at io.micronaut.http.client.filters.ClientServerRequestTracingPublisher$1.onNext(ClientServerRequestTracingPublisher.java:52)
	at reactor.core.publisher.StrictSubscriber.onNext(StrictSubscriber.java:89)
	at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:122)
	at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79)
	at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:129)
	at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:158)
	at reactor.core.publisher.MonoFlatMap$FlatMapMain.secondComplete(MonoFlatMap.java:245)
	at reactor.core.publisher.MonoFlatMap$FlatMapInner.onNext(MonoFlatMap.java:305)
	at reactor.core.publisher.MonoSubscribeOn$SubscribeOnSubscriber.onNext(MonoSubscribeOn.java:146)
	at reactor.core.publisher.FluxContextWriteRestoringThreadLocals$ContextWriteRestoringThreadLocalsSubscriber.onNext(FluxContextWriteRestoringThreadLocals.java:118)
	at reactor.core.publisher.MonoCreate$DefaultMonoSink.success(MonoCreate.java:176)
	at io.micronaut.http.client.netty.DefaultHttpClient$6.complete(DefaultHttpClient.java:1709)
	at io.micronaut.http.client.netty.Http1ResponseHandler$BufferedContent.complete(Http1ResponseHandler.java:222)
	at io.micronaut.http.client.netty.Http1ResponseHandler$BufferedContent.read(Http1ResponseHandler.java:185)
	at io.micronaut.http.client.netty.Http1ResponseHandler$BufferedContent.read(Http1ResponseHandler.java:157)
	at io.micronaut.http.client.netty.Http1ResponseHandler.channelReadInstrumented(Http1ResponseHandler.java:73)
	at io.micronaut.http.client.netty.Http1ResponseHandler.channelReadInstrumented(Http1ResponseHandler.java:48)
	at io.micronaut.http.client.netty.SimpleChannelInboundHandlerInstrumented.channelRead0(SimpleChannelInboundHandlerInstrumented.java:46)
	at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
	at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:107)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
	at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436)
	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346)
	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:318)
	at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
	at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:289)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1357)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:868)
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:796)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:732)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:658)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:998)
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.base/java.lang.Thread.run(Thread.java:1583)

Thread is a Netty event loop thread, for example default-nioEventLoopGroup-5-3.
Logger is reactor.core.publisher.Operators.

Steps To Reproduce

Not sure yet. Might be load related? Will give more info when we have it. Wanted to post this in case someone else is working the same issue already.

Environment Information

No response

Example Application

No response

Version

4.7.6

@kevin-wise
Copy link
Contributor Author

Are there any tips or tricks on reproducing issues with reactive streams? From the stack trace it seems like something is getting cancelled, and while being cancelled it's hitting the NPE. It seems like we'd need to know what's cancelling it, but I have no idea how to find that.

@kevin-wise
Copy link
Contributor Author

Based on the stack trace it looks like this is happening when an HttpClient call is being completed.

On one service (so far) we've narrowed it down to a JWKS refresh. By changing the JWKS cache expiration the errors went way down, possibly to zero. That particular service doesn't use HttpClient for anything else, so we'll verify with another service and let you know.

@kevin-wise
Copy link
Contributor Author

Also noticed this seems to correlate with

Unexpected message on HTTP2 connection channel: DefaultHttp2PingFrame

But that could be a coincidence. Just noting it for now.

@kevin-wise
Copy link
Contributor Author

Lengthening the JWKS cache expiration did mitigate but not completely eliminate the problem. So far we have not found a way to replicate the issue. Since the only impact appears to be some extra logs entries, and the number of log entries is mitigated by increasing the cache expiration, we're considering this to be more of a nuisance than a critical issue.

@kevin-wise
Copy link
Contributor Author

Switching to CacheableJwkSetFetcher completely eliminated the error, so the error is somehow tied to ReactorCacheJwkSetFetcher. Seeing how ReactorCacheJwkSetFetcher is deprecated, is there still a chance this bug could get fixed?

On a side note, JwksCacheConfigurationExistsCondition only checks for a Caffeine cache. It would be nice if it would find any kind of cache (such as Redis). We had to pull in Caffeine just for this.

@kevin-wise
Copy link
Contributor Author

FYI we got Redis working. Had to implement a JwkSetFetcher like CacheableJwkSetFetcher, but checking for the redis cache property. Had to put it in the same package since we had to use the package-private ReactorCacheJwkSetFetcher for @Replaces(ReactorCacheJwkSetFetcher.class).

@yawkat
Copy link
Member

yawkat commented Mar 17, 2025

Looks like a reactor bug tbh. I don't see a way in which that NPE could happen through misbehavior on our side

@yawkat yawkat added the status: awaiting validation Waiting to be validated as a real issue label Mar 24, 2025
@yawkat
Copy link
Member

yawkat commented Mar 24, 2025

I'm putting this on hiatus unless someone can provide a reproducer.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
status: awaiting validation Waiting to be validated as a real issue
Projects
None yet
Development

No branches or pull requests

2 participants