Skip to content

KAFKA-17904: Flaky testMultiConsumerSessionTimeoutOnClose #17789

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Nov 15, 2024

Conversation

xijiu
Copy link
Collaborator

@xijiu xijiu commented Nov 13, 2024

Here are some of my conclusions about this flaky test.

First of all, the reason for the failure of this test is due to TIMEOUT, the method AbstractConsumerTest#validateGroupAssignment timeout after waiting for 10 seconds. And it reproduced on my computer.

AbstractConsumerTest#validateGroupAssignment is used to check all the consumer's assignments meet expectations, the exception as below:

org.opentest4j.AssertionFailedError: Did not get valid assignment for partitions HashSet(topic1-2, topic1-4, topic-1, topic-0, topic1-5, topic1-1, topic1-0, topic1-3). Instead, got ArrayBuffer(Set(topic1-1, topic1-0, topic1-2), Set(topic1-5, topic1-4), Set())

I ran this junit test many times on my local computer after I added some logs. Then I found the timeout case is the GroupProtocol.CONSUMER mode. The CONSUMER mode maybe interact with the GroupCoordinator multiple times before reconciliation completed
image

The frequency of interaction is controlled by configuration group.consumer.heartbeat.interval.ms which default value is 5000ms. Those successful unit tests take at least 5 seconds to complete, so maybe we can reduce heartbeat interval.

After I set group.consumer.heartbeat.interval.ms to 1000ms, this problem has not occurred again on my computer. And running this unit test has become more faster.

image

@github-actions github-actions bot added core Kafka Broker tests Test fixes (including flaky tests) small Small PRs labels Nov 13, 2024
@xijiu
Copy link
Collaborator Author

xijiu commented Nov 13, 2024

@lianetm @chia7712 PTAL

@lianetm
Copy link
Member

lianetm commented Nov 13, 2024

Hello @xijiu , thanks for taking a look at this! Very interesting finding indeed. I expect this same issue is behind the flakiness on testMultiConsumerSessionTimeoutOnStopPolling too right? (Could you maybe validate locally for this one too?)

https://ge.apache.org/scans/tests?search.buildOutcome=failure&search.relativeStartTime=P28D&search.rootProjectNames=kafka&search.tags=trunk&search.timeZoneId=America%2FToronto&tests.container=kafka.api.PlaintextConsumerPollTest&tests.test=testMultiConsumerSessionTimeoutOnStopPolling(String%2C%20String)%5B2%5D

-- update

I created https://issues.apache.org/jira/browse/KAFKA-18008 for the other test just for visibility. You can take it if you want and maybe this PR serves both. If it needs more work we can tackle that separately.

@xijiu
Copy link
Collaborator Author

xijiu commented Nov 14, 2024

@lianetm Thanks for reply.

I think this PR will server boths.
Both testMultiConsumerSessionTimeoutOnClose() and testMultiConsumerSessionTimeoutOnStopPolling() will call the method runMultiConsumersSessionTimeoutTest()

  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
  def testMultiConsumerSessionTimeoutOnStopPolling(quorum: String, groupProtocol: String): Unit = {
    runMultiConsumerSessionTimeoutTest(false)
  }

  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
  def testMultiConsumerSessionTimeoutOnClose(quorum: String, groupProtocol: String): Unit = {
    runMultiConsumerSessionTimeoutTest(true)
  }

And the failure of method runMultiConsumersSessionTimeoutTest(boolean) is unrelated to the input parameters.

But to be honest, this flaky test is hard to reproduce on my mac , I ran many many times but only reproduced it once 😁.

@@ -32,6 +34,12 @@ import scala.jdk.CollectionConverters._
@Timeout(600)
class PlaintextConsumerPollTest extends AbstractConsumerTest {

override protected def brokerPropertyOverrides(properties: Properties): Unit = {
super.brokerPropertyOverrides(properties)
properties.setProperty(GroupCoordinatorConfig.CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, "1000")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I notice there are several test in this same file overriding the equivalent of this prop for the classic consumer (ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG), but to 500. Should we consider that value instead? With that, we would have those tests running for both consumers with the same config. What do you think?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lianetm Yeah, you are right, I will fix it.

Copy link
Member

@lianetm lianetm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! LGTM.

@lianetm
Copy link
Member

lianetm commented Nov 15, 2024

4 unrelated test failures (failures exist in trunk). 3 already tracked. I filed https://issues.apache.org/jira/browse/KAFKA-18025.

@lianetm lianetm merged commit 283d56c into apache:trunk Nov 15, 2024
7 of 8 checks passed
chiacyu pushed a commit to chiacyu/kafka that referenced this pull request Nov 30, 2024
tedyu pushed a commit to tedyu/kafka that referenced this pull request Jan 6, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Kafka Broker small Small PRs tests Test fixes (including flaky tests)
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants