|
17 | 17 | package org.eclipse.hono.communication.api.service.communication;
|
18 | 18 |
|
19 | 19 | import java.util.ArrayList;
|
| 20 | +import java.util.HashMap; |
20 | 21 | import java.util.List;
|
| 22 | +import java.util.Map; |
| 23 | +import java.util.Set; |
| 24 | +import java.util.stream.Stream; |
21 | 25 |
|
22 |
| -import org.eclipse.hono.client.pubsub.PubSubBasedAdminClientManager; |
| 26 | +import org.apache.commons.lang3.StringUtils; |
23 | 27 | import org.eclipse.hono.client.pubsub.PubSubMessageHelper;
|
24 | 28 | import org.eclipse.hono.communication.api.config.PubSubConstants;
|
25 | 29 | import org.eclipse.hono.communication.api.handler.CommandTopicEventHandler;
|
|
37 | 41 | import com.google.cloud.pubsub.v1.AckReplyConsumer;
|
38 | 42 | import com.google.common.base.Strings;
|
39 | 43 | import com.google.pubsub.v1.PubsubMessage;
|
| 44 | +import com.google.pubsub.v1.Subscription; |
40 | 45 | import com.google.pubsub.v1.SubscriptionName;
|
41 | 46 | import com.google.pubsub.v1.TopicName;
|
42 | 47 |
|
@@ -96,16 +101,115 @@ public InternalTopicManagerImpl(final DeviceRepository deviceRepository,
|
96 | 101 | @Override
|
97 | 102 | public void initPubSub() {
|
98 | 103 | log.info("Initialize tenant topics and subscriptions.");
|
99 |
| - internalMessaging.subscribe(PubSubConstants.TENANT_NOTIFICATIONS, this::onTenantChanges); |
| 104 | + vertx.executeBlocking(promise -> { |
| 105 | + internalMessaging.subscribe(PubSubConstants.TENANT_NOTIFICATIONS, this::onTenantChanges); |
| 106 | + promise.complete(); |
| 107 | + }); |
100 | 108 | deviceRepository.listDistinctTenants()
|
101 | 109 | .onFailure(err -> log.error("Error getting tenants for topic creation: {}", err.getMessage()))
|
102 | 110 | .onSuccess(tenants -> {
|
| 111 | + if (tenants.size() < internalMessagingConfig.getBatchInitTenantThreshold()) { |
| 112 | + for (String tenant : tenants) { |
| 113 | + initPubSubForTenant(tenant); |
| 114 | + } |
| 115 | + } else { |
| 116 | + batchInitPubSubResources(tenants); |
| 117 | + } |
| 118 | + }); |
| 119 | + } |
| 120 | + |
| 121 | + private void batchInitPubSubResources(final List<String> tenants) { |
| 122 | + log.info("Start batchInitPubSubResources"); |
| 123 | + findMissingPubSubResources(tenants).compose(map -> { |
| 124 | + final PubSubBasedAdminClientManager adminClientManager = adminClientManagerFactory |
| 125 | + .createAdminClientManager(); |
| 126 | + final List<Future> pubsubRequests = new ArrayList<>(); |
| 127 | + for (Map.Entry<String, String> missingTopic : map.get("missingTopics").entrySet()) { |
| 128 | + final TopicName topic = TopicName.parse(missingTopic.getKey()); |
| 129 | + pubsubRequests.add(adminClientManager.createTopic(topic)); |
| 130 | + } |
| 131 | + for (Map.Entry<String, String> missingSubscription : map.get("missingSubscriptions").entrySet()) { |
| 132 | + final SubscriptionName subscription = SubscriptionName.parse(missingSubscription.getKey()); |
| 133 | + final TopicName topic = TopicName.parse(missingSubscription.getValue()); |
| 134 | + pubsubRequests.add(adminClientManager.createSubscription(subscription, topic)); |
| 135 | + } |
| 136 | + for (Map.Entry<String, String> faultySubscription : map.get("faultySubscriptions").entrySet()) { |
| 137 | + final SubscriptionName subscription = SubscriptionName.parse(faultySubscription.getKey()); |
| 138 | + final TopicName topic = TopicName.parse(faultySubscription.getValue()); |
| 139 | + pubsubRequests.add(adminClientManager.updateSubscriptionTopic(subscription, topic)); |
| 140 | + } |
| 141 | + return CompositeFuture.join(pubsubRequests).onComplete(i -> { |
| 142 | + adminClientManager.closeAdminClients(); |
| 143 | + log.info("Finished batchInitPubSubResources"); |
| 144 | + }); |
| 145 | + }).onFailure(e -> log.error("batchInitPubSubResources failed", e)) |
| 146 | + .onSuccess(i -> { |
103 | 147 | for (String tenant : tenants) {
|
104 |
| - initPubSubForTenant(tenant); |
| 148 | + vertx.executeBlocking(promise -> subscribeToTenantTopics(tenant)); |
105 | 149 | }
|
106 | 150 | });
|
107 | 151 | }
|
108 | 152 |
|
| 153 | + private Future<Map<String, Map<String, String>>> findMissingPubSubResources(final List<String> tenants) { |
| 154 | + final PubSubBasedAdminClientManager pubSubBasedAdminClientManager = adminClientManagerFactory |
| 155 | + .createAdminClientManager(); |
| 156 | + final Future<Set<String>> topicsFuture = pubSubBasedAdminClientManager.listTopics(); |
| 157 | + final Future<Set<Subscription>> subscriptionsFuture = pubSubBasedAdminClientManager.listSubscriptions(); |
| 158 | + final Map<String, String> missingTopics = new HashMap<>(); |
| 159 | + final Map<String, String> missingSubscriptions = new HashMap<>(); |
| 160 | + final Map<String, String> faultySubscriptions = new HashMap<>(); |
| 161 | + return CompositeFuture.join(topicsFuture, subscriptionsFuture).map(i -> { |
| 162 | + final Set<String> topicSet = topicsFuture.result(); |
| 163 | + final Set<Subscription> subscriptionSet = subscriptionsFuture.result(); |
| 164 | + final Map<String, Subscription> subscriptionMap = new HashMap<>(); |
| 165 | + subscriptionSet.forEach(s -> subscriptionMap.put(s.getName(), s)); |
| 166 | + for (String tenantEndpoint : PubSubConstants.getTenantEndpoints()) { |
| 167 | + for (String tenant : tenants) { |
| 168 | + final String topic = String |
| 169 | + .valueOf(TopicName.of(projectId, PubSubMessageHelper.getTopicName(tenantEndpoint, tenant))); |
| 170 | + addMissingTopicToMap(topicSet, topic, missingTopics); |
| 171 | + final String subscription = String.valueOf( |
| 172 | + SubscriptionName.of(projectId, PubSubMessageHelper.getTopicName(tenantEndpoint, tenant))); |
| 173 | + addMissingOrFaultySubscription(subscriptionMap, missingSubscriptions, faultySubscriptions, |
| 174 | + subscription, topic); |
| 175 | + if (PubSubConstants.getEndpointsWithAdditionalSubscription().contains(tenantEndpoint)) { |
| 176 | + final String apiSubscription = String.valueOf(SubscriptionName.of(projectId, |
| 177 | + PubSubMessageHelper.getTopicName( |
| 178 | + String.format(PubSubConstants.COMMUNICATION_API_SUBSCRIPTION_NAME, |
| 179 | + tenantEndpoint), |
| 180 | + tenant))); |
| 181 | + addMissingOrFaultySubscription(subscriptionMap, missingSubscriptions, faultySubscriptions, |
| 182 | + apiSubscription, topic); |
| 183 | + } |
| 184 | + } |
| 185 | + } |
| 186 | + return Map.of("missingTopics", missingTopics, "missingSubscriptions", missingSubscriptions, |
| 187 | + "faultySubscriptions", faultySubscriptions); |
| 188 | + }).onFailure(thr -> log.error("Cannot find missing Pub/Sub resources", thr)) |
| 189 | + .onComplete(i -> pubSubBasedAdminClientManager.closeAdminClients()); |
| 190 | + } |
| 191 | + |
| 192 | + private void addMissingTopicToMap(final Set<String> topicSet, final String topic, |
| 193 | + final Map<String, String> missingTopics) { |
| 194 | + if (!topicSet.contains(topic)) { |
| 195 | + missingTopics.put(topic, ""); |
| 196 | + } |
| 197 | + } |
| 198 | + |
| 199 | + private void addMissingOrFaultySubscription(final Map<String, Subscription> subscriptionMap, |
| 200 | + final Map<String, String> missingSubscriptions, final Map<String, String> faultySubscriptions, |
| 201 | + final String subscription, final String topic) { |
| 202 | + if (!subscriptionMap.containsKey(subscription)) { |
| 203 | + missingSubscriptions.put(subscription, topic); |
| 204 | + return; |
| 205 | + } |
| 206 | + final String deletedTopic = "_deleted-topic_"; |
| 207 | + if (subscriptionMap.get(subscription) != null |
| 208 | + && StringUtils.equals(subscriptionMap.get(subscription).getTopic(), deletedTopic)) { |
| 209 | + faultySubscriptions.put(subscription, topic); |
| 210 | + } |
| 211 | + } |
| 212 | + |
109 | 213 | /**
|
110 | 214 | * Handle incoming tenant CREATE notifications.
|
111 | 215 | *
|
@@ -165,12 +269,17 @@ private Future<Void> createPubSubResourceForTenant(final String tenantId,
|
165 | 269 | final PubSubResourceType pubSubResourceType,
|
166 | 270 | final PubSubBasedAdminClientManager pubSubBasedAdminClientManager) {
|
167 | 271 | final List<Future> futureList = new ArrayList<>();
|
168 |
| - final List<String> topics = PubSubConstants.getTenantTopics(); |
169 |
| - for (String topic : topics) { |
| 272 | + for (String tenantEndpoint : PubSubConstants.getTenantEndpoints()) { |
170 | 273 | if (pubSubResourceType == PubSubResourceType.TOPIC) {
|
171 |
| - futureList.add(pubSubBasedAdminClientManager.getOrCreateTopic(topic, tenantId)); |
| 274 | + futureList.add(pubSubBasedAdminClientManager.getOrCreateTopic(tenantEndpoint, tenantId)); |
172 | 275 | } else {
|
173 |
| - futureList.add(pubSubBasedAdminClientManager.getOrCreateSubscription(topic, tenantId)); |
| 276 | + futureList.add(pubSubBasedAdminClientManager.getOrCreateSubscription(tenantEndpoint, tenantEndpoint, |
| 277 | + tenantId)); |
| 278 | + if (PubSubConstants.getEndpointsWithAdditionalSubscription().contains(tenantEndpoint)) { |
| 279 | + futureList.add(pubSubBasedAdminClientManager.getOrCreateSubscription(tenantEndpoint, |
| 280 | + String.format(PubSubConstants.COMMUNICATION_API_SUBSCRIPTION_NAME, tenantEndpoint), |
| 281 | + tenantId)); |
| 282 | + } |
174 | 283 | }
|
175 | 284 | }
|
176 | 285 | return CompositeFuture.join(futureList)
|
@@ -198,18 +307,31 @@ private void subscribeToTenantTopics(final String tenant) {
|
198 | 307 | }
|
199 | 308 |
|
200 | 309 | private void cleanupPubSubResources(final String tenant) {
|
201 |
| - final List<String> pubSubTopicsToDelete = PubSubConstants.getTenantTopics().stream() |
202 |
| - .map(id -> TopicName.of(projectId, "%s.%s".formatted(tenant, id)).toString()).toList(); |
203 |
| - final List<String> pubSubSubscriptionsToDelete = PubSubConstants.getTenantTopics().stream() |
204 |
| - .map(id -> SubscriptionName.of(projectId, "%s.%s".formatted(tenant, id)).toString()).toList(); |
| 310 | + final List<String> pubSubTopicsToDelete = PubSubConstants.getTenantEndpoints().stream().map( |
| 311 | + endpoint -> String.valueOf(TopicName.of(projectId, PubSubMessageHelper.getTopicName(endpoint, tenant)))) |
| 312 | + .toList(); |
| 313 | + final List<String> pubSubSubscriptionsToDelete = Stream.concat(PubSubConstants.getTenantEndpoints().stream() |
| 314 | + .map(endpoint -> String |
| 315 | + .valueOf(SubscriptionName.of(projectId, PubSubMessageHelper.getTopicName(endpoint, tenant)))), |
| 316 | + PubSubConstants.getEndpointsWithAdditionalSubscription().stream() |
| 317 | + .map(endpoint -> String.valueOf(SubscriptionName.of(projectId, |
| 318 | + PubSubMessageHelper.getTopicName( |
| 319 | + String.format(PubSubConstants.COMMUNICATION_API_SUBSCRIPTION_NAME, endpoint), |
| 320 | + tenant))))) |
| 321 | + .toList(); |
| 322 | + internalMessaging.closeSubscribersForTenant(tenant); |
205 | 323 | PubSubMessageHelper.getCredentialsProvider()
|
206 | 324 | .ifPresentOrElse(provider -> {
|
207 | 325 | final PubSubBasedAdminClientManager pubSubBasedAdminClientManager = adminClientManagerFactory
|
208 | 326 | .createAdminClientManager();
|
209 |
| - pubSubBasedAdminClientManager.deleteTopics(pubSubTopicsToDelete); |
210 |
| - pubSubBasedAdminClientManager.deleteSubscriptions(pubSubSubscriptionsToDelete); |
211 |
| - log.info("All topics and subscriptions for tenant {} were deleted successfully.", tenant); |
212 |
| - pubSubBasedAdminClientManager.closeAdminClients(); |
| 327 | + CompositeFuture.join(pubSubBasedAdminClientManager.deleteTopics(pubSubTopicsToDelete), |
| 328 | + pubSubBasedAdminClientManager.deleteSubscriptions(pubSubSubscriptionsToDelete)) |
| 329 | + .onSuccess(compFuture -> log.info( |
| 330 | + "All topics and subscriptions of tenant {} were deleted successfully.", tenant)) |
| 331 | + .onFailure(throwable -> log.warn( |
| 332 | + "Some topics or subscriptions of tenant {} could not be deleted.", tenant, |
| 333 | + throwable)) |
| 334 | + .onComplete(compFuture -> pubSubBasedAdminClientManager.closeAdminClients()); |
213 | 335 | }, () -> log.error("credentials provider is empty"));
|
214 | 336 | }
|
215 | 337 |
|
|
0 commit comments