Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue in mocks PartitionConsumer.Close implementation #3096

Open
dharmjit opened this issue Feb 10, 2025 · 0 comments
Open

Issue in mocks PartitionConsumer.Close implementation #3096

dharmjit opened this issue Feb 10, 2025 · 0 comments
Assignees
Labels
needs-investigation Issues that require followup from maintainers

Comments

@dharmjit
Copy link

Context: We are using kafka compacted topics to keep our application state within Kafka as well, and the state is read using sarama Consumer and PartitionConsumer APIs. The app state can be read concurrently and and thus we have introduced locking so that only a single PartitionConsumer can be started at a time to avoid error The topic/partition is already being consumed. Below is what the implementation looks like after removing some app specific code

Code

kp.Mu.Lock()
defer kp.Mu.Unlock()

// Create a partition consumer
partitionConsumer, err := kp.Consumer.ConsumePartition(kp.Topic, 0, sarama.OffsetOldest)
if err != nil {
	return nil, err
}
defer partitionConsumer.Close()

// Get the high watermark offset
highWaterMarkOffset := partitionConsumer.HighWaterMarkOffset()
if highWaterMarkOffset == 0 {
	return nil
}

timeoutChan := kp.Clock.After(timeout)
for {
	select {
	case message, ok := <-partitionConsumer.Messages():
		if !ok {
			return nil
		}
		if err := processMessage(message); err != nil {
			return err
		}
		// Stop consuming once all messages have been read
		if message.Offset+1 >= highWaterMarkOffset {
			return nil
		}

	case err := <-partitionConsumer.Errors():
		return err.Err

	case <-ctx.Done():
		return ctx.Err()

	case <-timeoutChan:
		return nil
	}
}

Problem: The tests which uses mocks Consumer and PartitionConsumer APIs are failing when executed concurrently with the error The topic/partition is already being consumed and I've found that the mocks.PartitionConsumer Close() method should set pc.consumed = false for direct pc.Close() calls. Also the check if !pc.consumed needs to be removed as the pc.Close() from consumer.Close() will fail otherwise.

Test

func TestKafka_Fetch(t *testing.T) {
	ctrl := gomock.NewController(t)
	defer ctrl.Finish()
	var mockConsumer *mocksarama.Consumer
	type fields struct {
		Topic    string
		Consumer sarama.Consumer
	}
	type args struct {
		ctx context.Context
	}
	tests := []struct {
		name       string
		fields     fields
		args       args
		setupMocks func()
		wantErr    bool
	}{
		{
			name: "success - concurrent api calls ",
			fields: fields{
				Topic: Topic,
				Consumer: mockConsumer,
			},
			args: args{
				ctx: context.Background(),
			},
			setupMocks: func() {
				mockConsumer = mocksarama.NewConsumer(t, sarama.NewConfig())
				valueBytes, err := json.Marshal(entry)
				assert.NoError(t, err)
				partitionConsumer := mockConsumer.ExpectConsumePartition(Topic, 0, sarama.OffsetOldest)
				partitionConsumer.YieldMessage(&sarama.ConsumerMessage{
					Key:   []byte(Key),
					Value: []byte(valueBytes),
				})
			},
			wantErr: false,
		},
	}
	for _, tt := range tests {
		t.Run(tt.name, func(t *testing.T) {
			tt.setupMocks()
			defer mockConsumer.Close()
			k := &Kafka{
				Topic: tt.fields.Topic,
				Consumer: mockConsumer,
			}

			// Concurrent calls to Fetch
			var wg sync.WaitGroup
			numFetches := 2
			var fetchErrors []error

			for i := 0; i < numFetches; i++ {
				wg.Add(1)
				go func(wg *sync.WaitGroup) {
					defer wg.Done()
					ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
					defer cancel()
					data, err := k.Fetch(ctx)
					if err != nil {
						fetchErrors = append(fetchErrors, err)
					}
				}(&wg)
			}
			wg.Wait()

			for _, err := range fetchErrors {
				require.NoError(t, err)
			}
		})
	}
}
@dnwe dnwe self-assigned this Feb 20, 2025
@dnwe dnwe added the needs-investigation Issues that require followup from maintainers label Feb 20, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
needs-investigation Issues that require followup from maintainers
Projects
None yet
Development

No branches or pull requests

2 participants