diff --git a/examples/private_scan.py b/examples/private_scan.py new file mode 100644 index 0000000..1b350b1 --- /dev/null +++ b/examples/private_scan.py @@ -0,0 +1,89 @@ +""" +Tool for scanning files privately using VirusTotal API. +Supports waiting for scan completion. +""" + +import sys +import asyncio +import argparse +from pathlib import Path +import vt +from rich.console import Console +from rich.progress import Progress + +console = Console() + +async def scan_file_private( + api_key: str, + file_path: Path, + wait: bool = False +) -> None: + """ + Scan a file privately on VirusTotal. + + Args: + api_key: VirusTotal API key + file_path: Path to file to scan + wait: Wait for scan completion + """ + async with vt.Client(api_key) as client: + try: + with Progress() as progress: + task = progress.add_task( + "Scanning file...", + total=None if wait else 1 + ) + + analysis = await client.scan_file_private_async( + str(file_path), + wait_for_completion=wait + ) + + progress.update(task, advance=1) + + console.print("\n[green]Scan submitted successfully[/green]") + console.print(f"Analysis ID: {analysis.id}") + + if wait: + console.print(f"\nScan Status: {analysis.status}") + if hasattr(analysis, 'stats'): + console.print("Detection Stats:") + for k, v in analysis.stats.items(): + console.print(f" {k}: {v}") + + except vt.error.APIError as e: + console.print(f"[red]API Error: {e}[/red]") + except Exception as e: + console.print(f"[red]Error: {e}[/red]") + +def main(): + parser = argparse.ArgumentParser( + description="Scan file privately using VirusTotal API" + ) + parser.add_argument("--apikey", help="VirusTotal API key") + parser.add_argument("--file_path", help="Path to file to scan") + parser.add_argument( + "--wait", + action="store_true", + help="Wait for scan completion" + ) + + args = parser.parse_args() + file_path = Path(args.file_path) + + if not file_path.exists(): + console.print(f"[red]Error: File {file_path} not found[/red]") + sys.exit(1) + + if not file_path.is_file(): + console.print(f"[red]Error: {file_path} is not a file[/red]") + sys.exit(1) + + asyncio.run(scan_file_private( + args.apikey, + file_path, + args.wait + )) + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/tests/test_client.py b/tests/test_client.py index 5469fec..057c3f6 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -507,3 +507,80 @@ def test_wsgi_app(httpserver, monkeypatch): response = client.get("/") assert response.status_code == 200 assert response.json == expected_response + +@pytest.fixture +def private_scan_mocks(httpserver): + """Fixture for mocking private scan API calls.""" + upload_url = f"http://{httpserver.host}:{httpserver.port}/upload" + + # Mock private upload URL request + httpserver.expect_request( + "/api/v3/private/files/upload_url", + method="GET" + ).respond_with_json({ + "data": upload_url + }) + + # Mock file upload response + httpserver.expect_request( + "/upload", + method="POST" + ).respond_with_json({ + "data": { + "id": "dummy_scan_id", + "type": "private_analysis", + "links": { + "self": "dummy_link" + }, + "attributes": { + "status": "queued", + } + } + }) + + # Add mock for analysis status endpoint + httpserver.expect_request( + "/api/v3/analyses/dummy_scan_id", + method="GET" + ).respond_with_json({ + "data": { + "id": "dummy_scan_id", + "type": "private_analysis", + "links": { + "self": "dummy_link" + }, + "attributes": { + "status": "completed", + "stats": { + "malicious": 0, + "suspicious": 0 + } + } + } + }) + + return upload_url + +def verify_analysis(analysis, status="queued"): + """Helper to verify analysis response.""" + assert analysis.id == "dummy_scan_id" + assert analysis.type == "private_analysis" + assert getattr(analysis, "status") == status + +def test_scan_file_private(httpserver, private_scan_mocks): + """Test synchronous private file scanning.""" + with new_client(httpserver) as client: + with io.StringIO("test file content") as f: + analysis = client.scan_file_private(f) + verify_analysis(analysis) + +@pytest.mark.asyncio +async def test_scan_file_private_async(httpserver, private_scan_mocks): + """Test asynchronous private file scanning.""" + async with new_client(httpserver) as client: + with io.StringIO("test file content") as f: + analysis = await client.scan_file_private_async( + f, + wait_for_completion=True + ) + verify_analysis(analysis, status="completed") \ No newline at end of file diff --git a/vt/client.py b/vt/client.py index a7c342b..d43e9ef 100644 --- a/vt/client.py +++ b/vt/client.py @@ -19,6 +19,8 @@ import io import json import typing +import os +import aiofiles import aiohttp @@ -973,3 +975,53 @@ async def _wait_for_analysis_completion(self, analysis: Object) -> Object: async def wait_for_analysis_completion(self, analysis: Object) -> Object: return await self._wait_for_analysis_completion(analysis) + + def scan_file_private( + self, + file: typing.Union[typing.BinaryIO, str], + wait_for_completion: bool = False + ) -> Object: + """Scan file privately. + + Args: + file: File to scan (path string or file object) + wait_for_completion: Wait for completion + + Returns: + Object: Analysis object with scan results + """ + return make_sync( + self.scan_file_private_async(file, wait_for_completion) + ) + + async def scan_file_private_async( + self, + file: typing.Union[typing.BinaryIO, str], + wait_for_completion: bool = False + ) -> Object: + """Async version of scan_file_private""" + + # Handle string path + if isinstance(file, str): + async with aiofiles.open(file, 'rb') as f: + file_content = io.BytesIO(await f.read()) + file_content.name = os.path.basename(file) + return await self.scan_file_private_async( + file_content, + wait_for_completion=wait_for_completion + ) + + # Create form data for private scan + form = aiohttp.FormData() + form.add_field('file', file) + + # Get private upload URL and submit + upload_url = await self.get_data_async("/private/files/upload_url") + response = await self.post_async(upload_url, data=form) + + analysis = await self._response_to_object(response) + + if wait_for_completion: + analysis = await self._wait_for_analysis_completion(analysis) + + return analysis \ No newline at end of file