feat: add file-based config and web api workflow#392
feat: add file-based config and web api workflow#392Emptytao wants to merge 1 commit intousestrix:mainfrom
Conversation
Greptile SummaryThis PR introduces two major capabilities: a file-based structured config system (replacing env-var-only configuration with a Pydantic-validated JSON config file at Key concerns found:
Confidence Score: 2/5
Important Files Changed
Prompt To Fix All With AIThis is a comment left during a code review.
Path: strix/api/task_manager.py
Line: 63-73
Comment:
**`instruction_file` enables arbitrary server-side file read**
When `instruction_file` is present in the request, the API server reads that path directly from its own filesystem and injects the content into the scan as the instruction. Because `api_auth_token` defaults to `null` (no authentication required out of the box), any client that can reach the API can exfiltrate arbitrary files:
```
POST /api/v1/tasks
{"targets": ["https://example.com"], "instruction_file": "/etc/passwd"}
```
The file content is then stored in the task record (and served back via the task result endpoint), making this an unauthenticated arbitrary file read.
Even when an auth token is set, a compromised token grants full read access to the server's filesystem — a significant privilege escalation beyond the intended "run a scan" capability.
Consider removing `instruction_file` from the API surface entirely, or restricting it to a configurable allow-list directory (e.g. only paths inside a configured `instructions_dir`).
How can I resolve this? If you propose a fix, please make it concise.
---
This is a comment left during a code review.
Path: strix/api/server.py
Line: 92-95
Comment:
**`GET /tasks/{id}` returns full `ScanTaskResult`, not just the task record**
`_get_result_or_404` returns a `ScanTaskResult` (containing `task`, `scan_state`, and `artifacts`), and the response is `result.model_dump(mode="python")`. That means this endpoint returns the same payload as `GET /api/v1/tasks/{task_id}/result`:
```python
# GET /api/v1/tasks/{task_id} → {"task": {...}, "scan_state": {...}, "artifacts": [...]}
# GET /api/v1/tasks/{task_id}/result → {"task": {...}, "scan_state": {...}, "artifacts": [...]}
```
Callers expecting `GET /tasks/{id}` to return a lightweight status object (like the records returned by `GET /tasks`) will be surprised by the heavier payload. A typical REST convention here is to return just the `ScanTaskRecord`. Returning the full result can also cause unnecessary I/O (globbing all artifact paths) on every status poll.
Consider returning only the task record at this endpoint:
```suggestion
@app.get("/api/v1/tasks/{task_id}", dependencies=[Depends(require_auth)])
async def get_task(task_id: str) -> dict[str, object]:
record = _get_task_or_404(task_id)
return {"task": record.model_dump(mode="python")}
```
How can I resolve this? If you propose a fix, please make it concise.
---
This is a comment left during a code review.
Path: strix/api/task_manager.py
Line: 47-57
Comment:
**Race condition in concurrent-task limit check**
`list_tasks()` + the `len(active_tasks) >= max_concurrent_tasks` check is not atomic. If two requests arrive simultaneously both can read `len(active_tasks) == 0` (with default `max_concurrent_tasks=1`) and both will pass the guard, resulting in two tasks being started when only one should be allowed.
Consider using a threading lock or an asyncio lock around the check-and-create step, or using an in-process counter instead of the disk-based list.
How can I resolve this? If you propose a fix, please make it concise.
---
This is a comment left during a code review.
Path: strix/api/task_store.py
Line: 97-101
Comment:
**`finished_at` may be `None` for completed tasks**
When `refresh()` promotes a task to `COMPLETED` via the scan-state path, `finished_at` is read from `scan_state["run_metadata"]["end_time"]`, which can be absent:
```python
record.finished_at = (scan_state.get("run_metadata") or {}).get("end_time") # → None if missing
```
A `ScanTaskRecord` with `status=COMPLETED` and `finished_at=None` will produce inconsistent API responses (e.g. the `task.finished` SSE event carries a record without a finish timestamp). The `ScanTaskRecord.normalize_record` validator also syncs `completed_at ↔ finished_at`, so both fields will end up `None`.
Consider falling back to the current time when `end_time` is unavailable:
```suggestion
record.finished_at = (scan_state.get("run_metadata") or {}).get("end_time") or utc_now_iso()
```
How can I resolve this? If you propose a fix, please make it concise.
---
This is a comment left during a code review.
Path: strix/api/worker.py
Line: 57-60
Comment:
**`sandbox_mode` hardcoded to `False`, ignoring config**
`configure_runtime_context` is called with `sandbox_mode=False` unconditionally, which means the `runtime.sandbox_mode` setting from the loaded config file is silently ignored for all API-triggered scans. If an operator sets `sandbox_mode: true` in their config expecting isolated execution, this worker will override that intent without any warning.
Consider reading the value from `Config.get_bool("strix_sandbox_mode")` so the worker respects the operator's configuration, consistent with how other settings are read from the config.
How can I resolve this? If you propose a fix, please make it concise.Reviews (1): Last reviewed commit: "feat: add file-based config and web api ..." | Re-trigger Greptile |
| instruction = request.instruction | ||
| if request.instruction_file: | ||
| instruction_path = Path(request.instruction_file).expanduser().resolve() | ||
| try: | ||
| instruction = instruction_path.read_text(encoding="utf-8").strip() | ||
| except OSError as exc: | ||
| raise ValueError( | ||
| f"Failed to read instruction file '{instruction_path}': {exc}" | ||
| ) from exc | ||
| if not instruction: | ||
| raise ValueError(f"Instruction file '{instruction_path}' is empty") |
There was a problem hiding this comment.
instruction_file enables arbitrary server-side file read
When instruction_file is present in the request, the API server reads that path directly from its own filesystem and injects the content into the scan as the instruction. Because api_auth_token defaults to null (no authentication required out of the box), any client that can reach the API can exfiltrate arbitrary files:
POST /api/v1/tasks
{"targets": ["https://example.com"], "instruction_file": "/etc/passwd"}
The file content is then stored in the task record (and served back via the task result endpoint), making this an unauthenticated arbitrary file read.
Even when an auth token is set, a compromised token grants full read access to the server's filesystem — a significant privilege escalation beyond the intended "run a scan" capability.
Consider removing instruction_file from the API surface entirely, or restricting it to a configurable allow-list directory (e.g. only paths inside a configured instructions_dir).
Prompt To Fix With AI
This is a comment left during a code review.
Path: strix/api/task_manager.py
Line: 63-73
Comment:
**`instruction_file` enables arbitrary server-side file read**
When `instruction_file` is present in the request, the API server reads that path directly from its own filesystem and injects the content into the scan as the instruction. Because `api_auth_token` defaults to `null` (no authentication required out of the box), any client that can reach the API can exfiltrate arbitrary files:
```
POST /api/v1/tasks
{"targets": ["https://example.com"], "instruction_file": "/etc/passwd"}
```
The file content is then stored in the task record (and served back via the task result endpoint), making this an unauthenticated arbitrary file read.
Even when an auth token is set, a compromised token grants full read access to the server's filesystem — a significant privilege escalation beyond the intended "run a scan" capability.
Consider removing `instruction_file` from the API surface entirely, or restricting it to a configurable allow-list directory (e.g. only paths inside a configured `instructions_dir`).
How can I resolve this? If you propose a fix, please make it concise.| @app.get("/api/v1/tasks/{task_id}", dependencies=[Depends(require_auth)]) | ||
| async def get_task(task_id: str) -> dict[str, object]: | ||
| result = _get_result_or_404(task_id) | ||
| return result.model_dump(mode="python") |
There was a problem hiding this comment.
GET /tasks/{id} returns full ScanTaskResult, not just the task record
_get_result_or_404 returns a ScanTaskResult (containing task, scan_state, and artifacts), and the response is result.model_dump(mode="python"). That means this endpoint returns the same payload as GET /api/v1/tasks/{task_id}/result:
# GET /api/v1/tasks/{task_id} → {"task": {...}, "scan_state": {...}, "artifacts": [...]}
# GET /api/v1/tasks/{task_id}/result → {"task": {...}, "scan_state": {...}, "artifacts": [...]}Callers expecting GET /tasks/{id} to return a lightweight status object (like the records returned by GET /tasks) will be surprised by the heavier payload. A typical REST convention here is to return just the ScanTaskRecord. Returning the full result can also cause unnecessary I/O (globbing all artifact paths) on every status poll.
Consider returning only the task record at this endpoint:
| @app.get("/api/v1/tasks/{task_id}", dependencies=[Depends(require_auth)]) | |
| async def get_task(task_id: str) -> dict[str, object]: | |
| result = _get_result_or_404(task_id) | |
| return result.model_dump(mode="python") | |
| @app.get("/api/v1/tasks/{task_id}", dependencies=[Depends(require_auth)]) | |
| async def get_task(task_id: str) -> dict[str, object]: | |
| record = _get_task_or_404(task_id) | |
| return {"task": record.model_dump(mode="python")} |
Prompt To Fix With AI
This is a comment left during a code review.
Path: strix/api/server.py
Line: 92-95
Comment:
**`GET /tasks/{id}` returns full `ScanTaskResult`, not just the task record**
`_get_result_or_404` returns a `ScanTaskResult` (containing `task`, `scan_state`, and `artifacts`), and the response is `result.model_dump(mode="python")`. That means this endpoint returns the same payload as `GET /api/v1/tasks/{task_id}/result`:
```python
# GET /api/v1/tasks/{task_id} → {"task": {...}, "scan_state": {...}, "artifacts": [...]}
# GET /api/v1/tasks/{task_id}/result → {"task": {...}, "scan_state": {...}, "artifacts": [...]}
```
Callers expecting `GET /tasks/{id}` to return a lightweight status object (like the records returned by `GET /tasks`) will be surprised by the heavier payload. A typical REST convention here is to return just the `ScanTaskRecord`. Returning the full result can also cause unnecessary I/O (globbing all artifact paths) on every status poll.
Consider returning only the task record at this endpoint:
```suggestion
@app.get("/api/v1/tasks/{task_id}", dependencies=[Depends(require_auth)])
async def get_task(task_id: str) -> dict[str, object]:
record = _get_task_or_404(task_id)
return {"task": record.model_dump(mode="python")}
```
How can I resolve this? If you propose a fix, please make it concise.| max_concurrent_tasks = Config.get_int("api_max_concurrent_tasks") or 1 | ||
| active_tasks = [ | ||
| task | ||
| for task in self.list_tasks() | ||
| if task.status in {TaskStatus.QUEUED, TaskStatus.RUNNING, TaskStatus.CANCELLING} | ||
| ] | ||
| if len(active_tasks) >= max_concurrent_tasks: | ||
| raise ValueError( | ||
| "Maximum concurrent task limit reached. " | ||
| f"Current limit: {max_concurrent_tasks}" | ||
| ) |
There was a problem hiding this comment.
Race condition in concurrent-task limit check
list_tasks() + the len(active_tasks) >= max_concurrent_tasks check is not atomic. If two requests arrive simultaneously both can read len(active_tasks) == 0 (with default max_concurrent_tasks=1) and both will pass the guard, resulting in two tasks being started when only one should be allowed.
Consider using a threading lock or an asyncio lock around the check-and-create step, or using an in-process counter instead of the disk-based list.
Prompt To Fix With AI
This is a comment left during a code review.
Path: strix/api/task_manager.py
Line: 47-57
Comment:
**Race condition in concurrent-task limit check**
`list_tasks()` + the `len(active_tasks) >= max_concurrent_tasks` check is not atomic. If two requests arrive simultaneously both can read `len(active_tasks) == 0` (with default `max_concurrent_tasks=1`) and both will pass the guard, resulting in two tasks being started when only one should be allowed.
Consider using a threading lock or an asyncio lock around the check-and-create step, or using an in-process counter instead of the disk-based list.
How can I resolve this? If you propose a fix, please make it concise.| scan_state = self.load_scan_state(record.task_id) | ||
| if scan_state and (scan_state.get("run_metadata") or {}).get("status") == "completed": | ||
| record.status = TaskStatus.COMPLETED | ||
| record.finished_at = (scan_state.get("run_metadata") or {}).get("end_time") | ||
| return self.save(record) |
There was a problem hiding this comment.
finished_at may be None for completed tasks
When refresh() promotes a task to COMPLETED via the scan-state path, finished_at is read from scan_state["run_metadata"]["end_time"], which can be absent:
record.finished_at = (scan_state.get("run_metadata") or {}).get("end_time") # → None if missingA ScanTaskRecord with status=COMPLETED and finished_at=None will produce inconsistent API responses (e.g. the task.finished SSE event carries a record without a finish timestamp). The ScanTaskRecord.normalize_record validator also syncs completed_at ↔ finished_at, so both fields will end up None.
Consider falling back to the current time when end_time is unavailable:
| scan_state = self.load_scan_state(record.task_id) | |
| if scan_state and (scan_state.get("run_metadata") or {}).get("status") == "completed": | |
| record.status = TaskStatus.COMPLETED | |
| record.finished_at = (scan_state.get("run_metadata") or {}).get("end_time") | |
| return self.save(record) | |
| record.finished_at = (scan_state.get("run_metadata") or {}).get("end_time") or utc_now_iso() |
Prompt To Fix With AI
This is a comment left during a code review.
Path: strix/api/task_store.py
Line: 97-101
Comment:
**`finished_at` may be `None` for completed tasks**
When `refresh()` promotes a task to `COMPLETED` via the scan-state path, `finished_at` is read from `scan_state["run_metadata"]["end_time"]`, which can be absent:
```python
record.finished_at = (scan_state.get("run_metadata") or {}).get("end_time") # → None if missing
```
A `ScanTaskRecord` with `status=COMPLETED` and `finished_at=None` will produce inconsistent API responses (e.g. the `task.finished` SSE event carries a record without a finish timestamp). The `ScanTaskRecord.normalize_record` validator also syncs `completed_at ↔ finished_at`, so both fields will end up `None`.
Consider falling back to the current time when `end_time` is unavailable:
```suggestion
record.finished_at = (scan_state.get("run_metadata") or {}).get("end_time") or utc_now_iso()
```
How can I resolve this? If you propose a fix, please make it concise.| configure_runtime_context( | ||
| sandbox_mode=False, | ||
| caido_api_token=Config.get_str("caido_api_token"), | ||
| ) |
There was a problem hiding this comment.
sandbox_mode hardcoded to False, ignoring config
configure_runtime_context is called with sandbox_mode=False unconditionally, which means the runtime.sandbox_mode setting from the loaded config file is silently ignored for all API-triggered scans. If an operator sets sandbox_mode: true in their config expecting isolated execution, this worker will override that intent without any warning.
Consider reading the value from Config.get_bool("strix_sandbox_mode") so the worker respects the operator's configuration, consistent with how other settings are read from the config.
Prompt To Fix With AI
This is a comment left during a code review.
Path: strix/api/worker.py
Line: 57-60
Comment:
**`sandbox_mode` hardcoded to `False`, ignoring config**
`configure_runtime_context` is called with `sandbox_mode=False` unconditionally, which means the `runtime.sandbox_mode` setting from the loaded config file is silently ignored for all API-triggered scans. If an operator sets `sandbox_mode: true` in their config expecting isolated execution, this worker will override that intent without any warning.
Consider reading the value from `Config.get_bool("strix_sandbox_mode")` so the worker respects the operator's configuration, consistent with how other settings are read from the config.
How can I resolve this? If you propose a fix, please make it concise.
No description provided.