forked from mostafa/xk6-kafka
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathschema_registry_test.go
125 lines (112 loc) · 3.79 KB
/
schema_registry_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
package kafka
import (
"testing"
"github.com/riferrei/srclient"
"github.com/stretchr/testify/assert"
)
// TestDecodeWireFormat tests the decoding of a wire-formatted message.
func TestDecodeWireFormat(t *testing.T) {
encoded := []byte{1, 2, 3, 4, 5, 6}
decoded := []byte{6}
result, err := DecodeWireFormat(encoded)
assert.Nil(t, err)
assert.Equal(t, decoded, result)
}
// TestDecodeWireFormatFails tests the decoding of a wire-formatted message and
// fails because the message is too short.
func TestDecodeWireFormatFails(t *testing.T) {
encoded := []byte{1, 2, 3, 4} // too short
result, err := DecodeWireFormat(encoded)
assert.Nil(t, result)
assert.NotNil(t, err)
assert.Equal(t, "Invalid message: message too short to contain schema id.", err.Message)
assert.Equal(t, messageTooShort, err.Code)
assert.Nil(t, err.Unwrap())
}
// TestEncodeWireFormat tests the encoding of a message and adding wire-format to it
func TestEncodeWireFormat(t *testing.T) {
data := []byte{6}
schemaID := 5
encoded := []byte{0, 0, 0, 0, 5, 6}
result := EncodeWireFormat(data, schemaID)
assert.Equal(t, encoded, result)
}
// TestSchemaRegistryClient tests the creation of a SchemaRegistryClient instance
// with the given configuration.
func TestSchemaRegistryClient(t *testing.T) {
srConfig := SchemaRegistryConfiguration{
Url: "http://localhost:8081",
BasicAuth: BasicAuth{
Username: "username",
Password: "password",
},
}
srClient := SchemaRegistryClientWithConfiguration(srConfig)
assert.NotNil(t, srClient)
}
// TestSchemaRegistryClientWithTLSConfig tests the creation of a SchemaRegistryClient instance
// with the given configuration along with TLS configuration.
func TestSchemaRegistryClientWithTLSConfig(t *testing.T) {
srConfig := SchemaRegistryConfiguration{
Url: "http://localhost:8081",
BasicAuth: BasicAuth{
Username: "username",
Password: "password",
},
TLSConfig: TLSConfig{
ClientCertPem: "fixtures/client.cer",
ClientKeyPem: "fixtures/client.pem",
ServerCaPem: "fixtures/caroot.cer",
},
}
srClient := SchemaRegistryClientWithConfiguration(srConfig)
assert.NotNil(t, srClient)
}
// TestGetLatestSchemaFails tests getting the latest schema and fails because
// the configuration is invalid.
func TestGetLatestSchemaFails(t *testing.T) {
srConfig := SchemaRegistryConfiguration{
Url: "http://localhost:8081",
BasicAuth: BasicAuth{
Username: "username",
Password: "password",
},
}
srClient := SchemaRegistryClientWithConfiguration(srConfig)
schema, err := GetSchema(srClient, "test-subject", "test-schema", srclient.Avro, 0)
assert.Nil(t, schema)
assert.NotNil(t, err)
assert.Equal(t, "Failed to get schema from schema registry", err.Message)
}
// TestGetSchemaFails tests getting the first version of the schema and fails because
// the configuration is invalid.
func TestGetSchemaFails(t *testing.T) {
srConfig := SchemaRegistryConfiguration{
Url: "http://localhost:8081",
BasicAuth: BasicAuth{
Username: "username",
Password: "password",
},
}
srClient := SchemaRegistryClientWithConfiguration(srConfig)
schema, err := GetSchema(srClient, "test-subject", "test-schema", srclient.Avro, 1)
assert.Nil(t, schema)
assert.NotNil(t, err)
assert.Equal(t, "Failed to get schema from schema registry", err.Message)
}
// TestCreateSchemaFails tests creating the schema and fails because the
// configuration is invalid.
func TestCreateSchemaFails(t *testing.T) {
srConfig := SchemaRegistryConfiguration{
Url: "http://localhost:8081",
BasicAuth: BasicAuth{
Username: "username",
Password: "password",
},
}
srClient := SchemaRegistryClientWithConfiguration(srConfig)
schema, err := CreateSchema(srClient, "test-subject", "test-schema", srclient.Avro)
assert.Nil(t, schema)
assert.NotNil(t, err)
assert.Equal(t, "Failed to create schema.", err.Message)
}