Spaces:
Sleeping
Sleeping
File size: 6,277 Bytes
aacdfd5 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 |
import pandas as pd
import os
import json
import re
import concurrent.futures
from dotenv import load_dotenv
from google import genai
from typing import List, Dict, Any, Optional, Tuple
import logging
from pathlib import Path
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
def setup_environment() -> None:
"""
Load environment variables and configure the Gemini API client.
Returns:
None
"""
load_dotenv()
def get_gemini_client() -> genai.Client:
"""
Initialize and return a Gemini API client.
Returns:
genai.Client: Configured Gemini client
"""
api_key = os.getenv("GEMINI_API_KEY")
if not api_key:
raise ValueError("GEMINI_API_KEY environment variable not set")
return genai.Client(api_key=api_key)
def process_chunk(chunk_info: Tuple[int, pd.DataFrame, int, int], client: genai.Client) -> List[Dict[str, Any]]:
"""
Process a single chunk of data using Gemini API.
Args:
chunk_info: Tuple containing (chunk_index, dataframe_chunk, start_index, end_index)
client: Gemini API client
Returns:
List of extracted items from the chunk
"""
i, chunk_df, start_idx, end_idx = chunk_info
# Create a structured extraction prompt for the specific chunk
extraction_prompt = f"""
Extract product information from rows {start_idx} to {end_idx-1} in this Excel data.
For each product row, extract:
1. Product name
2. Batch number
3. Expiry date (MM/YY format)
4. MRP (Maximum Retail Price)
5. Quantity (as integer)
Return ONLY a JSON array of objects, one for each product, with these properties:
[
{{
"product_name": "...",
"batch_number": "...",
"expiry_date": "...",
"mrp": "...",
"quantity": ...
}},
...
]
Use null for any value you cannot extract. Return ONLY the JSON array.
"""
chunk_items = []
# Process chunk
try:
chunk_response = client.models.generate_content(
model="gemini-2.0-flash",
contents=[extraction_prompt, chunk_df.to_string()],
config={
'response_mime_type': 'application/json',
'temperature': 0.1,
'max_output_tokens': 8192,
}
)
# Extract items
chunk_text = chunk_response.text
# Fix common JSON issues
chunk_text = re.sub(r'[\n\r\t]', '', chunk_text)
chunk_text = re.sub(r',\s*]', ']', chunk_text)
# Extract JSON array
match = re.search(r'\[(.*)\]', chunk_text, re.DOTALL)
if match:
try:
chunk_items = json.loads('[' + match.group(1) + ']')
logger.info(f"Successfully processed chunk {i+1} with {len(chunk_items)} items")
except json.JSONDecodeError:
logger.error(f"Error parsing JSON in chunk {i+1}")
except Exception as e:
logger.error(f"Error processing chunk {i+1}: {str(e)}")
return chunk_items
def prepare_chunks(df: pd.DataFrame, chunk_size: int) -> List[Tuple[int, pd.DataFrame, int, int]]:
"""
Prepare dataframe chunks for processing.
Args:
df: Input dataframe
chunk_size: Size of each chunk
Returns:
List of chunk information tuples
"""
num_chunks = (len(df) + chunk_size - 1) // chunk_size
chunks_to_process = []
for i in range(num_chunks):
start_idx = i * chunk_size
end_idx = min((i + 1) * chunk_size, len(df))
chunk_df = df.iloc[start_idx:end_idx]
chunks_to_process.append((i, chunk_df, start_idx, end_idx))
return chunks_to_process
def process_excel_file(file_path: str, output_path: str, chunk_size: int = 20, max_workers: int = 2) -> Dict[str, Any]:
"""
Process an Excel file to extract product information using Gemini API.
Args:
file_path: Path to the Excel file
output_path: Path to save the extracted data
chunk_size: Size of each chunk for processing
max_workers: Maximum number of parallel workers
Returns:
Dict containing the extraction results
"""
# Setup environment
setup_environment()
client = get_gemini_client()
# Read Excel file
logger.info(f"Reading Excel file: {file_path}")
df = pd.read_excel(file_path)
# Prepare chunks for processing
chunks_to_process = prepare_chunks(df, chunk_size)
num_chunks = len(chunks_to_process)
# Process chunks in parallel
logger.info(f"Processing {num_chunks} chunks with {max_workers} workers")
all_items = []
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
# Pass client to each process_chunk call
results = list(executor.map(
lambda chunk: process_chunk(chunk, client),
chunks_to_process
))
# Combine results
for chunk_items in results:
all_items.extend(chunk_items)
# Create final result
final_result = {
"items": all_items,
"extraction_status": "COMPLETE" if all_items else "INCOMPLETE",
"total_items": len(all_items)
}
# Save the final result
with open(output_path, "w") as f:
json.dump(final_result, f, indent=2)
logger.info(f"Extraction complete. Total items extracted: {len(all_items)}")
return final_result
def main() -> None:
"""
Main function to run the Excel processing script.
"""
input_file = 'expiry_invoice/SAC01000975.xls'
output_file = "extracted_invoice_data.json"
# Ensure the output directory exists
output_path = Path(output_file)
output_path.parent.mkdir(parents=True, exist_ok=True)
# Process the Excel file
result = process_excel_file(
file_path=input_file,
output_path=output_file,
chunk_size=20,
max_workers=2
)
print(f"Extraction complete. Total items extracted: {result['total_items']}")
if __name__ == "__main__":
main() |