diff --git a/py/llama_cloud_services/parse/base.py b/py/llama_cloud_services/parse/base.py index be02ffac..fe70a79c 100644 --- a/py/llama_cloud_services/parse/base.py +++ b/py/llama_cloud_services/parse/base.py @@ -1073,7 +1073,11 @@ def _calculate_backoff(self, current_interval: float) -> float: return current_interval # Default fallback async def _get_job_result( - self, job_id: str, result_type: str, verbose: bool = False + self, + job_id: str, + result_type: str, + verbose: bool = False, + raise_job_error: bool = True, ) -> Dict[str, Any]: start = time.time() tries = 0 @@ -1106,7 +1110,25 @@ async def _get_job_result( print(".", end="", flush=True) current_interval = self._calculate_backoff(current_interval) else: - raise JobFailedException.from_result(result_json) + if raise_job_error: + raise JobFailedException.from_result(result_json) + else: + error_code = result_json.get("error_code") + error_message = result_json.get("error_message") + + error_parts = [f"Job ID: {job_id} failed with status: {status}"] + if error_code: + error_parts.append(f"Error code: {error_code}") + if error_message: + error_parts.append(f"Error message: {error_message}") + error_str = ", ".join(error_parts) + + return { + "pages": [], + "job_metadata": {"job_pages": 0}, + "error": error_str, + "status": status, + } except ( httpx.ConnectError, httpx.ReadError, @@ -1137,6 +1159,7 @@ async def _parse_one( fs: Optional[AbstractFileSystem] = None, result_type: Optional[str] = None, num_workers: Optional[int] = None, + raise_job_error: bool = True, ) -> List[Tuple[str, Dict[str, Any]]]: if self.partition_pages is None: job_results = [ @@ -1145,6 +1168,7 @@ async def _parse_one( extra_info=extra_info, fs=fs, result_type=result_type, + raise_job_error=raise_job_error, ) ] else: @@ -1154,6 +1178,7 @@ async def _parse_one( fs=fs, result_type=result_type, num_workers=num_workers, + raise_job_error=raise_job_error, ) return job_results @@ -1163,6 +1188,7 @@ async def _parse_one_unpartitioned( extra_info: Optional[dict] = None, fs: Optional[AbstractFileSystem] = None, result_type: Optional[str] = None, + raise_job_error: bool = True, **create_kwargs: Any, ) -> Tuple[str, Dict[str, Any]]: """Create one parse job and wait for the result.""" @@ -1172,7 +1198,10 @@ async def _parse_one_unpartitioned( if self.verbose: print("Started parsing the file under job_id %s" % job_id) result = await self._get_job_result( - job_id, result_type or self.result_type.value, verbose=self.verbose + job_id, + result_type or self.result_type.value, + verbose=self.verbose, + raise_job_error=raise_job_error, ) return job_id, result @@ -1183,6 +1212,7 @@ async def _parse_one_partitioned( fs: Optional[AbstractFileSystem] = None, result_type: Optional[str] = None, num_workers: Optional[int] = None, + raise_job_error: bool = True, ) -> List[Tuple[str, Dict[str, Any]]]: """Partition a file and run separate parse jobs per partition segment.""" assert self.partition_pages is not None @@ -1197,6 +1227,7 @@ async def _parse_one_partitioned( extra_info=extra_info, fs=fs, result_type=result_type, + raise_job_error=raise_job_error, partition_target_pages=target_pages, ) for target_pages in partition_pages( @@ -1224,28 +1255,33 @@ async def _parse_one_partitioned( size = self.partition_pages if not size: break - try: - # Fetch JSON result type first to get accurate pagination data - # and then fetch the user's desired result type if needed - job_id, json_result = await self._parse_one_unpartitioned( - file_path, - extra_info=extra_info, - fs=fs, - result_type=ResultType.JSON.value, - partition_target_pages=f"{total}-{total + size - 1}", - ) - result_type = result_type or self.result_type.value - if result_type == ResultType.JSON.value: - job_result = json_result - else: - job_result = await self._get_job_result( - job_id, result_type, verbose=self.verbose - ) - except JobFailedException as e: - if results and e.error_code == "NO_DATA_FOUND_IN_FILE": - # Expected when we try to read past the end of the file + # Fetch JSON result type first to get accurate pagination data + # and then fetch the user's desired result type if needed + job_id, json_result = await self._parse_one_unpartitioned( + file_path, + extra_info=extra_info, + fs=fs, + result_type=ResultType.JSON.value, + raise_job_error=raise_job_error, + partition_target_pages=f"{total}-{total + size - 1}", + ) + + if json_result.get("error"): + if results and "NO_DATA_FOUND_IN_FILE" in json_result.get("error", ""): return results - raise + results.append((job_id, json_result)) + return results + + result_type = result_type or self.result_type.value + if result_type == ResultType.JSON.value: + job_result = json_result + else: + job_result = await self._get_job_result( + job_id, + result_type, + verbose=self.verbose, + raise_job_error=raise_job_error, + ) results.append((job_id, job_result)) if len(json_result["pages"]) < size: break @@ -1359,6 +1395,7 @@ async def _aparse_one( extra_info: Optional[dict] = None, fs: Optional[AbstractFileSystem] = None, num_workers: Optional[int] = None, + raise_job_error: bool = True, ) -> List[JobResult]: job_results = await self._parse_one( file_path, @@ -1366,6 +1403,7 @@ async def _aparse_one( fs=fs, result_type=ResultType.JSON.value, num_workers=num_workers, + raise_job_error=raise_job_error, ) return [ JobResult( @@ -1385,6 +1423,7 @@ async def aparse( file_path: Union[List[FileInput], FileInput], extra_info: Optional[dict] = None, fs: Optional[AbstractFileSystem] = None, + raise_job_error: bool = True, ) -> Union[List["JobResult"], "JobResult"]: """ Parse the file and return a JobResult object instead of Document objects. @@ -1396,6 +1435,7 @@ async def aparse( file_path: Path to the file to parse. Can be a string, path, bytes, file-like object, or a list of these. extra_info: Additional metadata to include in the result. fs: Optional filesystem to use for reading files. + raise_job_error: If True (default), raises JobFailedException when jobs fail. If False, returns JobResult objects with error information. Returns: JobResult object or list of JobResult objects if either multiple files were provided or file(s) were partitioned before parsing. @@ -1411,7 +1451,11 @@ async def aparse( else: file_name = str(file_path) result = await self._aparse_one( - file_path, file_name, extra_info=extra_info, fs=fs + file_path, + file_name, + extra_info=extra_info, + fs=fs, + raise_job_error=raise_job_error, ) return result[0] if len(result) == 1 else result @@ -1437,6 +1481,7 @@ async def aparse( extra_info=extra_info, fs=fs, num_workers=1, + raise_job_error=raise_job_error, ) for i, f in enumerate(file_path) ], @@ -1462,6 +1507,7 @@ def parse( file_path: Union[List[FileInput], FileInput], extra_info: Optional[dict] = None, fs: Optional[AbstractFileSystem] = None, + raise_job_error: bool = True, ) -> Union[List["JobResult"], "JobResult"]: """ Parse the file and return a JobResult object instead of Document objects. @@ -1473,12 +1519,17 @@ def parse( file_path: Path to the file to parse. Can be a string, path, bytes, file-like object, or a list of these. extra_info: Additional metadata to include in the result. fs: Optional filesystem to use for reading files. + raise_job_error: If True (default), raises JobFailedException when jobs fail. If False, returns JobResult objects with error information. Returns: JobResult object or list of JobResult objects if multiple files were provided """ try: - return asyncio_run(self.aparse(file_path, extra_info, fs=fs)) + return asyncio_run( + self.aparse( + file_path, extra_info, fs=fs, raise_job_error=raise_job_error + ) + ) except RuntimeError as e: if nest_asyncio_err in str(e): raise RuntimeError(nest_asyncio_msg) @@ -1745,7 +1796,7 @@ def _get_sub_docs(self, docs: List[Document]) -> List[Document]: return sub_docs async def aget_result( - self, job_id: Union[str, List[str]] + self, job_id: Union[str, List[str]], raise_job_error: bool = True ) -> Union[JobResult, List[JobResult]]: """ Return JobResult object for previously parsed job(s). @@ -1754,13 +1805,17 @@ async def aget_result( Args: job_id: Job ID or list of multiple Job IDs to be retrieved. + raise_job_error: If True (default), raises JobFailedException when jobs fail. If False, returns JobResult objects with error information. Returns: JobResult object or list of JobResult objects if multiple job IDs were provided. """ if isinstance(job_id, str): result = await self._get_job_result( - job_id, ResultType.JSON.value, verbose=self.verbose + job_id, + ResultType.JSON.value, + verbose=self.verbose, + raise_job_error=raise_job_error, ) return JobResult( job_id=job_id, @@ -1774,7 +1829,12 @@ async def aget_result( elif isinstance(job_id, list): results = [] jobs = [ - self._get_job_result(id_, ResultType.JSON.value, verbose=self.verbose) + self._get_job_result( + id_, + ResultType.JSON.value, + verbose=self.verbose, + raise_job_error=raise_job_error, + ) for id_ in job_id ] results = await run_jobs( @@ -1799,7 +1859,7 @@ async def aget_result( raise ValueError("The input job_id must be a string or a list of strings.") def get_result( - self, job_id: Union[str, List[str]] + self, job_id: Union[str, List[str]], raise_job_error: bool = True ) -> Union[JobResult, List[JobResult]]: """ Return JobResult object for previously parsed job(s). @@ -1808,12 +1868,15 @@ def get_result( Args: job_id: Job ID or list of multiple Job IDs to be retrieved. + raise_job_error: If True (default), raises JobFailedException when jobs fail. If False, returns JobResult objects with error information. Returns: JobResult object or list of JobResult objects if multiple job IDs were provided. """ try: - return asyncio_run(self.aget_result(job_id)) + return asyncio_run( + self.aget_result(job_id, raise_job_error=raise_job_error) + ) except RuntimeError as e: if nest_asyncio_err in str(e): raise RuntimeError(nest_asyncio_msg) diff --git a/py/tests/parse/test_llama_parse.py b/py/tests/parse/test_llama_parse.py index 105af9e2..b6f42ca2 100644 --- a/py/tests/parse/test_llama_parse.py +++ b/py/tests/parse/test_llama_parse.py @@ -2,6 +2,7 @@ import pytest import shutil from typing import Optional, cast +from unittest.mock import patch from fsspec.implementations.local import LocalFileSystem from httpx import AsyncClient @@ -204,6 +205,10 @@ async def test_get_result(markdown_parser: LlamaParse) -> None: assert len(result.pages) == len(expected.pages) +@pytest.mark.skipif( + os.environ.get("LLAMA_CLOUD_API_KEY", "") == "", + reason="LLAMA_CLOUD_API_KEY not set", +) @pytest.mark.asyncio async def test_parse_audio() -> None: parser = LlamaParse() @@ -211,3 +216,194 @@ async def test_parse_audio() -> None: result = await parser.aparse(filepath) assert result.job_id is not None + + +@pytest.mark.asyncio +async def test_error_handling_with_raise_job_error_false() -> None: + """Test that failed jobs return JobResult objects with error information when raise_job_error=False.""" + + parser = LlamaParse(api_key="test_key") + + # Mock error result with full error information + mock_error_result = { + "pages": [], + "job_metadata": {"job_pages": 0}, + "error": "Job ID: test_job_123 failed with status: ERROR, Error code: INVALID_FILE, Error message: File format not supported", + "status": "ERROR", + } + + with patch.object(parser, "_create_job", return_value="test_job_123"), patch.object( + parser, "_get_job_result", return_value=mock_error_result + ): + # Test aparse with raise_job_error=False + result = await parser.aparse("test_file.txt", raise_job_error=False) + + assert isinstance(result, type(result)) # Check it's a JobResult + assert result.job_id == "test_job_123" + assert result.error is not None + assert "ERROR" in result.error + assert "INVALID_FILE" in result.error + assert "File format not supported" in result.error + assert len(result.pages) == 0 + + # Test parse (synchronous version) with raise_job_error=False + result_sync = parser.parse("test_file.txt", raise_job_error=False) + + assert isinstance(result_sync, type(result_sync)) + assert result_sync.job_id == "test_job_123" + assert result_sync.error is not None + assert "INVALID_FILE" in result_sync.error + assert "File format not supported" in result_sync.error + + +@pytest.mark.asyncio +async def test_error_handling_with_raise_job_error_true() -> None: + """Test that failed jobs raise JobFailedException when raise_job_error=True (default behavior).""" + + parser = LlamaParse(api_key="test_key") + + # Mock that _get_job_result will raise JobFailedException when raise_job_error=True + from llama_cloud_services.parse.base import JobFailedException + + def mock_get_job_result(job_id, result_type, verbose=False, raise_job_error=True): + if raise_job_error: + raise JobFailedException( + "test_job_123", + "ERROR", + error_code="INVALID_FILE", + error_message="File format not supported", + ) + else: + return { + "pages": [], + "job_metadata": {"job_pages": 0}, + "error": "Job ID: test_job_123 failed with status: ERROR, Error code: INVALID_FILE, Error message: File format not supported", + "status": "ERROR", + } + + with patch.object(parser, "_create_job", return_value="test_job_123"), patch.object( + parser, "_get_job_result", side_effect=mock_get_job_result + ): + # Test aparse with raise_job_error=True (default) - should raise exception + with pytest.raises(JobFailedException) as exc_info: + await parser.aparse("test_file.txt") + + assert exc_info.value.job_id == "test_job_123" + assert exc_info.value.status == "ERROR" + assert exc_info.value.error_code == "INVALID_FILE" + + # Test parse (synchronous version) with raise_job_error=True (default) - should raise exception + with pytest.raises(JobFailedException) as exc_info: + parser.parse("test_file.txt") + + assert exc_info.value.job_id == "test_job_123" + assert exc_info.value.status == "ERROR" + + +@pytest.mark.asyncio +async def test_error_handling_with_minimal_fields() -> None: + """Test error handling when only status is available (no error_code/error_message) with raise_job_error=False.""" + + parser = LlamaParse(api_key="test_key") + + # Mock error result with minimal fields (only what's guaranteed) + mock_minimal_error_result = { + "pages": [], + "job_metadata": {"job_pages": 0}, + "error": "Job ID: test_job_456 failed with status: CANCELED", + "status": "CANCELED", + } + + with patch.object(parser, "_create_job", return_value="test_job_456"), patch.object( + parser, "_get_job_result", return_value=mock_minimal_error_result + ): + # Test aparse with a minimal error response and raise_job_error=False + result = await parser.aparse("test_file.txt", raise_job_error=False) + + assert isinstance(result, type(result)) + assert result.job_id == "test_job_456" + assert result.error is not None + assert "CANCELED" in result.error + assert len(result.pages) == 0 + + +@pytest.mark.asyncio +async def test_successful_job_still_works() -> None: + """Test that successful jobs still work as before after error handling changes.""" + + parser = LlamaParse(api_key="test_key") + + # Mock successful result + mock_success_result = { + "pages": [ + { + "page": 0, + "text": "Sample text content", + "md": "# Sample markdown content", + "images": [], + "charts": [], + "tables": [], + "layout": [], + "items": [], + "status": "SUCCESS", + "links": [], + "width": 612.0, + "height": 792.0, + } + ], + "job_metadata": {"job_pages": 1}, + } + + with patch.object( + parser, "_create_job", return_value="success_job_456" + ), patch.object(parser, "_get_job_result", return_value=mock_success_result): + # Test aparse with a successful job (both with and without raise_job_error parameter) + result = await parser.aparse("test_file.txt") + + assert isinstance(result, type(result)) + assert result.job_id == "success_job_456" + assert result.error is None # No error for successful jobs + assert len(result.pages) == 1 + assert result.pages[0].text == "Sample text content" + + # Test with explicit raise_job_error=False (should work the same for successful jobs) + result2 = await parser.aparse("test_file.txt", raise_job_error=False) + + assert isinstance(result2, type(result2)) + assert result2.job_id == "success_job_456" + assert result2.error is None + assert len(result2.pages) == 1 + assert result2.pages[0].text == "Sample text content" + + +@pytest.mark.asyncio +async def test_get_result_with_raise_job_error_parameter() -> None: + """Test that get_result method respects the raise_job_error parameter.""" + + parser = LlamaParse(api_key="test_key") + + # Mock error result + mock_error_result = { + "pages": [], + "job_metadata": {"job_pages": 0}, + "error": "Job ID: test_job_789 failed with status: ERROR, Error code: TIMEOUT, Error message: Job timed out", + "status": "ERROR", + } + + with patch.object(parser, "_get_job_result", return_value=mock_error_result): + # Test aget_result with raise_job_error=False + result = await parser.aget_result("test_job_789", raise_job_error=False) + + assert isinstance(result, type(result)) + assert result.job_id == "test_job_789" + assert result.error is not None + assert "TIMEOUT" in result.error + assert len(result.pages) == 0 + + # Test get_result (synchronous version) with raise_job_error=False + result_sync = parser.get_result("test_job_789", raise_job_error=False) + + assert isinstance(result_sync, type(result_sync)) + assert result_sync.job_id == "test_job_789" + assert result_sync.error is not None + assert "TIMEOUT" in result_sync.error