@@ -31,6 +31,7 @@ class ReviewNode(BaseWorkflowNode):
31
31
definition_id (WorkflowDefinitionId): Node type identifier (read-only)
32
32
instructions (Optional[str]): Task instructions for reviewers
33
33
group_assignment (Optional[Union[str, List[str], Any]]): User groups for assignment
34
+ max_contributions_per_user (Optional[int]): Maximum contributions per user (null means infinite)
34
35
node_config (List[Dict[str, Any]]): API configuration for assignments
35
36
36
37
Inputs:
@@ -55,6 +56,7 @@ class ReviewNode(BaseWorkflowNode):
55
56
>>> review = ReviewNode(
56
57
... label="Quality Review",
57
58
... group_assignment=["reviewer-group-id"],
59
+ ... max_contributions_per_user=5,
58
60
... instructions="Check annotation accuracy and completeness"
59
61
... )
60
62
>>> # Connect inputs and outputs
@@ -90,6 +92,11 @@ class ReviewNode(BaseWorkflowNode):
90
92
description = "User group assignment for this review node. Can be a UserGroup object, a string ID, or a list of IDs." ,
91
93
alias = "groupAssignment" ,
92
94
)
95
+ max_contributions_per_user : Optional [int ] = Field (
96
+ default = None ,
97
+ description = "Maximum contributions per user (null means infinite)" ,
98
+ alias = "maxContributionsPerUser" ,
99
+ )
93
100
node_config : List [Dict [str , Any ]] = Field (
94
101
default_factory = lambda : [],
95
102
description = "Contains assignment rules etc." ,
@@ -98,7 +105,8 @@ class ReviewNode(BaseWorkflowNode):
98
105
99
106
@model_validator (mode = "after" )
100
107
def sync_group_assignment_with_config (self ) -> "ReviewNode" :
101
- """Sync group_assignment with node_config for API compatibility."""
108
+ """Sync group_assignment and max_contributions_per_user with node_config for API compatibility."""
109
+ # Handle group assignment (existing logic)
102
110
if self .group_assignment is not None :
103
111
group_ids = []
104
112
@@ -120,16 +128,100 @@ def sync_group_assignment_with_config(self) -> "ReviewNode":
120
128
# Create config entries for group assignments
121
129
if group_ids :
122
130
# Update node_config with assignment rule in correct API format
123
- self .node_config = [
124
- {
125
- "field" : "groupAssignment" ,
126
- "value" : group_ids ,
127
- "metadata" : None ,
128
- }
129
- ]
131
+ group_config_entry = {
132
+ "field" : "groupAssignment" ,
133
+ "value" : group_ids ,
134
+ "metadata" : None ,
135
+ }
136
+
137
+ # Check if group assignment entry already exists and update it, otherwise add it
138
+ updated = False
139
+ for i , entry in enumerate (self .node_config ):
140
+ if entry .get ("field" ) == "groupAssignment" :
141
+ self .node_config [i ] = group_config_entry
142
+ updated = True
143
+ break
144
+
145
+ if not updated :
146
+ self .node_config .append (group_config_entry )
147
+
148
+ # Handle max_contributions_per_user (new logic)
149
+ if self .max_contributions_per_user is not None :
150
+ # Add max contributions config entry
151
+ max_contrib_config_entry = {
152
+ "field" : "maxContributionsPerUser" ,
153
+ "value" : self .max_contributions_per_user ,
154
+ "metadata" : None ,
155
+ }
156
+
157
+ # Check if entry already exists and update it, otherwise add it
158
+ updated = False
159
+ for i , entry in enumerate (self .node_config ):
160
+ if entry .get ("field" ) == "maxContributionsPerUser" :
161
+ self .node_config [i ] = max_contrib_config_entry
162
+ updated = True
163
+ break
164
+
165
+ if not updated :
166
+ self .node_config .append (max_contrib_config_entry )
130
167
131
168
return self
132
169
170
+ def __setattr__ (self , name : str , value : Any ) -> None :
171
+ """Custom setter to sync field changes with node_config."""
172
+ super ().__setattr__ (name , value )
173
+
174
+ # Sync changes to node_config when max_contributions_per_user is updated
175
+ if name == "max_contributions_per_user" and hasattr (
176
+ self , "node_config"
177
+ ):
178
+ self ._sync_config ()
179
+
180
+ def _sync_config (self ) -> None :
181
+ """Sync max_contributions_per_user with node_config."""
182
+ if (
183
+ hasattr (self , "max_contributions_per_user" )
184
+ and self .max_contributions_per_user is not None
185
+ ):
186
+ # Add max contributions config entry
187
+ config_entry = {
188
+ "field" : "maxContributionsPerUser" ,
189
+ "value" : self .max_contributions_per_user ,
190
+ "metadata" : None ,
191
+ }
192
+
193
+ # Check if entry already exists and update it, otherwise add it
194
+ updated = False
195
+ for i , entry in enumerate (self .node_config ):
196
+ if entry .get ("field" ) == "maxContributionsPerUser" :
197
+ self .node_config [i ] = config_entry
198
+ updated = True
199
+ break
200
+
201
+ if not updated :
202
+ self .node_config .append (config_entry )
203
+ else :
204
+ # Remove the entry if value is None
205
+ self .node_config = [
206
+ entry
207
+ for entry in self .node_config
208
+ if entry .get ("field" ) != "maxContributionsPerUser"
209
+ ]
210
+
211
+ # Sync changes back to workflow config
212
+ self ._sync_to_workflow ()
213
+
214
+ def _update_node_data (self , node_data : Dict [str , Any ]) -> None :
215
+ """Update individual node data in workflow config.
216
+
217
+ Override base class to always update config field.
218
+ """
219
+ # Call parent implementation first
220
+ super ()._update_node_data (node_data )
221
+
222
+ # Always update config field, even if empty
223
+ node_data ["config" ] = getattr (self , "node_config" , [])
224
+
133
225
@field_validator ("inputs" )
134
226
@classmethod
135
227
def validate_inputs (cls , v ) -> List [str ]:
0 commit comments