@@ -316,23 +316,30 @@ public class PartitionedCache
316316 /**
317317 * Property IndexPendingPartitions
318318 *
319- * This PartitionSet that contains partitions that have a pending index
319+ * Map that contains partitions/counter mappings that have a pending index
320320 * rebuild due to partition redistribution or initial index creation.
321+ * The counter is used to track when multiple extractors are added to the
322+ * same cache. This avoids potentially removing a partition from status
323+ * pending on deferred requests.
321324 *
322325 * @volatile
323326 */
324- private volatile com.tangosol.net.partition.PartitionSet __m_IndexPendingPartitions;
325-
327+ private volatile Map<Integer, AtomicInteger> __m_IndexPendingPartitions;
328+
326329 /**
327330 * Property IndexProcessingPartitions
328331 *
329- * This PartitionSet that contains partitions that index rebuild are being
330- * processed, used in condition check for index rebuild optimization, see
331- * scheduleInitialIndexUpdate.
332- *
332+ * Map that contains partitions/counter mappings that have an index rebuild
333+ * being processed, used in condition check for index rebuild optimization.
334+ * The counter is used to track when multiple extractors are added to the
335+ * same cache. This avoids potentially removing a partition from status
336+ * pending on deferred requests.
337+ *
338+ * @see scheduleInitialIndexUpdate
339+ *
333340 * @volatile
334341 */
335- private volatile com.tangosol.net.partition.PartitionSet __m_IndexProcessingPartitions;
342+ private volatile Map<Integer, AtomicInteger> __m_IndexProcessingPartitions;
336343
337344 /**
338345 * Property IndexUpdateCount
@@ -1940,7 +1947,7 @@ public void ensureIndexReady(int nPartition)
19401947 long cSoftTimeout = (long) (getDefaultGuardTimeout() * getDefaultGuardRecovery());
19411948 long ldtStartTime = Base.getSafeTimeMillis();
19421949 long ldtSoftTimeout = ldtStartTime + cSoftTimeout;
1943- while (getIndexPendingPartitions().contains (nPartition))
1950+ while (getIndexPendingPartitions().containsKey (nPartition))
19441951 {
19451952 try
19461953 {
@@ -2777,7 +2784,7 @@ public long getDistributionContendMillis()
27772784
27782785 // since the asynchronous index update communicates using the service thread,
27792786 // we should not block it (see COH-15478).
2780- PartitionSet partsPending = getIndexPendingPartitions();
2787+ Map partsPending = getIndexPendingPartitions();
27812788 return partsPending == null || partsPending.isEmpty() ? super.getDistributionContendMillis() : 0L;
27822789 }
27832790
@@ -2825,26 +2832,33 @@ public long getIndexingStartTime()
28252832 // Accessor for the property "IndexPendingPartitions"
28262833 /**
28272834 * Getter for property IndexPendingPartitions.<p>
2828- * This PartitionSet that contains partitions that have a pending index
2829- * rebuild due to partition redistribution or initial index creation.
2830- *
2831- * @volatile
2835+ * Map that contains partitions/counter mappings that have a pending index
2836+ * rebuild due to partition redistribution or initial index creation.
2837+ * The counter is used to track when multiple extractors are added to the
2838+ * same cache. This avoids potentially removing a partition from status
2839+ * pending on deferred requests.
2840+ *
2841+ * @volatile
28322842 */
2833- public com.tangosol.net.partition.PartitionSet getIndexPendingPartitions()
2843+ public Map<Integer, AtomicInteger> getIndexPendingPartitions()
28342844 {
28352845 return __m_IndexPendingPartitions;
28362846 }
28372847
28382848 // Accessor for the property "IndexProcessingPartitions"
28392849 /**
28402850 * Getter for property IndexProcessingPartitions.<p>
2841- * This PartitionSet that contains partitions that index rebuild are being
2842- * processed, used in condition check for index rebuild optimization, see
2843- * scheduleInitialIndexUpdate.
2844- *
2845- * @volatile
2851+ * Map that contains partitions/counter mappings that have an index rebuild
2852+ * being processed, used in condition check for index rebuild optimization.
2853+ * The counter is used to track when multiple extractors are added to the
2854+ * same cache. This avoids potentially removing a partition from status
2855+ * pending on deferred requests.
2856+ *
2857+ * @see scheduleInitialIndexUpdate
2858+ *
2859+ * @volatile
28462860 */
2847- public com.tangosol.net.partition.PartitionSet getIndexProcessingPartitions()
2861+ public Map<Integer, AtomicInteger> getIndexProcessingPartitions()
28482862 {
28492863 return __m_IndexProcessingPartitions;
28502864 }
@@ -3506,12 +3520,12 @@ public boolean isLocalStorageEnabled()
35063520
35073521 /**
35083522 * Return true iff partition is still pending or being processed for index
3509- * rebuild.
3523+ * rebuild.
35103524 */
3511- protected boolean isPartitionIndexPending(int nPartition)
3525+ protected synchronized boolean isPartitionIndexPending(int nPartition)
35123526 {
3513- return getIndexPendingPartitions().contains (nPartition) &&
3514- !getIndexProcessingPartitions().contains (nPartition);
3527+ return getIndexPendingPartitions().containsKey (nPartition) &&
3528+ !getIndexProcessingPartitions().containsKey (nPartition);
35153529 }
35163530
35173531 /**
@@ -4681,8 +4695,8 @@ protected void onFinalizeStartup()
46814695 // and prior to accepting clients
46824696 if (isOwnershipEnabled())
46834697 {
4684- setIndexPendingPartitions(new PartitionSet(getPartitionCount() ));
4685- setIndexProcessingPartitions(new PartitionSet(getPartitionCount() ));
4698+ setIndexPendingPartitions(new ConcurrentHashMap( ));
4699+ setIndexProcessingPartitions(new ConcurrentHashMap( ));
46864700 }
46874701
46884702 super.onFinalizeStartup();
@@ -6528,9 +6542,9 @@ public void onMapEvent(PartitionedCache.MapEvent msgEvent)
65286542 _trace("Ignoring duplicate event", 2);
65296543 return;
65306544 }
6531-
6545+
65326546 removeSUIDRange(laProcessed, getBaseSUID(nSender), lOldestPending, false);
6533-
6547+
65346548 laProcessed.set(lEventSUID, null);
65356549 }
65366550 else
@@ -6560,7 +6574,7 @@ public void onMapEvent(PartitionedCache.MapEvent msgEvent)
65606574 if (lOldestPending > lOldestKnown && lOldestPending < lEventSUID && !laProcessed.exists(lOldestPending))
65616575 {
65626576 // Non-null value acts as a marker that the oldest pending event still needs to be processed.
6563- laProcessed.set(lOldestPending, Boolean.TRUE);
6577+ laProcessed.set(lOldestPending, Boolean.TRUE);
65646578 }
65656579
65666580 BinaryMap mapBinary = (BinaryMap) getBinaryMapArray().get(msgEvent.getCacheId());
@@ -6569,7 +6583,7 @@ public void onMapEvent(PartitionedCache.MapEvent msgEvent)
65696583 mapBinary.dispatch(msgEvent);
65706584 }
65716585 }
6572-
6586+
65736587 // Declared at the super level
65746588 /**
65756589 * Event notification to perform a regular daemon activity. To get it
@@ -7053,7 +7067,7 @@ public void onPutRequest(PartitionedCache.PutRequest msgRequest)
70537067 // import com.tangosol.net.security.StorageAccessAuthorizer as com.tangosol.net.security.StorageAccessAuthorizer;
70547068 // import com.tangosol.util.Binary;
70557069 // import java.util.Collection;
7056-
7070+
70577071 PartitionedCache.Response msgResponse = (PartitionedCache.Response) instantiateMessage("Response");
70587072 msgResponse.respondTo(msgRequest);
70597073
@@ -11148,28 +11162,31 @@ public void setIndexingStartTime(long lTime)
1114811162 // Accessor for the property "IndexPendingPartitions"
1114911163 /**
1115011164 * Setter for property IndexPendingPartitions.<p>
11151- * This PartitionSet that contains partitions that have a pending index
11152- * rebuild due to partition redistribution or initial index creation.
11153- *
11154- * @volatile
11165+ * This Map that contains a partitions/AtomicInteger counter mapping for
11166+ * partitions that have a pending index rebuild due to partition
11167+ * redistribution or initial index creation.
11168+ *
11169+ * @volatile
1115511170 */
11156- protected void setIndexPendingPartitions(com.tangosol.net.partition.PartitionSet parts )
11171+ protected void setIndexPendingPartitions(Map partsPending )
1115711172 {
11158- __m_IndexPendingPartitions = parts ;
11173+ __m_IndexPendingPartitions = partsPending ;
1115911174 }
1116011175
1116111176 // Accessor for the property "IndexProcessingPartitions"
1116211177 /**
1116311178 * Setter for property IndexProcessingPartitions.<p>
11164- * This PartitionSet that contains partitions that index rebuild are being
11165- * processed, used in condition check for index rebuild optimization, see
11166- * scheduleInitialIndexUpdate.
11167- *
11168- * @volatile
11179+ * This map contains partitions/counter mappings that have an index rebuild
11180+ * being processed, used in condition check for index rebuild optimization.
11181+ * The counter is used to track when multiple extractors are added to the
11182+ * same cache. This avoids potentially removing a partition from status
11183+ * pending on deferred requests.
11184+ *
11185+ * @volatile
1116911186 */
11170- protected void setIndexProcessingPartitions(com.tangosol.net.partition.PartitionSet parts )
11187+ protected void setIndexProcessingPartitions(Map partsProcessing )
1117111188 {
11172- __m_IndexProcessingPartitions = parts ;
11189+ __m_IndexProcessingPartitions = partsProcessing ;
1117311190 }
1117411191
1117511192 // Accessor for the property "IndexUpdateCount"
@@ -11913,31 +11930,12 @@ public void updateIndexBuildTime()
1191311930 }
1191411931
1191511932 /**
11916- * Update IndexPendingPartitions that have a pending index rebuild. Called
11917- * on both service and worker threads.
11933+ * Update IndexPendingPartitions that have a pending index rebuild. Called
11934+ * on both service and worker threads.
1191811935 */
1191911936 public synchronized void updatePendingIndexPartition(int nPartition, boolean fAdd)
1192011937 {
11921- // import com.tangosol.net.partition.PartitionSet;
11922-
11923- PartitionSet parts = getIndexPendingPartitions();
11924-
11925- if (fAdd ^ parts.contains(nPartition))
11926- {
11927- // use copy-on-write to protect concurrent readers
11928- parts = new PartitionSet(parts);
11929-
11930- if (fAdd)
11931- {
11932- parts.add(nPartition);
11933- }
11934- else
11935- {
11936- parts.remove(nPartition);
11937- }
11938-
11939- setIndexPendingPartitions(parts);
11940- }
11938+ updateIndexPartition(getIndexPendingPartitions(), nPartition, fAdd);
1194111939 }
1194211940
1194311941 /**
@@ -11946,28 +11944,42 @@ public synchronized void updatePendingIndexPartition(int nPartition, boolean fAd
1194611944 */
1194711945 public synchronized void updateProcessingIndexPartition(int nPartition, boolean fAdd)
1194811946 {
11949- // import com.tangosol.net.partition.PartitionSet;
11950-
11951- PartitionSet parts = getIndexProcessingPartitions();
11952-
11953- if (fAdd ^ parts.contains(nPartition))
11947+ updateIndexPartition(getIndexProcessingPartitions(), nPartition, fAdd);
11948+ }
11949+
11950+ /**
11951+ * Update map of partitions/counters.
11952+ */
11953+ protected synchronized void updateIndexPartition(Map<Integer, AtomicInteger> partsCounter, int nPartition, boolean fAdd)
11954+ {
11955+ if (fAdd)
1195411956 {
11955- // use copy-on-write to protect concurrent readers
11956- parts = new PartitionSet(parts);
11957-
11958- if (fAdd)
11957+ partsCounter.compute(nPartition, (k, v) ->
1195911958 {
11960- parts.add(nPartition);
11961- }
11962- else
11959+ if (v == null)
11960+ {
11961+ return new AtomicInteger(1);
11962+ }
11963+ else
11964+ {
11965+ v.incrementAndGet();
11966+ return v;
11967+ }
11968+ });
11969+ }
11970+ else
11971+ {
11972+ partsCounter.computeIfPresent(nPartition, (k, v) ->
1196311973 {
11964- parts.remove(nPartition);
11965- }
11966-
11967- setIndexProcessingPartitions(parts);
11974+ if (v.decrementAndGet() <= 1)
11975+ {
11976+ return null;
11977+ }
11978+ return v;
11979+ });
1196811980 }
1196911981 }
11970-
11982+
1197111983 // Declared at the super level
1197211984 /**
1197311985 * Validate the assignment array against the specified member-set.
@@ -43650,7 +43662,7 @@ public void onResponse(com.tangosol.coherence.component.net.Message msg)
4365043662 {
4365143663 int nPartition = msgRetry.getPartition();
4365243664
43653- if (service.getIndexPendingPartitions().contains (nPartition))
43665+ if (service.getIndexPendingPartitions().containsKey (nPartition))
4365443666 {
4365543667 service.scheduleIndexUpdate(nPartition, msgRetry.getEventId(), msgRetry.getUpdateMap());
4365643668 }
0 commit comments