Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

XREAD blocks if count is provided #681

Closed
guytbk opened this issue Jan 29, 2025 · 2 comments
Closed

XREAD blocks if count is provided #681

guytbk opened this issue Jan 29, 2025 · 2 comments

Comments

@guytbk
Copy link

guytbk commented Jan 29, 2025

Hi,
I have been using XREAD like so in the past:
streams, err := redis.Values(conn.Do("XREAD", "BLOCK", 0, "STREAMS", stream, key))

Recently, I wanted to start reading in bulks so I opted to this option:
streams, err := redis.Values(conn.Do("XREAD", "BLOCK", 0, "COUNT", bulkSize, "STREAMS", stream, key)) aiming to read UP to "bulkSize" but not block.

With redigo, looks like I am blocking until the number of messages on the stream reach "bulkSize".
However, debugging on the CLI returns immediately usingthe following command:
XREAD BLOCK 0 count 20000 STREAMS changes 1738147923964-1

According to this, it should be supported.

I am using Redis 6.2.14 and redigo v1.8.5

@guytbk guytbk closed this as completed Jan 29, 2025
@guytbk
Copy link
Author

guytbk commented Jan 29, 2025

This was wrong validation from my part, closing

@stevenh
Copy link
Collaborator

stevenh commented Jan 29, 2025

I was going to say it looks ok here with this test:

func Test_xread(t *testing.T) {
	c, err := dial()
	require.NoError(t, err)
	defer c.Close()

	resultStr, err := redis.Strings(c.Do("CONFIG", "GET", "stream-node-max-entries"))
	require.NoError(t, err)
	// in case of older version < 5.0 where streams are not supported don't run the test
	if len(resultStr) == 0 {
		t.Skip("Skipped, stream feature not supported")
	}

	count := 3
	stream := "testStream"
	for i := 0; i < count; i++ {
		_, err = redis.String(c.Do("XADD", stream, "*", "index", i, "other", fmt.Sprintf("other-%d", i)))
		require.NoError(t, err)
	}

	streams, err := redis.Values(c.Do("XREAD", "BLOCK", 0, "COUNT", 4, "STREAMS", stream, "0"))
	require.NoError(t, err)
	require.Len(t, streams, 1)

	for _, v := range streams {
		data, ok := v.([]interface{})
		require.True(t, ok)
		require.Len(t, data, 2)

		key, err := redis.String(data[0], nil)
		require.NoError(t, err)
		require.Equal(t, stream, key)

		values, err := redis.Values(data[1], err)
		require.NoError(t, err)

		for _, value := range values {
			vals, err := redis.Values(value, nil)
			require.NoError(t, err)
			require.Len(t, vals, 2)

			id, err := redis.String(vals[0], nil)
			require.NoError(t, err)

			fields, err := redis.StringMap(vals[1], nil)
			require.NoError(t, err)
			fmt.Printf("id: %s = %+v\n", id, fields)
		}
	}
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants