forked from mostafa/xk6-kafka
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathavro.go
141 lines (119 loc) · 4.85 KB
/
avro.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
package kafka
import (
"github.com/linkedin/goavro/v2"
"github.com/riferrei/srclient"
"github.com/sirupsen/logrus"
)
const (
AvroSerializer string = "io.confluent.kafka.serializers.KafkaAvroSerializer"
AvroDeserializer string = "io.confluent.kafka.serializers.KafkaAvroDeserializer"
)
// SerializeAvro serializes the given data to wire-formatted Avro binary format and returns it
// as a byte array. It uses the given version to retrieve the schema from Schema Registry, otherwise
// it uses the given schema to manually create the codec and encode the data. The configuration
// is used to configure the Schema Registry client. The element is used to define the subject.
// The data should be a string.
func SerializeAvro(configuration Configuration, topic string, data interface{}, element Element, schema string, version int) ([]byte, *Xk6KafkaError) {
bytesData := []byte(data.(string))
client := SchemaRegistryClientWithConfiguration(configuration.SchemaRegistry)
subject := topic + "-" + string(element)
var schemaInfo *srclient.Schema
schemaID := 0
var xk6KafkaError *Xk6KafkaError
if schema != "" {
// Schema is provided, so we need to create it and get the schema ID
schemaInfo, xk6KafkaError = CreateSchema(client, subject, schema, srclient.Avro)
} else {
// Schema is not provided, so we need to fetch the schema from the Schema Registry
schemaInfo, xk6KafkaError = GetSchema(client, subject, schema, srclient.Avro, version)
}
if xk6KafkaError != nil {
logrus.New().WithField("error", xk6KafkaError).Warn("Failed to create or get schema, manually encoding the data")
codec, err := goavro.NewCodec(schema)
if err != nil {
return nil, NewXk6KafkaError(failedCreateAvroCodec,
"Failed to create codec for encoding Avro",
err)
}
avroEncodedData, _, err := codec.NativeFromTextual(bytesData)
if err != nil {
return nil, NewXk6KafkaError(failedEncodeToAvro,
"Failed to encode data into Avro",
err)
}
bytesData, err = codec.BinaryFromNative(nil, avroEncodedData)
if err != nil {
return nil, NewXk6KafkaError(failedEncodeAvroToBinary,
"Failed to encode Avro data into binary",
err)
}
}
if schemaInfo != nil {
schemaID = schemaInfo.ID()
// Encode the data into Avro and then the wire format
avroEncodedData, _, err := schemaInfo.Codec().NativeFromTextual(bytesData)
if err != nil {
return nil, NewXk6KafkaError(failedEncodeToAvro,
"Failed to encode data into Avro",
err)
}
bytesData, err = schemaInfo.Codec().BinaryFromNative(nil, avroEncodedData)
if err != nil {
return nil, NewXk6KafkaError(failedEncodeAvroToBinary,
"Failed to encode Avro data into binary",
err)
}
}
return EncodeWireFormat(bytesData, schemaID), nil
}
// DeserializeAvro deserializes the given data from wire-formatted Avro binary format and returns it
// as a byte array. It uses the given version to retrieve the schema from Schema Registry, otherwise
// it uses the given schema to manually create the codec and decode the data. The configuration
// is used to configure the Schema Registry client. The element is used to define the subject.
// The data should be a byte array.
func DeserializeAvro(configuration Configuration, topic string, data []byte, element Element, schema string, version int) (interface{}, *Xk6KafkaError) {
bytesDecodedData, err := DecodeWireFormat(data)
if err != nil {
return nil, NewXk6KafkaError(failedDecodeFromWireFormat,
"Failed to remove wire format from the binary data",
err)
}
client := SchemaRegistryClientWithConfiguration(configuration.SchemaRegistry)
subject := topic + "-" + string(element)
var schemaInfo *srclient.Schema
var xk6KafkaError *Xk6KafkaError
if schema != "" {
// Schema is provided, so we need to create it and get the schema ID
schemaInfo, xk6KafkaError = CreateSchema(client, subject, schema, srclient.Avro)
} else {
// Schema is not provided, so we need to fetch the schema from the Schema Registry
schemaInfo, xk6KafkaError = GetSchema(client, subject, schema, srclient.Avro, version)
}
if xk6KafkaError != nil {
logrus.New().WithField("error", xk6KafkaError).Warn("Failed to create or get schema, manually decoding the data")
codec, err := goavro.NewCodec(schema)
if err != nil {
return nil, NewXk6KafkaError(failedCreateAvroCodec,
"Failed to create codec for decoding Avro",
err)
}
avroDecodedData, _, err := codec.NativeFromBinary(bytesDecodedData)
if err != nil {
return nil, NewXk6KafkaError(failedDecodeAvroFromBinary,
"Failed to decode data from Avro",
err)
}
return avroDecodedData, nil
}
if schemaInfo != nil {
// Decode the data from Avro
avroDecodedData, _, err := schemaInfo.Codec().NativeFromBinary(bytesDecodedData)
if err != nil {
return nil, NewXk6KafkaError(failedDecodeAvroFromBinary,
"Failed to decode data from Avro",
err)
}
return avroDecodedData, nil
}
return bytesDecodedData, nil
}