Skip to content

Commit 7688ece

Browse files
committed
Bug 37155645 - [37155643->25.03] SimilaritySearch aggregator does not support async execution
[git-p4: depot-paths = "//dev/coherence-ce/main/": change = 111877]
1 parent c93822a commit 7688ece

File tree

4 files changed

+253
-17
lines changed

4 files changed

+253
-17
lines changed

prj/coherence-core-components/src/main/java/com/tangosol/coherence/component/util/daemon/queueProcessor/service/grid/partitionedService/PartitionedCache.java

Lines changed: 1 addition & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -43,12 +43,9 @@
4343
import com.oracle.coherence.persistence.PersistenceException;
4444
import com.oracle.coherence.persistence.PersistenceManager;
4545
import com.oracle.coherence.persistence.PersistentStore;
46-
import com.tangosol.application.ContainerHelper;
4746
import com.tangosol.coherence.config.Config;
4847
import com.tangosol.coherence.config.ResolvableParameterList;
49-
import com.tangosol.coherence.config.unit.Millis;
5048
import com.tangosol.config.expression.Parameter;
51-
import com.tangosol.internal.net.NamedCacheDeactivationListener;
5249
import com.tangosol.internal.net.service.grid.DefaultPartitionedCacheDependencies;
5350
import com.tangosol.internal.net.service.grid.PartitionedCacheDependencies;
5451
import com.tangosol.internal.tracing.Span;
@@ -63,13 +60,11 @@
6360
import com.tangosol.io.Serializer;
6461
import com.tangosol.io.SizeEstimatingBufferOutput;
6562
import com.tangosol.io.WriteBuffer;
66-
import com.tangosol.license.LicenseException;
6763
import com.tangosol.net.ActionPolicy;
6864
import com.tangosol.net.BackingMapManager;
6965
import com.tangosol.net.CacheService;
7066
import com.tangosol.net.GuardSupport;
7167
import com.tangosol.net.Member;
72-
import com.tangosol.net.NamedCache;
7368
import com.tangosol.net.PriorityTask;
7469
import com.tangosol.net.RequestIncompleteException;
7570
import com.tangosol.net.RequestPolicyException;
@@ -116,14 +111,12 @@
116111
import com.tangosol.util.FilterEnumerator;
117112
import com.tangosol.util.HashHelper;
118113
import com.tangosol.util.ImmutableArrayList;
119-
import com.tangosol.util.InvocableMapHelper;
120114
import com.tangosol.util.KeyValueArrayMap;
121115
import com.tangosol.util.Listeners;
122116
import com.tangosol.util.LiteMap;
123117
import com.tangosol.util.LiteSet;
124118
import com.tangosol.util.LongArray;
125119
import com.tangosol.util.MapTrigger;
126-
import com.tangosol.util.MapTriggerListener;
127120
import com.tangosol.util.NullImplementation;
128121
import com.tangosol.util.PrimitiveSparseArray;
129122
import com.tangosol.util.ResourceRegistry;
@@ -136,13 +129,9 @@
136129
import com.tangosol.util.ValueExtractor;
137130
import com.tangosol.util.aggregator.AbstractAsynchronousAggregator;
138131
import com.tangosol.util.aggregator.QueryRecorder;
139-
import com.tangosol.util.comparator.EntryComparator;
140132
import com.tangosol.util.comparator.SafeComparator;
141133
import com.tangosol.util.filter.AlwaysFilter;
142-
import com.tangosol.util.filter.InKeySetFilter;
143-
import com.tangosol.util.filter.KeyAssociatedFilter;
144134
import com.tangosol.util.filter.LimitFilter;
145-
import com.tangosol.util.filter.PartitionedFilter;
146135
import com.tangosol.util.processor.AbstractAsynchronousProcessor;
147136
import java.io.IOException;
148137
import java.security.AccessController;
@@ -36469,6 +36458,7 @@ public com.tangosol.coherence.component.net.requestContext.AsyncContext createCo
3646936458
ctx.setAsyncAggregator((AbstractAsynchronousAggregator) asyncAgent);
3647036459
context = ctx;
3647136460
}
36461+
asyncAgent.setValueConverter(convValue);
3647236462
context.setValueConverter(convValue);
3647336463

3647436464
if (mapBinary.isAuthorizationEnabled())

prj/coherence-core/src/main/java/com/tangosol/util/AsynchronousAgent.java

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2000, 2023, Oracle and/or its affiliates.
2+
* Copyright (c) 2000, 2024, Oracle and/or its affiliates.
33
*
44
* Licensed under the Universal Permissive License v 1.0 as shown at
55
* https://oss.oracle.com/licenses/upl.
@@ -378,6 +378,20 @@ public synchronized CompletableFuture<T> getCompletableFuture()
378378
return future;
379379
}
380380

381+
/**
382+
* Set the converter from internal format.
383+
*
384+
* @implNote This method is only called by RequestCoordinator.createContext,
385+
* so it is very much internal, but it has to be public in order
386+
* to be accessible.
387+
*
388+
* @param converter the converter from internal format
389+
*/
390+
public void setValueConverter(Converter<Binary, T> converter)
391+
{
392+
m_converter = converter;
393+
}
394+
381395
// ----- data fields ----------------------------------------------------
382396

383397
/**
@@ -415,4 +429,9 @@ public synchronized CompletableFuture<T> getCompletableFuture()
415429
* The {@link Executor} to complete the future on.
416430
*/
417431
private final Executor f_executor;
432+
433+
/**
434+
* Converter from internal format.
435+
*/
436+
protected Converter<Binary, T> m_converter;
418437
}

prj/coherence-core/src/main/java/com/tangosol/util/aggregator/AsynchronousAggregator.java

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,11 @@
11
/*
2-
* Copyright (c) 2000, 2023, Oracle and/or its affiliates.
2+
* Copyright (c) 2000, 2024, Oracle and/or its affiliates.
33
*
44
* Licensed under the Universal Permissive License v 1.0 as shown at
55
* https://oss.oracle.com/licenses/upl.
66
*/
77
package com.tangosol.util.aggregator;
88

9-
109
import com.tangosol.internal.util.Daemons;
1110
import com.tangosol.net.NamedCache;
1211

@@ -19,7 +18,6 @@
1918
import java.util.concurrent.Executor;
2019
import java.util.concurrent.Future;
2120

22-
2321
/**
2422
* A marker {@link EntryAggregator EntryAggregator} wrapper class that allows for
2523
* an asynchronous invocation of the underlying aggregator. When used as a
@@ -145,15 +143,15 @@ public void onComplete()
145143
Throwable eReason = m_eReason;
146144
if (eReason == null)
147145
{
148-
complete(m_aggregator::finalizeResult);
146+
complete(() -> m_aggregator.finalizeResult(m_converter));
149147
}
150148
else
151149
{
152150
completeExceptionally(eReason);
153151
}
154152
}
155153

156-
// ----- data fields -----------------------------------------------------
154+
// ----- data fields ----------------------------------------------------
157155

158156
/**
159157
* Reason for the failed operation.
Lines changed: 229 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,229 @@
1+
/*
2+
* Copyright (c) 2000, 2024, Oracle and/or its affiliates.
3+
*
4+
* Licensed under the Universal Permissive License v 1.0 as shown at
5+
* https://oss.oracle.com/licenses/upl.
6+
*/
7+
8+
package ai_tests.search;
9+
10+
import com.oracle.coherence.ai.Float32Vector;
11+
import com.oracle.coherence.ai.Vector;
12+
import com.oracle.coherence.ai.search.SimilaritySearch;
13+
import com.oracle.coherence.ai.util.Vectors;
14+
15+
import com.tangosol.io.ExternalizableLite;
16+
import com.tangosol.io.pof.PofReader;
17+
import com.tangosol.io.pof.PofWriter;
18+
import com.tangosol.io.pof.PortableObject;
19+
20+
import com.tangosol.net.Coherence;
21+
import com.tangosol.net.NamedMap;
22+
import com.tangosol.net.Session;
23+
24+
import com.tangosol.util.ExternalizableHelper;
25+
import com.tangosol.util.ValueExtractor;
26+
27+
import java.io.DataInput;
28+
import java.io.DataOutput;
29+
import java.io.IOException;
30+
31+
import java.util.Arrays;
32+
import java.util.Random;
33+
import java.util.concurrent.TimeUnit;
34+
35+
import org.junit.jupiter.api.AfterAll;
36+
import org.junit.jupiter.api.BeforeAll;
37+
import org.junit.jupiter.api.Test;
38+
39+
import static org.hamcrest.CoreMatchers.is;
40+
import static org.hamcrest.MatcherAssert.assertThat;
41+
42+
public class SimilaritySearchIT
43+
{
44+
@BeforeAll
45+
@SuppressWarnings("resource")
46+
static void setup() throws Exception
47+
{
48+
String sAddress = "127.0.0.1";
49+
System.setProperty("coherence.wka", sAddress);
50+
System.setProperty("coherence.localhost", sAddress);
51+
System.setProperty("test.unicast.address", sAddress);
52+
System.setProperty("test.unicast.port", "0");
53+
System.setProperty("coherence.ttl", "0");
54+
55+
System.setProperty("coherence.distributed.partitioncount", "13");
56+
57+
Coherence coherence = Coherence.clusterMember().start().get(5, TimeUnit.MINUTES);
58+
m_session = coherence.getSession();
59+
60+
NamedMap<Integer, ValueWithVector> vectors = m_session.getMap("vectors");
61+
m_valueZero = populateVectors(vectors);
62+
}
63+
64+
@AfterAll
65+
static void cleanup()
66+
{
67+
Coherence.closeAll();
68+
}
69+
70+
@Test
71+
public void shouldSearch()
72+
{
73+
ValueExtractor<ValueWithVector, Vector<float[]>> extractor = ValueExtractor.of(ValueWithVector::getVector);
74+
75+
NamedMap<Integer, ValueWithVector> vectors = m_session.getMap("vectors");
76+
77+
Vector<float[]> vector = m_valueZero.getVector();
78+
79+
var results = vectors.aggregate(new SimilaritySearch<>(extractor, vector, 10));
80+
assertThat(results.size(), is(10));
81+
}
82+
83+
@Test
84+
public void shouldSearchAsync()
85+
{
86+
ValueExtractor<ValueWithVector, Vector<float[]>> extractor = ValueExtractor.of(ValueWithVector::getVector);
87+
88+
NamedMap<Integer, ValueWithVector> vectors = m_session.getMap("vectors");
89+
90+
Vector<float[]> vector = m_valueZero.getVector();
91+
92+
var results = vectors.async().aggregate(new SimilaritySearch<>(extractor, vector, 10));
93+
assertThat(results.join().size(), is(10));
94+
}
95+
96+
public static ValueWithVector populateVectors(NamedMap<Integer, ValueWithVector> vectors)
97+
{
98+
float[][] matches = new float[5][];
99+
matches[0] = randomFloats(DIMENSIONS);
100+
matches[1] = Arrays.copyOf(matches[0], matches[0].length);
101+
matches[2] = Arrays.copyOf(matches[0], matches[0].length);
102+
matches[3] = Arrays.copyOf(matches[0], matches[0].length);
103+
matches[4] = Arrays.copyOf(matches[0], matches[0].length);
104+
matches[1][0] = matches[1][0] + 1.0f;
105+
matches[2][0] = matches[2][0] + 1.0f;
106+
matches[3][0] = matches[3][0] + 1.0f;
107+
matches[4][0] = matches[4][0] + 1.0f;
108+
109+
ValueWithVector[] values = new ValueWithVector[10000];
110+
111+
for (int i = 0; i < matches.length; i++)
112+
{
113+
values[i] = new ValueWithVector(new Float32Vector(Vectors.normalize(matches[i])), String.valueOf(i), i);
114+
vectors.put(i, values[i]);
115+
}
116+
for (int i = matches.length; i < values.length; i++)
117+
{
118+
values[i] = new ValueWithVector(new Float32Vector(Vectors.normalize(randomFloats(DIMENSIONS))), String.valueOf(i), i);
119+
vectors.put(i, values[i]);
120+
}
121+
122+
return values[0];
123+
}
124+
125+
public static float[] randomFloats(int n)
126+
{
127+
float[] floats = new float[n];
128+
for (int i = 0; i < n; i++)
129+
{
130+
floats[i] = m_random.nextFloat(-50, 50);
131+
}
132+
return floats;
133+
}
134+
135+
// ----- inner class: ValueWithVector -----------------------------------
136+
137+
/**
138+
* A simple test holder for a vector and a text value.
139+
*/
140+
public static class ValueWithVector
141+
implements ExternalizableLite, PortableObject
142+
{
143+
public ValueWithVector()
144+
{
145+
}
146+
147+
public ValueWithVector(Vector<float[]> vector, String text, int n)
148+
{
149+
this.vector = vector;
150+
this.text = text;
151+
this.number = n;
152+
}
153+
154+
public Vector<float[]> getVector()
155+
{
156+
return vector;
157+
}
158+
159+
public String getText()
160+
{
161+
return text;
162+
}
163+
164+
public int getNumber()
165+
{
166+
return number;
167+
}
168+
169+
@Override
170+
public void readExternal(PofReader in) throws IOException
171+
{
172+
vector = in.readObject(0);
173+
text = in.readString(1);
174+
number = in.readInt(2);
175+
}
176+
177+
@Override
178+
public void writeExternal(PofWriter out) throws IOException
179+
{
180+
out.writeObject(0, vector);
181+
out.writeString(1, text);
182+
out.writeInt(2, number);
183+
}
184+
185+
@Override
186+
public void readExternal(DataInput in) throws IOException
187+
{
188+
vector = ExternalizableHelper.readObject(in);
189+
text = ExternalizableHelper.readSafeUTF(in);
190+
number = ExternalizableHelper.readInt(in);
191+
}
192+
193+
@Override
194+
public void writeExternal(DataOutput out) throws IOException
195+
{
196+
ExternalizableHelper.writeObject(out, vector);
197+
ExternalizableHelper.writeSafeUTF(out, text);
198+
ExternalizableHelper.writeInt(out, number);
199+
}
200+
201+
@Override
202+
public String toString()
203+
{
204+
return "ValueWithVector{" +
205+
"vector=" + vector +
206+
", text='" + text + '\'' +
207+
", number=" + number +
208+
'}';
209+
}
210+
211+
// ----- data members ---------------------------------------------------
212+
213+
private Vector<float[]> vector;
214+
215+
private String text;
216+
217+
private int number;
218+
}
219+
220+
// ----- data members ---------------------------------------------------
221+
222+
public static final int DIMENSIONS = 384;
223+
224+
private static Session m_session;
225+
226+
private static ValueWithVector m_valueZero;
227+
228+
private static final Random m_random = new Random(System.currentTimeMillis());
229+
}

0 commit comments

Comments
 (0)