Skip to content

Commit

Permalink
ci-test
Browse files Browse the repository at this point in the history
  • Loading branch information
alangenfeld committed Apr 10, 2024
1 parent 73633a8 commit 0b64a31
Show file tree
Hide file tree
Showing 9 changed files with 170 additions and 44 deletions.
98 changes: 92 additions & 6 deletions dagster-cloud-cli/dagster_cloud_cli/commands/ci/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import sys
from collections import Counter
from enum import Enum
from typing import Any, Dict, List, Optional
from typing import Any, Dict, List, Optional, cast

import typer
from typer import Typer
Expand All @@ -28,11 +28,10 @@
get_org_url,
)
from dagster_cloud_cli.core import pex_builder, pydantic_yaml

from .. import metrics
from . import checks, report, state

app = Typer(hidden=True, help="CI/CD agnostic commands")
from dagster_cloud_cli.core.artifacts import (
download_organization_artifact,
upload_organization_artifact,
)
from dagster_cloud_cli.core.pex_builder import (
code_location,
deps,
Expand All @@ -42,6 +41,11 @@
)
from dagster_cloud_cli.types import CliEventTags, CliEventType

from .. import metrics
from . import checks, report, state

app = Typer(hidden=True, help="CI/CD agnostic commands")


@app.command(help="Print json information about current CI/CD environment")
def inspect(project_dir: str):
Expand Down Expand Up @@ -204,6 +208,10 @@ def init(
status_url: Optional[str] = None,
):
yaml_path = pathlib.Path(project_dir) / dagster_cloud_yaml_path
if not yaml_path.exists():
raise ui.error(
f"Dagster Cloud yaml file not found at specified path {yaml_path.resolve()}."
)
locations_def = pydantic_yaml.load_dagster_cloud_yaml(yaml_path.read_text())
locations = locations_def.locations
if location_name:
Expand All @@ -217,6 +225,7 @@ def init(
url = get_org_url(organization, dagster_env)
# Deploy to the branch deployment for the current context. If there is no branch deployment
# available (eg. if not in a PR) then we fallback to the --deployment flag.

try:
branch_deployment = get_deployment_from_context(url, project_dir)
if deployment:
Expand All @@ -225,9 +234,11 @@ def init(
f" --deployment={deployment}"
)
deployment = branch_deployment
is_branch_deployment = True
except ValueError as err:
if deployment:
ui.print(f"Deploying to {deployment}. No branch deployment ({err}).")
is_branch_deployment = False
else:
raise ui.error(
f"Cannot determine deployment name in current context ({err}). Please specify"
Expand All @@ -245,6 +256,7 @@ def init(
deployment_name=deployment,
location_file=str(yaml_path.absolute()),
location_name=location.location_name,
is_branch_deployment=is_branch_deployment,
build=state.BuildMetadata(
git_url=git_url, commit_hash=commit_hash, build_config=location.build
),
Expand Down Expand Up @@ -703,3 +715,77 @@ def _deploy(
agent_heartbeat_timeout=agent_heartbeat_timeout,
url=deployment_url,
)


dagster_dbt_app = typer.Typer(
hidden=True,
help="Dagster Cloud commands for managing the `dagster-dbt` integration.",
add_completion=False,
)
app.add_typer(dagster_dbt_app, name="dagster-dbt", no_args_is_help=True)

project_app = typer.Typer(
name="project",
no_args_is_help=True,
help="Commands for using a dbt project in Dagster.",
add_completion=False,
)
dagster_dbt_app.add_typer(project_app, name="project", no_args_is_help=True)


@project_app.command(
name="manage-state",
help="""
This CLI command will handle uploading and downloading artifacts if `state_dir` is specified on
`DbtProject`.
""",
)
def manage_state_command(
statedir: str = STATEDIR_OPTION,
file: str = typer.Option(),
source_deployment: str = typer.Option(
default="prod",
help="Which deployment should upload its manifest.json.",
),
key_prefix: str = typer.Option(
default="",
help="A key prefix for the key the manifest.json is saved with.",
),
):
try:
from dagster_dbt import DbtProject
except:
ui.print(
"Unable to import dagster_dbt, can not use dbt-prepare-for-deployment when dagster_dbt is not installed."
)
return
from dagster._core.code_pointer import load_python_file
from dagster._core.definitions.load_assets_from_modules import find_objects_in_module_of_types

state_store = state.FileStore(statedir=statedir)
locations = state_store.list_locations()
if not locations:
raise ui.error("Unable to determine deployment state.")

location = locations[0]
deployment_name = location.deployment_name
is_branch = location.is_branch_deployment

contents = load_python_file(file, None)
for project in find_objects_in_module_of_types(contents, DbtProject):
project = cast(DbtProject, project)
if project.state_path:
download_path = project.state_path.joinpath("manifest.json")
key = f"{key_prefix}{os.fspath(download_path)}"
if is_branch:
ui.print(f"Downloading {source_deployment} manifest for branch deployment.")
os.makedirs(project.state_path, exist_ok=True)
download_organization_artifact(key, download_path)
ui.print("Download complete.")

elif deployment_name == source_deployment:
ui.print(f"Uploading {source_deployment} manifest.")
upload_organization_artifact(key, project.manifest_path)
ui.print("Upload complete")

ui.print("Project ready")
1 change: 1 addition & 0 deletions dagster-cloud-cli/dagster_cloud_cli/commands/ci/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ class LocationState(BaseModel, extra=Extra.forbid):
deployment_name: str
location_file: str
location_name: str
is_branch_deployment: bool
selected: bool = True
build: BuildMetadata
build_output: Optional[Union[DockerBuildOutput, PexBuildOutput]] = Field(
Expand Down
2 changes: 1 addition & 1 deletion dagster-cloud-cli/dagster_cloud_cli/version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "1.7.0"
__version__ = "1!0+dev"
5 changes: 4 additions & 1 deletion dagster-cloud/dagster_cloud/anomaly_detection/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
from .defs import (
build_anomaly_detection_freshness_checks as build_anomaly_detection_freshness_checks,
)
from .types import AnomalyDetectionModelParams as AnomalyDetectionModelParams
from .types import (
AnomalyDetectionModelParams as AnomalyDetectionModelParams,
BetaFreshnessAnomalyDetectionParams as BetaFreshnessAnomalyDetectionParams,
)
26 changes: 13 additions & 13 deletions dagster-cloud/dagster_cloud/anomaly_detection/defs.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ def _build_check_for_assets(
@multi_asset_check(
specs=[
AssetCheckSpec(
name="freshness_anomaly_detection_check",
name="anomaly_detection_freshness_check",
description=f"Detects anomalies in the freshness of the asset using model {params.model_version.value.lower()}.",
asset=asset_key,
)
Expand All @@ -65,7 +65,9 @@ def the_check(context: AssetCheckExecutionContext) -> Iterable[AssetCheckResult]
)
instance = cast(DagsterCloudAgentInstance, context.instance)
with create_cloud_webserver_client(
instance.dagit_url,
instance.dagit_url[:-1]
if instance.dagit_url.endswith("/")
else instance.dagit_url, # Remove trailing slash
check.str_param(instance.dagster_cloud_agent_token, "dagster_cloud_agent_token"),
) as client:
for check_key in context.selected_asset_check_keys:
Expand All @@ -82,16 +84,17 @@ def the_check(context: AssetCheckExecutionContext) -> Iterable[AssetCheckResult]
},
},
)
data = result["data"]["anomalyDetectionInference"]
metadata = {
"model_params": {**params.as_metadata},
"model_version": params.model_version.value,
}
if result["anomalyDetectionInference"]["__typename"] != "AnomalyDetectionSuccess":
if data["__typename"] != "AnomalyDetectionSuccess":
yield handle_anomaly_detection_inference_failure(
result, metadata, params, asset_key
data, metadata, params, asset_key
)
continue
response = result["anomalyDetectionInference"]["response"]
response = result["data"]["anomalyDetectionInference"]["response"]
overdue_seconds = check.float_param(response["overdue_seconds"], "overdue_seconds")
overdue_deadline_timestamp = response["overdue_deadline_timestamp"]
metadata["overdue_deadline_timestamp"] = MetadataValue.timestamp(
Expand Down Expand Up @@ -148,25 +151,22 @@ def the_check(context: AssetCheckExecutionContext) -> Iterable[AssetCheckResult]


def handle_anomaly_detection_inference_failure(
result: dict, metadata: dict, params: AnomalyDetectionModelParams, asset_key: AssetKey
data: dict, metadata: dict, params: AnomalyDetectionModelParams, asset_key: AssetKey
) -> AssetCheckResult:
if (
result["anomalyDetectionInference"]["__typename"] == "AnomalyDetectionFailure"
and result["anomalyDetectionInference"]["message"]
== params.model_version.minimum_required_records_msg
data["__typename"] == "AnomalyDetectionFailure"
and data["message"] == params.model_version.minimum_required_records_msg
):
# Intercept failure in the case of not enough records, and return a pass to avoid
# being too noisy with failures.
return AssetCheckResult(
passed=True,
severity=AssetCheckSeverity.WARN,
metadata=metadata,
description=result["anomalyDetectionInference"]["message"],
description=data["message"],
asset_key=asset_key,
)
raise DagsterCloudAnomalyDetectionFailed(
f"Anomaly detection failed: {result['anomalyDetectionInference']['message']}"
)
raise DagsterCloudAnomalyDetectionFailed(f"Anomaly detection failed: {data['message']}")


def build_anomaly_detection_freshness_checks(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ def jaffle_shop_dbt_assets(
)
if row.bytes_billed or row.slots_ms:
cost_info = BigQueryCostInfo(
asset_key, partition, row.job_id, row.bytes_billed, row.slots_ms
asset_key, partition, row.job_id, row.slots_ms, row.bytes_billed
)
cost_by_asset[cost_info.asset_partition_key].append(cost_info)
except:
Expand Down
2 changes: 1 addition & 1 deletion dagster-cloud/dagster_cloud/version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "1.7.0"
__version__ = "1!0+dev"
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,9 @@

DEFAULT_SERVER_PROCESS_STARTUP_TIMEOUT = 180
DEFAULT_MAX_TTL_SERVERS = 25
ACTIVE_AGENT_HEARTBEAT_INTERVAL = 600
ACTIVE_AGENT_HEARTBEAT_INTERVAL = int(
os.getenv("DAGSTER_CLOUD_ACTIVE_AGENT_HEARTBEAT_INVERAL", "600")
)


USER_CODE_LAUNCHER_RECONCILE_SLEEP_SECONDS = 1
Expand Down Expand Up @@ -340,6 +342,7 @@ def __init__(
self._desired_entries: Dict[DeploymentAndLocation, UserCodeLauncherEntry] = {}
self._actual_entries: Dict[DeploymentAndLocation, UserCodeLauncherEntry] = {}
self._last_refreshed_actual_entries = 0
self._last_cleaned_up_dangling_code_servers = 0
self._metadata_lock = threading.Lock()

self._upload_locations: Set[DeploymentAndLocation] = set()
Expand Down Expand Up @@ -457,7 +460,9 @@ def start(self, run_reconcile_thread=True, run_metrics_thread=True):
"Not starting run worker monitoring, because it's not supported on this agent."
)

self._graceful_cleanup_servers()
self._graceful_cleanup_servers(
include_own_servers=True # shouldn't be any of our own servers at this part, but won't hurt either
)

if run_reconcile_thread:
self._reconcile_grpc_metadata_thread = threading.Thread(
Expand Down Expand Up @@ -893,13 +898,15 @@ def _graceful_remove_server_handle(self, server_handle: ServerHandle):
with self._grpc_servers_lock:
self._pending_delete_grpc_server_handles.discard(server_handle)

def _cleanup_servers(self, active_agent_ids: Set[str]) -> None:
def _cleanup_servers(self, active_agent_ids: Set[str], include_own_servers: bool) -> None:
"""Remove all servers, across all deployments and locations."""
with ThreadPoolExecutor() as executor:
futures = []
for handle in self._list_server_handles():
self._logger.info(f"Attempting to cleanup server {handle}")
if self._can_cleanup_server(handle, active_agent_ids):
if self._can_cleanup_server(
handle, active_agent_ids, include_own_servers=include_own_servers
):
self._logger.info(f"Can remove server {handle}. Cleaning up.")
futures.append(executor.submit(self._remove_server_handle, handle))
else:
Expand All @@ -924,7 +931,9 @@ def get_agent_id_for_server(self, handle: ServerHandle) -> Optional[str]:
def get_server_create_timestamp(self, handle: ServerHandle) -> Optional[float]:
"""Returns the update_timestamp value from the given code server."""

def _can_cleanup_server(self, handle: ServerHandle, active_agent_ids: Set[str]) -> bool:
def _can_cleanup_server(
self, handle: ServerHandle, active_agent_ids: Set[str], include_own_servers: bool
) -> bool:
"""Returns true if we can clean up the server identified by the handle without issues (server was started by this agent, or agent is no longer active)."""
agent_id_for_server = self.get_agent_id_for_server(handle)
self._logger.info(
Expand All @@ -933,11 +942,13 @@ def _can_cleanup_server(self, handle: ServerHandle, active_agent_ids: Set[str])
)
self._logger.info(f"All active agent ids: {active_agent_ids}")

# If this server was created by the current agent, it can always be cleaned up
# (or if its a legacy server that never set an agent ID)
if not agent_id_for_server or self._instance.instance_uuid == agent_id_for_server:
# if it's a legacy server that never set an agent ID:
if not agent_id_for_server:
return True

if self._instance.instance_uuid == agent_id_for_server:
return include_own_servers

try:
update_timestamp_for_server = self.get_server_create_timestamp(handle)
except:
Expand All @@ -958,16 +969,18 @@ def _can_cleanup_server(self, handle: ServerHandle, active_agent_ids: Set[str])

return agent_id_for_server not in cast(Set[str], active_agent_ids)

def _graceful_cleanup_servers(self): # ServerHandles
def _graceful_cleanup_servers(self, include_own_servers: bool): # ServerHandles
active_agent_ids = self.get_active_agent_ids()
if not self.supports_get_current_runs_for_server_handle:
return self._cleanup_servers(active_agent_ids)
return self._cleanup_servers(active_agent_ids, include_own_servers=include_own_servers)

handles = self._list_server_handles()
servers_to_remove: List[ServerHandle] = []
with self._grpc_servers_lock:
for handle in handles:
if self._can_cleanup_server(handle, active_agent_ids):
if self._can_cleanup_server(
handle, active_agent_ids, include_own_servers=include_own_servers
):
servers_to_remove.append(handle)
self._pending_delete_grpc_server_handles.update(servers_to_remove)
for server_handle in servers_to_remove:
Expand All @@ -994,7 +1007,7 @@ def __exit__(self, exception_type, exception_value, traceback):
self._reconcile_location_utilization_metrics_thread.join()

if self._started:
self._graceful_cleanup_servers()
self._graceful_cleanup_servers(include_own_servers=True)

super().__exit__(exception_value, exception_value, traceback)

Expand Down Expand Up @@ -1081,6 +1094,9 @@ def _reconcile_thread(self, shutdown_event):
f"Failure updating user code servers: {serializable_error_info_from_exc_info(sys.exc_info())}"
)

def _cleanup_server_check_interval(self):
return int(os.getenv("DAGSTER_CLOUD_CLEANUP_SERVER_CHECK_INTERVAL", "1800"))

def reconcile(self) -> None:
with self._metadata_lock:
desired_entries = (
Expand All @@ -1095,6 +1111,24 @@ def reconcile(self) -> None:

now = pendulum.now("UTC").timestamp()

if not self._last_refreshed_actual_entries:
self._last_refreshed_actual_entries = now

if not self._last_cleaned_up_dangling_code_servers:
self._last_cleaned_up_dangling_code_servers = now

cleanup_server_check_interval = self._cleanup_server_check_interval()

if (
cleanup_server_check_interval
and now - self._last_cleaned_up_dangling_code_servers > cleanup_server_check_interval
):
try:
self._graceful_cleanup_servers(include_own_servers=False)
except:
self._logger.exception("Failed to clean up dangling code serverrs.")
self._last_cleaned_up_dangling_code_servers = now

if now - self._last_refreshed_actual_entries > ACTUAL_ENTRIES_REFRESH_INTERVAL:
try:
self._refresh_actual_entries()
Expand Down
Loading

0 comments on commit 0b64a31

Please sign in to comment.