Lucas ARRIESSE commited on
Commit
6607a5c
·
1 Parent(s): 46800f4

Make doc extraction async

Browse files
Files changed (2) hide show
  1. api/docs.py +57 -26
  2. app.py +1 -1
api/docs.py CHANGED
@@ -34,10 +34,13 @@ NSMAP = {
34
 
35
  # ================================== Converting of files to .txt ====================================
36
 
 
 
37
 
38
- def convert_file(contents: io.BytesIO, filename: str, input_ext: str, output_ext: str, filter: str = None) -> io.BytesIO:
39
  """
40
  Converts the given file bytes using Libreoffice headless to the specified file type.
 
41
 
42
  Args:
43
  contents: File contents
@@ -46,6 +49,9 @@ def convert_file(contents: io.BytesIO, filename: str, input_ext: str, output_ext
46
  output_ext: Output extension (WITHOUT THE DOT)
47
  filter: The conversion filter to use.
48
  """
 
 
 
49
  with tempfile.TemporaryDirectory() as tmpdir:
50
  dir_path = Path(tmpdir)
51
  input_file_path = dir_path / f"{filename}.{input_ext}"
@@ -57,14 +63,35 @@ def convert_file(contents: io.BytesIO, filename: str, input_ext: str, output_ext
57
 
58
  out_bytes = io.BytesIO()
59
 
60
- # convert using libreoffice
61
- subprocess.run([
62
  "libreoffice",
63
  "--headless",
64
  "--convert-to", f"{output_ext}:{filter}" if filter else output_ext,
65
  "--outdir", tmpdir,
66
- input_file_path
67
- ], check=True)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
68
 
69
  with open(output_file_path, mode="rb") as out:
70
  out_bytes.write(out.read())
@@ -73,27 +100,31 @@ def convert_file(contents: io.BytesIO, filename: str, input_ext: str, output_ext
73
  return out_bytes
74
 
75
 
76
- def get_docx_archive(url: str) -> zipfile.ZipFile:
77
  """Récupère le docx depuis l'URL et le retourne comme objet ZipFile"""
 
78
  if not url.endswith("zip"):
79
  raise ValueError("URL doit pointer vers un fichier ZIP")
80
 
81
  doc_id = os.path.splitext(os.path.basename(url))[0]
82
- resp = requests.get(url, verify=False, headers={
83
  "User-Agent": 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
84
  })
85
 
86
  resp.raise_for_status()
87
 
88
  with zipfile.ZipFile(io.BytesIO(resp.content)) as zf:
 
89
  for file_name in zf.namelist():
90
  if file_name.endswith(".docx"):
91
  docx_bytes = zf.read(file_name)
92
  return zipfile.ZipFile(io.BytesIO(docx_bytes))
93
  elif file_name.endswith(".doc"):
94
  in_bytes = io.BytesIO(zf.read(file_name))
95
- docx_bytes = convert_file(in_bytes, doc_id, "doc", "docx")
96
  return zipfile.ZipFile(docx_bytes)
 
 
97
 
98
  raise ValueError("Aucun fichier docx/doc trouvé dans l'archive")
99
 
@@ -157,11 +188,14 @@ def apply_docx_revisions(docx_zip: zipfile.ZipFile) -> io.BytesIO:
157
  return output
158
 
159
 
160
- def docx_to_txt(doc_id: str, url: str) -> str:
161
- docx_zip = get_docx_archive(url)
 
 
 
162
  modified_bytes = apply_docx_revisions(docx_zip)
163
 
164
- final_bytes = convert_file(
165
  modified_bytes, f"{doc_id}", "docx", "txt")
166
 
167
  final_bytes_text = str(final_bytes.read(), encoding="utf-8")
@@ -262,7 +296,7 @@ async def get_docs_df(req: DataRequest, http_client: AsyncClient = Depends(get_h
262
 
263
 
264
  @router.post("/download_tdocs")
265
- def download_tdocs(req: DocDownloadRequest):
266
  """Download the specified TDocs and zips them in a single archive"""
267
 
268
  # Document IDs to download
@@ -270,31 +304,27 @@ def download_tdocs(req: DocDownloadRequest):
270
 
271
  logging.info(f"Downloading TDocs: {document_ids}")
272
 
273
- documents_content: Dict[str, bytes] = {}
274
- failed_documents: List[str] = []
275
-
276
- def _process_single_document(doc_id: str, doc_url: str) -> Tuple[bool, bytes]:
277
  """Attempts to convert a document to text and returns success status and content."""
278
  try:
279
- text_lines = docx_to_txt(doc_id, doc_url)
280
  content_bytes = "\n".join(text_lines).encode("utf-8")
281
- return content_bytes
282
  except Exception as e:
283
  logging.warning(
284
  f"Failed to process document '{doc_id}' from URL '{doc_url}': {e}")
285
  error_message = f"Document '{doc_id}' text extraction failed: {e}".encode(
286
  "utf-8")
287
- return error_message
288
 
289
- for doc in req.documents:
290
- content = _process_single_document(doc.document, doc.url)
291
- documents_content[doc.document] = content
292
 
293
  zip_buffer = io.BytesIO()
294
  with zipfile.ZipFile(zip_buffer, mode='w', compression=zipfile.ZIP_DEFLATED) as zip_file:
295
- for doc_id, content_bytes in documents_content.items():
 
296
  safe_filename = f"{doc_id}.txt"
297
- zip_file.writestr(safe_filename, content_bytes)
298
 
299
  zip_buffer.seek(0)
300
 
@@ -316,7 +346,7 @@ class ProgressUpdate(BaseModel):
316
 
317
 
318
  @router.post("/generate_requirements/sse")
319
- async def gen_reqs(req: ExtractRequirementsRequest, llm_router: Router = Depends(get_llm_router)):
320
  """Extract requirements from the specified xxxxCR docs using a LLM and returns SSE events about the progress of ongoing operations"""
321
 
322
  documents = req.documents
@@ -337,7 +367,8 @@ async def gen_reqs(req: ExtractRequirementsRequest, llm_router: Router = Depends
337
 
338
  # convert the docx to txt for use
339
  try:
340
- full = "\n".join(docx_to_txt(doc_id, url))
 
341
  except Exception as e:
342
  fmt = "".join(traceback.format_exception(e))
343
  logging.error(f"Failed to process doc {doc_id} : {fmt}")
 
34
 
35
  # ================================== Converting of files to .txt ====================================
36
 
37
+ # Unfortunately needs to be kept to 1, as libreoffice isn't built to support parallel instances
38
+ CONVERSION_MUTEX = asyncio.Semaphore(1)
39
 
40
+ async def convert_file(contents: io.BytesIO, filename: str, input_ext: str, output_ext: str, filter: str = None) -> io.BytesIO:
41
  """
42
  Converts the given file bytes using Libreoffice headless to the specified file type.
43
+ This is an asynchronous version.
44
 
45
  Args:
46
  contents: File contents
 
49
  output_ext: Output extension (WITHOUT THE DOT)
50
  filter: The conversion filter to use.
51
  """
52
+
53
+ await CONVERSION_MUTEX.acquire()
54
+
55
  with tempfile.TemporaryDirectory() as tmpdir:
56
  dir_path = Path(tmpdir)
57
  input_file_path = dir_path / f"{filename}.{input_ext}"
 
63
 
64
  out_bytes = io.BytesIO()
65
 
66
+ # construct the command
67
+ command = [
68
  "libreoffice",
69
  "--headless",
70
  "--convert-to", f"{output_ext}:{filter}" if filter else output_ext,
71
  "--outdir", tmpdir,
72
+ str(input_file_path) # Ensure path is a string for subprocess
73
+ ]
74
+
75
+ # convert using libreoffice asynchronously
76
+ process = await asyncio.create_subprocess_exec(
77
+ *command,
78
+ stdout=asyncio.subprocess.PIPE,
79
+ stderr=asyncio.subprocess.PIPE
80
+ )
81
+
82
+ stdout, stderr = await process.communicate()
83
+
84
+ exit_code = await process.wait()
85
+
86
+ if exit_code != 0 and not output_file_path.exists():
87
+ raise subprocess.CalledProcessError(
88
+ exit_code,
89
+ command,
90
+ output=stdout,
91
+ stderr=stderr
92
+ )
93
+
94
+ CONVERSION_MUTEX.release()
95
 
96
  with open(output_file_path, mode="rb") as out:
97
  out_bytes.write(out.read())
 
100
  return out_bytes
101
 
102
 
103
+ async def get_doc_archive(url: str, client: AsyncClient) -> zipfile.ZipFile:
104
  """Récupère le docx depuis l'URL et le retourne comme objet ZipFile"""
105
+
106
  if not url.endswith("zip"):
107
  raise ValueError("URL doit pointer vers un fichier ZIP")
108
 
109
  doc_id = os.path.splitext(os.path.basename(url))[0]
110
+ resp = await client.get(url, headers={
111
  "User-Agent": 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
112
  })
113
 
114
  resp.raise_for_status()
115
 
116
  with zipfile.ZipFile(io.BytesIO(resp.content)) as zf:
117
+ # there should be a single file per file
118
  for file_name in zf.namelist():
119
  if file_name.endswith(".docx"):
120
  docx_bytes = zf.read(file_name)
121
  return zipfile.ZipFile(io.BytesIO(docx_bytes))
122
  elif file_name.endswith(".doc"):
123
  in_bytes = io.BytesIO(zf.read(file_name))
124
+ docx_bytes = await convert_file(in_bytes, doc_id, "doc", "docx")
125
  return zipfile.ZipFile(docx_bytes)
126
+ elif file_name.endswith(".pptx"):
127
+ in_bytes = io.BytesIO(zf.read())
128
 
129
  raise ValueError("Aucun fichier docx/doc trouvé dans l'archive")
130
 
 
188
  return output
189
 
190
 
191
+ async def doc_to_txt(doc_id: str, url: str, client: AsyncClient) -> str:
192
+ # Grab the document archive
193
+ docx_zip = await get_doc_archive(url, client)
194
+
195
+ # Apply the docx_revisions if the
196
  modified_bytes = apply_docx_revisions(docx_zip)
197
 
198
+ final_bytes = await convert_file(
199
  modified_bytes, f"{doc_id}", "docx", "txt")
200
 
201
  final_bytes_text = str(final_bytes.read(), encoding="utf-8")
 
296
 
297
 
298
  @router.post("/download_tdocs")
299
+ async def download_tdocs(req: DocDownloadRequest, http_client: AsyncClient = Depends(get_http_client)):
300
  """Download the specified TDocs and zips them in a single archive"""
301
 
302
  # Document IDs to download
 
304
 
305
  logging.info(f"Downloading TDocs: {document_ids}")
306
 
307
+ async def _process_single_document(doc_id: str, doc_url: str) -> Tuple[bool, bytes]:
 
 
 
308
  """Attempts to convert a document to text and returns success status and content."""
309
  try:
310
+ text_lines = await doc_to_txt(doc_id, doc_url, http_client)
311
  content_bytes = "\n".join(text_lines).encode("utf-8")
312
+ return {"doc_id": doc_id, "content": content_bytes}
313
  except Exception as e:
314
  logging.warning(
315
  f"Failed to process document '{doc_id}' from URL '{doc_url}': {e}")
316
  error_message = f"Document '{doc_id}' text extraction failed: {e}".encode(
317
  "utf-8")
318
+ return {"doc_id": doc_id, "content": error_message}
319
 
320
+ convert_tasks = await asyncio.gather(*[_process_single_document(doc.document, doc.url) for doc in req.documents], return_exceptions=False)
 
 
321
 
322
  zip_buffer = io.BytesIO()
323
  with zipfile.ZipFile(zip_buffer, mode='w', compression=zipfile.ZIP_DEFLATED) as zip_file:
324
+ for task in convert_tasks:
325
+ doc_id = task["doc_id"]
326
  safe_filename = f"{doc_id}.txt"
327
+ zip_file.writestr(safe_filename, task["content"])
328
 
329
  zip_buffer.seek(0)
330
 
 
346
 
347
 
348
  @router.post("/generate_requirements/sse")
349
+ async def gen_reqs(req: ExtractRequirementsRequest, llm_router: Router = Depends(get_llm_router), http_client: AsyncClient = Depends(get_http_client)):
350
  """Extract requirements from the specified xxxxCR docs using a LLM and returns SSE events about the progress of ongoing operations"""
351
 
352
  documents = req.documents
 
367
 
368
  # convert the docx to txt for use
369
  try:
370
+ doc = await doc_to_txt(doc_id, url, http_client)
371
+ full = "\n".join(doc)
372
  except Exception as e:
373
  fmt = "".join(traceback.format_exception(e))
374
  logging.error(f"Failed to process doc {doc_id} : {fmt}")
app.py CHANGED
@@ -11,7 +11,7 @@ import api.solutions
11
  from dependencies import get_llm_router, get_prompt_templates, init_dependencies
12
  import api.docs
13
  import api.requirements
14
- from api.docs import docx_to_txt
15
  from schemas import *
16
  from fastapi.middleware.cors import CORSMiddleware
17
  from fastapi.responses import FileResponse, StreamingResponse
 
11
  from dependencies import get_llm_router, get_prompt_templates, init_dependencies
12
  import api.docs
13
  import api.requirements
14
+ from api.docs import doc_to_txt
15
  from schemas import *
16
  from fastapi.middleware.cors import CORSMiddleware
17
  from fastapi.responses import FileResponse, StreamingResponse