expiryprocess / process /process_excel.py
krishnavadithya's picture
Upload folder using huggingface_hub
aacdfd5 verified
import pandas as pd
import os
import json
import re
import concurrent.futures
from dotenv import load_dotenv
from google import genai
from typing import List, Dict, Any, Optional, Tuple
import logging
from pathlib import Path
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
def setup_environment() -> None:
"""
Load environment variables and configure the Gemini API client.
Returns:
None
"""
load_dotenv()
def get_gemini_client() -> genai.Client:
"""
Initialize and return a Gemini API client.
Returns:
genai.Client: Configured Gemini client
"""
api_key = os.getenv("GEMINI_API_KEY")
if not api_key:
raise ValueError("GEMINI_API_KEY environment variable not set")
return genai.Client(api_key=api_key)
def process_chunk(chunk_info: Tuple[int, pd.DataFrame, int, int], client: genai.Client) -> List[Dict[str, Any]]:
"""
Process a single chunk of data using Gemini API.
Args:
chunk_info: Tuple containing (chunk_index, dataframe_chunk, start_index, end_index)
client: Gemini API client
Returns:
List of extracted items from the chunk
"""
i, chunk_df, start_idx, end_idx = chunk_info
# Create a structured extraction prompt for the specific chunk
extraction_prompt = f"""
Extract product information from rows {start_idx} to {end_idx-1} in this Excel data.
For each product row, extract:
1. Product name
2. Batch number
3. Expiry date (MM/YY format)
4. MRP (Maximum Retail Price)
5. Quantity (as integer)
Return ONLY a JSON array of objects, one for each product, with these properties:
[
{{
"product_name": "...",
"batch_number": "...",
"expiry_date": "...",
"mrp": "...",
"quantity": ...
}},
...
]
Use null for any value you cannot extract. Return ONLY the JSON array.
"""
chunk_items = []
# Process chunk
try:
chunk_response = client.models.generate_content(
model="gemini-2.0-flash",
contents=[extraction_prompt, chunk_df.to_string()],
config={
'response_mime_type': 'application/json',
'temperature': 0.1,
'max_output_tokens': 8192,
}
)
# Extract items
chunk_text = chunk_response.text
# Fix common JSON issues
chunk_text = re.sub(r'[\n\r\t]', '', chunk_text)
chunk_text = re.sub(r',\s*]', ']', chunk_text)
# Extract JSON array
match = re.search(r'\[(.*)\]', chunk_text, re.DOTALL)
if match:
try:
chunk_items = json.loads('[' + match.group(1) + ']')
logger.info(f"Successfully processed chunk {i+1} with {len(chunk_items)} items")
except json.JSONDecodeError:
logger.error(f"Error parsing JSON in chunk {i+1}")
except Exception as e:
logger.error(f"Error processing chunk {i+1}: {str(e)}")
return chunk_items
def prepare_chunks(df: pd.DataFrame, chunk_size: int) -> List[Tuple[int, pd.DataFrame, int, int]]:
"""
Prepare dataframe chunks for processing.
Args:
df: Input dataframe
chunk_size: Size of each chunk
Returns:
List of chunk information tuples
"""
num_chunks = (len(df) + chunk_size - 1) // chunk_size
chunks_to_process = []
for i in range(num_chunks):
start_idx = i * chunk_size
end_idx = min((i + 1) * chunk_size, len(df))
chunk_df = df.iloc[start_idx:end_idx]
chunks_to_process.append((i, chunk_df, start_idx, end_idx))
return chunks_to_process
def process_excel_file(file_path: str, output_path: str, chunk_size: int = 20, max_workers: int = 2) -> Dict[str, Any]:
"""
Process an Excel file to extract product information using Gemini API.
Args:
file_path: Path to the Excel file
output_path: Path to save the extracted data
chunk_size: Size of each chunk for processing
max_workers: Maximum number of parallel workers
Returns:
Dict containing the extraction results
"""
# Setup environment
setup_environment()
client = get_gemini_client()
# Read Excel file
logger.info(f"Reading Excel file: {file_path}")
df = pd.read_excel(file_path)
# Prepare chunks for processing
chunks_to_process = prepare_chunks(df, chunk_size)
num_chunks = len(chunks_to_process)
# Process chunks in parallel
logger.info(f"Processing {num_chunks} chunks with {max_workers} workers")
all_items = []
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
# Pass client to each process_chunk call
results = list(executor.map(
lambda chunk: process_chunk(chunk, client),
chunks_to_process
))
# Combine results
for chunk_items in results:
all_items.extend(chunk_items)
# Create final result
final_result = {
"items": all_items,
"extraction_status": "COMPLETE" if all_items else "INCOMPLETE",
"total_items": len(all_items)
}
# Save the final result
with open(output_path, "w") as f:
json.dump(final_result, f, indent=2)
logger.info(f"Extraction complete. Total items extracted: {len(all_items)}")
return final_result
def main() -> None:
"""
Main function to run the Excel processing script.
"""
input_file = 'expiry_invoice/SAC01000975.xls'
output_file = "extracted_invoice_data.json"
# Ensure the output directory exists
output_path = Path(output_file)
output_path.parent.mkdir(parents=True, exist_ok=True)
# Process the Excel file
result = process_excel_file(
file_path=input_file,
output_path=output_file,
chunk_size=20,
max_workers=2
)
print(f"Extraction complete. Total items extracted: {result['total_items']}")
if __name__ == "__main__":
main()