Skip to content

Commit

Permalink
Pierre/json images (#62)
Browse files Browse the repository at this point in the history
  • Loading branch information
hexapode committed Mar 6, 2024
1 parent b19f852 commit 81843b9
Show file tree
Hide file tree
Showing 2 changed files with 150 additions and 54 deletions.
3 changes: 2 additions & 1 deletion .gitignore
@@ -1,3 +1,4 @@
.git
__pycache__/
*.pyc
*.pyc
.DS_Store
201 changes: 148 additions & 53 deletions llama_parse/base.py
Expand Up @@ -13,12 +13,12 @@
from llama_index.core.readers.base import BasePydanticReader
from llama_index.core.schema import Document


nest_asyncio_err = "cannot be called from a running event loop"
nest_asyncio_msg = "The event loop is already running. Add `import nest_asyncio; nest_asyncio.apply()` to your code to fix this issue."

class ResultType(str, Enum):
"""The result type for the parser."""

TXT = "text"
MD = "markdown"

Expand Down Expand Up @@ -139,6 +139,10 @@ class LlamaParse(BasePydanticReader):
language: Language = Field(
default=Language.ENGLISH, description="The language of the text to parse."
)
parsing_instruction: Optional[str] = Field(
default="",
description="The parsing instruction for the parser."
)

@validator("api_key", pre=True, always=True)
def validate_api_key(cls, v: str) -> str:
Expand All @@ -158,70 +162,84 @@ def validate_base_url(cls, v: str) -> str:
url = os.getenv("LLAMA_CLOUD_BASE_URL", None)
return url or v or DEFAULT_BASE_URL

async def _aload_data(self, file_path: str, extra_info: Optional[dict] = None) -> List[Document]:
"""Load data from the input path."""
try:
file_path = str(file_path)
if not file_path.endswith(".pdf"):
raise Exception("Currently, only PDF files are supported.")
# upload a document and get back a job_id
async def _create_job(self, file_path: str, extra_info: Optional[dict] = None) -> str:
file_path = str(file_path)
if not file_path.endswith(".pdf"):
raise Exception("Currently, only PDF files are supported.")

extra_info = extra_info or {}
extra_info["file_path"] = file_path

headers = {"Authorization": f"Bearer {self.api_key}"}

extra_info = extra_info or {}
extra_info["file_path"] = file_path
# load data, set the mime type
with open(file_path, "rb") as f:
mime_type = mimetypes.guess_type(file_path)[0]
files = {"file": (f.name, f, mime_type)}

headers = {"Authorization": f"Bearer {self.api_key}"}
# send the request, start job
url = f"{self.base_url}/api/parsing/upload"
async with httpx.AsyncClient(timeout=self.max_timeout) as client:
response = await client.post(url, files=files, headers=headers, data={"language": self.language.value, "parsing_instruction": self.parsing_instruction})
if not response.is_success:
raise Exception(f"Failed to parse the PDF file: {response.text}")

# load data, set the mime type
with open(file_path, "rb") as f:
mime_type = mimetypes.guess_type(file_path)[0]
files = {"file": (f.name, f, mime_type)}
# check the status of the job, return when done
job_id = response.json()["id"]
return job_id

# send the request, start job
url = f"{self.base_url}/api/parsing/upload"
async with httpx.AsyncClient(timeout=self.max_timeout) as client:
response = await client.post(url, files=files, headers=headers, data={"language": self.language.value})
if not response.is_success:
raise Exception(f"Failed to parse the PDF file: {response.text}")
async def _get_job_result(self, job_id: str, result_type: str) -> dict:
result_url = f"{self.base_url}/api/parsing/job/{job_id}/result/{result_type}"
headers = {"Authorization": f"Bearer {self.api_key}"}

# check the status of the job, return when done
job_id = response.json()["id"]
start = time.time()
tries = 0
while True:
await asyncio.sleep(self.check_interval)
async with httpx.AsyncClient(timeout=self.max_timeout) as client:
tries += 1

result = await client.get(result_url, headers=headers)

if result.status_code == 404:
end = time.time()
if end - start > self.max_timeout:
raise Exception(
f"Timeout while parsing the PDF file: {job_id}"
)
if self.verbose and tries % 10 == 0:
print(".", end="", flush=True)
continue

if result.status_code == 400:
detail = result.json().get("detail", "Unknown error")
raise Exception(f"Failed to parse the PDF file: {detail}")

return result.json()

async def _aload_data(self, file_path: str, extra_info: Optional[dict] = None) -> List[Document]:
"""Load data from the input path."""
try:
job_id = await self._create_job(file_path, extra_info=extra_info)
if self.verbose:
print("Started parsing the file under job_id %s" % job_id)

result_url = f"{self.base_url}/api/parsing/job/{job_id}/result/{self.result_type.value}"

start = time.time()
tries = 0
while True:
await asyncio.sleep(self.check_interval)
async with httpx.AsyncClient(timeout=self.max_timeout) as client:
tries += 1

result = await client.get(result_url, headers=headers)

if result.status_code == 404:
end = time.time()
if end - start > self.max_timeout:
raise Exception(
f"Timeout while parsing the PDF file: {response.text}"
)
if self.verbose and tries % 10 == 0:
print(".", end="", flush=True)
continue

if result.status_code == 400:
detail = result.json().get("detail", "Unknown error")
raise Exception(f"Failed to parse the PDF file: {detail}")

return [
Document(
text=result.json()[self.result_type.value],
metadata=extra_info,
)
]
result = await self._get_job_result(job_id, self.result_type.value)

return [
Document(
text=result.json()[self.result_type.value],
metadata=extra_info,
)
]

except Exception as e:
print(f"Error while parsing the PDF file '{file_path}':", e)
raise e
return []


async def aload_data(self, file_path: Union[List[str], str], extra_info: Optional[dict] = None) -> List[Document]:
"""Load data from the input path."""
if isinstance(file_path, (str, Path)):
Expand Down Expand Up @@ -250,3 +268,80 @@ def load_data(self, file_path: Union[List[str], str], extra_info: Optional[dict]
raise RuntimeError(nest_asyncio_msg)
else:
raise e


async def _aget_json(self, file_path: str, extra_info: Optional[dict] = None) -> List[dict]:
"""Load data from the input path."""
try:
job_id = await self._create_job(file_path, extra_info=extra_info)
if self.verbose:
print("Started parsing the file under job_id %s" % job_id)

result = await self._get_job_result(job_id, "json")
result["job_id"] = job_id
result["file_path"] = file_path
return [
result
]

except Exception as e:
print(f"Error while parsing the PDF file '{file_path}':", e)
raise e
return []



async def aget_json(self, file_path: Union[List[str], str], extra_info: Optional[dict] = None) -> List[dict]:
"""Load data from the input path."""
if isinstance(file_path, (str, Path)):
return await self._aget_json(file_path, extra_info=extra_info)
elif isinstance(file_path, list):
jobs = [self._aget_json(f, extra_info=extra_info) for f in file_path]
try:
results = await run_jobs(jobs, workers=self.num_workers)

# return flattened results
return [item for sublist in results for item in sublist]
except RuntimeError as e:
if nest_asyncio_err in str(e):
raise RuntimeError(nest_asyncio_msg)
else:
raise e
else:
raise ValueError("The input file_path must be a string or a list of strings.")


def get_json_result(self, file_path: Union[List[str], str], extra_info: Optional[dict] = None) -> List[dict]:
"""Parse the input path."""
try:
return asyncio.run(self.aget_json(file_path, extra_info))
except RuntimeError as e:
if nest_asyncio_err in str(e):
raise RuntimeError(nest_asyncio_msg)
else:
raise e

def get_images(self, json_result: list[dict], download_path: str) -> List[str]:
"""Download images from the parsed result."""
headers = {"Authorization": f"Bearer {self.api_key}"}
try:
images = []
for result in json_result:
job_id = result["job_id"]
for page in result["pages"]:
print(page["images"])
for image in page["images"]:
image_name = image["name"]
image_path = os.path.join(download_path, f"{job_id}-{image_name}")
image["path"]=image_path
image["job_id"]=job_id
image["original_pdf_path"]=result["file_path"]
image["page_number"]=page["page"]
with open(image_path, "wb") as f:
image_url = f"{self.base_url}/api/parsing/job/{job_id}/result/image/{image_name}"
f.write(httpx.get(image_url, headers=headers).content)
images.append(image)
return images
except Exception as e:
print(f"Error while downloading images from the parsed result:", e)
return []

0 comments on commit 81843b9

Please sign in to comment.