Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
125 changes: 94 additions & 31 deletions py/llama_cloud_services/parse/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -1073,7 +1073,11 @@ def _calculate_backoff(self, current_interval: float) -> float:
return current_interval # Default fallback

async def _get_job_result(
self, job_id: str, result_type: str, verbose: bool = False
self,
job_id: str,
result_type: str,
verbose: bool = False,
raise_job_error: bool = True,
) -> Dict[str, Any]:
start = time.time()
tries = 0
Expand Down Expand Up @@ -1106,7 +1110,25 @@ async def _get_job_result(
print(".", end="", flush=True)
current_interval = self._calculate_backoff(current_interval)
else:
raise JobFailedException.from_result(result_json)
if raise_job_error:
raise JobFailedException.from_result(result_json)
else:
error_code = result_json.get("error_code")
error_message = result_json.get("error_message")

error_parts = [f"Job ID: {job_id} failed with status: {status}"]
if error_code:
error_parts.append(f"Error code: {error_code}")
if error_message:
error_parts.append(f"Error message: {error_message}")
error_str = ", ".join(error_parts)

return {
"pages": [],
"job_metadata": {"job_pages": 0},
"error": error_str,
"status": status,
}
except (
httpx.ConnectError,
httpx.ReadError,
Expand Down Expand Up @@ -1137,6 +1159,7 @@ async def _parse_one(
fs: Optional[AbstractFileSystem] = None,
result_type: Optional[str] = None,
num_workers: Optional[int] = None,
raise_job_error: bool = True,
) -> List[Tuple[str, Dict[str, Any]]]:
if self.partition_pages is None:
job_results = [
Expand All @@ -1145,6 +1168,7 @@ async def _parse_one(
extra_info=extra_info,
fs=fs,
result_type=result_type,
raise_job_error=raise_job_error,
)
]
else:
Expand All @@ -1154,6 +1178,7 @@ async def _parse_one(
fs=fs,
result_type=result_type,
num_workers=num_workers,
raise_job_error=raise_job_error,
)
return job_results

Expand All @@ -1163,6 +1188,7 @@ async def _parse_one_unpartitioned(
extra_info: Optional[dict] = None,
fs: Optional[AbstractFileSystem] = None,
result_type: Optional[str] = None,
raise_job_error: bool = True,
**create_kwargs: Any,
) -> Tuple[str, Dict[str, Any]]:
"""Create one parse job and wait for the result."""
Expand All @@ -1172,7 +1198,10 @@ async def _parse_one_unpartitioned(
if self.verbose:
print("Started parsing the file under job_id %s" % job_id)
result = await self._get_job_result(
job_id, result_type or self.result_type.value, verbose=self.verbose
job_id,
result_type or self.result_type.value,
verbose=self.verbose,
raise_job_error=raise_job_error,
)
return job_id, result

Expand All @@ -1183,6 +1212,7 @@ async def _parse_one_partitioned(
fs: Optional[AbstractFileSystem] = None,
result_type: Optional[str] = None,
num_workers: Optional[int] = None,
raise_job_error: bool = True,
) -> List[Tuple[str, Dict[str, Any]]]:
"""Partition a file and run separate parse jobs per partition segment."""
assert self.partition_pages is not None
Expand All @@ -1197,6 +1227,7 @@ async def _parse_one_partitioned(
extra_info=extra_info,
fs=fs,
result_type=result_type,
raise_job_error=raise_job_error,
partition_target_pages=target_pages,
)
for target_pages in partition_pages(
Expand Down Expand Up @@ -1224,28 +1255,33 @@ async def _parse_one_partitioned(
size = self.partition_pages
if not size:
break
try:
# Fetch JSON result type first to get accurate pagination data
# and then fetch the user's desired result type if needed
job_id, json_result = await self._parse_one_unpartitioned(
file_path,
extra_info=extra_info,
fs=fs,
result_type=ResultType.JSON.value,
partition_target_pages=f"{total}-{total + size - 1}",
)
result_type = result_type or self.result_type.value
if result_type == ResultType.JSON.value:
job_result = json_result
else:
job_result = await self._get_job_result(
job_id, result_type, verbose=self.verbose
)
except JobFailedException as e:
if results and e.error_code == "NO_DATA_FOUND_IN_FILE":
# Expected when we try to read past the end of the file
# Fetch JSON result type first to get accurate pagination data
# and then fetch the user's desired result type if needed
job_id, json_result = await self._parse_one_unpartitioned(
file_path,
extra_info=extra_info,
fs=fs,
result_type=ResultType.JSON.value,
raise_job_error=raise_job_error,
partition_target_pages=f"{total}-{total + size - 1}",
)

if json_result.get("error"):
if results and "NO_DATA_FOUND_IN_FILE" in json_result.get("error", ""):
return results
raise
results.append((job_id, json_result))
return results

result_type = result_type or self.result_type.value
if result_type == ResultType.JSON.value:
job_result = json_result
else:
job_result = await self._get_job_result(
job_id,
result_type,
verbose=self.verbose,
raise_job_error=raise_job_error,
)
results.append((job_id, job_result))
if len(json_result["pages"]) < size:
break
Expand Down Expand Up @@ -1359,13 +1395,15 @@ async def _aparse_one(
extra_info: Optional[dict] = None,
fs: Optional[AbstractFileSystem] = None,
num_workers: Optional[int] = None,
raise_job_error: bool = True,
) -> List[JobResult]:
job_results = await self._parse_one(
file_path,
extra_info,
fs=fs,
result_type=ResultType.JSON.value,
num_workers=num_workers,
raise_job_error=raise_job_error,
)
return [
JobResult(
Expand All @@ -1385,6 +1423,7 @@ async def aparse(
file_path: Union[List[FileInput], FileInput],
extra_info: Optional[dict] = None,
fs: Optional[AbstractFileSystem] = None,
raise_job_error: bool = True,
) -> Union[List["JobResult"], "JobResult"]:
"""
Parse the file and return a JobResult object instead of Document objects.
Expand All @@ -1396,6 +1435,7 @@ async def aparse(
file_path: Path to the file to parse. Can be a string, path, bytes, file-like object, or a list of these.
extra_info: Additional metadata to include in the result.
fs: Optional filesystem to use for reading files.
raise_job_error: If True (default), raises JobFailedException when jobs fail. If False, returns JobResult objects with error information.

Returns:
JobResult object or list of JobResult objects if either multiple files were provided or file(s) were partitioned before parsing.
Expand All @@ -1411,7 +1451,11 @@ async def aparse(
else:
file_name = str(file_path)
result = await self._aparse_one(
file_path, file_name, extra_info=extra_info, fs=fs
file_path,
file_name,
extra_info=extra_info,
fs=fs,
raise_job_error=raise_job_error,
)
return result[0] if len(result) == 1 else result

Expand All @@ -1437,6 +1481,7 @@ async def aparse(
extra_info=extra_info,
fs=fs,
num_workers=1,
raise_job_error=raise_job_error,
)
for i, f in enumerate(file_path)
],
Expand All @@ -1462,6 +1507,7 @@ def parse(
file_path: Union[List[FileInput], FileInput],
extra_info: Optional[dict] = None,
fs: Optional[AbstractFileSystem] = None,
raise_job_error: bool = True,
) -> Union[List["JobResult"], "JobResult"]:
"""
Parse the file and return a JobResult object instead of Document objects.
Expand All @@ -1473,12 +1519,17 @@ def parse(
file_path: Path to the file to parse. Can be a string, path, bytes, file-like object, or a list of these.
extra_info: Additional metadata to include in the result.
fs: Optional filesystem to use for reading files.
raise_job_error: If True (default), raises JobFailedException when jobs fail. If False, returns JobResult objects with error information.

Returns:
JobResult object or list of JobResult objects if multiple files were provided
"""
try:
return asyncio_run(self.aparse(file_path, extra_info, fs=fs))
return asyncio_run(
self.aparse(
file_path, extra_info, fs=fs, raise_job_error=raise_job_error
)
)
except RuntimeError as e:
if nest_asyncio_err in str(e):
raise RuntimeError(nest_asyncio_msg)
Expand Down Expand Up @@ -1745,7 +1796,7 @@ def _get_sub_docs(self, docs: List[Document]) -> List[Document]:
return sub_docs

async def aget_result(
self, job_id: Union[str, List[str]]
self, job_id: Union[str, List[str]], raise_job_error: bool = True
) -> Union[JobResult, List[JobResult]]:
"""
Return JobResult object for previously parsed job(s).
Expand All @@ -1754,13 +1805,17 @@ async def aget_result(

Args:
job_id: Job ID or list of multiple Job IDs to be retrieved.
raise_job_error: If True (default), raises JobFailedException when jobs fail. If False, returns JobResult objects with error information.

Returns:
JobResult object or list of JobResult objects if multiple job IDs were provided.
"""
if isinstance(job_id, str):
result = await self._get_job_result(
job_id, ResultType.JSON.value, verbose=self.verbose
job_id,
ResultType.JSON.value,
verbose=self.verbose,
raise_job_error=raise_job_error,
)
return JobResult(
job_id=job_id,
Expand All @@ -1774,7 +1829,12 @@ async def aget_result(
elif isinstance(job_id, list):
results = []
jobs = [
self._get_job_result(id_, ResultType.JSON.value, verbose=self.verbose)
self._get_job_result(
id_,
ResultType.JSON.value,
verbose=self.verbose,
raise_job_error=raise_job_error,
)
for id_ in job_id
]
results = await run_jobs(
Expand All @@ -1799,7 +1859,7 @@ async def aget_result(
raise ValueError("The input job_id must be a string or a list of strings.")

def get_result(
self, job_id: Union[str, List[str]]
self, job_id: Union[str, List[str]], raise_job_error: bool = True
) -> Union[JobResult, List[JobResult]]:
"""
Return JobResult object for previously parsed job(s).
Expand All @@ -1808,12 +1868,15 @@ def get_result(

Args:
job_id: Job ID or list of multiple Job IDs to be retrieved.
raise_job_error: If True (default), raises JobFailedException when jobs fail. If False, returns JobResult objects with error information.

Returns:
JobResult object or list of JobResult objects if multiple job IDs were provided.
"""
try:
return asyncio_run(self.aget_result(job_id))
return asyncio_run(
self.aget_result(job_id, raise_job_error=raise_job_error)
)
except RuntimeError as e:
if nest_asyncio_err in str(e):
raise RuntimeError(nest_asyncio_msg)
Expand Down
Loading
Loading