|
9 | 9 | */ |
10 | 10 | package io.pravega.schemaregistry.serializer.avro.impl; |
11 | 11 |
|
12 | | -import com.google.common.base.Charsets; |
| 12 | +import com.google.common.annotations.VisibleForTesting; |
13 | 13 | import com.google.common.base.Preconditions; |
| 14 | +import com.google.common.collect.ImmutableMap; |
14 | 15 | import io.pravega.schemaregistry.serializer.avro.schemas.AvroSchema; |
15 | 16 | import io.pravega.schemaregistry.client.SchemaRegistryClient; |
16 | 17 | import io.pravega.schemaregistry.contract.data.SchemaInfo; |
|
19 | 20 | import io.pravega.schemaregistry.serializer.shared.impl.SerializerConfig; |
20 | 21 | import org.apache.avro.Schema; |
21 | 22 | import org.apache.avro.io.BinaryDecoder; |
| 23 | +import org.apache.avro.io.DatumReader; |
22 | 24 | import org.apache.avro.io.DecoderFactory; |
23 | 25 | import org.apache.avro.reflect.ReflectDatumReader; |
24 | 26 | import org.apache.avro.specific.SpecificDatumReader; |
25 | 27 | import org.apache.avro.specific.SpecificRecordBase; |
26 | 28 |
|
27 | 29 | import java.io.IOException; |
28 | 30 | import java.io.InputStream; |
| 31 | +import java.nio.ByteBuffer; |
29 | 32 | import java.util.concurrent.ConcurrentHashMap; |
30 | 33 |
|
31 | 34 | class AvroDeserializer<T> extends AbstractDeserializer<T> { |
32 | | - private final AvroSchema<T> avroSchema; |
33 | | - private final ConcurrentHashMap<SchemaInfo, Schema> knownSchemas; |
| 35 | + private final ConcurrentHashMap<ByteBuffer, DatumReader<T>> knownSchemaReaders; |
| 36 | + private final boolean specific; |
| 37 | + private final Schema readerSchema; |
34 | 38 |
|
35 | 39 | AvroDeserializer(String groupId, SchemaRegistryClient client, |
36 | 40 | AvroSchema<T> schema, |
37 | 41 | SerializerConfig.Decoders decoder, EncodingCache encodingCache) { |
38 | 42 | super(groupId, client, schema, false, decoder, encodingCache, true); |
39 | 43 | Preconditions.checkNotNull(schema); |
40 | | - this.avroSchema = schema; |
41 | | - this.knownSchemas = new ConcurrentHashMap<>(); |
| 44 | + this.knownSchemaReaders = new ConcurrentHashMap<>(); |
| 45 | + specific = SpecificRecordBase.class.isAssignableFrom(schema.getTClass()); |
| 46 | + readerSchema = schema.getSchema(); |
| 47 | + ByteBuffer schemaData = schema.getSchemaInfo().getSchemaData(); |
| 48 | + knownSchemaReaders.put(schemaData, createDatumReader(readerSchema, readerSchema, specific)); |
42 | 49 | } |
43 | 50 |
|
44 | 51 | @Override |
45 | 52 | public final T deserialize(InputStream inputStream, SchemaInfo writerSchemaInfo, SchemaInfo readerSchemaInfo) throws IOException { |
46 | 53 | Preconditions.checkNotNull(writerSchemaInfo); |
47 | | - Schema writerSchema; |
48 | | - if (knownSchemas.containsKey(writerSchemaInfo)) { |
49 | | - writerSchema = knownSchemas.get(writerSchemaInfo); |
50 | | - } else { |
51 | | - String schemaString = new String(writerSchemaInfo.getSchemaData().array(), Charsets.UTF_8); |
52 | | - writerSchema = new Schema.Parser().parse(schemaString); |
53 | | - knownSchemas.put(writerSchemaInfo, writerSchema); |
54 | | - } |
55 | | - Schema readerSchema = avroSchema.getSchema(); |
| 54 | + final ByteBuffer writerSchemaData = writerSchemaInfo.getSchemaData(); |
| 55 | + DatumReader<T> datumReader = knownSchemaReaders.computeIfAbsent(writerSchemaData, key -> { |
| 56 | + Schema writerSchema = AvroSchema.from(writerSchemaInfo).getSchema(); |
| 57 | + return createDatumReader(writerSchema, this.readerSchema, specific); |
| 58 | + }); |
56 | 59 | BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(inputStream, null); |
57 | | - |
58 | | - if (SpecificRecordBase.class.isAssignableFrom(avroSchema.getTClass())) { |
59 | | - SpecificDatumReader<T> datumReader = new SpecificDatumReader<>(writerSchema, readerSchema); |
60 | | - return datumReader.read(null, decoder); |
| 60 | + return datumReader.read(null, decoder); |
| 61 | + } |
| 62 | + |
| 63 | + @VisibleForTesting |
| 64 | + DatumReader<T> createDatumReader(Schema writerSchema, Schema readerSchema, boolean specific) { |
| 65 | + DatumReader<T> datumReader; |
| 66 | + if (specific) { |
| 67 | + datumReader = new SpecificDatumReader<>(writerSchema, readerSchema); |
61 | 68 | } else { |
62 | | - ReflectDatumReader<T> datumReader = new ReflectDatumReader<>(writerSchema, readerSchema); |
63 | | - return datumReader.read(null, decoder); |
| 69 | + datumReader = new ReflectDatumReader<>(writerSchema, readerSchema); |
64 | 70 | } |
| 71 | + return datumReader; |
| 72 | + } |
| 73 | + |
| 74 | + @VisibleForTesting |
| 75 | + ImmutableMap<ByteBuffer, DatumReader<T>> getKnownSchemaReaders() { |
| 76 | + return ImmutableMap.copyOf(knownSchemaReaders); |
65 | 77 | } |
66 | 78 | } |
0 commit comments