diff --git a/libs/labelbox/src/labelbox/schema/workflow/nodes/review_node.py b/libs/labelbox/src/labelbox/schema/workflow/nodes/review_node.py index 300ccaf39..9e3dab416 100644 --- a/libs/labelbox/src/labelbox/schema/workflow/nodes/review_node.py +++ b/libs/labelbox/src/labelbox/schema/workflow/nodes/review_node.py @@ -31,6 +31,7 @@ class ReviewNode(BaseWorkflowNode): definition_id (WorkflowDefinitionId): Node type identifier (read-only) instructions (Optional[str]): Task instructions for reviewers group_assignment (Optional[Union[str, List[str], Any]]): User groups for assignment + max_contributions_per_user (Optional[int]): Maximum contributions per user (null means infinite) node_config (List[Dict[str, Any]]): API configuration for assignments Inputs: @@ -55,6 +56,7 @@ class ReviewNode(BaseWorkflowNode): >>> review = ReviewNode( ... label="Quality Review", ... group_assignment=["reviewer-group-id"], + ... max_contributions_per_user=5, ... instructions="Check annotation accuracy and completeness" ... ) >>> # Connect inputs and outputs @@ -90,6 +92,11 @@ class ReviewNode(BaseWorkflowNode): description="User group assignment for this review node. Can be a UserGroup object, a string ID, or a list of IDs.", alias="groupAssignment", ) + max_contributions_per_user: Optional[int] = Field( + default=None, + description="Maximum contributions per user (null means infinite)", + alias="maxContributionsPerUser", + ) node_config: List[Dict[str, Any]] = Field( default_factory=lambda: [], description="Contains assignment rules etc.", @@ -98,7 +105,8 @@ class ReviewNode(BaseWorkflowNode): @model_validator(mode="after") def sync_group_assignment_with_config(self) -> "ReviewNode": - """Sync group_assignment with node_config for API compatibility.""" + """Sync group_assignment and max_contributions_per_user with node_config for API compatibility.""" + # Handle group assignment (existing logic) if self.group_assignment is not None: group_ids = [] @@ -120,16 +128,100 @@ def sync_group_assignment_with_config(self) -> "ReviewNode": # Create config entries for group assignments if group_ids: # Update node_config with assignment rule in correct API format - self.node_config = [ - { - "field": "groupAssignment", - "value": group_ids, - "metadata": None, - } - ] + group_config_entry = { + "field": "groupAssignment", + "value": group_ids, + "metadata": None, + } + + # Check if group assignment entry already exists and update it, otherwise add it + updated = False + for i, entry in enumerate(self.node_config): + if entry.get("field") == "groupAssignment": + self.node_config[i] = group_config_entry + updated = True + break + + if not updated: + self.node_config.append(group_config_entry) + + # Handle max_contributions_per_user (new logic) + if self.max_contributions_per_user is not None: + # Add max contributions config entry + max_contrib_config_entry = { + "field": "maxContributionsPerUser", + "value": self.max_contributions_per_user, + "metadata": None, + } + + # Check if entry already exists and update it, otherwise add it + updated = False + for i, entry in enumerate(self.node_config): + if entry.get("field") == "maxContributionsPerUser": + self.node_config[i] = max_contrib_config_entry + updated = True + break + + if not updated: + self.node_config.append(max_contrib_config_entry) return self + def __setattr__(self, name: str, value: Any) -> None: + """Custom setter to sync field changes with node_config.""" + super().__setattr__(name, value) + + # Sync changes to node_config when max_contributions_per_user is updated + if name == "max_contributions_per_user" and hasattr( + self, "node_config" + ): + self._sync_config() + + def _sync_config(self) -> None: + """Sync max_contributions_per_user with node_config.""" + if ( + hasattr(self, "max_contributions_per_user") + and self.max_contributions_per_user is not None + ): + # Add max contributions config entry + config_entry = { + "field": "maxContributionsPerUser", + "value": self.max_contributions_per_user, + "metadata": None, + } + + # Check if entry already exists and update it, otherwise add it + updated = False + for i, entry in enumerate(self.node_config): + if entry.get("field") == "maxContributionsPerUser": + self.node_config[i] = config_entry + updated = True + break + + if not updated: + self.node_config.append(config_entry) + else: + # Remove the entry if value is None + self.node_config = [ + entry + for entry in self.node_config + if entry.get("field") != "maxContributionsPerUser" + ] + + # Sync changes back to workflow config + self._sync_to_workflow() + + def _update_node_data(self, node_data: Dict[str, Any]) -> None: + """Update individual node data in workflow config. + + Override base class to always update config field. + """ + # Call parent implementation first + super()._update_node_data(node_data) + + # Always update config field, even if empty + node_data["config"] = getattr(self, "node_config", []) + @field_validator("inputs") @classmethod def validate_inputs(cls, v) -> List[str]: diff --git a/libs/labelbox/src/labelbox/schema/workflow/workflow.py b/libs/labelbox/src/labelbox/schema/workflow/workflow.py index fa4e902fb..3c45d2ffd 100644 --- a/libs/labelbox/src/labelbox/schema/workflow/workflow.py +++ b/libs/labelbox/src/labelbox/schema/workflow/workflow.py @@ -644,6 +644,7 @@ def add_node( name: str = "Review task", instructions: Optional[str] = None, group_assignment: Optional[Union[str, List[str], Any]] = None, + max_contributions_per_user: Optional[int] = None, **kwargs, ) -> ReviewNode: ... diff --git a/libs/labelbox/src/labelbox/schema/workflow/workflow_operations.py b/libs/labelbox/src/labelbox/schema/workflow/workflow_operations.py index 192f34f8c..c0fcb9b0c 100644 --- a/libs/labelbox/src/labelbox/schema/workflow/workflow_operations.py +++ b/libs/labelbox/src/labelbox/schema/workflow/workflow_operations.py @@ -255,6 +255,7 @@ def add_node( name: str = "Review task", instructions: Optional[str] = None, group_assignment: Optional[Union[str, List[str], Any]] = None, + max_contributions_per_user: Optional[int] = None, **kwargs: Any, ) -> ReviewNode: ...