Skip to content

Commit 32a8952

Browse files
authored
Merge pull request #146483 from cockroachdb/blathers/backport-release-25.2-144442
release-25.2: changefeedccl: improve test coverage of ALTER CHANGEFEED
2 parents 1645aa3 + 71ee6ae commit 32a8952

File tree

5 files changed

+386
-6
lines changed

5 files changed

+386
-6
lines changed

pkg/ccl/changefeedccl/alter_changefeed_test.go

Lines changed: 376 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,15 @@
66
package changefeedccl
77

88
import (
9+
"cmp"
910
"context"
1011
gosql "database/sql"
1112
"fmt"
13+
"maps"
1214
"math/rand"
1315
"net/url"
16+
"slices"
17+
"strings"
1418
"sync/atomic"
1519
"testing"
1620
"time"
@@ -1579,6 +1583,116 @@ func TestAlterChangefeedAddTargetsDuringBackfill(t *testing.T) {
15791583
cdcTestWithSystem(t, testFn, feedTestEnterpriseSinks, feedTestNoExternalConnection)
15801584
}
15811585

1586+
func TestAlterChangefeedDropTargetDuringInitialScan(t *testing.T) {
1587+
defer leaktest.AfterTest(t)()
1588+
defer log.Scope(t).Close(t)
1589+
1590+
rnd, _ := randutil.NewPseudoRand()
1591+
1592+
testFn := func(t *testing.T, s TestServerWithSystem, f cdctest.TestFeedFactory) {
1593+
sqlDB := sqlutils.MakeSQLRunner(s.DB)
1594+
1595+
sqlDB.Exec(t, `CREATE TABLE foo(val INT PRIMARY KEY)`)
1596+
sqlDB.Exec(t, `INSERT INTO foo (val) SELECT * FROM generate_series(1, 100)`)
1597+
1598+
sqlDB.Exec(t, `CREATE TABLE bar(val INT PRIMARY KEY)`)
1599+
sqlDB.Exec(t, `INSERT INTO bar (val) SELECT * FROM generate_series(1, 100)`)
1600+
1601+
fooDesc := desctestutils.TestingGetPublicTableDescriptor(
1602+
s.SystemServer.DB(), s.Codec, "d", "foo")
1603+
fooTableSpan := fooDesc.PrimaryIndexSpan(s.Codec)
1604+
1605+
barDesc := desctestutils.TestingGetPublicTableDescriptor(
1606+
s.SystemServer.DB(), s.Codec, "d", "bar")
1607+
barTableSpan := barDesc.PrimaryIndexSpan(s.Codec)
1608+
1609+
knobs := s.TestingKnobs.
1610+
DistSQL.(*execinfra.TestingKnobs).
1611+
Changefeed.(*TestingKnobs)
1612+
1613+
// Make scan requests small enough so that we're guaranteed multiple
1614+
// resolved events during the initial scan.
1615+
knobs.FeedKnobs.BeforeScanRequest = func(b *kv.Batch) error {
1616+
b.Header.MaxSpanRequestKeys = 10
1617+
return nil
1618+
}
1619+
1620+
var allSpans roachpb.SpanGroup
1621+
allSpans.Add(fooTableSpan, barTableSpan)
1622+
var allSpansResolved atomic.Bool
1623+
1624+
// Skip some spans for both tables so that the initial scan can't complete.
1625+
var skippedFooSpans, skippedBarSpans roachpb.SpanGroup
1626+
knobs.FilterSpanWithMutation = func(r *jobspb.ResolvedSpan) (bool, error) {
1627+
defer func() {
1628+
allSpans.Sub(r.Span)
1629+
if allSpans.Len() == 0 {
1630+
allSpansResolved.Store(true)
1631+
}
1632+
}()
1633+
1634+
if r.Span.Equal(fooTableSpan) || r.Span.Equal(barTableSpan) ||
1635+
skippedFooSpans.Encloses(r.Span) || skippedBarSpans.Encloses(r.Span) {
1636+
return true, nil
1637+
}
1638+
1639+
if fooTableSpan.Contains(r.Span) && (skippedFooSpans.Len() == 0 || rnd.Intn(3) == 0) {
1640+
skippedFooSpans.Add(r.Span)
1641+
return true, nil
1642+
}
1643+
1644+
if barTableSpan.Contains(r.Span) && (skippedBarSpans.Len() == 0 || rnd.Intn(3) == 0) {
1645+
skippedBarSpans.Add(r.Span)
1646+
return true, nil
1647+
}
1648+
1649+
return false, nil
1650+
}
1651+
1652+
// Create a changefeed watching both tables.
1653+
targets := "foo, bar"
1654+
if rnd.Intn(2) == 0 {
1655+
targets = "bar, foo"
1656+
}
1657+
testFeed := feed(t, f, fmt.Sprintf(`CREATE CHANGEFEED for %s`, targets))
1658+
defer closeFeed(t, testFeed)
1659+
1660+
// Wait for all spans to have been resolved.
1661+
testutils.SucceedsSoon(t, func() error {
1662+
if allSpansResolved.Load() {
1663+
return nil
1664+
}
1665+
return errors.New("expected all spans to be resolved")
1666+
})
1667+
1668+
// Pause the changefeed and make sure the initial scan hasn't completed yet.
1669+
feed, ok := testFeed.(cdctest.EnterpriseTestFeed)
1670+
require.True(t, ok)
1671+
require.NoError(t, feed.Pause())
1672+
hw, err := feed.HighWaterMark()
1673+
require.NoError(t, err)
1674+
require.Zero(t, hw)
1675+
1676+
// Alter the changefeed to stop watching the second table.
1677+
sqlDB.Exec(t, fmt.Sprintf(`ALTER CHANGEFEED %d DROP bar`, feed.JobID()))
1678+
1679+
allSpans.Add(fooTableSpan)
1680+
knobs.FilterSpanWithMutation = func(r *jobspb.ResolvedSpan) (bool, error) {
1681+
if barTableSpan.Contains(r.Span) {
1682+
t.Fatalf("span from dropped table should not have been resolved: %#v", r.Span)
1683+
}
1684+
allSpans.Sub(r.Span)
1685+
return false, nil
1686+
}
1687+
1688+
require.NoError(t, feed.Resume())
1689+
require.NoError(t, feed.WaitForHighWaterMark(hlc.Timestamp{}))
1690+
require.Zero(t, allSpans.Len())
1691+
}
1692+
1693+
cdcTestWithSystem(t, testFn, feedTestEnterpriseSinks, feedTestNoExternalConnection)
1694+
}
1695+
15821696
func TestAlterChangefeedInitialScan(t *testing.T) {
15831697
defer leaktest.AfterTest(t)()
15841698
defer log.Scope(t).Close(t)
@@ -1769,3 +1883,265 @@ func TestAlterChangefeedAccessControl(t *testing.T) {
17691883
// Only enterprise sinks create jobs.
17701884
cdcTest(t, testFn, feedTestEnterpriseSinks)
17711885
}
1886+
1887+
// TestAlterChangefeedAddDropSameTarget tests adding and dropping the same
1888+
// target multiple times in a statement.
1889+
func TestAlterChangefeedAddDropSameTarget(t *testing.T) {
1890+
defer leaktest.AfterTest(t)()
1891+
defer log.Scope(t).Close(t)
1892+
1893+
testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
1894+
sqlDB := sqlutils.MakeSQLRunner(s.DB)
1895+
sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY)`)
1896+
sqlDB.Exec(t, `CREATE TABLE bar (a INT PRIMARY KEY)`)
1897+
1898+
testFeed := feed(t, f, `CREATE CHANGEFEED FOR foo`)
1899+
defer closeFeed(t, testFeed)
1900+
1901+
feed, ok := testFeed.(cdctest.EnterpriseTestFeed)
1902+
require.True(t, ok)
1903+
1904+
// Test removing and adding the same target.
1905+
require.NoError(t, feed.Pause())
1906+
sqlDB.Exec(t, fmt.Sprintf(`ALTER CHANGEFEED %d DROP foo ADD foo`, feed.JobID()))
1907+
require.NoError(t, feed.Resume())
1908+
sqlDB.Exec(t, `INSERT INTO foo VALUES(1)`)
1909+
assertPayloads(t, testFeed, []string{
1910+
`foo: [1]->{"after": {"a": 1}}`,
1911+
})
1912+
1913+
// Test adding and removing the same target.
1914+
require.NoError(t, feed.Pause())
1915+
sqlDB.Exec(t, fmt.Sprintf(`ALTER CHANGEFEED %d ADD bar DROP bar`, feed.JobID()))
1916+
require.NoError(t, feed.Resume())
1917+
var tsStr string
1918+
sqlDB.QueryRow(t, `INSERT INTO bar VALUES(1)`)
1919+
sqlDB.QueryRow(t, `INSERT INTO foo VALUES(2) RETURNING cluster_logical_timestamp()`).Scan(&tsStr)
1920+
ts := parseTimeToHLC(t, tsStr)
1921+
require.NoError(t, feed.WaitForHighWaterMark(ts))
1922+
// We don't expect to see the row inserted into bar.
1923+
assertPayloads(t, testFeed, []string{
1924+
`foo: [2]->{"after": {"a": 2}}`,
1925+
})
1926+
1927+
// Test adding, removing, and adding the same target.
1928+
require.NoError(t, feed.Pause())
1929+
sqlDB.Exec(t, fmt.Sprintf(
1930+
`ALTER CHANGEFEED %d ADD bar DROP bar ADD bar WITH initial_scan='yes'`, feed.JobID()))
1931+
require.NoError(t, feed.Resume())
1932+
sqlDB.Exec(t, `INSERT INTO bar VALUES(2)`)
1933+
assertPayloads(t, testFeed, []string{
1934+
// TODO(#144032): This row should be produced.
1935+
//`bar: [1]->{"after": {"a": 1}}`,
1936+
`bar: [2]->{"after": {"a": 2}}`,
1937+
})
1938+
}
1939+
1940+
cdcTest(t, testFn, feedTestEnterpriseSinks, feedTestNoExternalConnection)
1941+
}
1942+
1943+
// TestAlterChangefeedRandomizedTargetChanges tests altering a changefeed
1944+
// with randomized adding and dropping of targets.
1945+
func TestAlterChangefeedRandomizedTargetChanges(t *testing.T) {
1946+
defer leaktest.AfterTest(t)()
1947+
defer log.Scope(t).Close(t)
1948+
1949+
require.NoError(t, log.SetVModule("helpers_test=1"))
1950+
1951+
rnd, _ := randutil.NewPseudoRand()
1952+
1953+
testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
1954+
sqlDB := sqlutils.MakeSQLRunner(s.DB)
1955+
1956+
// The tables in this test will have the rows 0, ..., tableRowCounts[tableName]-1.
1957+
tables := make(map[string]struct{})
1958+
tableRowCounts := make(map[string]int)
1959+
1960+
makeExpectedRow := func(tableName string, row int, updated hlc.Timestamp) string {
1961+
return fmt.Sprintf(`%s: [%[2]d]->{"after": {"a": %[2]d}, "updated": "%s"}`,
1962+
tableName, row, updated.AsOfSystemTime())
1963+
}
1964+
1965+
insertRowsIntoTable := func(tableName string, numRows int) []string {
1966+
rows := make([]string, 0, numRows)
1967+
for range numRows {
1968+
row := tableRowCounts[tableName]
1969+
var tsStr string
1970+
insertStmt := fmt.Sprintf(`INSERT INTO %s VALUES (%d)`, tableName, row)
1971+
t.Log(insertStmt)
1972+
sqlDB.QueryRow(t,
1973+
fmt.Sprintf(`%s RETURNING cluster_logical_timestamp()`, insertStmt),
1974+
).Scan(&tsStr)
1975+
ts := parseTimeToHLC(t, tsStr)
1976+
rows = append(rows, makeExpectedRow(tableName, row, ts))
1977+
tableRowCounts[tableName] += 1
1978+
}
1979+
return rows
1980+
}
1981+
1982+
// Create 10 tables with a single row to start.
1983+
const numTables = 10
1984+
t.Logf("creating %d tables", numTables)
1985+
for i := range numTables {
1986+
tableName := fmt.Sprintf("table%d", i)
1987+
createStmt := fmt.Sprintf(`CREATE TABLE %s (a INT PRIMARY KEY)`, tableName)
1988+
t.Log(createStmt)
1989+
sqlDB.Exec(t, createStmt)
1990+
tables[tableName] = struct{}{}
1991+
insertRowsIntoTable(tableName, 1 /* numRows */)
1992+
}
1993+
1994+
// makeInitialScanRows returns the expected initial scan rows assuming
1995+
// every row in the table will be included in the initial scan.
1996+
makeInitialScanRows := func(newTables []string, scanTime hlc.Timestamp) []string {
1997+
var rows []string
1998+
for _, t := range newTables {
1999+
for i := range tableRowCounts[t] {
2000+
rows = append(rows, makeExpectedRow(t, i, scanTime))
2001+
}
2002+
}
2003+
return rows
2004+
}
2005+
2006+
// Randomly select some subset of tables to be the initial changefeed targets.
2007+
initialTables := getNFromSet(rnd, tables, 1+rnd.Intn(numTables))
2008+
watchedTables := makeSet(initialTables)
2009+
nonWatchedTables := setDifference(tables, watchedTables)
2010+
2011+
// Create the changefeed.
2012+
createStmt := fmt.Sprintf(
2013+
`CREATE CHANGEFEED FOR %s WITH updated`, strings.Join(initialTables, ", "))
2014+
t.Log(createStmt)
2015+
testFeed := feed(t, f, createStmt)
2016+
defer closeFeed(t, testFeed)
2017+
2018+
feed, ok := testFeed.(cdctest.EnterpriseTestFeed)
2019+
require.True(t, ok)
2020+
2021+
d, err := feed.Details()
2022+
require.NoError(t, err)
2023+
statementTime := d.StatementTime
2024+
require.NoError(t, feed.WaitForHighWaterMark(statementTime))
2025+
assertPayloads(t, testFeed, makeInitialScanRows(initialTables, statementTime))
2026+
2027+
const numAlters = 10
2028+
t.Logf("will perform %d alters", numAlters)
2029+
for i := range numAlters {
2030+
t.Logf("performing alter #%d", i+1)
2031+
2032+
require.NoError(t, feed.Pause())
2033+
2034+
hw, err := feed.HighWaterMark()
2035+
require.NoError(t, err)
2036+
2037+
var alterStmtBuilder strings.Builder
2038+
write := func(format string, args ...any) {
2039+
_, err := fmt.Fprintf(&alterStmtBuilder, format, args...)
2040+
require.NoError(t, err)
2041+
}
2042+
write(`ALTER CHANGEFEED %d`, feed.JobID())
2043+
2044+
// We get the set of tables to add/drop first to ensure we are
2045+
// selecting without replacement.
2046+
numAdds := rnd.Intn(len(nonWatchedTables) + 1)
2047+
numDrops := rnd.Intn(len(watchedTables))
2048+
if numAdds == 0 && numDrops == 0 {
2049+
t.Logf("skipping alter #%d", i+1)
2050+
continue
2051+
}
2052+
adds := getNFromSet(rnd, nonWatchedTables, numAdds)
2053+
drops := getNFromSet(rnd, watchedTables, numDrops)
2054+
2055+
var expectedRows []string
2056+
for len(adds) > 0 || len(drops) > 0 {
2057+
// Randomize the order of adds and drops.
2058+
if add := len(adds) > 0 && (len(drops) == 0 || rnd.Intn(2) == 0); add {
2059+
addTarget := adds[0]
2060+
adds = adds[1:]
2061+
delete(nonWatchedTables, addTarget)
2062+
watchedTables[addTarget] = struct{}{}
2063+
2064+
write(` ADD %s`, addTarget)
2065+
2066+
switch rnd.Intn(4) {
2067+
case 0:
2068+
write(` WITH initial_scan='yes'`)
2069+
expectedRows = append(expectedRows, makeInitialScanRows([]string{addTarget}, hw)...)
2070+
case 1:
2071+
write(` WITH initial_scan='only'`)
2072+
// We don't do an initial scan because the original
2073+
// changefeed did not have initial_scan='only'.
2074+
case 2:
2075+
write(` WITH initial_scan='no'`)
2076+
case 3:
2077+
// The default option is initial_scan='no'.
2078+
}
2079+
expectedRows = append(expectedRows,
2080+
insertRowsIntoTable(addTarget, 2 /* numRows */)...)
2081+
} else { // Drop a target.
2082+
dropTarget := drops[0]
2083+
drops = drops[1:]
2084+
delete(watchedTables, dropTarget)
2085+
nonWatchedTables[dropTarget] = struct{}{}
2086+
2087+
write(` DROP %s`, dropTarget)
2088+
2089+
// Insert some more rows into the table that
2090+
// should NOT be emitted by the changefeed.
2091+
insertRowsIntoTable(dropTarget, 3 /* numRows */)
2092+
}
2093+
}
2094+
require.Empty(t, adds)
2095+
require.Empty(t, drops)
2096+
2097+
alterStmt := alterStmtBuilder.String()
2098+
t.Log(alterStmt)
2099+
sqlDB.Exec(t, alterStmt)
2100+
2101+
require.NoError(t, feed.Resume())
2102+
2103+
// Wait for highwater to advance past the current time so that
2104+
// we're sure no more rows are expected.
2105+
var tsStr string
2106+
sqlDB.QueryRow(t, `SELECT cluster_logical_timestamp()`).Scan(&tsStr)
2107+
ts := parseTimeToHLC(t, tsStr)
2108+
require.NoError(t, feed.WaitForHighWaterMark(ts))
2109+
2110+
assertPayloads(t, testFeed, expectedRows)
2111+
}
2112+
}
2113+
2114+
cdcTest(t, testFn, feedTestEnterpriseSinks, feedTestNoExternalConnection)
2115+
}
2116+
2117+
// makeSet returns a new set with the elements in the provided slice.
2118+
func makeSet[K cmp.Ordered](ks []K) map[K]struct{} {
2119+
m := make(map[K]struct{}, len(ks))
2120+
for _, k := range ks {
2121+
m[k] = struct{}{}
2122+
}
2123+
return m
2124+
}
2125+
2126+
// setDifference returns a new set that is s - t.
2127+
func setDifference[K cmp.Ordered](s map[K]struct{}, t map[K]struct{}) map[K]struct{} {
2128+
difference := make(map[K]struct{})
2129+
for e := range s {
2130+
if _, ok := t[e]; !ok {
2131+
difference[e] = struct{}{}
2132+
}
2133+
}
2134+
return difference
2135+
}
2136+
2137+
// getNFromSet returns a slice with n random elements from s.
2138+
func getNFromSet[K cmp.Ordered](rnd *rand.Rand, s map[K]struct{}, n int) []K {
2139+
if len(s) < n {
2140+
panic(fmt.Sprintf("not enough elements in set, wanted %d, found %d", n, len(s)))
2141+
}
2142+
ks := slices.Sorted(maps.Keys(s))
2143+
rnd.Shuffle(len(ks), func(i, j int) {
2144+
ks[i], ks[j] = ks[j], ks[i]
2145+
})
2146+
return ks[:n]
2147+
}

0 commit comments

Comments
 (0)