import pandas as pd import os import json import re import concurrent.futures from dotenv import load_dotenv from google import genai from typing import List, Dict, Any, Optional, Tuple import logging from pathlib import Path # Configure logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) def setup_environment() -> None: """ Load environment variables and configure the Gemini API client. Returns: None """ load_dotenv() def get_gemini_client() -> genai.Client: """ Initialize and return a Gemini API client. Returns: genai.Client: Configured Gemini client """ api_key = os.getenv("GEMINI_API_KEY") if not api_key: raise ValueError("GEMINI_API_KEY environment variable not set") return genai.Client(api_key=api_key) def process_chunk(chunk_info: Tuple[int, pd.DataFrame, int, int], client: genai.Client) -> List[Dict[str, Any]]: """ Process a single chunk of data using Gemini API. Args: chunk_info: Tuple containing (chunk_index, dataframe_chunk, start_index, end_index) client: Gemini API client Returns: List of extracted items from the chunk """ i, chunk_df, start_idx, end_idx = chunk_info # Create a structured extraction prompt for the specific chunk extraction_prompt = f""" Extract product information from rows {start_idx} to {end_idx-1} in this Excel data. For each product row, extract: 1. Product name 2. Batch number 3. Expiry date (MM/YY format) 4. MRP (Maximum Retail Price) 5. Quantity (as integer) Return ONLY a JSON array of objects, one for each product, with these properties: [ {{ "product_name": "...", "batch_number": "...", "expiry_date": "...", "mrp": "...", "quantity": ... }}, ... ] Use null for any value you cannot extract. Return ONLY the JSON array. """ chunk_items = [] # Process chunk try: chunk_response = client.models.generate_content( model="gemini-2.0-flash", contents=[extraction_prompt, chunk_df.to_string()], config={ 'response_mime_type': 'application/json', 'temperature': 0.1, 'max_output_tokens': 8192, } ) # Extract items chunk_text = chunk_response.text # Fix common JSON issues chunk_text = re.sub(r'[\n\r\t]', '', chunk_text) chunk_text = re.sub(r',\s*]', ']', chunk_text) # Extract JSON array match = re.search(r'\[(.*)\]', chunk_text, re.DOTALL) if match: try: chunk_items = json.loads('[' + match.group(1) + ']') logger.info(f"Successfully processed chunk {i+1} with {len(chunk_items)} items") except json.JSONDecodeError: logger.error(f"Error parsing JSON in chunk {i+1}") except Exception as e: logger.error(f"Error processing chunk {i+1}: {str(e)}") return chunk_items def prepare_chunks(df: pd.DataFrame, chunk_size: int) -> List[Tuple[int, pd.DataFrame, int, int]]: """ Prepare dataframe chunks for processing. Args: df: Input dataframe chunk_size: Size of each chunk Returns: List of chunk information tuples """ num_chunks = (len(df) + chunk_size - 1) // chunk_size chunks_to_process = [] for i in range(num_chunks): start_idx = i * chunk_size end_idx = min((i + 1) * chunk_size, len(df)) chunk_df = df.iloc[start_idx:end_idx] chunks_to_process.append((i, chunk_df, start_idx, end_idx)) return chunks_to_process def process_excel_file(file_path: str, output_path: str, chunk_size: int = 20, max_workers: int = 2) -> Dict[str, Any]: """ Process an Excel file to extract product information using Gemini API. Args: file_path: Path to the Excel file output_path: Path to save the extracted data chunk_size: Size of each chunk for processing max_workers: Maximum number of parallel workers Returns: Dict containing the extraction results """ # Setup environment setup_environment() client = get_gemini_client() # Read Excel file logger.info(f"Reading Excel file: {file_path}") df = pd.read_excel(file_path) # Prepare chunks for processing chunks_to_process = prepare_chunks(df, chunk_size) num_chunks = len(chunks_to_process) # Process chunks in parallel logger.info(f"Processing {num_chunks} chunks with {max_workers} workers") all_items = [] with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor: # Pass client to each process_chunk call results = list(executor.map( lambda chunk: process_chunk(chunk, client), chunks_to_process )) # Combine results for chunk_items in results: all_items.extend(chunk_items) # Create final result final_result = { "items": all_items, "extraction_status": "COMPLETE" if all_items else "INCOMPLETE", "total_items": len(all_items) } # Save the final result with open(output_path, "w") as f: json.dump(final_result, f, indent=2) logger.info(f"Extraction complete. Total items extracted: {len(all_items)}") return final_result def main() -> None: """ Main function to run the Excel processing script. """ input_file = 'expiry_invoice/SAC01000975.xls' output_file = "extracted_invoice_data.json" # Ensure the output directory exists output_path = Path(output_file) output_path.parent.mkdir(parents=True, exist_ok=True) # Process the Excel file result = process_excel_file( file_path=input_file, output_path=output_file, chunk_size=20, max_workers=2 ) print(f"Extraction complete. Total items extracted: {result['total_items']}") if __name__ == "__main__": main()