Skip to content

Add workflow management #1975

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: develop
Choose a base branch
from

Conversation

paulnoirel
Copy link
Contributor

@paulnoirel paulnoirel commented May 29, 2025

Description

This PR adds workflow management to the SDK.

Note: the verb "create" is used abusively since one doesn't create a workflow but updates it either by removing all nodes first (reset_config()) or by changing the existing configuration.

⚠️ As it is, workflow management doesn't check any data but only the validity of the workflow. This is to avoid adding even more complexity and performance issues.

⚠️ Using project.get_workflow() and project.clone_workflow_from() will raise a warning:

project.py:1714: UserWarning: Workflow Management is currently in alpha and its behavior may change in future releases.
  • Copy from another project:
project_target_id = "cm4vs2ic90ey0072v01e33isl"
project_source_id = "cm9lfjx010ddc07432s3a0zvo"
project_source = client.get_project(project_source_id)
project_target = client.get_project(project_target_id)

project_target.clone_workflow_from(project_source.uid)
  • Create a minimal workflow
image
import labelbox as lb
from labelbox.schema.workflow import NodeType

project_id = "cm37vxyth01fu07xu2ejc6j2f"

client = lb.Client(API_KEY)
project = client.get_project(project_id)

workflow = project.get_workflow()

workflow.reset_config()

initial_labeling = workflow.add_node(type=NodeType.InitialLabeling)

initial_rework = workflow.add_node(type=NodeType.InitialRework)
done = workflow.add_node(type=NodeType.Done)

workflow.add_edge(initial_labeling, done)
workflow.add_edge(initial_rework, done)

if not workflow.check_validity().get("errors"):
   workflow.update_config(reposition=True)
  • Create a more complex workflow
image
import labelbox as lb
from labelbox.schema.workflow import (
    NodeType, 
    NodeOutput, 
    ProjectWorkflowFilter,
    created_by,
    metadata,
    sample,
    labeled_at,
    mp_condition,
    m_condition,
    labeling_time,
    review_time,
    issue_category,
    batch,
    dataset,
    annotation,
    consensus_average,
    model_prediction,
    natural_language,
    feature_consensus_average
)

from labelbox.schema.workflow.enums import IndividualAssignment
from labelbox.schema.workflow.enums import MatchFilters
from datetime import datetime


client = lb.Client(API_KEY)

project_id = "cm37vxyth01fu07xu2ejc6j2f"
project = client.get_project(project_id)

# Get workflow and reset config
workflow = project.get_workflow()
workflow.reset_config()

# Create nodes
initial_labeling = workflow.add_node(
    type=NodeType.InitialLabeling,
    instructions="This is the entry point"
)

initial_rework = workflow.add_node(type=NodeType.InitialRework,
                                individual_assignment=IndividualAssignment.LabelCreator)

initial_review = workflow.add_node(
    type=NodeType.Review,
    name="Initial review task",
    group_assignment=["63a6a360-baa8-11ec-aedb-2592d52c761e",
                  "b3f89430-ea3a-11ef-b2a5-e1807377f8af"]
)

logic = workflow.add_node(
    type=NodeType.Logic,
    name="Logic node",
    match_filters=MatchFilters.Any,
    filters=ProjectWorkflowFilter([
        created_by(["cly7gzohg07zz07v5fqs63zmx", "cl7k7a9x1764808vk6bm1hf8e", "ckruht2sob6xj0ybh7bu46mgo"]),
        metadata([m_condition.contains("clo8t1njt00j807zkh5wz9uyt", ["test"])]),
        sample(23),
        labeled_at.between(datetime(2024, 3, 9, 5, 5, 42), datetime(2025, 4, 28, 13, 5, 42)),
        labeling_time.greater_than(1000),
        review_time.less_than_or_equal(100),
        issue_category(["cmbgs41zu0k5u07y49o9p54o9"]),
        batch.is_one_of(["ad210540-9d58-11ef-87dd-8501c518f349"]),
        dataset(["cm37vyets000z072314wxgt0l"]),
        annotation(["cm37w0e0500lf0709ba7c42m9"]),
        consensus_average(0.17, 0.61),
        model_prediction([mp_condition.is_one_of(["cm17qumj801ll07093toq47x3"], 1),
                          mp_condition.is_not_one_of(["cm4lbh7fv07q00709ewfk2b0o"], 2, 6),
                          mp_condition.is_none()]),
        natural_language("Birds in the sky/Blue sky/clouds/0.5", 0.178, 0.768),
        feature_consensus_average(0.17, 0.67, ["cm37w0e0500lf0709ba7c42m9"])
    ])
)

done = workflow.add_node(type=NodeType.Done)

custom_rework_1 = workflow.add_node(
    type=NodeType.CustomRework,
    name="Custom Rework 1",
    individual_assignment=IndividualAssignment.LabelCreator,
        group_assignment=["63a6a360-baa8-11ec-aedb-2592d52c761e",
                  "b3f89430-ea3a-11ef-b2a5-e1807377f8af"]
)

review_2 = workflow.add_node(
    type=NodeType.Review,
    name="Review 2"
)

rework = workflow.add_node(
    type=NodeType.Rework,
    name="To rework"
)

custom_rework_2 = workflow.add_node(
    type=NodeType.CustomRework,
    name="Custom Rework 2",
    instructions="test"
)

done_2 = workflow.add_node(
    type=NodeType.Done,
    name="Well done"
)

# Create edges
workflow.add_edge(initial_labeling, initial_review)
workflow.add_edge(initial_rework, initial_review)
workflow.add_edge(initial_review, logic, NodeOutput.Approved)
workflow.add_edge(logic, done, NodeOutput.If)
workflow.add_edge(logic, custom_rework_1, NodeOutput.Else)
workflow.add_edge(initial_review, review_2, NodeOutput.Rejected)
workflow.add_edge(review_2, rework, NodeOutput.Rejected)
workflow.add_edge(review_2, custom_rework_2, NodeOutput.Approved)
workflow.add_edge(custom_rework_2, done_2)

if not (err := workflow.check_validity().get("errors")):
    workflow.update_config(reposition=True)
else:
    print(err)
  • Update existing workflow
from labelbox.schema.workflow.enums import FilterField
#from labelbox.schema.workflow.enums import WorkflowDefinitionId

workflow = project.get_workflow()

# Check nodes to 
workflow.get_nodes() # check nodes

logic = workflow.get_node_by_id("0359113a-6081-4f48-83d1-175062a0259b")
# logic = next(
#     node for node in workflow.get_nodes()
#     if node.definition_id == WorkflowDefinitionId.Logic
# )

# Change node name (for all nodes but initial ones)
logic.name = "My Logic"

logic.remove_filter(FilterField.ModelPrediction)

# Apply changes
workflow.update_config()

# re-add filter
logic.add_filter(
    model_prediction([
        mp_condition.is_none()
    ])
)
# Apply changes
workflow.update_config()

Results:
image

Fixes # (issue)

Type of change

Please delete options that are not relevant.

  • Bug fix (non-breaking change which fixes an issue)
  • New feature (non-breaking change which adds functionality)
  • Breaking change (fix or feature that would cause existing functionality to not work as expected)
  • Document change (fix typo or modifying any markdown files, code comments or anything in the examples folder only)

All Submissions

  • Have you followed the guidelines in our Contributing document?
  • Have you provided a description?
  • Are your changes properly formatted?

New Feature Submissions

  • Does your submission pass tests?
  • Have you added thorough tests for your new feature?
  • Have you commented your code, particularly in hard-to-understand areas?
  • Have you added a Docstring?

Changes to Core Features

  • Have you written new tests for your core changes, as applicable?
  • Have you successfully run tests with your changes locally?
  • Have you updated any code comments, as applicable?

@@ -59,6 +59,7 @@
ProjectOverview,
ProjectOverviewDetailed,
)
from labelbox.schema.workflow import ProjectWorkflow
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The local imports below should not be necessary if provided here.

In general, it would be great to not have any local imports, if it's to avoid a circular dependency perhaps we can see if it's easy to resolve and perhaps mark as a TODO if not.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed a few and add the TODOs after several attempts.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would move this into nodes.py as it's the base class for all nodes in the graph.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code refactored. I keep this one isolated.

"ReviewTime": "review_time",
"NlSearch": "natural_language",
"LabelFeedback": "label_feedback",
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should be able to create this mapping from ALIAS_TO_BACKEND_KEY to avoid duplication.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code refactored

}


def normalize_filter_rule(rule: Dict[str, Any]) -> Dict[str, Any]:
Copy link
Contributor

@mrobers1982 mrobers1982 May 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I might have considered using enums more generally (as opposed to strings) to minimize the need for translating between formats - enums are also easier to use from a readability / usability standpoint.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Implementation refactored

label: str = Field(default="Initial labeling task", frozen=True)
filter_logic: Literal["and", "or"] = Field(
default="and", alias="filterLogic"
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this only applies to a logical node, perhaps it should be defined as frozen on the base type (or ideally only defined on the logical node).

return self._edge_factory

@property
def WorkflowEdge(self):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer create_edge, also it's a bit unorthodox to have a property return a callable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code refactored

return node

# Node creation methods
def InitialLabelingNode(self, **kwargs) -> InitialLabelingNode:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Personally, I prefer create_initial_labeling_node or verb-based method names - this is a bit unorthodox but it's fine if this a convention used by the SDK.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code refactored

# For any other type, return None to avoid serialization errors
return None

def reset_config(self) -> "ProjectWorkflow":
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: We might want to consider creating the default nodes as part of this process - I'll leave it up to you.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't do it to ensure that user could customize their initial nodes even if, currently, only the initial labeling node can receive instructions. The current approach makes it easier if we decide to add more customization in the future.

@paulnoirel paulnoirel force-pushed the pno/PLT-2503-workflow-management branch from 723b059 to 62da8c5 Compare June 3, 2025 14:40
@paulnoirel paulnoirel force-pushed the pno/PLT-2503-workflow-management branch from 62da8c5 to 4c95356 Compare June 5, 2025 15:36
@paulnoirel paulnoirel force-pushed the pno/PLT-2503-workflow-management branch from 4c95356 to bfa1df9 Compare June 5, 2025 16:21
@paulnoirel paulnoirel requested a review from mrobers1982 June 5, 2025 16:59
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants