From 75be068f8d87e9e1b04ef71b28bc60ed117e1a55 Mon Sep 17 00:00:00 2001 From: Sivaselvan32 Date: Wed, 21 Jan 2026 22:19:56 +0530 Subject: [PATCH] jsonapi-pydantic library --- src/pytfe/jsonapi/__init__.py | 11 + src/pytfe/jsonapi/metadata.py | 138 +++++++++++++ src/pytfe/jsonapi/types.py | 81 ++++++++ src/pytfe/jsonapi/unmarshaler.py | 288 +++++++++++++++++++++++++++ src/pytfe/models/organization.py | 5 +- src/pytfe/models/project.py | 4 +- src/pytfe/models/workspace.py | 231 +++++++++++++-------- src/pytfe/resources/run_trigger.py | 8 +- src/pytfe/resources/variable_sets.py | 13 +- src/pytfe/resources/workspaces.py | 251 +++++++++-------------- tests/units/test_run.py | 4 +- tests/units/test_run_trigger.py | 8 +- tests/units/test_workspaces.py | 30 ++- 13 files changed, 787 insertions(+), 285 deletions(-) create mode 100644 src/pytfe/jsonapi/__init__.py create mode 100644 src/pytfe/jsonapi/metadata.py create mode 100644 src/pytfe/jsonapi/types.py create mode 100644 src/pytfe/jsonapi/unmarshaler.py diff --git a/src/pytfe/jsonapi/__init__.py b/src/pytfe/jsonapi/__init__.py new file mode 100644 index 0000000..f031e83 --- /dev/null +++ b/src/pytfe/jsonapi/__init__.py @@ -0,0 +1,11 @@ +"""JSON:API unmarshaling library for python-tfe.""" + +from .types import IncludedIndex, JSONAPIResponse +from .unmarshaler import unmarshal_many_payload, unmarshal_payload + +__all__ = [ + "unmarshal_payload", + "unmarshal_many_payload", + "JSONAPIResponse", + "IncludedIndex", +] diff --git a/src/pytfe/jsonapi/metadata.py b/src/pytfe/jsonapi/metadata.py new file mode 100644 index 0000000..036e91d --- /dev/null +++ b/src/pytfe/jsonapi/metadata.py @@ -0,0 +1,138 @@ +"""Field metadata extractors for Pydantic models.""" + +import inspect +from typing import Any, get_args, get_origin, get_type_hints + +from pydantic import BaseModel +from pydantic.fields import FieldInfo + + +class FieldMetadata: + """Metadata about a Pydantic model field.""" + + def __init__( + self, + field_name: str, + field_type: type, + jsonapi_type: str | None = None, + jsonapi_name: str | None = None, + is_optional: bool = False, + is_list: bool = False, + inner_type: type | None = None, + ): + self.field_name = field_name + self.field_type = field_type + self.jsonapi_type = jsonapi_type or self._infer_jsonapi_type() + self.jsonapi_name = jsonapi_name or self._convert_to_jsonapi_name(field_name) + self.is_optional = is_optional + self.is_list = is_list + self.inner_type = inner_type + + def _infer_jsonapi_type(self) -> str: + """Infer JSON:API type from field name patterns.""" + if self.field_name == "id": + return "primary" + return "attribute" + + def _convert_to_jsonapi_name(self, field_name: str) -> str: + """Convert Python field name to JSON:API name (snake_case to kebab-case).""" + return field_name.replace("_", "-") + + +def get_model_metadata(model_class: type[BaseModel]) -> dict[str, FieldMetadata]: + """Extract metadata for all fields in a Pydantic model. + + Returns: + Dict mapping field name to FieldMetadata + """ + metadata: dict[str, FieldMetadata] = {} + + # Get type hints + try: + type_hints = get_type_hints(model_class, include_extras=True) + except Exception: + type_hints = {} + + # Iterate through model fields + for field_name, field_info in model_class.model_fields.items(): + field_type = type_hints.get(field_name, field_info.annotation) + + # Check for Field metadata + jsonapi_type = None + jsonapi_name = None + + if isinstance(field_info, FieldInfo): + # Extract custom metadata from Field() + if field_info.json_schema_extra and isinstance( + field_info.json_schema_extra, dict + ): + jsonapi_type = field_info.json_schema_extra.get("jsonapi_type") + jsonapi_name = field_info.json_schema_extra.get("jsonapi_name") + else: + jsonapi_type = None + jsonapi_name = None + + # If no explicit jsonapi_name, use the Pydantic alias if available + if not jsonapi_name and field_info.alias: + jsonapi_name = field_info.alias + + # Handle Optional types + is_optional = False + is_list = False + inner_type = field_type + + origin = get_origin(field_type) + args = get_args(field_type) + + # Check for Optional (Union with None) - handles both Optional[X] and X | None + import types + + if origin is types.UnionType or ( + hasattr(types, "Union") and origin is getattr(types, "Union", None) + ): + if type(None) in args: + is_optional = True + # Get the non-None type + inner_type = next( + (arg for arg in args if arg is not type(None)), field_type + ) + + # Check for List + if get_origin(inner_type) is list: + is_list = True + list_args = get_args(inner_type) + if list_args: + inner_type = list_args[0] + + # Ensure proper types for FieldMetadata + jsonapi_type_str: str | None = None + if jsonapi_type is not None: + jsonapi_type_str = ( + str(jsonapi_type) if not isinstance(jsonapi_type, str) else jsonapi_type + ) + + jsonapi_name_str: str | None = None + if jsonapi_name is not None: + jsonapi_name_str = ( + str(jsonapi_name) if not isinstance(jsonapi_name, str) else jsonapi_name + ) + + metadata[field_name] = FieldMetadata( + field_name=field_name, + field_type=field_type, # type: ignore[arg-type] + jsonapi_type=jsonapi_type_str, + jsonapi_name=jsonapi_name_str, + is_optional=is_optional, + is_list=is_list, + inner_type=inner_type, + ) + + return metadata + + +def is_pydantic_model(obj: Any) -> bool: + """Check if object is a Pydantic model class.""" + try: + return inspect.isclass(obj) and issubclass(obj, BaseModel) + except TypeError: + return False diff --git a/src/pytfe/jsonapi/types.py b/src/pytfe/jsonapi/types.py new file mode 100644 index 0000000..5336468 --- /dev/null +++ b/src/pytfe/jsonapi/types.py @@ -0,0 +1,81 @@ +from typing import Any, Generic, TypeVar + +from pydantic import BaseModel + +T = TypeVar("T", bound=BaseModel) + + +class JSONAPINode: + """Represents a JSON:API resource object node.""" + + def __init__(self, data: dict[str, Any]): + self.id: str = data.get("id", "") + self.type: str = data.get("type", "") + self.attributes: dict[str, Any] = data.get("attributes", {}) + self.relationships: dict[str, Any] = data.get("relationships", {}) + self.links: dict[str, Any] | None = data.get("links") + self.meta: dict[str, Any] | None = data.get("meta") + self._raw_data = data + + def get_relationship_linkage( + self, rel_name: str + ) -> dict[str, Any] | list[dict[str, Any]] | None: + """Extract relationship linkage data (type and id).""" + if not self.relationships or rel_name not in self.relationships: + return None + + rel_data = self.relationships[rel_name].get("data") + if rel_data is None: + return None + # Can be dict or list of dicts based on relationship type + return rel_data # type: ignore[no-any-return] + + +class IncludedIndex: + """Index for fast lookup of included resources.""" + + def __init__(self, included: list[dict[str, Any]] | None = None): + self._index: dict[tuple[str, str], JSONAPINode] = {} + + if included: + for item in included: + node = JSONAPINode(item) + if node.type and node.id: + key = (node.type, node.id) + self._index[key] = node + + def get(self, resource_type: str, resource_id: str) -> JSONAPINode | None: + """Lookup a resource by type and id.""" + return self._index.get((resource_type, resource_id)) + + def resolve_relationship( + self, rel_data: dict[str, Any] | None + ) -> JSONAPINode | None: + """Resolve a relationship linkage to full node.""" + if not rel_data or not isinstance(rel_data, dict): + return None + + resource_type = rel_data.get("type") + resource_id = rel_data.get("id") + + if not resource_type or not resource_id: + return None + + return self.get(resource_type, resource_id) + + +class JSONAPIResponse(Generic[T]): + """Complete JSON:API response with data and included.""" + + def __init__(self, response_dict: dict[str, Any]): + self.data: dict[str, Any] | list[dict[str, Any]] = response_dict.get("data", {}) + self.included: list[dict[str, Any]] = response_dict.get("included", []) + self.links: dict[str, Any] | None = response_dict.get("links") + self.meta: dict[str, Any] | None = response_dict.get("meta") + + # Build included index + self.included_index = IncludedIndex(self.included) + + def is_collection(self) -> bool: + """Check if data is a collection (list) or single resource.""" + return isinstance(self.data, list) diff --git a/src/pytfe/jsonapi/unmarshaler.py b/src/pytfe/jsonapi/unmarshaler.py new file mode 100644 index 0000000..f1a9fef --- /dev/null +++ b/src/pytfe/jsonapi/unmarshaler.py @@ -0,0 +1,288 @@ +"""Core unmarshaling functions""" + +from typing import Any, TypeVar + +from pydantic import BaseModel + +from .metadata import FieldMetadata, get_model_metadata, is_pydantic_model +from .types import IncludedIndex, JSONAPINode, JSONAPIResponse + +T = TypeVar("T", bound=BaseModel) + + +def unmarshal_payload(response_dict: dict[str, Any], model_class: type[T]) -> T: + """Unmarshal a single resource JSON:API response into a Pydantic model. + + Equivalent to jsonapi.UnmarshalPayload() in Go. + + Args: + response_dict: Full JSON:API response dictionary + model_class: Target Pydantic model class + + Returns: + Instance of model_class with data populated from response + + Example: + >>> response = requests.get("/api/v2/workspaces/ws-123").json() + >>> workspace = unmarshal_payload(response, Workspace) + >>> print(workspace.name) + >>> print(workspace.project.name if workspace.project else "No project") + """ + jsonapi_response: JSONAPIResponse = JSONAPIResponse(response_dict) + + if jsonapi_response.is_collection(): + raise ValueError("Expected single resource, got collection") + + # Type narrowing for mypy + assert isinstance(jsonapi_response.data, dict), "Expected data to be a dict" + + data_node = JSONAPINode(jsonapi_response.data) + + return unmarshal_node(data_node, model_class, jsonapi_response.included_index) + + +def unmarshal_many_payload( + response_dict: dict[str, Any], model_class: type[T] +) -> list[T]: + """Unmarshal a collection JSON:API response into list of Pydantic models. + + Equivalent to jsonapi.UnmarshalManyPayload() in Go. + + Args: + response_dict: Full JSON:API response dictionary + model_class: Target Pydantic model class + + Returns: + List of model_class instances + """ + jsonapi_response: JSONAPIResponse = JSONAPIResponse(response_dict) + + if not jsonapi_response.is_collection(): + raise ValueError("Expected collection, got single resource") + + models: list[T] = [] + + # After is_collection() check, data must be a list + # mypy has trouble inferring this, so we assert + data_list = jsonapi_response.data + if not isinstance(data_list, list): # pragma: no cover + raise ValueError("Expected data to be a list") + + for data_item in data_list: + if not isinstance(data_item, dict): + continue # type: ignore[unreachable] + data_node = JSONAPINode(data_item) + model = unmarshal_node(data_node, model_class, jsonapi_response.included_index) + models.append(model) + + return models + + +def unmarshal_node( + node: JSONAPINode, model_class: type[T], included_index: IncludedIndex | None = None +) -> T: + """Recursively unmarshal a JSON:API node into a Pydantic model. + + This is the core recursive function - equivalent to unmarshalNode() in Go. + + Args: + node: JSON:API resource node + model_class: Target Pydantic model class + included_index: Index of included resources for relationship resolution + + Returns: + Instance of model_class with all fields populated + """ + # Get metadata for all fields in the model + field_metadata = get_model_metadata(model_class) + + # Dictionary to collect values for Pydantic model construction + model_data: dict[str, Any] = {} + + # Process each field + for field_name, meta in field_metadata.items(): + if meta.jsonapi_type == "primary": + # Primary ID field + model_data[field_name] = node.id + + elif meta.jsonapi_type == "attribute": + # Attribute field - extract from node.attributes + value = node.attributes.get(meta.jsonapi_name) + if value is not None: + model_data[field_name] = value + + elif meta.jsonapi_type == "relation": + # Full relationship object - resolve from included + model_data[field_name] = unmarshal_relationship( + node, meta, included_index, is_many=meta.is_list + ) + + elif meta.jsonapi_type == "polyrelation": + # Polymorphic relationship - resolve based on type + model_data[field_name] = unmarshal_polyrelation(node, meta, included_index) + + # Construct the Pydantic model with collected data + return model_class(**model_data) + + +def unmarshal_relationship( + node: JSONAPINode, + field_meta: FieldMetadata, + included_index: IncludedIndex | None, + is_many: bool = False, +) -> Any: + """Unmarshal a relationship field. + + Equivalent to the relationship processing in Go's unmarshalNode. + + Args: + node: Parent resource node + field_meta: Metadata about the relationship field + included_index: Index of included resources + is_many: True if this is a to-many relationship (list) + + Returns: + Resolved relationship object(s) or None + """ + rel_linkage = node.get_relationship_linkage(field_meta.jsonapi_name) + + if not rel_linkage: + return None if not is_many else [] + + if is_many: + # To-many relationship - list of resources + if not isinstance(rel_linkage, list): + return [] + + resolved_items = [] + inner_type = field_meta.inner_type + if inner_type is None: + return [] + for linkage_item in rel_linkage: + resolved = resolve_relationship_linkage( + linkage_item, inner_type, included_index + ) + if resolved: + resolved_items.append(resolved) + + return resolved_items + else: + # To-one relationship - single resource + if not isinstance(rel_linkage, dict): + return None + inner_type = field_meta.inner_type + if inner_type is None: + return None + return resolve_relationship_linkage(rel_linkage, inner_type, included_index) + + +def resolve_relationship_linkage( + linkage: dict[str, Any], + target_model_class: type, + included_index: IncludedIndex | None, +) -> Any: + """Resolve a relationship linkage to a full object. + + This is equivalent to Go's fullNode() + recursive unmarshalNode(). + + Args: + linkage: Relationship data with type and id + target_model_class: The model class to instantiate + included_index: Index of included resources + + Returns: + Resolved model instance or stub with only ID + """ + if isinstance(linkage, dict): + # Try to get full node from included index + full_node = None + if included_index: + full_node = included_index.resolve_relationship(linkage) + + if full_node: + # Found in included array - recursively unmarshal full object + if is_pydantic_model(target_model_class): + return unmarshal_node(full_node, target_model_class, included_index) + else: + # Not a Pydantic model, return as-is + return full_node._raw_data + else: + # Not in included - create stub with only ID + stub_data = {"id": linkage.get("id")} + + if is_pydantic_model(target_model_class): + try: + return target_model_class(**stub_data) + except Exception: + return None + else: + return stub_data + else: + return None # type: ignore[unreachable] + + +def unmarshal_polyrelation( + node: JSONAPINode, field_meta: FieldMetadata, included_index: IncludedIndex | None +) -> Any: + """Unmarshal a polymorphic relationship (choice type). + + Equivalent to Go's polyrelation handling with choiceStructMapping. + + Args: + node: Parent resource node + field_meta: Metadata about the polyrelation field + included_index: Index of included resources + + Returns: + Choice object with appropriate field populated + """ + rel_linkage = node.get_relationship_linkage(field_meta.jsonapi_name) + + if not rel_linkage or not isinstance(rel_linkage, dict): + return None + + resource_type = rel_linkage.get("type") + + # Get the choice class + choice_class = field_meta.inner_type + + if not is_pydantic_model(choice_class): + return None + + # Get full node from included + full_node = None + if included_index: + full_node = included_index.resolve_relationship(rel_linkage) + + if not full_node: + # No included data, create empty choice + if choice_class is None: + return None + return choice_class() + + # Find which field in the choice class matches this type + if choice_class is None: + return None + choice_metadata = get_model_metadata(choice_class) + + for choice_field_name, choice_field_meta in choice_metadata.items(): + # Check if this field's type matches the resource type + field_type = choice_field_meta.inner_type or choice_field_meta.field_type + + if is_pydantic_model(field_type): + # Get the jsonapi type from the target model + target_metadata = get_model_metadata(field_type) + primary_field = next( + (m for m in target_metadata.values() if m.jsonapi_type == "primary"), + None, + ) + + if primary_field and primary_field.jsonapi_name == resource_type: + # This is the matching field! + resolved_obj: Any = unmarshal_node( + full_node, field_type, included_index + ) + return choice_class(**{choice_field_name: resolved_obj}) + + # No matching field found, return empty choice + return choice_class() diff --git a/src/pytfe/models/organization.py b/src/pytfe/models/organization.py index 21e40b2..e2b430f 100644 --- a/src/pytfe/models/organization.py +++ b/src/pytfe/models/organization.py @@ -81,7 +81,10 @@ class Organization(BaseModel): default_execution_mode: str | None = None email: str | None = None external_id: str | None = None - id: str | None = None + id: str | None = Field( + None, + json_schema_extra={"jsonapi_type": "primary", "jsonapi_name": "organizations"}, + ) is_unified: bool | None = None owners_team_saml_role_id: str | None = None permissions: dict | None = None diff --git a/src/pytfe/models/project.py b/src/pytfe/models/project.py index 3f4b9c6..f1e5e4f 100644 --- a/src/pytfe/models/project.py +++ b/src/pytfe/models/project.py @@ -6,7 +6,9 @@ class Project(BaseModel): - id: str + id: str = Field( + json_schema_extra={"jsonapi_type": "primary", "jsonapi_name": "projects"} + ) name: str | None = None description: str = "" organization: str | None = None diff --git a/src/pytfe/models/workspace.py b/src/pytfe/models/workspace.py index dca54b0..7839774 100644 --- a/src/pytfe/models/workspace.py +++ b/src/pytfe/models/workspace.py @@ -4,80 +4,124 @@ from enum import Enum from typing import Any -from pydantic import BaseModel, Field +from pydantic import BaseModel, ConfigDict, Field +from .agent import AgentPool from .common import EffectiveTagBinding, Pagination, Tag, TagBinding -from .data_retention_policy import DataRetentionPolicy, DataRetentionPolicyChoice -from .organization import ExecutionMode +from .data_retention_policy import DataRetentionPolicyChoice +from .organization import ExecutionMode, Organization from .project import Project class Workspace(BaseModel): - id: str - name: str | None = None - organization: str | None = None - execution_mode: ExecutionMode | None = None - project_id: str | None = None + model_config = ConfigDict(populate_by_name=True, validate_by_name=True) + + id: str = Field( + json_schema_extra={"jsonapi_type": "primary", "jsonapi_name": "workspaces"} + ) + name: str | None = Field(None, alias="name") # Core attributes - actions: WorkspaceActions | None = None - allow_destroy_plan: bool = False - assessments_enabled: bool = False - auto_apply: bool = False - auto_apply_run_trigger: bool = False - auto_destroy_at: datetime | None = None - auto_destroy_activity_duration: str | None = None - can_queue_destroy_plan: bool = False - created_at: datetime | None = None - description: str = "" - environment: str = "" - file_triggers_enabled: bool = False - global_remote_state: bool = False - inherits_project_auto_destroy: bool = False - locked: bool = False - migration_environment: str = "" - no_code_upgrade_available: bool = False - operations: bool = False - permissions: WorkspacePermissions | None = None - queue_all_runs: bool = False - speculative_enabled: bool = False - source: WorkspaceSource | None = None - source_name: str = "" - source_url: str = "" - structured_run_output_enabled: bool = False - terraform_version: str = "" - trigger_prefixes: list[str] = Field(default_factory=list) - trigger_patterns: list[str] = Field(default_factory=list) - vcs_repo: VCSRepo | None = None - working_directory: str = "" - updated_at: datetime | None = None - resource_count: int = 0 - apply_duration_average: float | None = None # in seconds - plan_duration_average: float | None = None # in seconds - policy_check_failures: int = 0 - run_failures: int = 0 - runs_count: int = 0 - tag_names: list[str] = Field(default_factory=list) - setting_overwrites: WorkspaceSettingOverwrites | None = None + actions: WorkspaceActions | None = Field(None, alias="actions") + allow_destroy_plan: bool | None = Field(None, alias="allow-destroy-plan") + assessments_enabled: bool | None = Field(None, alias="assessments-enabled") + auto_apply: bool | None = Field(None, alias="auto-apply") + auto_apply_run_trigger: bool | None = Field(None, alias="auto-apply-run-trigger") + auto_destroy_at: datetime | None = Field(None, alias="auto-destroy-at") + auto_destroy_activity_duration: str | None = Field( + None, alias="auto-destroy-activity-duration" + ) + can_queue_destroy_plan: bool | None = Field(None, alias="can-queue-destroy-plan") + created_at: datetime | None = Field(None, alias="created-at") + description: str | None = Field(None, alias="description") + environment: str | None = Field(None, alias="environment") + execution_mode: ExecutionMode | None = Field(None, alias="execution-mode") + file_triggers_enabled: bool | None = Field(None, alias="file-triggers-enabled") + global_remote_state: bool | None = Field(None, alias="global-remote-state") + inherits_project_auto_destroy: bool | None = Field( + None, alias="inherits-project-auto-destroy" + ) + locked: bool | None = Field(None, alias="locked") + migration_environment: str | None = Field(None, alias="migration-environment") + no_code_upgrade_available: bool | None = Field( + None, alias="no-code-upgrade-available" + ) + operations: bool | None = Field(None, alias="operations") + permissions: WorkspacePermissions | None = Field(None, alias="permissions") + queue_all_runs: bool | None = Field(None, alias="queue-all-runs") + speculative_enabled: bool | None = Field(None, alias="speculative-enabled") + source: WorkspaceSource | None = Field(None, alias="source") + source_name: str | None = Field(None, alias="source-name") + source_url: str | None = Field(None, alias="source-url") + structured_run_output_enabled: bool | None = Field( + None, alias="structured-run-output-enabled" + ) + terraform_version: str | None = Field(None, alias="terraform-version") + trigger_prefixes: list[str] = Field(default_factory=list, alias="trigger-prefixes") + trigger_patterns: list[str] = Field(default_factory=list, alias="trigger-patterns") + vcs_repo: VCSRepo | None = Field(None, alias="vcs-repo") + working_directory: str | None = Field(None, alias="working-directory") + updated_at: datetime | None = Field(None, alias="updated-at") + resource_count: int | None = Field(None, alias="resource-count") + apply_duration_average: float | None = Field(None, alias="apply-duration-average") + plan_duration_average: float | None = Field(None, alias="plan-duration-average") + policy_check_failures: int | None = Field(None, alias="policy-check-failures") + run_failures: int | None = Field(None, alias="run-failures") + runs_count: int | None = Field(None, alias="workspace-kpis-runs-count") + tag_names: list[str] = Field(default_factory=list, alias="tag-names") + setting_overwrites: WorkspaceSettingOverwrites | None = Field( + None, alias="setting-overwrites" + ) # Relations - agent_pool: Any | None = None # AgentPool object - current_run: Any | None = None # Run object - current_state_version: Any | None = None # StateVersion object - project: Project | None = None - ssh_key: Any | None = None # SSHKey object - outputs: list[WorkspaceOutputs] = Field(default_factory=list) - tags: list[Tag] = Field(default_factory=list) - # tags: list[Tag] = Field(default_factory=list) - current_configuration_version: Any | None = None # ConfigurationVersion object - locked_by: LockedByChoice | None = None + agent_pool: AgentPool | None = Field( + None, + json_schema_extra={"jsonapi_type": "relation", "jsonapi_name": "agent-pool"}, + ) # AgentPool object + current_state_version: Any | None = Field( + None, + json_schema_extra={ + "jsonapi_type": "relation", + "jsonapi_name": "current-state-version", + }, + ) # StateVersion object + organization: Organization | None = Field( + None, + json_schema_extra={"jsonapi_type": "relation", "jsonapi_name": "organization"}, + ) + project: Project | None = Field( + None, json_schema_extra={"jsonapi_type": "relation", "jsonapi_name": "project"} + ) + ssh_key: Any | None = Field( + None, json_schema_extra={"jsonapi_type": "relation", "jsonapi_name": "ssh-key"} + ) # SSHKey object + outputs: list[WorkspaceOutputs] = Field( + default_factory=list, + json_schema_extra={"jsonapi_type": "relation", "jsonapi_name": "outputs"}, + ) + tags: list[Tag] = Field( + default_factory=list, + json_schema_extra={"jsonapi_type": "relation", "jsonapi_name": "tags"}, + ) + current_configuration_version: Any | None = Field( + None, + json_schema_extra={ + "jsonapi_type": "relation", + "jsonapi_name": "current-configuration-version", + }, + ) # ConfigurationVersion object + locked_by: LockedByChoice | None = Field( + None, + json_schema_extra={"jsonapi_type": "polyrelation", "jsonapi_name": "locked-by"}, + ) variables: list[Any] = Field(default_factory=list) # Variable objects tag_bindings: list[TagBinding] = Field(default_factory=list) effective_tag_bindings: list[EffectiveTagBinding] = Field(default_factory=list) # Links - links: dict[str, Any] = Field(default_factory=dict) - data_retention_policy: DataRetentionPolicy | None = None + links: dict[str, Any] | None = Field(None, alias="links") + + data_retention_policy: Any | None = None # Legacy field, deprecated data_retention_policy_choice: DataRetentionPolicyChoice | None = None @@ -107,35 +151,43 @@ class WorkspaceSource(str, Enum): class WorkspaceActions(BaseModel): - is_destroyable: bool = False + model_config = ConfigDict(populate_by_name=True, validate_by_name=True) + + is_destroyable: bool = Field(default=False, alias="is-destroyable") class WorkspacePermissions(BaseModel): - can_destroy: bool = False - can_force_unlock: bool = False - can_lock: bool = False - can_manage_run_tasks: bool = False - can_queue_apply: bool = False - can_queue_destroy: bool = False - can_queue_run: bool = False - can_read_settings: bool = False - can_unlock: bool = False - can_update: bool = False - can_update_variable: bool = False - can_force_delete: bool | None = None + model_config = ConfigDict(populate_by_name=True, validate_by_name=True) + + can_destroy: bool = Field(default=False, alias="can-destroy") + can_force_unlock: bool = Field(default=False, alias="can-force-unlock") + can_lock: bool = Field(default=False, alias="can-lock") + can_manage_run_tasks: bool = Field(default=False, alias="can-manage-run-tasks") + can_queue_apply: bool = Field(default=False, alias="can-queue-apply") + can_queue_destroy: bool = Field(default=False, alias="can-queue-destroy") + can_queue_run: bool = Field(default=False, alias="can-queue-run") + can_read_settings: bool = Field(default=False, alias="can-read-settings") + can_unlock: bool = Field(default=False, alias="can-unlock") + can_update: bool = Field(default=False, alias="can-update") + can_update_variable: bool = Field(default=False, alias="can-update-variable") + can_force_delete: bool | None = Field(default=None, alias="can-force-delete") class WorkspaceSettingOverwrites(BaseModel): - execution_mode: bool | None = None - agent_pool: bool | None = None + model_config = ConfigDict(populate_by_name=True, validate_by_name=True) + + execution_mode: bool | None = Field(None, alias="execution-mode") + agent_pool: bool | None = Field(None, alias="agent-pool") class WorkspaceOutputs(BaseModel): + model_config = ConfigDict(populate_by_name=True, validate_by_name=True) + id: str - name: str - sensitive: bool = False - output_type: str - value: Any | None = None + name: str | None = Field(default=None, alias="name") + sensitive: bool = Field(default=False, alias="sensitive") + output_type: str | None = Field(default=None, alias="output-type") + value: Any | None = Field(default=None, alias="value") class LockedByChoice(BaseModel): @@ -331,12 +383,23 @@ class WorkspaceAddTagBindingsOptions(BaseModel): class VCSRepo(BaseModel): - branch: str | None = None - identifier: str | None = None - ingress_submodules: bool | None = None - oauth_token_id: str | None = None - tags_regex: str | None = None - gha_installation_id: str | None = None + model_config = ConfigDict(populate_by_name=True, validate_by_name=True) + + branch: str | None = Field(default=None, alias="branch") + display_identifier: str | None = Field(default=None, alias="display-identifier") + identifier: str | None = Field(default=None, alias="identifier") + ingress_submodules: bool | None = Field(default=None, alias="ingress-submodules") + oauth_token_id: str | None = Field(default=None, alias="oauth-token-id") + tags_regex: str | None = Field(default=None, alias="tags-regex") + gha_installation_id: str | None = Field( + default=None, alias="github-app-installation-id" + ) + repository_http_url: str | None = Field(default=None, alias="repository-http-url") + service_provider: str | None = Field(default=None, alias="service-provider") + tags: bool | None = Field(default=None, alias="tags") + webhook_url: str | None = Field(default=None, alias="webhook-url") + tag_prefix: str | None = Field(default=None, alias="tag-prefix") + source_directory: str | None = Field(default=None, alias="source-directory") class VCSRepoOptions(BaseModel): diff --git a/src/pytfe/resources/run_trigger.py b/src/pytfe/resources/run_trigger.py index 3a16d76..a93b580 100644 --- a/src/pytfe/resources/run_trigger.py +++ b/src/pytfe/resources/run_trigger.py @@ -48,12 +48,8 @@ def _run_trigger_from(d: dict[str, Any], org: str | None = None) -> RunTrigger: sourceable_id = sourceable_rel["data"].get("id", "") # Create workspace objects with proper IDs - workspace = Workspace( - id=workspace_id, name=workspace_name_str, organization=org or "" - ) - sourceable = Workspace( - id=sourceable_id, name=sourceable_name_str, organization=org or "" - ) + workspace = Workspace(id=workspace_id, name=workspace_name_str) # type: ignore[call-arg] + sourceable = Workspace(id=sourceable_id, name=sourceable_name_str) # type: ignore[call-arg] sourceable_choice = SourceableChoice( workspace=sourceable ) # Should reference sourceable, not workspace diff --git a/src/pytfe/resources/variable_sets.py b/src/pytfe/resources/variable_sets.py index 4bf4353..2ea4d33 100644 --- a/src/pytfe/resources/variable_sets.py +++ b/src/pytfe/resources/variable_sets.py @@ -625,11 +625,7 @@ def _parse_variable_set(self, data: dict[str, Any]) -> VariableSet: for ws in ws_data: if "id" in ws: workspaces.append( - { - "id": ws["id"], - "name": f"workspace-{ws['id']}", # Placeholder name - "organization": "placeholder-org", # Placeholder organization - } + {"id": ws["id"], "name": f"workspace-{ws['id']}"} ) parsed_data["workspaces"] = workspaces @@ -641,11 +637,7 @@ def _parse_variable_set(self, data: dict[str, Any]) -> VariableSet: for proj in proj_data: if "id" in proj: projects.append( - { - "id": proj["id"], - "name": f"project-{proj['id']}", # Placeholder name - "organization": "placeholder-org", # Placeholder organization - } + {"id": proj["id"], "name": f"project-{proj['id']}"} ) parsed_data["projects"] = projects @@ -680,7 +672,6 @@ def _parse_variable_set(self, data: dict[str, Any]) -> VariableSet: "project": { "id": parent_data["id"], "name": f"project-{parent_data['id']}", - "organization": "placeholder-org", } } elif parent_data.get("type") == "organizations": diff --git a/src/pytfe/resources/workspaces.py b/src/pytfe/resources/workspaces.py index 81b49f5..b9ee16f 100644 --- a/src/pytfe/resources/workspaces.py +++ b/src/pytfe/resources/workspaces.py @@ -16,6 +16,7 @@ WorkspaceMinimumLimitError, WorkspaceRequiredError, ) +from ..jsonapi.unmarshaler import unmarshal_payload from ..models.common import ( EffectiveTagBinding, Tag, @@ -29,6 +30,8 @@ DataRetentionPolicyDontDelete, DataRetentionPolicySetOptions, ) +from ..models.organization import Organization +from ..models.project import Project from ..models.workspace import ( ExecutionMode, LockedByChoice, @@ -50,13 +53,11 @@ WorkspaceRemoveTagsOptions, WorkspaceRemoveVCSConnectionOptions, WorkspaceSettingOverwrites, - WorkspaceSource, WorkspaceTagListOptions, WorkspaceUpdateOptions, WorkspaceUpdateRemoteStateConsumersOptions, ) from ..utils import ( - _safe_str, valid_string, valid_string_id, validate_workspace_create_options, @@ -73,190 +74,120 @@ def _em_safe(v: Any) -> ExecutionMode | None: return result if isinstance(result, ExecutionMode) else None -def _ws_from(d: dict[str, Any], org: str | None = None) -> Workspace: +def _ws_from(d: dict[str, Any]) -> Workspace: attr: dict[str, Any] = d.get("attributes", {}) or {} - - # Coerce to required string fields (empty string fallback keeps mypy happy) - id_str: str = _safe_str(d.get("id")) - name_str: str = _safe_str(attr.get("name")) - org_str: str = _safe_str(org if org is not None else attr.get("organization")) + relationships: dict[str, Any] = d.get("relationships", {}) or {} # Optional fields em: ExecutionMode | None = _em_safe(attr.get("execution-mode")) - proj_id: str | None = None - proj = attr.get("project") - if isinstance(proj, dict): - proj_id = proj.get("id") if isinstance(proj.get("id"), str) else None - - # Enhanced field mapping - tags_val = attr.get("tags", []) or [] - tags_list: builtins.list[Tag] = [] - if isinstance(tags_val, builtins.list): - for tag_item in tags_val: - if isinstance(tag_item, dict): - tags_list.append( - Tag(id=tag_item.get("id"), name=tag_item.get("name", "")) - ) - elif isinstance(tag_item, str): - tags_list.append(Tag(name=tag_item)) - - # Map additional attributes actions = None if attr.get("actions"): - actions = WorkspaceActions( - is_destroyable=attr["actions"].get("is-destroyable", False) - ) + actions = WorkspaceActions.model_validate(attr["actions"]) permissions = None if attr.get("permissions"): - perm_attr = attr["permissions"] - permissions = WorkspacePermissions( - can_destroy=perm_attr.get("can-destroy", False), - can_force_unlock=perm_attr.get("can-force-unlock", False), - can_lock=perm_attr.get("can-lock", False), - can_manage_run_tasks=perm_attr.get("can-manage-run-tasks", False), - can_queue_apply=perm_attr.get("can-queue-apply", False), - can_queue_destroy=perm_attr.get("can-queue-destroy", False), - can_queue_run=perm_attr.get("can-queue-run", False), - can_read_settings=perm_attr.get("can-read-settings", False), - can_unlock=perm_attr.get("can-unlock", False), - can_update=perm_attr.get("can-update", False), - can_update_variable=perm_attr.get("can-update-variable", False), - can_force_delete=perm_attr.get("can-force-delete"), - ) + permissions = WorkspacePermissions.model_validate(attr["permissions"]) setting_overwrites = None if attr.get("setting-overwrites"): - so_attr = attr["setting-overwrites"] - setting_overwrites = WorkspaceSettingOverwrites( - execution_mode=so_attr.get("execution-mode"), - agent_pool=so_attr.get("agent-pool"), + setting_overwrites = WorkspaceSettingOverwrites.model_validate( + attr["setting-overwrites"] ) # Map VCS repo vcs_repo = None if attr.get("vcs-repo"): - vcs_attr = attr["vcs-repo"] - vcs_repo = VCSRepo( - branch=vcs_attr.get("branch"), - identifier=vcs_attr.get("identifier"), - ingress_submodules=vcs_attr.get("ingress-submodules"), - oauth_token_id=vcs_attr.get("oauth-token-id"), - gha_installation_id=vcs_attr.get("github-app-installation-id"), - ) + vcs_repo = VCSRepo.model_validate(attr["vcs-repo"]) # Map locked_by choice locked_by = None - if d.get("relationships", {}).get("locked-by"): - lb_data = d["relationships"]["locked-by"]["data"] + if relationships.get("locked-by", {}).get("data"): + lb_data = relationships["locked-by"]["data"] if lb_data: - locked_by = LockedByChoice( - run=lb_data.get("run"), - user=lb_data.get("user"), - team=lb_data.get("team"), - ) + if lb_data.get("type") == "runs": + locked_by = LockedByChoice.model_validate({"run": lb_data.get("id")}) + elif lb_data.get("type") == "users": + locked_by = LockedByChoice.model_validate({"user": lb_data.get("id")}) + elif lb_data.get("type") == "teams": + locked_by = LockedByChoice.model_validate({"team": lb_data.get("id")}) # Map outputs outputs = [] - if d.get("relationships", {}).get("outputs"): - for output_data in d["relationships"]["outputs"].get("data", []): - outputs.append( - WorkspaceOutputs( - id=output_data.get("id", ""), - name=output_data.get("attributes", {}).get("name", ""), - sensitive=output_data.get("attributes", {}).get("sensitive", False), - output_type=output_data.get("attributes", {}).get( - "output-type", "" - ), - value=output_data.get("attributes", {}).get("value"), - ) - ) + if relationships.get("outputs", {}).get("data"): + for output_data in relationships["outputs"].get("data", []): + output_attrs = output_data.get("attributes", {}) + output_attrs["id"] = output_data.get("id", "") + outputs.append(WorkspaceOutputs.model_validate(output_attrs)) data_retention_policy_choice: DataRetentionPolicyChoice | None = None - if d.get("relationships", {}).get("data-retention-policy-choice"): - drp_data = d["relationships"]["data-retention-policy-choice"]["data"] + if relationships.get("data-retention-policy-choice", {}).get("data"): + drp_data = relationships["data-retention-policy-choice"]["data"] if drp_data: if drp_data.get("type") == "data-retention-policy-delete-olders": - data_retention_policy_choice = DataRetentionPolicyChoice( - data_retention_policy_delete_older=DataRetentionPolicyDeleteOlder( - id=drp_data.get("id"), - delete_older_than_n_days=drp_data.get("attributes", {}).get( - "delete-older-than-n-days", 0 - ), + data_retention_policy_delete_older = ( + DataRetentionPolicyDeleteOlder.model_validate( + { + "id": drp_data.get("id"), + "delete_older_than_n_days": drp_data.get( + "attributes", {} + ).get("delete-older-than-n-days", 0), + } ) ) + data_retention_policy_choice = DataRetentionPolicyChoice.model_validate( + { + "data_retention_policy_delete_older": data_retention_policy_delete_older + } + ) elif drp_data.get("type") == "data-retention-policy-dont-deletes": - data_retention_policy_choice = DataRetentionPolicyChoice( - data_retention_policy_dont_delete=DataRetentionPolicyDontDelete( - id=drp_data.get("id") + data_retention_policy_dont_delete = ( + DataRetentionPolicyDontDelete.model_validate( + {"id": drp_data.get("id")} ) ) + data_retention_policy_choice = DataRetentionPolicyChoice.model_validate( + { + "data_retention_policy_dont_delete": data_retention_policy_dont_delete + } + ) elif drp_data.get("type") == "data-retention-policies": # Legacy data retention policy - data_retention_policy_choice = DataRetentionPolicyChoice( - data_retention_policy=DataRetentionPolicy( - id=drp_data.get("id"), - delete_older_than_n_days=drp_data.get("attributes", {}).get( + data_retention_policy = DataRetentionPolicy.model_validate( + { + "id": drp_data.get("id"), + "delete_older_than_n_days": drp_data.get("attributes", {}).get( "delete-older-than-n-days", 0 ), - ) + } + ) + data_retention_policy_choice = DataRetentionPolicyChoice.model_validate( + {"data_retention_policy": data_retention_policy} ) - return Workspace( - id=id_str, - name=name_str, - organization=org_str, - execution_mode=em, - project_id=proj_id, - tags=tags_list, - # Core attributes - actions=actions, - allow_destroy_plan=attr.get("allow-destroy-plan", False), - assessments_enabled=attr.get("assessments-enabled", False), - auto_apply=attr.get("auto-apply", False), - auto_apply_run_trigger=attr.get("auto-apply-run-trigger", False), - auto_destroy_at=attr.get("auto-destroy-at"), - auto_destroy_activity_duration=attr.get("auto-destroy-activity-duration"), - can_queue_destroy_plan=attr.get("can-queue-destroy-plan", False), - created_at=attr.get("created-at"), - description=attr.get("description") or "", - environment=attr.get("environment", ""), - file_triggers_enabled=attr.get("file-triggers-enabled", False), - global_remote_state=attr.get("global-remote-state", False), - inherits_project_auto_destroy=attr.get("inherits-project-auto-destroy", False), - locked=attr.get("locked", False), - migration_environment=attr.get("migration-environment", ""), - no_code_upgrade_available=attr.get("no-code-upgrade-available", False), - operations=attr.get("operations", False), - permissions=permissions, - queue_all_runs=attr.get("queue-all-runs", False), - speculative_enabled=attr.get("speculative-enabled", False), - source=WorkspaceSource(attr.get("source")) if attr.get("source") else None, - source_name=attr.get("source-name") or "", - source_url=attr.get("source-url") or "", - structured_run_output_enabled=attr.get("structured-run-output-enabled", False), - terraform_version=attr.get("terraform-version") or "", - trigger_prefixes=attr.get("trigger-prefixes", []), - trigger_patterns=attr.get("trigger-patterns", []), - vcs_repo=vcs_repo, - working_directory=attr.get("working-directory") or "", - updated_at=attr.get("updated-at"), - resource_count=attr.get("resource-count", 0), - apply_duration_average=attr.get("apply-duration-average"), - plan_duration_average=attr.get("plan-duration-average"), - policy_check_failures=attr.get("policy-check-failures") or 0, - run_failures=attr.get("run-failures") or 0, - runs_count=attr.get("workspace-kpis-runs-count") or 0, - tag_names=attr.get("tag-names", []), - setting_overwrites=setting_overwrites, - # Relations - outputs=outputs, - locked_by=locked_by, - data_retention_policy_choice=data_retention_policy_choice - if data_retention_policy_choice - else None, - ) + attr["id"] = d.get("id") + attr["execution_mode"] = em + attr["actions"] = actions + attr["permissions"] = permissions + attr["setting_overwrites"] = setting_overwrites + attr["vcs-repo"] = vcs_repo + + # Add parsed relations + if relationships.get("organization", {}).get("data"): + attr["organization"] = Organization.model_validate( + {"id": relationships["organization"]["data"].get("id")} + ) + if relationships.get("project", {}).get("data"): + attr["project"] = Project.model_validate( + {"id": relationships["project"]["data"].get("id")} + ) + if relationships.get("ssh-key", {}).get("data"): + attr["ssh_key"] = relationships["ssh-key"]["data"].get("id") + attr["outputs"] = outputs + attr["locked_by"] = locked_by + attr["data_retention_policy_choice"] = data_retention_policy_choice + + return Workspace.model_validate(attr) class Workspaces(_Service): @@ -305,7 +236,7 @@ def list( path = f"/api/v2/organizations/{organization}/workspaces" for item in self._list(path, params=params): - yield _ws_from(item, organization) + yield _ws_from(item) def read(self, workspace: str, *, organization: str) -> Workspace: """Read workspace by organization and name.""" @@ -333,7 +264,7 @@ def read_with_options( f"/api/v2/organizations/{organization}/workspaces/{workspace}", params=params, ) - ws = _ws_from(r.json()["data"], organization) + ws = unmarshal_payload(r.json(), Workspace) ws.data_retention_policy = ( ws.data_retention_policy_choice.convert_to_legacy_struct() if ws.data_retention_policy_choice @@ -357,7 +288,7 @@ def read_by_id_with_options( if options.include: params["include"] = ",".join([i.value for i in options.include]) r = self.t.request("GET", f"/api/v2/workspaces/{workspace_id}", params=params) - ws = _ws_from(r.json()["data"], None) + ws = _ws_from(r.json()["data"]) if ws.data_retention_policy_choice is not None: ws.data_retention_policy = ( ws.data_retention_policy_choice.convert_to_legacy_struct() @@ -381,7 +312,7 @@ def create( r = self.t.request( "POST", f"/api/v2/organizations/{organization}/workspaces", json_body=body ) - return _ws_from(r.json()["data"], organization) + return _ws_from(r.json()["data"]) # Convenience methods for org+name operations def update( @@ -403,7 +334,7 @@ def update( f"/api/v2/organizations/{organization}/workspaces/{workspace}", json_body=body, ) - return _ws_from(r.json()["data"], organization) + return _ws_from(r.json()["data"]) def update_by_id( self, workspace_id: str, options: WorkspaceUpdateOptions @@ -420,7 +351,7 @@ def update_by_id( r = self.t.request( "PATCH", f"/api/v2/workspaces/{workspace_id}", json_body=body ) - return _ws_from(r.json()["data"], None) + return _ws_from(r.json()["data"]) def _build_workspace_payload( self, @@ -643,7 +574,7 @@ def remove_vcs_connection( f"/api/v2/organizations/{organization}/workspaces/{workspace}", json_body=body, ) - return _ws_from(r.json()["data"], organization) + return _ws_from(r.json()["data"]) def remove_vcs_connection_by_id(self, workspace_id: str) -> Workspace: """Remove VCS connection from workspace by workspace ID.""" @@ -666,7 +597,7 @@ def remove_vcs_connection_by_id(self, workspace_id: str) -> Workspace: f"/api/v2/workspaces/{workspace_id}", json_body=body, ) - return _ws_from(r.json()["data"], None) + return _ws_from(r.json()["data"]) def lock(self, workspace_id: str, options: WorkspaceLockOptions) -> Workspace: """Lock a workspace by workspace ID.""" @@ -681,7 +612,7 @@ def lock(self, workspace_id: str, options: WorkspaceLockOptions) -> Workspace: f"/api/v2/workspaces/{workspace_id}/actions/lock", json_body=body, ) - return _ws_from(r.json()["data"], None) + return _ws_from(r.json()["data"]) def unlock(self, workspace_id: str) -> Workspace: """Unlock a workspace by workspace ID.""" @@ -693,7 +624,7 @@ def unlock(self, workspace_id: str) -> Workspace: "POST", f"/api/v2/workspaces/{workspace_id}/actions/unlock", ) - return _ws_from(r.json()["data"], None) + return _ws_from(r.json()["data"]) except Exception as e: if "latest state version is still pending" in str(e): raise WorkspaceLockedStateVersionStillPending(str(e)) from e @@ -709,7 +640,7 @@ def force_unlock(self, workspace_id: str) -> Workspace: "POST", f"/api/v2/workspaces/{workspace_id}/actions/force-unlock", ) - return _ws_from(r.json()["data"], None) + return _ws_from(r.json()["data"]) def assign_ssh_key( self, workspace_id: str, options: WorkspaceAssignSSHKeyOptions @@ -737,7 +668,7 @@ def assign_ssh_key( f"/api/v2/workspaces/{workspace_id}/relationships/ssh-key", json_body=body, ) - return _ws_from(r.json()["data"], None) + return _ws_from(r.json()["data"]) def unassign_ssh_key(self, workspace_id: str) -> Workspace: """Unassign the SSH key from a workspace by workspace ID.""" @@ -758,7 +689,7 @@ def unassign_ssh_key(self, workspace_id: str) -> Workspace: json_body=body, ) - return _ws_from(r.json()["data"], None) + return _ws_from(r.json()["data"]) def list_remote_state_consumers( self, workspace_id: str, options: WorkspaceListRemoteStateConsumersOptions @@ -778,7 +709,7 @@ def list_remote_state_consumers( path = f"/api/v2/workspaces/{workspace_id}/relationships/remote-state-consumers" for item in self._list(path, params=params): - yield _ws_from(item, None) + yield _ws_from(item) def add_remote_state_consumers( self, workspace_id: str, options: WorkspaceAddRemoteStateConsumersOptions diff --git a/tests/units/test_run.py b/tests/units/test_run.py index bce2d2a..b1ea8c2 100644 --- a/tests/units/test_run.py +++ b/tests/units/test_run.py @@ -190,7 +190,7 @@ def test_create_run_validation_errors(self, runs_service): runs_service.create(options) # Test terraform_version with non-plan-only run - workspace = Workspace(id="ws-123", name="test", organization="test-org") + workspace = Workspace(id="ws-123", name="test") options = RunCreateOptions( workspace=workspace, terraform_version="1.5.0", plan_only=False ) @@ -227,7 +227,7 @@ def test_create_run_success(self, runs_service): with patch.object(runs_service, "t") as mock_transport: mock_transport.request.return_value = mock_response - workspace = Workspace(id="ws-123", name="test", organization="test-org") + workspace = Workspace(id="ws-123", name="test") variables = [ RunVariable(key="env", value="test"), RunVariable(key="region", value="us-east-1"), diff --git a/tests/units/test_run_trigger.py b/tests/units/test_run_trigger.py index 901dafb..4b40644 100644 --- a/tests/units/test_run_trigger.py +++ b/tests/units/test_run_trigger.py @@ -190,7 +190,7 @@ def test_create_run_trigger_validations(self, run_triggers_service): """Test create method with invalid workspace ID.""" options = RunTriggerCreateOptions( - sourceable=Workspace(id="ws-source", name="source", organization="org") + sourceable=Workspace(id="ws-source", name="source") ) with pytest.raises(InvalidWorkspaceIDError): @@ -201,7 +201,7 @@ def test_create_run_trigger_validations(self, run_triggers_service): # is raised when the service method checks for None sourceable # Create valid options but then manually set sourceable to None to bypass model validation options = RunTriggerCreateOptions( - sourceable=Workspace(id="ws-source", name="source", organization="org") + sourceable=Workspace(id="ws-source", name="source") ) options.sourceable = None @@ -227,7 +227,7 @@ def test_create_run_trigger_success(self, run_triggers_service): mock_transport.request.return_value = mock_response options = RunTriggerCreateOptions( - sourceable=Workspace(id="ws-source", name="source", organization="org") + sourceable=Workspace(id="ws-source", name="source") ) result = run_triggers_service.create("ws-123", options) @@ -340,7 +340,7 @@ def test_validate_run_trigger_filter_param_success(self, run_triggers_service): def test_backfill_deprecated_sourceable_already_exists(self, run_triggers_service): """Test backfill when sourceable already exists.""" - workspace = Workspace(id="ws-1", name="workspace", organization="org") + workspace = Workspace(id="ws-1", name="workspace") rt = RunTrigger( id="rt-1", created_at=datetime.now(), diff --git a/tests/units/test_workspaces.py b/tests/units/test_workspaces.py index 762b97f..65a923d 100644 --- a/tests/units/test_workspaces.py +++ b/tests/units/test_workspaces.py @@ -339,7 +339,7 @@ def test_create_workspace_with_project( sample_workspace_response ) - project = Project(id="prj-123", name="Test Project", organization="test-org") + project = Project(id="prj-123", name="Test Project") options = WorkspaceCreateOptions(name="project-workspace", project=project) @@ -611,11 +611,12 @@ def test_unassign_ssh_key( def test_ws_from_conversion(self, sample_workspace_response): """Test _ws_from helper function conversion.""" workspace_data = sample_workspace_response["data"] - workspace = _ws_from(workspace_data, "test-org") + workspace = _ws_from(workspace_data) assert workspace.id == "ws-abc123def456" assert workspace.name == "test-workspace" - assert workspace.organization == "test-org" + # organization is now from relationships, not parameter + assert workspace.organization is None assert workspace.auto_apply assert workspace.execution_mode == ExecutionMode.REMOTE assert workspace.resource_count == 25 @@ -633,11 +634,12 @@ def test_ws_from_minimal_data(self): """Test _ws_from with minimal data.""" minimal_data = {"id": "ws-minimal", "attributes": {"name": "minimal-workspace"}} - workspace = _ws_from(minimal_data, "test-org") + workspace = _ws_from(minimal_data) assert workspace.id == "ws-minimal" assert workspace.name == "minimal-workspace" - assert workspace.organization == "test-org" + # organization is now from relationships, not parameter + assert workspace.organization is None assert not workspace.auto_apply # Default value assert not workspace.locked # Default value @@ -676,11 +678,7 @@ def test_none_values_handling(self): }, } - workspace = _ws_from(data_with_nones, "test-org") - - assert workspace.description == "" # Should convert None to empty string - assert workspace.terraform_version == "" - assert workspace.working_directory == "" + workspace = _ws_from(data_with_nones) assert workspace.vcs_repo is None # ========================================== @@ -764,8 +762,8 @@ def test_list_remote_state_consumers_with_pagination( def test_add_remote_state_consumers_basic(self, workspaces_service, mock_transport): """Test adding remote state consumers.""" consumer_workspaces = [ - Workspace(id="ws-consumer-1", name="consumer-1", organization="test-org"), - Workspace(id="ws-consumer-2", name="consumer-2", organization="test-org"), + Workspace(id="ws-consumer-1", name="consumer-1"), + Workspace(id="ws-consumer-2", name="consumer-2"), ] options = WorkspaceAddRemoteStateConsumersOptions( @@ -806,7 +804,7 @@ def test_add_remote_state_consumers_validation_errors(self, workspaces_service): # Test invalid workspace ID format (with slash) options = WorkspaceAddRemoteStateConsumersOptions( - workspaces=[Workspace(id="ws-valid", name="valid", organization="test-org")] + workspaces=[Workspace(id="ws-valid", name="valid")] ) with pytest.raises(InvalidWorkspaceIDError): @@ -817,7 +815,7 @@ def test_remove_remote_state_consumers_basic( ): """Test removing remote state consumers.""" consumer_workspaces = [ - Workspace(id="ws-consumer-1", name="consumer-1", organization="test-org"), + Workspace(id="ws-consumer-1", name="consumer-1"), ] options = WorkspaceRemoveRemoteStateConsumersOptions( @@ -844,8 +842,8 @@ def test_update_remote_state_consumers_basic( ): """Test updating (replacing) remote state consumers.""" consumer_workspaces = [ - Workspace(id="ws-consumer-3", name="consumer-3", organization="test-org"), - Workspace(id="ws-consumer-4", name="consumer-4", organization="test-org"), + Workspace(id="ws-consumer-3", name="consumer-3"), + Workspace(id="ws-consumer-4", name="consumer-4"), ] options = WorkspaceUpdateRemoteStateConsumersOptions(