|
28 | 28 | import io.grpc.Attributes;
|
29 | 29 | import io.grpc.ChannelLogger;
|
30 | 30 | import io.grpc.InternalChannelz;
|
| 31 | +import io.grpc.InternalStatus; |
31 | 32 | import io.grpc.Metadata;
|
32 | 33 | import io.grpc.Status;
|
33 | 34 | import io.grpc.StatusException;
|
|
83 | 84 | import io.perfmark.Tag;
|
84 | 85 | import io.perfmark.TaskCloseable;
|
85 | 86 | import java.nio.channels.ClosedChannelException;
|
| 87 | +import java.util.LinkedHashMap; |
| 88 | +import java.util.Map; |
86 | 89 | import java.util.concurrent.Executor;
|
87 | 90 | import java.util.logging.Level;
|
88 | 91 | import java.util.logging.Logger;
|
|
94 | 97 | */
|
95 | 98 | class NettyClientHandler extends AbstractNettyHandler {
|
96 | 99 | private static final Logger logger = Logger.getLogger(NettyClientHandler.class.getName());
|
| 100 | + static boolean enablePerRpcAuthorityCheck = |
| 101 | + GrpcUtil.getFlag("GRPC_ENABLE_PER_RPC_AUTHORITY_CHECK", false); |
97 | 102 |
|
98 | 103 | /**
|
99 | 104 | * A message that simply passes through the channel without any real processing. It is useful to
|
@@ -128,6 +133,13 @@ protected void handleNotInUse() {
|
128 | 133 | lifecycleManager.notifyInUse(false);
|
129 | 134 | }
|
130 | 135 | };
|
| 136 | + private final Map<String, Status> peerVerificationResults = |
| 137 | + new LinkedHashMap<String, Status>() { |
| 138 | + @Override |
| 139 | + protected boolean removeEldestEntry(Map.Entry<String, Status> eldest) { |
| 140 | + return size() > 100; |
| 141 | + } |
| 142 | + }; |
131 | 143 |
|
132 | 144 | private WriteQueue clientWriteQueue;
|
133 | 145 | private Http2Ping ping;
|
@@ -591,6 +603,56 @@ private void createStream(CreateStreamCommand command, ChannelPromise promise)
|
591 | 603 | return;
|
592 | 604 | }
|
593 | 605 |
|
| 606 | + CharSequence authorityHeader = command.headers().authority(); |
| 607 | + if (authorityHeader == null) { |
| 608 | + Status authorityVerificationStatus = Status.UNAVAILABLE.withDescription( |
| 609 | + "Missing authority header"); |
| 610 | + command.stream().setNonExistent(); |
| 611 | + command.stream().transportReportStatus( |
| 612 | + Status.UNAVAILABLE, RpcProgress.PROCESSED, true, new Metadata()); |
| 613 | + promise.setFailure(InternalStatus.asRuntimeExceptionWithoutStacktrace( |
| 614 | + authorityVerificationStatus, null)); |
| 615 | + return; |
| 616 | + } |
| 617 | + // No need to verify authority for the rpc outgoing header if it is same as the authority |
| 618 | + // for the transport |
| 619 | + if (!authority.contentEquals(authorityHeader)) { |
| 620 | + Status authorityVerificationStatus = peerVerificationResults.get( |
| 621 | + authorityHeader.toString()); |
| 622 | + if (authorityVerificationStatus == null) { |
| 623 | + if (attributes.get(GrpcAttributes.ATTR_AUTHORITY_VERIFIER) == null) { |
| 624 | + authorityVerificationStatus = Status.UNAVAILABLE.withDescription( |
| 625 | + "Authority verifier not found to verify authority"); |
| 626 | + command.stream().setNonExistent(); |
| 627 | + command.stream().transportReportStatus( |
| 628 | + authorityVerificationStatus, RpcProgress.PROCESSED, true, new Metadata()); |
| 629 | + promise.setFailure(InternalStatus.asRuntimeExceptionWithoutStacktrace( |
| 630 | + authorityVerificationStatus, null)); |
| 631 | + return; |
| 632 | + } |
| 633 | + authorityVerificationStatus = attributes.get(GrpcAttributes.ATTR_AUTHORITY_VERIFIER) |
| 634 | + .verifyAuthority(authorityHeader.toString()); |
| 635 | + peerVerificationResults.put(authorityHeader.toString(), authorityVerificationStatus); |
| 636 | + if (!authorityVerificationStatus.isOk() && !enablePerRpcAuthorityCheck) { |
| 637 | + logger.log(Level.WARNING, String.format("%s.%s", |
| 638 | + authorityVerificationStatus.getDescription(), |
| 639 | + enablePerRpcAuthorityCheck |
| 640 | + ? "" : " This will be an error in the future."), |
| 641 | + InternalStatus.asRuntimeExceptionWithoutStacktrace( |
| 642 | + authorityVerificationStatus, null)); |
| 643 | + } |
| 644 | + } |
| 645 | + if (!authorityVerificationStatus.isOk()) { |
| 646 | + if (enablePerRpcAuthorityCheck) { |
| 647 | + command.stream().setNonExistent(); |
| 648 | + command.stream().transportReportStatus( |
| 649 | + authorityVerificationStatus, RpcProgress.PROCESSED, true, new Metadata()); |
| 650 | + promise.setFailure(InternalStatus.asRuntimeExceptionWithoutStacktrace( |
| 651 | + authorityVerificationStatus, null)); |
| 652 | + return; |
| 653 | + } |
| 654 | + } |
| 655 | + } |
594 | 656 | // Get the stream ID for the new stream.
|
595 | 657 | int streamId;
|
596 | 658 | try {
|
|
0 commit comments