113113 */
114114public abstract class DemandForwardingBridgeSupport implements NetworkBridge , BrokerServiceAware {
115115 private static final Logger LOG = LoggerFactory .getLogger (DemandForwardingBridgeSupport .class );
116- protected static final String DURABLE_SUB_PREFIX = "NC-DS_" ;
116+ protected static final String DURABLE_SUB_PREFIX = NetworkBridgeConfiguration . DURABLE_SUB_PREFIX ;
117117 protected final Transport localBroker ;
118118 protected final Transport remoteBroker ;
119119 protected IdGenerator idGenerator = new IdGenerator ();
@@ -664,23 +664,8 @@ public void run() {
664664 }
665665 }
666666
667- /**
668- * Checks whether or not this consumer is a direct bridge network subscription
669- * @param info
670- * @return
671- */
672- protected boolean isDirectBridgeConsumer (ConsumerInfo info ) {
673- return (info .getSubscriptionName () != null && info .getSubscriptionName ().startsWith (DURABLE_SUB_PREFIX )) &&
674- (info .getClientId () == null || info .getClientId ().startsWith (configuration .getName ()));
675- }
676-
677667 protected boolean isProxyBridgeSubscription (String clientId , String subName ) {
678- if (subName != null && clientId != null ) {
679- if (subName .startsWith (DURABLE_SUB_PREFIX ) && !clientId .startsWith (configuration .getName ())) {
680- return true ;
681- }
682- }
683- return false ;
668+ return NetworkBridgeUtils .isProxyBridgeSubscription (configuration , clientId , subName );
684669 }
685670
686671 /**
@@ -750,49 +735,61 @@ protected void serviceRemoteCommand(Command command) {
750735 } else if (command instanceof BrokerSubscriptionInfo ) {
751736 final BrokerSubscriptionInfo brokerSubscriptionInfo = (BrokerSubscriptionInfo ) command ;
752737
753- //Start in a new thread so we don't block the transport waiting for staticDestinations
754- syncExecutor .execute (new Runnable () {
755-
756- @ Override
757- public void run () {
758- try {
759- staticDestinationsLatch .await ();
760- //Make sure after the countDown of staticDestinationsLatch we aren't stopping
761- if (!disposed .get ()) {
762- BrokerSubscriptionInfo subInfo = brokerSubscriptionInfo ;
763- LOG .debug ("Received Remote BrokerSubscriptionInfo on {} from {}" ,
764- brokerService .getBrokerName (), subInfo .getBrokerName ());
765-
766- if (configuration .isSyncDurableSubs () && configuration .isConduitSubscriptions ()
767- && !configuration .isDynamicOnly ()) {
768- if (started .get ()) {
769- if (subInfo .getSubscriptionInfos () != null ) {
770- for (ConsumerInfo info : subInfo .getSubscriptionInfos ()) {
771- //re-add any process any non-NC consumers that match the
772- //dynamicallyIncludedDestinations list
773- //Also re-add network consumers that are not part of this direct
774- //bridge (proxy of proxy bridges)
775- if ((info .getSubscriptionName () == null || !isDirectBridgeConsumer (info )) &&
776- NetworkBridgeUtils .matchesDestinations (dynamicallyIncludedDestinations , info .getDestination ())) {
777- serviceRemoteConsumerAdvisory (info );
778- }
779- }
780- }
738+ // Skip the durable sync if any of the following are true:
739+ // 1) if the flag is set to false.
740+ // 2) If dynamicOnly is true, this means means to only activate when the real
741+ // consumers come back so we need to skip. This mode is useful espeically when
742+ // setting TTL > 1 as the TTL info is tied to consumers
743+ // 3) If conduit subscriptions is disable we also skip, for the same reason we
744+ // skip when dynamicOnly is true, that we need to let consumers entirely drive
745+ // the creation/removal of subscriptions as each consumer gets their own
746+ if (!configuration .isSyncDurableSubs () || !configuration .isConduitSubscriptions ()
747+ || configuration .isDynamicOnly ()) {
748+ return ;
749+ }
781750
782- //After re-added, clean up any empty durables
783- for (Iterator <DemandSubscription > i = subscriptionMapByLocalId .values ().iterator (); i .hasNext (); ) {
784- DemandSubscription ds = i .next ();
785- if (NetworkBridgeUtils .matchesDestinations (dynamicallyIncludedDestinations , ds .getLocalInfo ().getDestination ())) {
786- cleanupDurableSub (ds , i );
787- }
788- }
751+ //Start in a new thread so we don't block the transport waiting for staticDestinations
752+ syncExecutor .execute (() -> {
753+ try {
754+ staticDestinationsLatch .await ();
755+
756+ //Make sure after the countDown of staticDestinationsLatch we aren't stopping
757+ if (!disposed .get () && started .get ()) {
758+ final BrokerSubscriptionInfo subInfo = brokerSubscriptionInfo ;
759+ LOG .debug ("Received Remote BrokerSubscriptionInfo on {} from {}" ,
760+ brokerService .getBrokerName (), subInfo .getBrokerName ());
761+
762+ // Go through and subs sent and see if we can add demand
763+ if (subInfo .getSubscriptionInfos () != null ) {
764+ // Re-add and process subscriptions on the remote broker to add demand
765+ for (ConsumerInfo info : subInfo .getSubscriptionInfos ()) {
766+ // Brokers filter what is sent, but the filtering logic has changed between
767+ // versions, plus some durables sent are only processed for removes so we
768+ // need to filter what to process for adding demand
769+ if (NetworkBridgeUtils .matchesConfigForDurableSync (configuration ,
770+ info .getClientId (), info .getSubscriptionName (), info .getDestination ())) {
771+ serviceRemoteConsumerAdvisory (info );
789772 }
790773 }
791774 }
792- } catch (Exception e ) {
793- LOG .warn ("Error processing BrokerSubscriptionInfo: {}" , e .getMessage (), e );
794- LOG .debug (e .getMessage (), e );
775+
776+ //After processing demand to add, clean up any empty durables
777+ for (Iterator <DemandSubscription > i = subscriptionMapByLocalId .values ().iterator (); i .hasNext (); ) {
778+ DemandSubscription ds = i .next ();
779+ // This filters on destinations to see if we should process possible removal
780+ // based on the bridge configuration (included dests, TTL, etc).
781+ if (NetworkBridgeUtils .matchesDestinations (configuration .getDynamicallyIncludedDestinations (),
782+ ds .getLocalInfo ().getDestination ())) {
783+ // Note that this method will further check that there are no remote
784+ // demand that was previously added or associated. If there are remote
785+ // subscriptions tied to the DS, then it will not be removed.
786+ cleanupDurableSub (ds , i );
787+ }
788+ }
795789 }
790+ } catch (Exception e ) {
791+ LOG .warn ("Error processing BrokerSubscriptionInfo: {}" , e .getMessage (), e );
792+ LOG .debug (e .getMessage (), e );
796793 }
797794 });
798795
@@ -1427,7 +1424,7 @@ protected void setupStaticDestinations() {
14271424 if (dests != null ) {
14281425 for (ActiveMQDestination dest : dests ) {
14291426 if (isPermissableDestination (dest )) {
1430- DemandSubscription sub = createDemandSubscription (dest , null );
1427+ DemandSubscription sub = createDemandSubscription (dest , null , null );
14311428 if (sub != null ) {
14321429 sub .setStaticallyIncluded (true );
14331430 try {
@@ -1684,7 +1681,8 @@ protected DemandSubscription doCreateDemandSubscription(ConsumerInfo info) throw
16841681 return result ;
16851682 }
16861683
1687- final protected DemandSubscription createDemandSubscription (ActiveMQDestination destination , final String subscriptionName ) {
1684+ final protected DemandSubscription createDemandSubscription (ActiveMQDestination destination , final String subscriptionName ,
1685+ BrokerId [] brokerPath ) {
16881686 ConsumerInfo info = new ConsumerInfo ();
16891687 info .setNetworkSubscription (true );
16901688 info .setDestination (destination );
@@ -1694,7 +1692,16 @@ final protected DemandSubscription createDemandSubscription(ActiveMQDestination
16941692 }
16951693
16961694 // Indicate that this subscription is being made on behalf of the remote broker.
1697- info .setBrokerPath (new BrokerId []{remoteBrokerId });
1695+ // If we have existing brokerPath info then use it, this is important to
1696+ // preserve TTL information
1697+ if (brokerPath == null || brokerPath .length == 0 ) {
1698+ info .setBrokerPath (new BrokerId []{remoteBrokerId });
1699+ } else {
1700+ info .setBrokerPath (brokerPath );
1701+ if (!contains (brokerPath , remoteBrokerId )) {
1702+ addRemoteBrokerToBrokerPath (info );
1703+ }
1704+ }
16981705
16991706 // the remote info held by the DemandSubscription holds the original
17001707 // consumerId, the local info get's overwritten
@@ -1778,7 +1785,7 @@ protected NetworkBridgeFilter createNetworkBridgeFilter(ConsumerInfo info) throw
17781785 return filterFactory .create (info , getRemoteBrokerPath (), configuration .getMessageTTL (), configuration .getConsumerTTL ());
17791786 }
17801787
1781- protected void addRemoteBrokerToBrokerPath (ConsumerInfo info ) throws IOException {
1788+ protected void addRemoteBrokerToBrokerPath (ConsumerInfo info ) {
17821789 info .setBrokerPath (appendToBrokerPath (info .getBrokerPath (), getRemoteBrokerPath ()));
17831790 }
17841791
0 commit comments