Skip to content

Commit 68da758

Browse files
committed
[FLINK-34996][Connectors/Kafka] Test proper class loader is used and also correct serializer.
1 parent 2629f51 commit 68da758

File tree

5 files changed

+120
-1
lines changed

5 files changed

+120
-1
lines changed

flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSerializerWrapper.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ public void open(InitializationContext context) throws Exception {
7171
InstantiationUtil.instantiate(
7272
serializerClass.getName(),
7373
Serializer.class,
74-
getClass().getClassLoader());
74+
userCodeClassLoader);
7575

7676
if (serializer instanceof Configurable) {
7777
((Configurable) serializer).configure(config);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
package org.apache.flink.connector.kafka.sink;
2+
3+
import org.apache.flink.streaming.connectors.kafka.testutils.SerializationTestBase;
4+
import org.apache.kafka.common.serialization.StringSerializer;
5+
import org.junit.Test;
6+
import org.junit.runner.RunWith;
7+
import org.mockito.junit.MockitoJUnitRunner;
8+
9+
import static org.mockito.Mockito.when;
10+
11+
@RunWith(MockitoJUnitRunner.class)
12+
public class KafkaSerializerWrapperTest extends SerializationTestBase {
13+
@Override
14+
protected void setupContext() {
15+
when(serializationContext.getUserCodeClassLoader()).thenReturn(userCodeClassLoader);
16+
}
17+
18+
@Test
19+
public void testUserCodeClassLoaderIsUsed() throws Exception {
20+
final KafkaSerializerWrapper<String> wrapper =
21+
new KafkaSerializerWrapper<>(StringSerializer.class, true, (value) -> "topic");
22+
23+
testUserClassLoaderIsUsedWhen(() -> {
24+
wrapper.open(serializationContext);
25+
return null;
26+
}, new StringSerializer());
27+
}
28+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
package org.apache.flink.connector.kafka.source.reader.deserializer;
2+
3+
import org.apache.flink.streaming.connectors.kafka.testutils.SerializationTestBase;
4+
import org.apache.kafka.common.serialization.StringDeserializer;
5+
import org.junit.Test;
6+
import org.junit.runner.RunWith;
7+
import org.mockito.junit.MockitoJUnitRunner;
8+
9+
import java.util.HashMap;
10+
import java.util.Map;
11+
12+
import static org.mockito.Mockito.when;
13+
14+
@RunWith(MockitoJUnitRunner.class)
15+
public class KafkaValueOnlyDeserializerWrapperTest extends SerializationTestBase {
16+
@Override
17+
protected void setupContext() {
18+
when(deserializationContext.getUserCodeClassLoader()).thenReturn(userCodeClassLoader);
19+
}
20+
21+
@Test
22+
public void testUserCodeClassLoaderIsUsed() throws Exception {
23+
final Map<String, String> config = new HashMap<>();
24+
final KafkaValueOnlyDeserializerWrapper<String> wrapper =
25+
new KafkaValueOnlyDeserializerWrapper<>(StringDeserializer.class, config);
26+
27+
testUserClassLoaderIsUsedWhen(() -> {
28+
wrapper.open(deserializationContext);
29+
return null;
30+
}, new StringDeserializer());
31+
}
32+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
package org.apache.flink.streaming.connectors.kafka.testutils;
2+
3+
4+
import org.apache.flink.api.common.serialization.DeserializationSchema;
5+
import org.apache.flink.api.common.serialization.SerializationSchema;
6+
import org.apache.flink.util.InstantiationUtil;
7+
import org.apache.flink.util.UserCodeClassLoader;
8+
import org.junit.Before;
9+
import org.mockito.*;
10+
11+
import java.util.concurrent.Callable;
12+
13+
import static org.junit.Assert.assertEquals;
14+
import static org.mockito.ArgumentMatchers.*;
15+
import static org.mockito.Mockito.when;
16+
17+
public class SerializationTestBase {
18+
@Mock
19+
protected DeserializationSchema.InitializationContext deserializationContext;
20+
@Mock
21+
protected SerializationSchema.InitializationContext serializationContext;
22+
@Mock
23+
protected UserCodeClassLoader userCodeClassLoader;
24+
@Mock
25+
protected ClassLoader classLoader;
26+
@Captor
27+
private ArgumentCaptor<ClassLoader> classLoaderCaptor;
28+
29+
@Before
30+
public void setUp() {
31+
when(userCodeClassLoader.asClassLoader()).thenReturn(classLoader);
32+
setupContext();
33+
}
34+
35+
protected void setupContext() {
36+
}
37+
38+
protected void testUserClassLoaderIsUsedWhen(Callable<Object> callable, Object instance) throws Exception {
39+
try (MockedStatic<InstantiationUtil> mocked = Mockito.mockStatic(InstantiationUtil.class)) {
40+
41+
mocked.when(() -> InstantiationUtil.instantiate(
42+
anyString(),
43+
notNull(),
44+
any(ClassLoader.class)
45+
)).thenReturn(instance);
46+
47+
callable.call();
48+
49+
mocked.verify(() -> InstantiationUtil.instantiate(
50+
anyString(),
51+
notNull(),
52+
classLoaderCaptor.capture()
53+
));
54+
55+
assertEquals(classLoader, classLoaderCaptor.getValue());
56+
}
57+
}
58+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
mock-maker-inline

0 commit comments

Comments
 (0)