-
Notifications
You must be signed in to change notification settings - Fork 20
DNM #1381
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
DNM #1381
Conversation
Skipping CI for Draft Pull Request. |
[APPROVALNOTIFIER] This PR is NOT APPROVED This pull-request has been approved by: The full list of commands accepted by this bot can be found here.
Needs approval from an approver in each of these files:
Approvers can indicate their approval by writing |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hello @wk989898, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!
Summary of Changes
Hello team,
Gemini here, providing a summary for this pull request titled "DNM". Based on the title and the empty description, this appears to be a work-in-progress or draft PR, so the full intent and context might not be completely captured yet. However, by inspecting the code changes, I can see that this PR introduces significant refactoring and new components related to the redo log functionality within the downstream adapter.
The core idea seems to be the introduction of a dedicated RedoDispatcher
and integrating the redo log handling as a distinct sink type. This involves changes across the dispatcher, dispatcher manager, event collector, and the redo log writing components themselves, along with updates to the communication protocols.
Highlights
- New Redo Dispatcher: A new
RedoDispatcher
type is added, implementing theEventDispatcher
interface. This dispatcher is specifically designed to handle events and write them to the redo log sink. - Redo Sink Integration: The redo log writing logic is encapsulated in a new
redo.Sink
type, which now acts as a standard sink implementation. This allows the system to treat the redo log as another destination for events. - Dispatcher Manager Enhancements: The
EventDispatcherManager
is updated to manage both the standard dispatchers and the newRedoDispatcher
instances. It includes logic for creating corresponding redo dispatchers, managing their lifecycle, and allocating separate memory quotas for the main sink and the redo sink. It also introduces a mechanism to collect and report redo checkpoint timestamps. - Event Routing Logic: The
EventCollector
is modified to differentiate between events intended for standard dispatchers and those for redo dispatchers. It uses separate internal dynamic streams and adds aRedo
flag to messages to ensure events are routed correctly. - Protocol Updates: New protobuf messages (
RedoTsMessage
) and fields (redo
inDispatcherRequest
) are added to facilitate communication related to redo log progress and dispatcher types between components. - Redo Writer Refactoring: The underlying redo log writer implementation, particularly the memory-based one, has been refactored. The encoding worker group is removed, and encoding seems to be handled directly within the file worker group. The file writer is split to handle DDL and DML logs separately, and the
FlushLog
method is removed from the writer interface, implying flushing is now managed internally by the writer.
Changelog
Click here to see the changelog
- downstreamadapter/dispatcher/dispatcher.go
- Added
GetBlockStatusesChan
,HandleDispatcherStatus
,GetType
methods toEventDispatcher
interface. - Added
redoGlobalTs
andcacheEvents
fields toDispatcher
struct. - Modified
NewDispatcher
to acceptredoGlobalTs
. - Modified
HandleEvents
to include a redo check and cache events if necessary. - Added
HandleCacheEvents
method. - Closed
cacheEvents
channel inRemove
.
- Added
- downstreamadapter/dispatcher/dispatcher_info.go
- Implemented
GetBlockStatusesChan
andGetType
forDispatcher
. - Added
TypeDispatcherCommon
constant.
- Implemented
- downstreamadapter/dispatcher/dispatcher_test.go
- Added
math
import. - Modified
newDispatcherForTest
to initialize and passredoTs
.
- Added
- downstreamadapter/dispatcher/helper.go
- Changed
dispatcher
type inResendTask
and related functions/handlers toEventDispatcher
interface. - Updated
ResendTask.Execute
to useGetBlockStatusesChan()
. - Added
TypeDispatcherRedo
,IsRedoDispatcher
,cacheEvents
struct, andnewCacheEvents
function.
- Changed
- downstreamadapter/dispatcher/redo_dispatcher.go
- Added new file
redo_dispatcher.go
definingRedoDispatcher
implementingEventDispatcher
. - Implemented methods for handling events, dispatcher status, and managing redo log writing via a
redoSink
.
- Added new file
- downstreamadapter/dispatchermanager/event_dispatcher_manager.go
- Added imports for
redo
andmessaging
. - Added fields for managing redo dispatchers (
redoTableTriggerEventDispatcher
,redoDispatcherMap
,redoMap
) and redo sink/meta (redoSink
,redoGlobalTs
,redoMeta
). - Added fields for memory quotas (
sinkQuota
,redoQuota
). - Modified
NewEventDispatcherManager
to initialize redo components and calculate quotas. - Modified
close
to close theredoSink
. - Modified
InitalizeTableTriggerEventDispatcher
to register the redo table trigger dispatcher. - Modified
newDispatchers
to create and registerRedoDispatcher
instances alongside normal ones. - Modified
removeDispatcher
to also remove the correspondingRedoDispatcher
. - Added
collectRedoTs
goroutine to report redo checkpoint TS. - Added
SetGlobalRedoTs
method.
- Added imports for
- downstreamadapter/dispatchermanager/heartbeat_collector.go
- Added import for
messaging
. - Changed generic type for
dispatcherStatusDynamicStream
toEventDispatcher
. - Added
redoTsMessageDynamicStream
and related registration/removal methods. - Modified
RecvMessages
to handleTypeRedoTsMessage
.
- Added import for
- downstreamadapter/dispatchermanager/helper.go
- Made
DispatcherMap
generic (DispatcherMap[T dispatcher.EventDispatcher]
). - Changed generic types in
newHeartBeatResponseDynamicStream
andnewHeartBeatResponseHandler
toEventDispatcher
. - Added
RedoTsMessage
struct andRedoTsMessageHandler
to process redo TS messages.
- Made
- downstreamadapter/eventcollector/dispatcher_stat.go
- Added import for
messaging
. - Added
isRedo
field todispatcherStat
. - Updated methods to use
messaging.EventServiceTopic
constant.
- Added import for
- downstreamadapter/eventcollector/event_collector.go
- Added
Redo
field toDispatcherRequest
. - Added
redoDs
dynamic stream. - Modified
New
andClose
to handleredoDs
. - Modified
AddDispatcher
andRemoveDispatcher
to use the correct dynamic stream based on dispatcher type. - Modified
WakeDispatcher
to acceptisRedo
. - Modified
mustSendDispatcherRequest
to set theRedo
flag in the protobuf message. - Modified
runProcessMessage
to route events based on theRedo
flag in the target message. - Added
getDynamicStream
helper function.
- Added
- downstreamadapter/eventcollector/event_collector_test.go
- Added methods to
mockEventDispatcher
to satisfy the updatedEventDispatcher
interface, includingGetType
andGetRedo
.
- Added methods to
- downstreamadapter/eventcollector/helper.go
- Modified
EventsHandler.Handle
to passstat.isRedo
toWakeDispatcher
.
- Modified
- downstreamadapter/sink/redo/helper.go
- Added new file
helper.go
withstatefulRts
struct and methods (likely refactored from oldredo/manager.go
).
- Added new file
- downstreamadapter/sink/redo/meta.go
- Moved and renamed from
redo/meta_manager.go
. - Renamed
metaManager
toRedoMeta
andNewMetaManager
toNewRedoMeta
. - Updated method signatures and receivers.
- Moved and renamed from
- downstreamadapter/sink/redo/sink.go
- Added new file
sink.go
definingSink
implementingsink.Sink
. - Implemented methods for managing redo log writing via a
writer.RedoLogWriter
.
- Added new file
- downstreamadapter/sink/redo/sink_test.go
- Moved and renamed from
redo/manager_test.go
. - Updated tests to use the new
redo.Sink
type and its methods.
- Moved and renamed from
- eventpb/event.pb.go
- Added
Redo
field toDispatcherRequest
protobuf message and updated generated code.
- Added
- eventpb/event.proto
- Added
bool redo = 16;
toDispatcherRequest
message definition.
- Added
- heartbeatpb/heartbeat.pb.go
- Added
RedoTsMessage
protobuf message and updated generated code.
- Added
- heartbeatpb/heartbeat.proto
- Added
RedoTsMessage
message definition.
- Added
- maintainer/maintainer.go
- Added
redoTs
field. - Modified
NewMaintainer
to initializeredoTs
. - Added
onRedoTsPersisted
method to handleRedoTsMessage
.
- Added
- maintainer/maintainer_manager.go
- Modified
recvMessages
to handleTypeRedoTsMessage
.
- Modified
- pkg/common/types.go
- Added
RedoSinkType
constant.
- Added
- pkg/config/changefeed.go
- Added
Consistent
field toChangefeedConfig
. - Modified
ToChangefeedConfig
to copy theConsistent
config.
- Added
- pkg/eventservice/dispatcher_stat.go
- Added
redo
field towrapEvent
. - Updated methods to use
messaging.EventServiceTopic
constant.
- Added
- pkg/eventservice/event_broker.go
- Modified various send functions to pass the redo status.
- Modified message processing and sending logic to handle the redo status and set the
Redo
flag inTargetMessage
.
- pkg/eventservice/event_service.go
- Added
GetRedo
method toDispatcherInfo
interface.
- Added
- pkg/eventservice/event_service_test.go
- Added
redo
field andGetRedo
method tomockDispatcherInfo
.
- Added
- pkg/messaging/message.go
- Added
TypeRedoTsMessage
constant. - Added
Redo
field toTargetMessage
. - Updated
String
,decodeIOType
, andNewSingleTargetMessage
to handle redo messages and theRedo
field.
- Added
- redo/manager.go
- Deleted file (functionality moved to new redo sink and meta components).
- redo/writer/blackhole/blackhole_log_writer.go
- Removed
FlushLog
method. - Modified
WriteEvents
to callevent.PostFlush()
.
- Removed
- redo/writer/file/file_log_writer.go
- Added imports.
- Split
logWriter
intoddlWriter
anddmlWriter
. - Modified
NewLogWriter
to create separate writers for DDL and DML. - Modified
WriteEvents
to route events to the correct writer, flush, and callevent.PostFlush()
. - Removed
FlushLog
. - Modified
Close
andisStopped
to handle both writers.
- redo/writer/memory/encoding_worker.go
- Deleted file (encoding logic moved).
- redo/writer/memory/file_worker.go
- Added imports.
- Removed
polymorphicRedoEvent
and related logic. - Modified
fileWorkerGroup
to directly receivewriter.RedoEvent
. - Updated
Run
andbgWriteLogs
to processRedoEvent
directly, including encoding, writing, flushing, and callingevent.PostFlush()
. - Modified
newFileCache
andwriteToCache
to handle raw data and commit TS.
- redo/writer/memory/mem_log_writer.go
- Added import.
- Replaced
encodeWorkers
andfileWorkers
withddlFileWorkers
anddmlFileWorkers
. - Modified
NewLogWriter
to create separate file worker groups for DDL and DML. - Modified
WriteEvents
to send events to the appropriate file worker group.
- redo/writer/writer.go
- Added
PostFlush()
andGetType()
methods toRedoEvent
interface. - Removed
FlushLog()
fromRedoLogWriter
interface.
- Added
Using Gemini Code Assist
The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.
Invoking Gemini
You can request assistance from Gemini at any point in your pull request via creating an issue comment (i.e. comment on the pull request page) using either /gemini <command>
or @gemini-code-assist <command>
. Below is a summary of the supported commands.
Feature | Command | Description |
---|---|---|
Code Review | /gemini review |
Performs a code review for the current pull request in its current state. |
Pull Request Summary | /gemini summary |
Provides a summary of the current pull request in its current state. |
Comment | @gemini-code-assist | Responds in comments when explicitly tagged, both in issue comments and review comments. |
Help | /gemini help |
Displays a list of available commands. |
Customization
To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/
folder in the base of the repository. Detailed instructions can be found here.
Limitations & Feedback
Gemini Code Assist is currently in preview and may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments to provide feedback.
You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.
A DNM stands, a work in flight,
Refactoring code, both day and night.
Redo logs flow, a separate stream,
A complex system, a software dream.
Footnotes
-
Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution. ↩
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request introduces a significant new feature related to redo logging, including a RedoDispatcher
, a redoSink
, and updates across various components to support this. The changes are extensive and touch upon core dispatcher and sink logic, event handling, and configuration.
Given the "DNM" (Do Not Merge) title, I assume this is a work-in-progress. The review focuses on correctness, potential issues, and areas for clarification. Several critical and high-severity issues have been identified, particularly concerning test adaptation, configuration management, and potential race conditions or deadlocks that need careful consideration before this can be merged.
Summary of Findings
- Test Correctness: The test file
downstreamadapter/sink/redo/sink_test.go
(renamed fromredo/manager_test.go
) is critically outdated. It attempts to use methods and fields from the oldredoManager
implementation on the newredo.Sink
type, which will cause test failures. This test suite needs a complete overhaul to align with the newSink
API and internal structure. - Configuration Management in Redo Sink: The
LogType
inwriter.LogWriterConfig
is hardcoded duringredoSink.New
initialization and subsequently mutated byfile.NewLogWriter
. This could lead to unexpected behavior if the config object is shared or itsLogType
is inspected elsewhere. A clearer separation or copying of configs for DDL and DML writers might be safer. - Dispatcher Caching Logic: The
Dispatcher.HandleEvents
caching mechanism based onredoGlobalTs
needs careful review to ensure it doesn't lead to indefinite blocking under certain conditions related to the update frequency ofredoGlobalTs
and the processing speed ofHandleCacheEvents
. - Resource Closure Race Condition: In
Dispatcher.Remove()
, closingd.cacheEvents
could panic ifHandleEvents
is concurrently blocked sending to this channel. Synchronization or a different signaling mechanism might be needed. - Flushing Strategy in File Writer:
redo/writer/file/file_log_writer.go
now flushes within everyWriteEvents
call. This change should be evaluated for performance implications, especially with frequent, small event batches. - Incomplete Redo Dispatcher Logic: The
dealWithBlockEvent
method inRedoDispatcher
is commented out, and theTypeSyncPointEvent
case in itsHandleEvents
is empty. This suggests some DDL/SyncPoint handling logic might be incomplete or intentionally simplified for the redo path. - TODO/FIXME Comments: Several TODO and FIXME comments exist in the code (e.g., regarding
redoTableTriggerEventDispatcher
bootstrap, metrics forredoDs
, checkpoint calculation inRedoDispatcher
, andLogType
hardcoding inredoSink.New
). These should be addressed.
Merge Readiness
This pull request introduces a substantial redo logging feature and is currently marked as "DNM" (Do Not Merge), which is appropriate given its work-in-progress nature. There are several critical and high-severity issues identified, particularly concerning the correctness of tests for the new redo sink, potential race conditions, and configuration management clarity. Additionally, some medium-severity concerns around dispatcher logic, flushing strategies, and incomplete functionality need to be addressed.
I recommend that these issues, especially the critical ones related to testing and potential deadlocks/races, be resolved before this PR is considered for merging. As a reviewer, I am unable to approve pull requests. The author should ensure further review and approval from other maintainers after addressing the feedback.
@gemini-code-assist[bot]: adding LGTM is restricted to approvers and reviewers in OWNERS files. In response to this:
Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. |
/test all |
/test all |
/test all |
@wk989898: The following tests failed, say
Full PR test history. Your PR dashboard. Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. I understand the commands that are listed here. |
[FORMAT CHECKER NOTIFICATION] Notice: To remove the 📖 For more info, you can check the "Contribute Code" section in the development guide. |
What problem does this PR solve?
Issue Number: close #xxx
What is changed and how it works?
Check List
Tests
Questions
Will it cause performance regression or break compatibility?
Do you need to update user documentation, design documentation or monitoring documentation?
Release note