chunknorris / app.py
mciancone's picture
Upload app.py
82c01c8 verified
raw
history blame
10.3 kB
from pathlib import Path
import time
import json
from typing import Literal
from io import BytesIO
import requests
import streamlit as st
from streamlit.runtime.uploaded_file_manager import UploadedFile, UploadedFileRec
from streamlit import session_state as ss
from chunknorris.parsers import (AbstractParser, MarkdownParser, HTMLParser, PdfParser, DocxParser, ExcelParser, CSVParser)
from chunknorris.chunkers import MarkdownChunker
from chunknorris.pipelines import PdfPipeline
st.set_page_config(
layout="wide",
page_icon="🔪",
page_title="ChunkNorris demo",
menu_items={
'Report a bug': "https://github.com/wikit-ai/chunknorris/issues",
'About': "https://wikit-ai.github.io/chunknorris/"
}
)
LOGGER = st.empty()
SAMPLE_FILE = {
"sample PDF - 264 pages" : "https://raw.githubusercontent.com/wikit-ai/chunknorris/refs/heads/main/docs/examples/example_data/sample.pdf",
"sample PDF - 16 pages" : "https://raw.githubusercontent.com/wikit-ai/chunknorris/refs/heads/main/docs/examples/example_data/sample2.pdf",
"sample MD": "https://raw.githubusercontent.com/wikit-ai/chunknorris/refs/heads/main/README.md",
"sample XLSX": "https://raw.githubusercontent.com/wikit-ai/chunknorris/refs/heads/main/docs/examples/example_data/sample.xlsx",
}
if "parsing_time" not in ss:
ss.parsing_time = 0
if "parsed_md" not in ss:
ss.parsed_md = ""
if "chunks" not in ss:
ss.chunks = []
def get_parser(fileext : str) -> AbstractParser:
"""Get the pipeline for the given filename."""
match fileext:
case ".md":
parser = MarkdownParser()
case ".html":
parser = HTMLParser()
case ".pdf":
parser = PdfParser(
use_ocr="never",
)
case ".docx":
parser = DocxParser()
case ".xls" | ".xlsx" | ".xlsm" | ".xlsb" | ".odf" | ".ods" | ".odt":
parser = ExcelParser()
case ".csv":
parser = CSVParser()
case _:
raise ValueError(
"File format not supported by ChunkNorris"
)
return parser
def get_md_chunker() -> MarkdownChunker:
"""Considering arguments set, returns the md chunker."""
return MarkdownChunker(
max_headers_to_use=ss.max_headers_to_use,
max_chunk_word_count=ss.max_chunk_word_count,
hard_max_chunk_word_count=ss.hard_max_chunk_word_count,
min_chunk_word_count=ss.min_chunk_word_count,
)
def parse_and_chunk(uploaded_file : UploadedFile):
"""Parse and chunk the file."""
if uploaded_file is None:
log("Please upload a file.", "warning")
return
log("Parsing and chunking...", "info")
try:
fileext = Path(uploaded_file.name).suffix.lower()
parser = get_parser(fileext)
start_time = time.perf_counter()
match fileext:
case ".pdf":
md_doc = parser.parse_string(uploaded_file.getvalue())
chunker = PdfPipeline(parser, get_md_chunker())
chunks = chunker._get_chunks_using_strategy()
case ".xlsx":
md_doc = parser.parse_string(uploaded_file.getvalue())
chunker = get_md_chunker()
chunks = chunker.chunk(md_doc)
case _:
md_doc = parser.parse_string(uploaded_file.getvalue().decode("utf-8"))
chunker = get_md_chunker()
chunks = chunker.chunk(md_doc)
ss.parsing_time = time.perf_counter() - start_time
ss.parsed_md = md_doc.to_string()
ss.chunks = chunks
log(f"Parsing and chunking took {round(ss.parsing_time, 4)} seconds.", "success")
except Exception as e:
log(f"Error when parsing file.", "warning")
print(e)
return
def save_parsed_md():
"""Save the parsed markdown string to a md file."""
return ss.parsed_md.encode("utf-8")
def save_chunks():
"""Save the parsed chunks to a json file."""
return json.dumps([
{
k:v
for k,v in chunk.model_dump().items()
if k not in ["headers","content"]
} | {
"text": chunk.get_text(prepend_headers=ss.prepend_headers_to_chunks)
}
for chunk in ss.chunks
], indent=4, ensure_ascii=False).encode("utf-8")
def log(message: str, log_type: Literal["success", "warning", "info"] = "info"):
"""Display a warning message."""
match log_type:
case "warning":
LOGGER.warning(message, icon="⚠️")
case "success":
LOGGER.success(message, icon="✅")
case "info":
LOGGER.info(message, icon="ℹ️")
def load_sample_file(url: str):
"""Get the file from url"""
response = requests.get(url)
if response.status_code == 200:
return UploadedFile(
record=UploadedFileRec(
file_id="sample_file",
name=url.split("/")[-1],
data=response.content,
type="application/octet-stream",
),
file_urls=[url]
)
else:
print(response.status_code, response.content)
st.error("Failed to get data.")
return None
st.title("ChunkNorris.")
st.subheader("*Fast, smart, lightweight document chunking.*")
st.sidebar.header("Chunking settings")
st.sidebar.markdown("| [Documentation](https://wikit-ai.github.io/chunknorris/) | [Tutorials](https://wikit-ai.github.io/chunknorris/examples/) | [Repo](https://github.com/wikit-ai/chunknorris) |")
st.sidebar.select_slider(
label="Max header level to consider for chunking",
options=["h1", "h2", "h3", "h4", "h5", "h6"],
value="h4",
key="max_headers_to_use",
help="Max section header level to consider for chunking. Lower level headers won't be used to split a chunk into smaller chunks.",
label_visibility="visible",
)
st.sidebar.slider(
label="Maximum words per chunk",
value=250,
min_value=0,
max_value=3000,
step=50,
key="max_chunk_word_count",
help="Maximum number of words per chunk. If a chunk is bigger than this, chunk is split using subsection headers if any are available.",
label_visibility="visible",
)
st.sidebar.slider(
label="Hard maximum words per chunk",
value=400,
min_value=100,
max_value=3000,
step=50,
key="hard_max_chunk_word_count",
help="The hard maximum number of words per chunk. If a chunk is bigger than this, chunk is split using newlines, still trying to preverse code blocks or tables integrity.",
label_visibility="visible",
)
st.sidebar.slider(
label="Minumum words per chunk",
value=10,
min_value=0,
max_value=50,
step=1,
key="min_chunk_word_count",
help="The minimum words a chunk must have to avoid being discarded.",
label_visibility="visible",
)
st.sidebar.checkbox(
"Prepend headers to chunk's text",
value=True,
key="prepend_headers_to_chunks",
label_visibility="visible",
help="Whether or not all the parent headers should be prepended to the chunk's text content. Might improve retrieval performance of the chunk as it preserves context.",
)
_, col1, col2, _ = st.columns([0.1, .5, .3, 0.1])
with col1:
uploaded_file = st.file_uploader(
"Upload your own file...",
type=["md", "html", "pdf", "docx", "xls", "xlsx", "xlsm", "xlsb", "odf", "ods", "odt", "csv"],
)
with col2:
sample_file = st.selectbox(
"... Or choose a sample file from the list.",
options=list(SAMPLE_FILE.keys()),
index=None,
)
if sample_file is not None:
st.markdown(f"[View file]({SAMPLE_FILE[sample_file]})")
uploaded_file = load_sample_file(SAMPLE_FILE[sample_file])
if uploaded_file is not None:
parse_and_chunk(uploaded_file)
st.sidebar.button(
"Parse & Chunk",
on_click=parse_and_chunk,
args=(uploaded_file,),
type="primary",
use_container_width=True
)
else:
st.sidebar.button(
"Parse & Chunk",
on_click=log,
args=("You must upload a file first.", "warning",),
type="secondary",
use_container_width=True
)
ss.parsed_md = ""
ss.chunks = []
col1, col2 = st.columns(2)
with col1:
if ss.parsed_md:
file_parsed_md = save_parsed_md()
cola, colb = st.columns([0.25, 0.75])
with colb:
st.subheader("⚙️ Parsed Document", divider="blue")
with cola:
st.write("\n")
st.download_button(
label="⬇️ Download",
data=file_parsed_md,
file_name="chunknorris_parsed_document.md",
mime="text/markdown",
use_container_width=True,
)
if Path(uploaded_file.name).suffix.lower() == ".pdf":
st.info("For the purpose of this demo, OCR on pdf documents is deactivated.", icon="ℹ️")
with st.expander("Parsed document", expanded=True):
with st.container(height=600, border=False):
st.markdown(ss.parsed_md)
with col2:
if ss.chunks:
file_chunks = save_chunks()
cola, colb = st.columns([0.25, 0.75])
with colb:
st.subheader("📦 Chunks", divider="blue")
with cola:
st.write("\n")
st.download_button(
label="⬇️ Download",
data=file_chunks,
file_name="chunknorris_chunks.json",
mime="application/json",
use_container_width=True,
)
with st.container(border=False):
for i, chunk in enumerate(ss.chunks):
with st.expander(f"Chunk {i+1}", expanded=False):
with st.container(height=300, border=False):
st.markdown(chunk.get_text(prepend_headers=ss.prepend_headers_to_chunks))