Skip to content

Commit 2d2326e

Browse files
sladkoffd-rkiyacontrol
authored andcommitted
feat: Add alter and list partition reassignments
Implementation of KIP-455. Also includes work to make Sarama protocol support the new optional tagged fields functionality from KIP-482 - add headerVersion for all requests (Ref: KIP-482) - implement AlterPartitionReassignmentsRequest/Reponse protocol - add tests for alter_partition_reassignments - pretty print partition reassignment errors - add ListPartitionReassignmentsRequest/Response protocol - decode empty tagged fields in response header v1 - make sure mockbroker can handle different reponse header versions - make sure partition reassignment can be aborted - add Alter/ListPartitionReassignments to admin client api https://cwiki.apache.org/confluence/display/KAFKA/KIP-455%3A+Create+an+Administrative+API+for+Replica+Reassignment https://cwiki.apache.org/confluence/display/KAFKA/KIP-482%3A+The+Kafka+Protocol+should+Support+Optional+Tagged+Fields Co-authored-by: Dirk Wilden <[email protected]> Co-authored-by: Leonid Koftun <[email protected]> Co-authored-by: iyacontrol <[email protected]>
1 parent 5812345 commit 2d2326e

File tree

93 files changed

+1692
-60
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

93 files changed

+1692
-60
lines changed

acl_create_request.go

+4
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,10 @@ func (c *CreateAclsRequest) version() int16 {
4747
return c.Version
4848
}
4949

50+
func (c *CreateAclsRequest) headerVersion() int16 {
51+
return 1
52+
}
53+
5054
func (c *CreateAclsRequest) requiredVersion() KafkaVersion {
5155
switch c.Version {
5256
case 1:

acl_create_response.go

+4
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,10 @@ func (c *CreateAclsResponse) version() int16 {
5555
return 0
5656
}
5757

58+
func (c *CreateAclsResponse) headerVersion() int16 {
59+
return 0
60+
}
61+
5862
func (c *CreateAclsResponse) requiredVersion() KafkaVersion {
5963
return V0_11_0_0
6064
}

acl_delete_request.go

+4
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,10 @@ func (d *DeleteAclsRequest) version() int16 {
4848
return int16(d.Version)
4949
}
5050

51+
func (c *DeleteAclsRequest) headerVersion() int16 {
52+
return 1
53+
}
54+
5155
func (d *DeleteAclsRequest) requiredVersion() KafkaVersion {
5256
switch d.Version {
5357
case 1:

acl_delete_response.go

+4
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,10 @@ func (d *DeleteAclsResponse) version() int16 {
5656
return d.Version
5757
}
5858

59+
func (d *DeleteAclsResponse) headerVersion() int16 {
60+
return 0
61+
}
62+
5963
func (d *DeleteAclsResponse) requiredVersion() KafkaVersion {
6064
return V0_11_0_0
6165
}

acl_describe_request.go

+4
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,10 @@ func (d *DescribeAclsRequest) version() int16 {
2525
return int16(d.Version)
2626
}
2727

28+
func (d *DescribeAclsRequest) headerVersion() int16 {
29+
return 1
30+
}
31+
2832
func (d *DescribeAclsRequest) requiredVersion() KafkaVersion {
2933
switch d.Version {
3034
case 1:

acl_describe_response.go

+4
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,10 @@ func (d *DescribeAclsResponse) version() int16 {
7777
return d.Version
7878
}
7979

80+
func (d *DescribeAclsResponse) headerVersion() int16 {
81+
return 0
82+
}
83+
8084
func (d *DescribeAclsResponse) requiredVersion() KafkaVersion {
8185
switch d.Version {
8286
case 1:

add_offsets_to_txn_request.go

+4
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,10 @@ func (a *AddOffsetsToTxnRequest) version() int16 {
4848
return 0
4949
}
5050

51+
func (a *AddOffsetsToTxnRequest) headerVersion() int16 {
52+
return 1
53+
}
54+
5155
func (a *AddOffsetsToTxnRequest) requiredVersion() KafkaVersion {
5256
return V0_11_0_0
5357
}

add_offsets_to_txn_response.go

+4
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,10 @@ func (a *AddOffsetsToTxnResponse) version() int16 {
4040
return 0
4141
}
4242

43+
func (a *AddOffsetsToTxnResponse) headerVersion() int16 {
44+
return 0
45+
}
46+
4347
func (a *AddOffsetsToTxnResponse) requiredVersion() KafkaVersion {
4448
return V0_11_0_0
4549
}

add_partitions_to_txn_request.go

+4
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,10 @@ func (a *AddPartitionsToTxnRequest) version() int16 {
7272
return 0
7373
}
7474

75+
func (a *AddPartitionsToTxnRequest) headerVersion() int16 {
76+
return 1
77+
}
78+
7579
func (a *AddPartitionsToTxnRequest) requiredVersion() KafkaVersion {
7680
return V0_11_0_0
7781
}

add_partitions_to_txn_response.go

+4
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,10 @@ func (a *AddPartitionsToTxnResponse) version() int16 {
7979
return 0
8080
}
8181

82+
func (a *AddPartitionsToTxnResponse) headerVersion() int16 {
83+
return 0
84+
}
85+
8286
func (a *AddPartitionsToTxnResponse) requiredVersion() KafkaVersion {
8387
return V0_11_0_0
8488
}

admin.go

+84
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,14 @@ type ClusterAdmin interface {
4242
// new partitions. This operation is supported by brokers with version 1.0.0 or higher.
4343
CreatePartitions(topic string, count int32, assignment [][]int32, validateOnly bool) error
4444

45+
// Alter the replica assignment for partitions.
46+
// This operation is supported by brokers with version 2.4.0.0 or higher.
47+
AlterPartitionReassignments(topic string, assignment [][]int32) error
48+
49+
// Provides info on ongoing partitions replica reassignments.
50+
// This operation is supported by brokers with version 2.4.0.0 or higher.
51+
ListPartitionReassignments(topics string, partitions []int32) (topicStatus map[string]map[int32]*PartitionReplicaReassignmentsStatus, err error)
52+
4553
// Delete records whose offset is smaller than the given offset of the corresponding partition.
4654
// This operation is supported by brokers with version 0.11.0.0 or higher.
4755
DeleteRecords(topic string, partitionOffsets map[int32]int64) error
@@ -452,6 +460,82 @@ func (ca *clusterAdmin) CreatePartitions(topic string, count int32, assignment [
452460
})
453461
}
454462

463+
func (ca *clusterAdmin) AlterPartitionReassignments(topic string, assignment [][]int32) error {
464+
if topic == "" {
465+
return ErrInvalidTopic
466+
}
467+
468+
request := &AlterPartitionReassignmentsRequest{
469+
TimeoutMs: int32(60000),
470+
Version: int16(0),
471+
}
472+
473+
for i := 0; i < len(assignment); i++ {
474+
request.AddBlock(topic, int32(i), assignment[i])
475+
}
476+
477+
return ca.retryOnError(isErrNoController, func() error {
478+
b, err := ca.Controller()
479+
if err != nil {
480+
return err
481+
}
482+
483+
errs := make([]error, 0)
484+
485+
rsp, err := b.AlterPartitionReassignments(request)
486+
487+
if err != nil {
488+
errs = append(errs, err)
489+
} else {
490+
if rsp.ErrorCode > 0 {
491+
errs = append(errs, errors.New(rsp.ErrorCode.Error()))
492+
}
493+
494+
for topic, topicErrors := range rsp.Errors {
495+
for partition, partitionError := range topicErrors {
496+
if partitionError.errorCode != ErrNoError {
497+
errStr := fmt.Sprintf("[%s-%d]: %s", topic, partition, partitionError.errorCode.Error())
498+
errs = append(errs, errors.New(errStr))
499+
}
500+
}
501+
}
502+
}
503+
504+
if len(errs) > 0 {
505+
return ErrReassignPartitions{MultiError{&errs}}
506+
}
507+
508+
return nil
509+
})
510+
}
511+
512+
func (ca *clusterAdmin) ListPartitionReassignments(topic string, partitions []int32) (topicStatus map[string]map[int32]*PartitionReplicaReassignmentsStatus, err error) {
513+
if topic == "" {
514+
return nil, ErrInvalidTopic
515+
}
516+
517+
request := &ListPartitionReassignmentsRequest{
518+
TimeoutMs: int32(60000),
519+
Version: int16(0),
520+
}
521+
522+
request.AddBlock(topic, partitions)
523+
524+
b, err := ca.Controller()
525+
if err != nil {
526+
return nil, err
527+
}
528+
_ = b.Open(ca.client.Config())
529+
530+
rsp, err := b.ListPartitionReassignments(request)
531+
532+
if err == nil && rsp != nil {
533+
return rsp.TopicStatus, nil
534+
} else {
535+
return nil, err
536+
}
537+
}
538+
455539
func (ca *clusterAdmin) DeleteRecords(topic string, partitionOffsets map[int32]int64) error {
456540
if topic == "" {
457541
return ErrInvalidTopic

admin_test.go

+161
Original file line numberDiff line numberDiff line change
@@ -332,6 +332,167 @@ func TestClusterAdminCreatePartitionsWithoutAuthorization(t *testing.T) {
332332
}
333333
}
334334

335+
func TestClusterAdminAlterPartitionReassignments(t *testing.T) {
336+
seedBroker := NewMockBroker(t, 1)
337+
defer seedBroker.Close()
338+
339+
secondBroker := NewMockBroker(t, 2)
340+
defer secondBroker.Close()
341+
342+
seedBroker.SetHandlerByMap(map[string]MockResponse{
343+
"MetadataRequest": NewMockMetadataResponse(t).
344+
SetController(secondBroker.BrokerID()).
345+
SetBroker(seedBroker.Addr(), seedBroker.BrokerID()).
346+
SetBroker(secondBroker.Addr(), secondBroker.BrokerID()),
347+
})
348+
349+
secondBroker.SetHandlerByMap(map[string]MockResponse{
350+
"AlterPartitionReassignmentsRequest": NewMockAlterPartitionReassignmentsResponse(t),
351+
})
352+
353+
config := NewConfig()
354+
config.Version = V2_4_0_0
355+
admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
356+
if err != nil {
357+
t.Fatal(err)
358+
}
359+
360+
var topicAssignment = make([][]int32, 0, 3)
361+
362+
err = admin.AlterPartitionReassignments("my_topic", topicAssignment)
363+
if err != nil {
364+
t.Fatal(err)
365+
}
366+
367+
err = admin.Close()
368+
if err != nil {
369+
t.Fatal(err)
370+
}
371+
}
372+
373+
func TestClusterAdminAlterPartitionReassignmentsWithDiffVersion(t *testing.T) {
374+
seedBroker := NewMockBroker(t, 1)
375+
defer seedBroker.Close()
376+
377+
secondBroker := NewMockBroker(t, 2)
378+
defer secondBroker.Close()
379+
380+
seedBroker.SetHandlerByMap(map[string]MockResponse{
381+
"MetadataRequest": NewMockMetadataResponse(t).
382+
SetController(secondBroker.BrokerID()).
383+
SetBroker(seedBroker.Addr(), seedBroker.BrokerID()).
384+
SetBroker(secondBroker.Addr(), secondBroker.BrokerID()),
385+
})
386+
387+
secondBroker.SetHandlerByMap(map[string]MockResponse{
388+
"AlterPartitionReassignmentsRequest": NewMockAlterPartitionReassignmentsResponse(t),
389+
})
390+
391+
config := NewConfig()
392+
config.Version = V2_3_0_0
393+
admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
394+
if err != nil {
395+
t.Fatal(err)
396+
}
397+
398+
var topicAssignment = make([][]int32, 0, 3)
399+
400+
err = admin.AlterPartitionReassignments("my_topic", topicAssignment)
401+
402+
if !strings.ContainsAny(err.Error(), ErrUnsupportedVersion.Error()) {
403+
t.Fatal(err)
404+
}
405+
406+
err = admin.Close()
407+
if err != nil {
408+
t.Fatal(err)
409+
}
410+
}
411+
412+
func TestClusterAdminListPartitionReassignments(t *testing.T) {
413+
seedBroker := NewMockBroker(t, 1)
414+
defer seedBroker.Close()
415+
416+
secondBroker := NewMockBroker(t, 2)
417+
defer secondBroker.Close()
418+
419+
seedBroker.SetHandlerByMap(map[string]MockResponse{
420+
"MetadataRequest": NewMockMetadataResponse(t).
421+
SetController(secondBroker.BrokerID()).
422+
SetBroker(seedBroker.Addr(), seedBroker.BrokerID()).
423+
SetBroker(secondBroker.Addr(), secondBroker.BrokerID()),
424+
})
425+
426+
secondBroker.SetHandlerByMap(map[string]MockResponse{
427+
"ListPartitionReassignmentsRequest": NewMockListPartitionReassignmentsResponse(t),
428+
})
429+
430+
config := NewConfig()
431+
config.Version = V2_4_0_0
432+
admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
433+
if err != nil {
434+
t.Fatal(err)
435+
}
436+
437+
response, err := admin.ListPartitionReassignments("my_topic", []int32{0, 1})
438+
if err != nil {
439+
t.Fatal(err)
440+
}
441+
442+
partitionStatus, ok := response["my_topic"]
443+
if !ok {
444+
t.Fatalf("topic missing in response")
445+
} else {
446+
if len(partitionStatus) != 2 {
447+
t.Fatalf("partition missing in response")
448+
}
449+
}
450+
451+
err = admin.Close()
452+
if err != nil {
453+
t.Fatal(err)
454+
}
455+
}
456+
457+
func TestClusterAdminListPartitionReassignmentsWithDiffVersion(t *testing.T) {
458+
seedBroker := NewMockBroker(t, 1)
459+
defer seedBroker.Close()
460+
461+
secondBroker := NewMockBroker(t, 2)
462+
defer secondBroker.Close()
463+
464+
seedBroker.SetHandlerByMap(map[string]MockResponse{
465+
"MetadataRequest": NewMockMetadataResponse(t).
466+
SetController(secondBroker.BrokerID()).
467+
SetBroker(seedBroker.Addr(), seedBroker.BrokerID()).
468+
SetBroker(secondBroker.Addr(), secondBroker.BrokerID()),
469+
})
470+
471+
secondBroker.SetHandlerByMap(map[string]MockResponse{
472+
"ListPartitionReassignmentsRequest": NewMockListPartitionReassignmentsResponse(t),
473+
})
474+
475+
config := NewConfig()
476+
config.Version = V2_3_0_0
477+
admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
478+
if err != nil {
479+
t.Fatal(err)
480+
}
481+
482+
var partitions = make([]int32, 0)
483+
484+
_, err = admin.ListPartitionReassignments("my_topic", partitions)
485+
486+
if !strings.ContainsAny(err.Error(), ErrUnsupportedVersion.Error()) {
487+
t.Fatal(err)
488+
}
489+
490+
err = admin.Close()
491+
if err != nil {
492+
t.Fatal(err)
493+
}
494+
}
495+
335496
func TestClusterAdminDeleteRecords(t *testing.T) {
336497
topicName := "my_topic"
337498
seedBroker := NewMockBroker(t, 1)

alter_configs_request.go

+4
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,10 @@ func (a *AlterConfigsRequest) version() int16 {
117117
return 0
118118
}
119119

120+
func (a *AlterConfigsRequest) headerVersion() int16 {
121+
return 1
122+
}
123+
120124
func (a *AlterConfigsRequest) requiredVersion() KafkaVersion {
121125
return V0_11_0_0
122126
}

alter_configs_response.go

+4
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,10 @@ func (a *AlterConfigsResponse) version() int16 {
9292
return 0
9393
}
9494

95+
func (a *AlterConfigsResponse) headerVersion() int16 {
96+
return 0
97+
}
98+
9599
func (a *AlterConfigsResponse) requiredVersion() KafkaVersion {
96100
return V0_11_0_0
97101
}

0 commit comments

Comments
 (0)