Skip to content

Commit 794cfda

Browse files
authored
Merge pull request #1691 from mbenson/concurrent-reindex
permit concurrent reindexing for indexes sharing a scan path
2 parents d38af4b + 184fd96 commit 794cfda

File tree

3 files changed

+198
-10
lines changed

3 files changed

+198
-10
lines changed

modeshape-jcr/src/main/java/org/modeshape/jcr/RepositoryIndexManager.java

+14-10
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import java.util.concurrent.CopyOnWriteArraySet;
3333
import java.util.concurrent.atomic.AtomicBoolean;
3434
import java.util.concurrent.atomic.AtomicReference;
35+
import java.util.function.Consumer;
3536
import java.util.stream.Collectors;
3637
import javax.jcr.RepositoryException;
3738
import org.modeshape.common.annotation.Immutable;
@@ -943,16 +944,19 @@ public void onEachPathInWorkspace( ScanOperation operation ) {
943944
for (Map.Entry<String, PathToScan> entry : pathsToScanByWorkspace.entries()) {
944945
String workspaceName = entry.getKey();
945946
PathToScan pathToScan = entry.getValue();
946-
for (IndexingCallback callback : pathToScan) {
947-
callback.beforeIndexing();
948-
try {
949-
operation.scan(workspaceName, pathToScan.path(), callback.writer());
950-
} catch (Exception e) {
951-
Logger.getLogger(getClass()).error(e, JcrI18n.errorIndexing, pathToScan.path(), workspaceName,
952-
e.getMessage());
953-
} finally {
954-
callback.afterIndexing();
955-
}
947+
948+
Consumer<Exception> exceptionHandler = e -> Logger.getLogger(getClass()).error(e,
949+
JcrI18n.errorIndexing,
950+
pathToScan.path(),
951+
workspaceName,
952+
e.getMessage());
953+
954+
IndexingCallback callback = IndexingCallback.compose(pathToScan, exceptionHandler);
955+
callback.beforeIndexing();
956+
try {
957+
operation.scan(workspaceName, pathToScan.path(), callback.writer());
958+
} finally {
959+
callback.afterIndexing();
956960
}
957961
}
958962
}

modeshape-jcr/src/main/java/org/modeshape/jcr/spi/index/IndexFeedback.java

+65
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,11 @@
1616

1717
package org.modeshape.jcr.spi.index;
1818

19+
import java.util.List;
20+
import java.util.Objects;
21+
import java.util.function.Consumer;
22+
import java.util.stream.Collectors;
23+
import java.util.stream.StreamSupport;
1924
import org.modeshape.jcr.spi.index.provider.IndexProvider;
2025
import org.modeshape.jcr.value.Path;
2126

@@ -29,6 +34,66 @@
2934
public interface IndexFeedback {
3035

3136
public static interface IndexingCallback {
37+
public static IndexingCallback noop() {
38+
return new IndexingCallback() {
39+
40+
@Override
41+
public IndexWriter writer() {
42+
return IndexWriter.noop();
43+
}
44+
45+
@Override
46+
public void beforeIndexing() {
47+
}
48+
49+
@Override
50+
public void afterIndexing() {
51+
}
52+
};
53+
}
54+
55+
public static IndexingCallback compose(Iterable<IndexingCallback> delegates, Consumer<Exception> handler) {
56+
Objects.requireNonNull(delegates, "delegates");
57+
Objects.requireNonNull(handler, "handler");
58+
59+
List<IndexingCallback> useDelegates = delegates instanceof List<?> ? (List<IndexingCallback>)delegates : StreamSupport.stream(delegates.spliterator(),
60+
false).collect(Collectors.toList());
61+
62+
if (useDelegates.isEmpty()) {
63+
return noop();
64+
}
65+
66+
return new IndexingCallback() {
67+
68+
@Override
69+
public IndexWriter writer() {
70+
return IndexWriter.compose(() -> useDelegates.stream().map(IndexingCallback::writer).iterator(), handler);
71+
}
72+
73+
@Override
74+
public void beforeIndexing() {
75+
for (IndexingCallback indexingCallback : useDelegates) {
76+
try {
77+
indexingCallback.beforeIndexing();
78+
} catch (Exception e) {
79+
handler.accept(e);
80+
}
81+
}
82+
}
83+
84+
@Override
85+
public void afterIndexing() {
86+
for (IndexingCallback indexingCallback : useDelegates) {
87+
try {
88+
indexingCallback.afterIndexing();
89+
} catch (Exception e) {
90+
handler.accept(e);
91+
}
92+
}
93+
}
94+
};
95+
}
96+
3297
void beforeIndexing();
3398

3499
void afterIndexing();

modeshape-jcr/src/main/java/org/modeshape/jcr/spi/index/IndexWriter.java

+119
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,12 @@
1515
*/
1616
package org.modeshape.jcr.spi.index;
1717

18+
import java.util.List;
19+
import java.util.Objects;
1820
import java.util.Set;
21+
import java.util.function.Consumer;
22+
import java.util.stream.Collectors;
23+
import java.util.stream.StreamSupport;
1924
import org.modeshape.jcr.cache.CachedNode.Properties;
2025
import org.modeshape.jcr.cache.NodeKey;
2126
import org.modeshape.jcr.spi.index.provider.IndexProvider;
@@ -29,6 +34,120 @@
2934
* @author Randall Hauch ([email protected])
3035
*/
3136
public interface IndexWriter {
37+
public static IndexWriter noop() {
38+
return new IndexWriter() {
39+
40+
@Override
41+
public boolean canBeSkipped() {
42+
return true;
43+
}
44+
45+
@Override
46+
public void clearAllIndexes() {
47+
}
48+
49+
@Override
50+
public boolean add( String workspace,
51+
NodeKey key,
52+
Path path,
53+
Name primaryType,
54+
Set<Name> mixinTypes,
55+
Properties properties ) {
56+
return false;
57+
}
58+
59+
@Override
60+
public boolean remove( String workspace,
61+
NodeKey key ) {
62+
return false;
63+
}
64+
65+
@Override
66+
public void commit( String workspace ) {
67+
}
68+
};
69+
}
70+
71+
public static IndexWriter compose(Iterable<IndexWriter> delegates, Consumer<Exception> handler) {
72+
Objects.requireNonNull(delegates, "delegates");
73+
Objects.requireNonNull(handler, "handler");
74+
75+
List<IndexWriter> useDelegates = StreamSupport.stream(delegates.spliterator(),
76+
false).filter(t -> !t.canBeSkipped()).collect(Collectors.toList());
77+
78+
if (useDelegates.isEmpty()) {
79+
return noop();
80+
}
81+
82+
return new IndexWriter() {
83+
84+
@Override
85+
public boolean remove( String workspace,
86+
NodeKey key ) {
87+
boolean result = false;
88+
for (IndexWriter indexWriter : delegates) {
89+
try {
90+
result = indexWriter.remove(workspace, key) || result;
91+
} catch (Exception e) {
92+
handler.accept(e);
93+
}
94+
}
95+
return result;
96+
}
97+
98+
@Override
99+
public void commit( String workspace ) {
100+
delegates.forEach(d -> {
101+
try {
102+
d.commit(workspace);
103+
} catch (Exception e) {
104+
handler.accept(e);
105+
}
106+
});
107+
}
108+
109+
@Override
110+
public void clearAllIndexes() {
111+
delegates.forEach(t -> {
112+
try {
113+
t.clearAllIndexes();
114+
} catch (Exception e) {
115+
handler.accept(e);
116+
}
117+
});
118+
}
119+
120+
@Override
121+
public boolean canBeSkipped() {
122+
for (IndexWriter indexWriter : delegates) {
123+
try {
124+
if (!indexWriter.canBeSkipped()) return false;
125+
} catch (Exception e) {
126+
handler.accept(e);
127+
}
128+
}
129+
return true;
130+
}
131+
132+
@Override
133+
public boolean add( String workspace,
134+
NodeKey key,
135+
Path path,
136+
Name primaryType,
137+
Set<Name> mixinTypes,
138+
Properties properties ) {
139+
boolean result = false;
140+
for (IndexWriter indexWriter : delegates) {
141+
try {
142+
result = indexWriter.add(workspace, key, path, primaryType, mixinTypes, properties) || result;
143+
} catch (Exception e) {
144+
handler.accept(e);
145+
}
146+
}
147+
return result;
148+
}
149+
};
150+
}
32151

33152
/**
34153
* Flag that defines whether this index may be skipped. This is usually the case when the writer has no indexes behind it.

0 commit comments

Comments
 (0)