|
import boto3 |
|
import time |
|
import os |
|
import pandas as pd |
|
import json |
|
import logging |
|
import datetime |
|
from typing import List |
|
from io import StringIO |
|
from urllib.parse import urlparse |
|
from botocore.exceptions import ClientError, NoCredentialsError, PartialCredentialsError, TokenRetrievalError |
|
|
|
from tools.config import TEXTRACT_WHOLE_DOCUMENT_ANALYSIS_BUCKET, OUTPUT_FOLDER, AWS_REGION, DOCUMENT_REDACTION_BUCKET, LOAD_PREVIOUS_TEXTRACT_JOBS_S3, TEXTRACT_JOBS_S3_LOC, TEXTRACT_JOBS_LOCAL_LOC |
|
|
|
|
|
def analyse_document_with_textract_api( |
|
local_pdf_path: str, |
|
s3_input_prefix: str, |
|
s3_output_prefix: str, |
|
job_df:pd.DataFrame, |
|
s3_bucket_name: str = TEXTRACT_WHOLE_DOCUMENT_ANALYSIS_BUCKET, |
|
local_output_dir: str = OUTPUT_FOLDER, |
|
analyse_signatures:List[str] = [], |
|
successful_job_number:int=0, |
|
total_document_page_count:int=1, |
|
general_s3_bucket_name: str = DOCUMENT_REDACTION_BUCKET, |
|
aws_region: str = AWS_REGION |
|
): |
|
""" |
|
Uploads a local PDF to S3, starts a Textract analysis job (detecting text & signatures), |
|
waits for completion, and downloads the output JSON from S3 to a local directory. |
|
|
|
Args: |
|
local_pdf_path (str): Path to the local PDF file. |
|
s3_bucket_name (str): Name of the S3 bucket to use. |
|
s3_input_prefix (str): S3 prefix (folder) to upload the input PDF. |
|
s3_output_prefix (str): S3 prefix (folder) where Textract should write output. |
|
job_df (pd.DataFrame): Dataframe containing information from previous Textract API calls. |
|
s3_bucket_name (str, optional): S3 bucket in which to save API call outputs. |
|
local_output_dir (str, optional): Local directory to save the downloaded JSON results. |
|
analyse_signatures (List[str], optional): Analyse signatures? Default is no. |
|
successful_job_number (int): The number of successful jobs that have been submitted in this session. |
|
total_document_page_count (int): The number of pages in the document |
|
aws_region (str, optional): AWS region name. Defaults to boto3 default region. |
|
|
|
Returns: |
|
str: Path to the downloaded local JSON output file, or None if failed. |
|
|
|
Raises: |
|
FileNotFoundError: If the local_pdf_path does not exist. |
|
boto3.exceptions.NoCredentialsError: If AWS credentials are not found. |
|
Exception: For other AWS errors or job failures. |
|
""" |
|
|
|
|
|
is_a_textract_api_call = True |
|
|
|
|
|
if isinstance(local_pdf_path, list): |
|
local_pdf_path = local_pdf_path[-1] |
|
|
|
if not os.path.exists(local_pdf_path): |
|
raise FileNotFoundError(f"Input document not found {local_pdf_path}") |
|
|
|
if not os.path.exists(local_output_dir): |
|
os.makedirs(local_output_dir) |
|
log_message = f"Created local output directory: {local_output_dir}" |
|
print(log_message) |
|
|
|
|
|
|
|
session = boto3.Session(region_name=aws_region) |
|
s3_client = session.client('s3') |
|
textract_client = session.client('textract') |
|
|
|
|
|
pdf_filename = os.path.basename(local_pdf_path) |
|
s3_input_key = os.path.join(s3_input_prefix, pdf_filename).replace("\\", "/") |
|
|
|
log_message = f"Uploading '{local_pdf_path}' to 's3://{s3_bucket_name}/{s3_input_key}'..." |
|
print(log_message) |
|
|
|
try: |
|
s3_client.upload_file(local_pdf_path, s3_bucket_name, s3_input_key) |
|
log_message = "Upload successful." |
|
print(log_message) |
|
|
|
except Exception as e: |
|
log_message = f"Failed to upload PDF to S3: {e}" |
|
print(log_message) |
|
|
|
raise |
|
|
|
|
|
if not job_df.empty: |
|
if "file_name" in job_df.columns: |
|
matching_job_id_file_names = job_df.loc[(job_df["file_name"] == pdf_filename) & (job_df["signature_extraction"].astype(str) == str(analyse_signatures)), "file_name"] |
|
|
|
if len(matching_job_id_file_names) > 0: |
|
raise Exception("Existing Textract outputs found. No need to re-analyse. Please download existing results from the list") |
|
|
|
|
|
message = "Starting Textract document analysis job..." |
|
print(message) |
|
|
|
|
|
try: |
|
if "Extract signatures" in analyse_signatures: |
|
response = textract_client.start_document_analysis( |
|
DocumentLocation={ |
|
'S3Object': { |
|
'Bucket': s3_bucket_name, |
|
'Name': s3_input_key |
|
} |
|
}, |
|
FeatureTypes=['SIGNATURES'], |
|
OutputConfig={ |
|
'S3Bucket': s3_bucket_name, |
|
'S3Prefix': s3_output_prefix |
|
} |
|
|
|
|
|
|
|
|
|
|
|
) |
|
job_type="document_analysis" |
|
|
|
else: |
|
response = textract_client.start_document_text_detection( |
|
DocumentLocation={ |
|
'S3Object': { |
|
'Bucket': s3_bucket_name, |
|
'Name': s3_input_key |
|
} |
|
}, |
|
OutputConfig={ |
|
'S3Bucket': s3_bucket_name, |
|
'S3Prefix': s3_output_prefix |
|
} |
|
|
|
|
|
|
|
|
|
|
|
) |
|
job_type="document_text_detection" |
|
|
|
job_id = response['JobId'] |
|
print(f"Textract job started with JobId: {job_id}") |
|
|
|
|
|
|
|
|
|
log_csv_key_location = f"{s3_output_prefix}/textract_document_jobs.csv" |
|
job_location_full = f"s3://{s3_bucket_name}/{s3_output_prefix}/{job_id}/" |
|
|
|
csv_buffer = StringIO() |
|
log_df = pd.DataFrame([{ |
|
'job_id': job_id, |
|
'file_name': pdf_filename, |
|
'job_type': job_type, |
|
'signature_extraction':analyse_signatures, |
|
's3_location': job_location_full, |
|
'job_date_time': datetime.datetime.now() |
|
}]) |
|
|
|
|
|
log_file_path = os.path.join(local_output_dir, "textract_document_jobs.csv") |
|
|
|
|
|
file_exists = os.path.exists(log_file_path) |
|
|
|
|
|
log_df.to_csv(log_file_path, mode='a', index=False, header=not file_exists) |
|
|
|
|
|
|
|
|
|
s3_client.upload_file(log_file_path, general_s3_bucket_name, log_csv_key_location) |
|
|
|
|
|
|
|
print(f"Job ID written to {log_csv_key_location}") |
|
|
|
|
|
except Exception as e: |
|
error = f"Failed to start Textract job: {e}" |
|
print(error) |
|
|
|
raise |
|
|
|
successful_job_number += 1 |
|
total_number_of_textract_page_calls = total_document_page_count |
|
|
|
return f"Textract analysis job submitted, job ID:{job_id}", job_id, job_type, successful_job_number, is_a_textract_api_call, total_number_of_textract_page_calls |
|
|
|
def return_job_status(job_id:str, |
|
response:dict, |
|
attempts:int, |
|
poll_interval_seconds: int = 0, |
|
max_polling_attempts: int = 1 |
|
): |
|
''' |
|
Poll Textract for the current status of a previously-submitted job. |
|
''' |
|
|
|
job_status = response['JobStatus'] |
|
logging.info(f"Polling attempt {attempts}/{max_polling_attempts}. Job status: {job_status}") |
|
|
|
if job_status == 'IN_PROGRESS': |
|
time.sleep(poll_interval_seconds) |
|
elif job_status == 'SUCCEEDED': |
|
logging.info("Textract job succeeded.") |
|
elif job_status in ['FAILED', 'PARTIAL_SUCCESS']: |
|
status_message = response.get('StatusMessage', 'No status message provided.') |
|
warnings = response.get('Warnings', []) |
|
logging.error(f"Textract job ended with status: {job_status}. Message: {status_message}") |
|
if warnings: |
|
logging.warning(f"Warnings: {warnings}") |
|
|
|
|
|
raise Exception(f"Textract job {job_id} failed or partially failed. Status: {job_status}. Message: {status_message}") |
|
else: |
|
|
|
raise Exception(f"Unexpected Textract job status: {job_status}") |
|
|
|
return job_status |
|
|
|
def download_textract_job_files(s3_client:str, |
|
s3_bucket_name:str, |
|
s3_output_key_prefix:str, |
|
pdf_filename:str, |
|
job_id:str, |
|
local_output_dir:str): |
|
''' |
|
Download and combine selected job files from the AWS Textract service. |
|
''' |
|
|
|
list_response = s3_client.list_objects_v2( |
|
Bucket=s3_bucket_name, |
|
Prefix=s3_output_key_prefix |
|
) |
|
|
|
output_files = list_response.get('Contents', []) |
|
if not output_files: |
|
|
|
|
|
|
|
list_response = s3_client.list_objects_v2( |
|
Bucket=s3_bucket_name, |
|
Prefix=s3_output_key_prefix |
|
) |
|
output_files = list_response.get('Contents', []) |
|
|
|
if not output_files: |
|
logging.error(f"No output files found in s3://{s3_bucket_name}/{s3_output_key_prefix}") |
|
|
|
|
|
raise FileNotFoundError(f"Textract output files not found in S3 path: s3://{s3_bucket_name}/{s3_output_key_prefix}") |
|
|
|
|
|
|
|
|
|
json_files_to_download = [ |
|
f for f in output_files |
|
if f['Key'] != s3_output_key_prefix and not f['Key'].endswith('/') and 'access_check' not in f['Key'] |
|
] |
|
|
|
|
|
|
|
if not json_files_to_download: |
|
error = f"No JSON files found (only prefix marker?) in s3://{s3_bucket_name}/{s3_output_key_prefix}" |
|
print(error) |
|
|
|
raise FileNotFoundError(error) |
|
|
|
combined_blocks = [] |
|
|
|
for f in sorted(json_files_to_download, key=lambda x: x['Key']): |
|
obj = s3_client.get_object(Bucket=s3_bucket_name, Key=f['Key']) |
|
data = json.loads(obj['Body'].read()) |
|
|
|
|
|
if "Blocks" in data: |
|
combined_blocks.extend(data["Blocks"]) |
|
else: |
|
logging.warning(f"No 'Blocks' key in file: {f['Key']}") |
|
|
|
|
|
combined_output = { |
|
"DocumentMetadata": { |
|
"Pages": len(set(block.get('Page', 1) for block in combined_blocks)) |
|
}, |
|
"Blocks": combined_blocks, |
|
"JobStatus": "SUCCEEDED" |
|
} |
|
|
|
output_filename_base = os.path.basename(pdf_filename) |
|
output_filename_base_no_ext = os.path.splitext(output_filename_base)[0] |
|
local_output_filename = f"{output_filename_base_no_ext}_textract.json" |
|
local_output_path = os.path.join(local_output_dir, local_output_filename) |
|
|
|
with open(local_output_path, 'w') as f: |
|
json.dump(combined_output, f) |
|
|
|
print(f"Combined Textract output written to {local_output_path}") |
|
|
|
|
|
|
|
|
|
downloaded_file_path = local_output_path |
|
|
|
|
|
|
|
|
|
|
|
return downloaded_file_path |
|
|
|
def check_for_provided_job_id(job_id:str): |
|
if not job_id: |
|
raise Exception("Please provide a job ID.") |
|
return |
|
|
|
def poll_bulk_textract_analysis_progress_and_download( |
|
job_id:str, |
|
job_type_dropdown:str, |
|
s3_output_prefix: str, |
|
pdf_filename:str, |
|
job_df:pd.DataFrame, |
|
s3_bucket_name: str = TEXTRACT_WHOLE_DOCUMENT_ANALYSIS_BUCKET, |
|
local_output_dir: str = OUTPUT_FOLDER, |
|
load_s3_jobs_loc:str=TEXTRACT_JOBS_S3_LOC, |
|
load_local_jobs_loc:str=TEXTRACT_JOBS_LOCAL_LOC, |
|
aws_region: str = AWS_REGION, |
|
load_jobs_from_s3:str = LOAD_PREVIOUS_TEXTRACT_JOBS_S3, |
|
poll_interval_seconds: int = 1, |
|
max_polling_attempts: int = 1 |
|
): |
|
''' |
|
Poll AWS for the status of a Textract API job. Return status, and if finished, combine and download results into a locally-stored json file for further processing by the app. |
|
''' |
|
|
|
if job_id: |
|
|
|
session = boto3.Session(region_name=aws_region) |
|
s3_client = session.client('s3') |
|
textract_client = session.client('textract') |
|
|
|
|
|
job_status = 'IN_PROGRESS' |
|
attempts = 0 |
|
|
|
message = "Polling Textract for job completion status..." |
|
print(message) |
|
|
|
|
|
|
|
try: |
|
job_df = load_in_textract_job_details(load_s3_jobs=load_jobs_from_s3, |
|
load_s3_jobs_loc=load_s3_jobs_loc, |
|
load_local_jobs_loc=load_local_jobs_loc) |
|
except Exception as e: |
|
|
|
print(f"Failed to update job details dataframe: {e}") |
|
|
|
|
|
while job_status == 'IN_PROGRESS' and attempts < max_polling_attempts: |
|
attempts += 1 |
|
try: |
|
if job_type_dropdown=="document_analysis": |
|
response = textract_client.get_document_analysis(JobId=job_id) |
|
job_status = return_job_status(job_id, response, attempts, poll_interval_seconds, max_polling_attempts) |
|
elif job_type_dropdown=="document_text_detection": |
|
response = textract_client.get_document_text_detection(JobId=job_id) |
|
job_status = return_job_status(job_id, response, attempts, poll_interval_seconds, max_polling_attempts) |
|
else: |
|
error = f"Unknown job type, cannot poll job" |
|
print(error) |
|
|
|
raise |
|
|
|
except textract_client.exceptions.InvalidJobIdException: |
|
error_message = f"Invalid JobId: {job_id}. This might happen if the job expired (older than 7 days) or never existed." |
|
print(error_message) |
|
logging.error(error_message) |
|
raise |
|
except Exception as e: |
|
error_message = f"Error while polling Textract status for job {job_id}: {e}" |
|
print(error_message) |
|
logging.error(error_message) |
|
raise |
|
|
|
downloaded_file_path = None |
|
if job_status == 'SUCCEEDED': |
|
|
|
|
|
|
|
|
|
if not job_df.empty: |
|
if "file_name" in job_df.columns: |
|
matching_job_id_file_names = job_df.loc[job_df["job_id"] == job_id, "file_name"] |
|
|
|
if pdf_filename and not matching_job_id_file_names.empty: |
|
if pdf_filename == matching_job_id_file_names.iloc[0]: |
|
raise Exception("Existing Textract outputs found. No need to re-download.") |
|
|
|
if not matching_job_id_file_names.empty: |
|
pdf_filename = matching_job_id_file_names.iloc[0] |
|
else: |
|
pdf_filename = "unknown_file" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
s3_output_key_prefix = os.path.join(s3_output_prefix, job_id).replace("\\", "/") + "/" |
|
logging.info(f"Searching for output files in s3://{s3_bucket_name}/{s3_output_key_prefix}") |
|
|
|
try: |
|
downloaded_file_path = download_textract_job_files(s3_client, |
|
s3_bucket_name, |
|
s3_output_key_prefix, |
|
pdf_filename, |
|
job_id, |
|
local_output_dir) |
|
|
|
except Exception as e: |
|
|
|
print(f"Failed to download or process Textract output from S3: {e}") |
|
raise |
|
|
|
else: |
|
raise Exception("No Job ID provided.") |
|
|
|
return downloaded_file_path, job_status, job_df |
|
|
|
def load_in_textract_job_details(load_s3_jobs:str=LOAD_PREVIOUS_TEXTRACT_JOBS_S3, |
|
load_s3_jobs_loc:str=TEXTRACT_JOBS_S3_LOC, |
|
load_local_jobs_loc:str=TEXTRACT_JOBS_LOCAL_LOC, |
|
document_redaction_bucket:str=DOCUMENT_REDACTION_BUCKET, |
|
aws_region:str=AWS_REGION): |
|
''' |
|
Load in a dataframe of jobs previous submitted to the Textract API service. |
|
''' |
|
job_df = pd.DataFrame(columns=['job_id','file_name','job_type','signature_extraction','s3_location','job_date_time']) |
|
|
|
|
|
session = boto3.Session(region_name=aws_region) |
|
s3_client = session.client('s3') |
|
|
|
local_output_path = f'{load_local_jobs_loc}/textract_document_jobs.csv' |
|
|
|
if load_s3_jobs == 'True': |
|
s3_output_key = f'{load_s3_jobs_loc}/textract_document_jobs.csv' |
|
|
|
try: |
|
s3_client.head_object(Bucket=document_redaction_bucket, Key=s3_output_key) |
|
|
|
s3_client.download_file(document_redaction_bucket, s3_output_key, local_output_path) |
|
|
|
except ClientError as e: |
|
if e.response['Error']['Code'] == '404': |
|
print("Log file does not exist in S3.") |
|
else: |
|
print(f"Unexpected error occurred: {e}") |
|
except (NoCredentialsError, PartialCredentialsError, TokenRetrievalError) as e: |
|
print(f"AWS credential issue encountered: {e}") |
|
print("Skipping S3 log file download.") |
|
|
|
|
|
if os.path.exists(local_output_path): |
|
print("Found log file in local path") |
|
job_df = pd.read_csv(local_output_path) |
|
|
|
if "job_date_time" in job_df.columns: |
|
job_df["job_date_time"] = pd.to_datetime(job_df["job_date_time"], errors='coerce') |
|
|
|
cutoff_time = pd.Timestamp.now() - pd.Timedelta(days=7) |
|
job_df = job_df.loc[job_df["job_date_time"] >= cutoff_time,:] |
|
|
|
return job_df |
|
|
|
def download_textract_output(job_id:str, |
|
output_bucket:str, |
|
output_prefix:str, |
|
local_folder:str): |
|
""" |
|
Checks the status of a Textract job and downloads the output ZIP file if the job is complete. |
|
|
|
:param job_id: The Textract job ID. |
|
:param output_bucket: The S3 bucket where the output is stored. |
|
:param output_prefix: The prefix (folder path) in S3 where the output file is stored. |
|
:param local_folder: The local directory where the ZIP file should be saved. |
|
""" |
|
textract_client = boto3.client('textract') |
|
s3_client = boto3.client('s3') |
|
|
|
|
|
while True: |
|
response = textract_client.get_document_analysis(JobId=job_id) |
|
status = response['JobStatus'] |
|
|
|
if status == 'SUCCEEDED': |
|
print("Job completed successfully.") |
|
break |
|
elif status == 'FAILED': |
|
print("Job failed:", response.get("StatusMessage", "No error message provided.")) |
|
return |
|
else: |
|
print(f"Job is still {status}.") |
|
|
|
|
|
|
|
output_file_key = f"{output_prefix}/{job_id}.zip" |
|
local_file_path = os.path.join(local_folder, f"{job_id}.zip") |
|
|
|
|
|
try: |
|
s3_client.download_file(output_bucket, output_file_key, local_file_path) |
|
print(f"Output file downloaded to: {local_file_path}") |
|
except Exception as e: |
|
print(f"Error downloading file: {e}") |
|
|
|
def check_textract_outputs_exist(textract_output_found_checkbox): |
|
if textract_output_found_checkbox == True: |
|
print("Textract outputs found") |
|
return |
|
else: raise Exception("Relevant Textract outputs not found. Please ensure you have selected to correct results output and you have uploaded the relevant document file in 'Choose document or image file...' above") |