diff --git a/dagster-cloud-cli/dagster_cloud_cli/commands/ci/__init__.py b/dagster-cloud-cli/dagster_cloud_cli/commands/ci/__init__.py index 109d62e..782133f 100644 --- a/dagster-cloud-cli/dagster_cloud_cli/commands/ci/__init__.py +++ b/dagster-cloud-cli/dagster_cloud_cli/commands/ci/__init__.py @@ -8,7 +8,7 @@ import sys from collections import Counter from enum import Enum -from typing import Any, Dict, List, Optional +from typing import Any, Dict, List, Optional, cast import typer from typer import Typer @@ -28,11 +28,10 @@ get_org_url, ) from dagster_cloud_cli.core import pex_builder, pydantic_yaml - -from .. import metrics -from . import checks, report, state - -app = Typer(hidden=True, help="CI/CD agnostic commands") +from dagster_cloud_cli.core.artifacts import ( + download_organization_artifact, + upload_organization_artifact, +) from dagster_cloud_cli.core.pex_builder import ( code_location, deps, @@ -42,6 +41,11 @@ ) from dagster_cloud_cli.types import CliEventTags, CliEventType +from .. import metrics +from . import checks, report, state + +app = Typer(hidden=True, help="CI/CD agnostic commands") + @app.command(help="Print json information about current CI/CD environment") def inspect(project_dir: str): @@ -204,6 +208,10 @@ def init( status_url: Optional[str] = None, ): yaml_path = pathlib.Path(project_dir) / dagster_cloud_yaml_path + if not yaml_path.exists(): + raise ui.error( + f"Dagster Cloud yaml file not found at specified path {yaml_path.resolve()}." + ) locations_def = pydantic_yaml.load_dagster_cloud_yaml(yaml_path.read_text()) locations = locations_def.locations if location_name: @@ -217,6 +225,7 @@ def init( url = get_org_url(organization, dagster_env) # Deploy to the branch deployment for the current context. If there is no branch deployment # available (eg. if not in a PR) then we fallback to the --deployment flag. + try: branch_deployment = get_deployment_from_context(url, project_dir) if deployment: @@ -225,9 +234,11 @@ def init( f" --deployment={deployment}" ) deployment = branch_deployment + is_branch_deployment = True except ValueError as err: if deployment: ui.print(f"Deploying to {deployment}. No branch deployment ({err}).") + is_branch_deployment = False else: raise ui.error( f"Cannot determine deployment name in current context ({err}). Please specify" @@ -245,6 +256,7 @@ def init( deployment_name=deployment, location_file=str(yaml_path.absolute()), location_name=location.location_name, + is_branch_deployment=is_branch_deployment, build=state.BuildMetadata( git_url=git_url, commit_hash=commit_hash, build_config=location.build ), @@ -703,3 +715,77 @@ def _deploy( agent_heartbeat_timeout=agent_heartbeat_timeout, url=deployment_url, ) + + +dagster_dbt_app = typer.Typer( + hidden=True, + help="Dagster Cloud commands for managing the `dagster-dbt` integration.", + add_completion=False, +) +app.add_typer(dagster_dbt_app, name="dagster-dbt", no_args_is_help=True) + +project_app = typer.Typer( + name="project", + no_args_is_help=True, + help="Commands for using a dbt project in Dagster.", + add_completion=False, +) +dagster_dbt_app.add_typer(project_app, name="project", no_args_is_help=True) + + +@project_app.command( + name="manage-state", + help=""" + This CLI command will handle uploading and downloading artifacts if `state_dir` is specified on + `DbtProject`. + """, +) +def manage_state_command( + statedir: str = STATEDIR_OPTION, + file: str = typer.Option(), + source_deployment: str = typer.Option( + default="prod", + help="Which deployment should upload its manifest.json.", + ), + key_prefix: str = typer.Option( + default="", + help="A key prefix for the key the manifest.json is saved with.", + ), +): + try: + from dagster_dbt import DbtProject + except: + ui.print( + "Unable to import dagster_dbt, can not use dbt-prepare-for-deployment when dagster_dbt is not installed." + ) + return + from dagster._core.code_pointer import load_python_file + from dagster._core.definitions.load_assets_from_modules import find_objects_in_module_of_types + + state_store = state.FileStore(statedir=statedir) + locations = state_store.list_locations() + if not locations: + raise ui.error("Unable to determine deployment state.") + + location = locations[0] + deployment_name = location.deployment_name + is_branch = location.is_branch_deployment + + contents = load_python_file(file, None) + for project in find_objects_in_module_of_types(contents, DbtProject): + project = cast(DbtProject, project) + if project.state_path: + download_path = project.state_path.joinpath("manifest.json") + key = f"{key_prefix}{os.fspath(download_path)}" + if is_branch: + ui.print(f"Downloading {source_deployment} manifest for branch deployment.") + os.makedirs(project.state_path, exist_ok=True) + download_organization_artifact(key, download_path) + ui.print("Download complete.") + + elif deployment_name == source_deployment: + ui.print(f"Uploading {source_deployment} manifest.") + upload_organization_artifact(key, project.manifest_path) + ui.print("Upload complete") + + ui.print("Project ready") diff --git a/dagster-cloud-cli/dagster_cloud_cli/commands/ci/state.py b/dagster-cloud-cli/dagster_cloud_cli/commands/ci/state.py index 2b99193..3680a21 100644 --- a/dagster-cloud-cli/dagster_cloud_cli/commands/ci/state.py +++ b/dagster-cloud-cli/dagster_cloud_cli/commands/ci/state.py @@ -47,6 +47,7 @@ class LocationState(BaseModel, extra=Extra.forbid): deployment_name: str location_file: str location_name: str + is_branch_deployment: bool selected: bool = True build: BuildMetadata build_output: Optional[Union[DockerBuildOutput, PexBuildOutput]] = Field( diff --git a/dagster-cloud-cli/dagster_cloud_cli/version.py b/dagster-cloud-cli/dagster_cloud_cli/version.py index 14d9d2f..fe3fd8a 100644 --- a/dagster-cloud-cli/dagster_cloud_cli/version.py +++ b/dagster-cloud-cli/dagster_cloud_cli/version.py @@ -1 +1 @@ -__version__ = "1.7.0" +__version__ = "1!0+dev" diff --git a/dagster-cloud/dagster_cloud/anomaly_detection/__init__.py b/dagster-cloud/dagster_cloud/anomaly_detection/__init__.py index 81e1a83..8c5c30b 100644 --- a/dagster-cloud/dagster_cloud/anomaly_detection/__init__.py +++ b/dagster-cloud/dagster_cloud/anomaly_detection/__init__.py @@ -1,4 +1,7 @@ from .defs import ( build_anomaly_detection_freshness_checks as build_anomaly_detection_freshness_checks, ) -from .types import AnomalyDetectionModelParams as AnomalyDetectionModelParams +from .types import ( + AnomalyDetectionModelParams as AnomalyDetectionModelParams, + BetaFreshnessAnomalyDetectionParams as BetaFreshnessAnomalyDetectionParams, +) diff --git a/dagster-cloud/dagster_cloud/anomaly_detection/defs.py b/dagster-cloud/dagster_cloud/anomaly_detection/defs.py index 80cd2ea..3580be1 100644 --- a/dagster-cloud/dagster_cloud/anomaly_detection/defs.py +++ b/dagster-cloud/dagster_cloud/anomaly_detection/defs.py @@ -47,7 +47,7 @@ def _build_check_for_assets( @multi_asset_check( specs=[ AssetCheckSpec( - name="freshness_anomaly_detection_check", + name="anomaly_detection_freshness_check", description=f"Detects anomalies in the freshness of the asset using model {params.model_version.value.lower()}.", asset=asset_key, ) @@ -65,7 +65,9 @@ def the_check(context: AssetCheckExecutionContext) -> Iterable[AssetCheckResult] ) instance = cast(DagsterCloudAgentInstance, context.instance) with create_cloud_webserver_client( - instance.dagit_url, + instance.dagit_url[:-1] + if instance.dagit_url.endswith("/") + else instance.dagit_url, # Remove trailing slash check.str_param(instance.dagster_cloud_agent_token, "dagster_cloud_agent_token"), ) as client: for check_key in context.selected_asset_check_keys: @@ -82,16 +84,17 @@ def the_check(context: AssetCheckExecutionContext) -> Iterable[AssetCheckResult] }, }, ) + data = result["data"]["anomalyDetectionInference"] metadata = { "model_params": {**params.as_metadata}, "model_version": params.model_version.value, } - if result["anomalyDetectionInference"]["__typename"] != "AnomalyDetectionSuccess": + if data["__typename"] != "AnomalyDetectionSuccess": yield handle_anomaly_detection_inference_failure( - result, metadata, params, asset_key + data, metadata, params, asset_key ) continue - response = result["anomalyDetectionInference"]["response"] + response = result["data"]["anomalyDetectionInference"]["response"] overdue_seconds = check.float_param(response["overdue_seconds"], "overdue_seconds") overdue_deadline_timestamp = response["overdue_deadline_timestamp"] metadata["overdue_deadline_timestamp"] = MetadataValue.timestamp( @@ -148,12 +151,11 @@ def the_check(context: AssetCheckExecutionContext) -> Iterable[AssetCheckResult] def handle_anomaly_detection_inference_failure( - result: dict, metadata: dict, params: AnomalyDetectionModelParams, asset_key: AssetKey + data: dict, metadata: dict, params: AnomalyDetectionModelParams, asset_key: AssetKey ) -> AssetCheckResult: if ( - result["anomalyDetectionInference"]["__typename"] == "AnomalyDetectionFailure" - and result["anomalyDetectionInference"]["message"] - == params.model_version.minimum_required_records_msg + data["__typename"] == "AnomalyDetectionFailure" + and data["message"] == params.model_version.minimum_required_records_msg ): # Intercept failure in the case of not enough records, and return a pass to avoid # being too noisy with failures. @@ -161,12 +163,10 @@ def handle_anomaly_detection_inference_failure( passed=True, severity=AssetCheckSeverity.WARN, metadata=metadata, - description=result["anomalyDetectionInference"]["message"], + description=data["message"], asset_key=asset_key, ) - raise DagsterCloudAnomalyDetectionFailed( - f"Anomaly detection failed: {result['anomalyDetectionInference']['message']}" - ) + raise DagsterCloudAnomalyDetectionFailed(f"Anomaly detection failed: {data['message']}") def build_anomaly_detection_freshness_checks( diff --git a/dagster-cloud/dagster_cloud/dagster_insights/bigquery/dbt_wrapper.py b/dagster-cloud/dagster_cloud/dagster_insights/bigquery/dbt_wrapper.py index 5459167..b982941 100644 --- a/dagster-cloud/dagster_cloud/dagster_insights/bigquery/dbt_wrapper.py +++ b/dagster-cloud/dagster_cloud/dagster_insights/bigquery/dbt_wrapper.py @@ -174,7 +174,7 @@ def jaffle_shop_dbt_assets( ) if row.bytes_billed or row.slots_ms: cost_info = BigQueryCostInfo( - asset_key, partition, row.job_id, row.bytes_billed, row.slots_ms + asset_key, partition, row.job_id, row.slots_ms, row.bytes_billed ) cost_by_asset[cost_info.asset_partition_key].append(cost_info) except: diff --git a/dagster-cloud/dagster_cloud/version.py b/dagster-cloud/dagster_cloud/version.py index 14d9d2f..fe3fd8a 100644 --- a/dagster-cloud/dagster_cloud/version.py +++ b/dagster-cloud/dagster_cloud/version.py @@ -1 +1 @@ -__version__ = "1.7.0" +__version__ = "1!0+dev" diff --git a/dagster-cloud/dagster_cloud/workspace/user_code_launcher/user_code_launcher.py b/dagster-cloud/dagster_cloud/workspace/user_code_launcher/user_code_launcher.py index 6f725b1..8cdf332 100644 --- a/dagster-cloud/dagster_cloud/workspace/user_code_launcher/user_code_launcher.py +++ b/dagster-cloud/dagster_cloud/workspace/user_code_launcher/user_code_launcher.py @@ -84,7 +84,9 @@ DEFAULT_SERVER_PROCESS_STARTUP_TIMEOUT = 180 DEFAULT_MAX_TTL_SERVERS = 25 -ACTIVE_AGENT_HEARTBEAT_INTERVAL = 600 +ACTIVE_AGENT_HEARTBEAT_INTERVAL = int( + os.getenv("DAGSTER_CLOUD_ACTIVE_AGENT_HEARTBEAT_INVERAL", "600") +) USER_CODE_LAUNCHER_RECONCILE_SLEEP_SECONDS = 1 @@ -340,6 +342,7 @@ def __init__( self._desired_entries: Dict[DeploymentAndLocation, UserCodeLauncherEntry] = {} self._actual_entries: Dict[DeploymentAndLocation, UserCodeLauncherEntry] = {} self._last_refreshed_actual_entries = 0 + self._last_cleaned_up_dangling_code_servers = 0 self._metadata_lock = threading.Lock() self._upload_locations: Set[DeploymentAndLocation] = set() @@ -457,7 +460,9 @@ def start(self, run_reconcile_thread=True, run_metrics_thread=True): "Not starting run worker monitoring, because it's not supported on this agent." ) - self._graceful_cleanup_servers() + self._graceful_cleanup_servers( + include_own_servers=True # shouldn't be any of our own servers at this part, but won't hurt either + ) if run_reconcile_thread: self._reconcile_grpc_metadata_thread = threading.Thread( @@ -893,13 +898,15 @@ def _graceful_remove_server_handle(self, server_handle: ServerHandle): with self._grpc_servers_lock: self._pending_delete_grpc_server_handles.discard(server_handle) - def _cleanup_servers(self, active_agent_ids: Set[str]) -> None: + def _cleanup_servers(self, active_agent_ids: Set[str], include_own_servers: bool) -> None: """Remove all servers, across all deployments and locations.""" with ThreadPoolExecutor() as executor: futures = [] for handle in self._list_server_handles(): self._logger.info(f"Attempting to cleanup server {handle}") - if self._can_cleanup_server(handle, active_agent_ids): + if self._can_cleanup_server( + handle, active_agent_ids, include_own_servers=include_own_servers + ): self._logger.info(f"Can remove server {handle}. Cleaning up.") futures.append(executor.submit(self._remove_server_handle, handle)) else: @@ -924,7 +931,9 @@ def get_agent_id_for_server(self, handle: ServerHandle) -> Optional[str]: def get_server_create_timestamp(self, handle: ServerHandle) -> Optional[float]: """Returns the update_timestamp value from the given code server.""" - def _can_cleanup_server(self, handle: ServerHandle, active_agent_ids: Set[str]) -> bool: + def _can_cleanup_server( + self, handle: ServerHandle, active_agent_ids: Set[str], include_own_servers: bool + ) -> bool: """Returns true if we can clean up the server identified by the handle without issues (server was started by this agent, or agent is no longer active).""" agent_id_for_server = self.get_agent_id_for_server(handle) self._logger.info( @@ -933,11 +942,13 @@ def _can_cleanup_server(self, handle: ServerHandle, active_agent_ids: Set[str]) ) self._logger.info(f"All active agent ids: {active_agent_ids}") - # If this server was created by the current agent, it can always be cleaned up - # (or if its a legacy server that never set an agent ID) - if not agent_id_for_server or self._instance.instance_uuid == agent_id_for_server: + # if it's a legacy server that never set an agent ID: + if not agent_id_for_server: return True + if self._instance.instance_uuid == agent_id_for_server: + return include_own_servers + try: update_timestamp_for_server = self.get_server_create_timestamp(handle) except: @@ -958,16 +969,18 @@ def _can_cleanup_server(self, handle: ServerHandle, active_agent_ids: Set[str]) return agent_id_for_server not in cast(Set[str], active_agent_ids) - def _graceful_cleanup_servers(self): # ServerHandles + def _graceful_cleanup_servers(self, include_own_servers: bool): # ServerHandles active_agent_ids = self.get_active_agent_ids() if not self.supports_get_current_runs_for_server_handle: - return self._cleanup_servers(active_agent_ids) + return self._cleanup_servers(active_agent_ids, include_own_servers=include_own_servers) handles = self._list_server_handles() servers_to_remove: List[ServerHandle] = [] with self._grpc_servers_lock: for handle in handles: - if self._can_cleanup_server(handle, active_agent_ids): + if self._can_cleanup_server( + handle, active_agent_ids, include_own_servers=include_own_servers + ): servers_to_remove.append(handle) self._pending_delete_grpc_server_handles.update(servers_to_remove) for server_handle in servers_to_remove: @@ -994,7 +1007,7 @@ def __exit__(self, exception_type, exception_value, traceback): self._reconcile_location_utilization_metrics_thread.join() if self._started: - self._graceful_cleanup_servers() + self._graceful_cleanup_servers(include_own_servers=True) super().__exit__(exception_value, exception_value, traceback) @@ -1081,6 +1094,9 @@ def _reconcile_thread(self, shutdown_event): f"Failure updating user code servers: {serializable_error_info_from_exc_info(sys.exc_info())}" ) + def _cleanup_server_check_interval(self): + return int(os.getenv("DAGSTER_CLOUD_CLEANUP_SERVER_CHECK_INTERVAL", "1800")) + def reconcile(self) -> None: with self._metadata_lock: desired_entries = ( @@ -1095,6 +1111,24 @@ def reconcile(self) -> None: now = pendulum.now("UTC").timestamp() + if not self._last_refreshed_actual_entries: + self._last_refreshed_actual_entries = now + + if not self._last_cleaned_up_dangling_code_servers: + self._last_cleaned_up_dangling_code_servers = now + + cleanup_server_check_interval = self._cleanup_server_check_interval() + + if ( + cleanup_server_check_interval + and now - self._last_cleaned_up_dangling_code_servers > cleanup_server_check_interval + ): + try: + self._graceful_cleanup_servers(include_own_servers=False) + except: + self._logger.exception("Failed to clean up dangling code serverrs.") + self._last_cleaned_up_dangling_code_servers = now + if now - self._last_refreshed_actual_entries > ACTUAL_ENTRIES_REFRESH_INTERVAL: try: self._refresh_actual_entries() diff --git a/dagster-cloud/setup.py b/dagster-cloud/setup.py index 89aedda..eb7fbbf 100644 --- a/dagster-cloud/setup.py +++ b/dagster-cloud/setup.py @@ -6,14 +6,16 @@ def get_version() -> str: version: Dict[str, str] = {} - with open(Path(__file__).parent / "dagster_cloud/version.py", encoding="utf8") as fp: + with open( + Path(__file__).parent / "dagster_cloud/version.py", encoding="utf8" + ) as fp: exec(fp.read(), version) return version["__version__"] def get_description() -> str: - return (Path(__file__).parent / "README.md").read_text() + return "" ver = get_version() @@ -40,8 +42,8 @@ def get_description() -> str: packages=find_packages(exclude=["dagster_cloud_tests*"]), include_package_data=True, install_requires=[ - "dagster==1.7.0", - "dagster-cloud-cli==1.7.0", + f"dagster{pin}", + f"dagster-cloud-cli{pin}", "pex>=2.1.132,<3", "questionary", "requests", @@ -66,13 +68,13 @@ def get_description() -> str: "dbt-snowflake", "dbt-postgres", "dbt-duckdb", - "dagster-dbt==0.23.0", - "dagster_k8s==0.23.0", + f"dagster-dbt{pin}", + f"dagster_k8s{pin}", ], "insights": ["pyarrow"], - "docker": ["docker", "dagster_docker==0.23.0"], - "kubernetes": ["kubernetes", "dagster_k8s==0.23.0"], - "ecs": ["dagster_aws==0.23.0", "boto3"], + "docker": ["docker", f"dagster_docker{pin}"], + "kubernetes": ["kubernetes", f"dagster_k8s{pin}"], + "ecs": [f"dagster_aws{pin}", "boto3"], "sandbox": ["supervisor"], "pex": ["boto3"], "serverless": ["boto3"],