forked from mostafa/xk6-kafka
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathavro_test.go
101 lines (87 loc) · 4.07 KB
/
avro_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
package kafka
import (
"testing"
"github.com/stretchr/testify/assert"
)
var (
avroConfig Configuration = Configuration{
Producer: ProducerConfiguration{
ValueSerializer: AvroSerializer,
KeySerializer: AvroSerializer,
},
Consumer: ConsumerConfiguration{
ValueDeserializer: AvroDeserializer,
KeyDeserializer: AvroDeserializer,
},
}
avroSchema string = `{"type":"record","name":"Schema","namespace":"io.confluent.kafka.avro","fields":[{"name":"field","type":"string"}]}`
)
// TestSerializeDeserializeAvro tests serialization and deserialization of Avro messages
func TestSerializeDeserializeAvro(t *testing.T) {
// Test with a schema registry, which fails and manually (de)serializes the data
for _, element := range []Element{Key, Value} {
// Serialize the key or value
serialized, err := SerializeAvro(avroConfig, "topic", `{"field":"value"}`, element, avroSchema, 0)
assert.Nil(t, err)
assert.NotNil(t, serialized)
// 4 bytes for magic byte, 1 byte for schema ID, and the rest is the data
assert.GreaterOrEqual(t, len(serialized), 10)
// Deserialize the key or value (removes the magic bytes)
deserialized, err := DeserializeAvro(avroConfig, "", serialized, element, avroSchema, 0)
assert.Nil(t, err)
assert.Equal(t, map[string]interface{}{"field": "value"}, deserialized)
}
}
// TestSerializeDeserializeAvroFailsOnSchemaError tests serialization and deserialization of Avro messages and fails on schema error
func TestSerializeDeserializeAvroFailsOnSchemaError(t *testing.T) {
jsonSchema = `{}`
for _, element := range []Element{Key, Value} {
// Serialize the key or value
serialized, err := SerializeAvro(avroConfig, "topic", `{"field":"value"}`, element, jsonSchema, 0)
assert.Nil(t, serialized)
assert.Error(t, err.Unwrap())
assert.Equal(t, "Failed to create codec for encoding Avro", err.Message)
assert.Equal(t, failedCreateAvroCodec, err.Code)
// Deserialize the key or value
deserialized, err := DeserializeAvro(avroConfig, "topic", []byte{1, 2, 3, 4, 5, 6}, element, jsonSchema, 0)
assert.Nil(t, deserialized)
assert.Error(t, err.Unwrap())
assert.Equal(t, "Failed to create codec for decoding Avro", err.Message)
assert.Equal(t, failedCreateAvroCodec, err.Code)
}
}
// TestSerializeDeserializeAvroFailsOnWireFormatError tests serialization and deserialization of Avro messages and fails on wire format error
func TestSerializeDeserializeAvroFailsOnWireFormatError(t *testing.T) {
schema := `{}`
for _, element := range []Element{Key, Value} {
// Deserialize an empty key or value
deserialized, err := DeserializeAvro(avroConfig, "topic", []byte{}, element, schema, 0)
assert.Nil(t, deserialized)
assert.Error(t, err.Unwrap())
assert.Equal(t, "Failed to remove wire format from the binary data", err.Message)
assert.Equal(t, failedDecodeFromWireFormat, err.Code)
// Deserialize a broken key or value
// Proper wire-formatted message has 5 bytes (the wire format) plus data
deserialized, err = DeserializeAvro(avroConfig, "topic", []byte{1, 2, 3, 4}, element, schema, 0)
assert.Nil(t, deserialized)
assert.Error(t, err.Unwrap())
assert.Equal(t, "Failed to remove wire format from the binary data", err.Message)
assert.Equal(t, failedDecodeFromWireFormat, err.Code)
}
}
// TestSerializeDeserializeAvroFailsOnEncodeDecodeError tests serialization and deserialization of Avro messages and fails on encode/decode error
func TestSerializeDeserializeAvroFailsOnEncodeDecodeError(t *testing.T) {
data := `{"nonExistingField":"value"}`
for _, element := range []Element{Key, Value} {
serialized, err := SerializeAvro(avroConfig, "topic", data, element, avroSchema, 0)
assert.Nil(t, serialized)
assert.Error(t, err.Unwrap())
assert.Equal(t, "Failed to encode data into Avro", err.Message)
assert.Equal(t, failedEncodeToAvro, err.Code)
deserialized, err := DeserializeAvro(avroConfig, "topic", []byte{1, 2, 3, 4, 5, 6}, element, avroSchema, 0)
assert.Nil(t, deserialized)
assert.Error(t, err.Unwrap())
assert.Equal(t, "Failed to decode data from Avro", err.Message)
assert.Equal(t, failedDecodeAvroFromBinary, err.Code)
}
}