Spaces:
Sleeping
Sleeping
Create fiber.py
Browse files
fiber.py
ADDED
@@ -0,0 +1,167 @@
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
1 |
+
import re
|
2 |
+
from typing import List, Dict
|
3 |
+
from datetime import datetime
|
4 |
+
from collections import Counter
|
5 |
+
|
6 |
+
class FiberDBMS:
|
7 |
+
def __init__(self):
|
8 |
+
self.database: List[Dict[str, str]] = []
|
9 |
+
self.content_index: Dict[str, List[int]] = {}
|
10 |
+
|
11 |
+
def add_entry(self, name: str, content: str, tags: str) -> None:
|
12 |
+
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
13 |
+
entry = {
|
14 |
+
"name": name,
|
15 |
+
"timestamp": timestamp,
|
16 |
+
"content": content,
|
17 |
+
"tags": tags
|
18 |
+
}
|
19 |
+
self.database.append(entry)
|
20 |
+
self._index_content(len(self.database) - 1, content)
|
21 |
+
|
22 |
+
def _index_content(self, entry_index: int, content: str) -> None:
|
23 |
+
words = self._tokenize(content)
|
24 |
+
for word in words:
|
25 |
+
if word not in self.content_index:
|
26 |
+
self.content_index[word] = []
|
27 |
+
self.content_index[word].append(entry_index)
|
28 |
+
|
29 |
+
def load_or_create(self, filename: str) -> None:
|
30 |
+
"""Load the database from a file or create a new one if the file does not exist."""
|
31 |
+
try:
|
32 |
+
self.load_from_file(filename)
|
33 |
+
print(f"Loaded {len(self.database)} entries from {filename}.")
|
34 |
+
except FileNotFoundError:
|
35 |
+
print(f"{filename} not found. Creating a new database.")
|
36 |
+
# Optionally, you can add default entries here if needed.
|
37 |
+
|
38 |
+
def query(self, query: str, top_n: int) -> List[Dict[str, str]]:
|
39 |
+
"""Query the database for entries matching the query."""
|
40 |
+
query_words = self._tokenize(query)
|
41 |
+
matching_indices = set()
|
42 |
+
for word in query_words:
|
43 |
+
if word in self.content_index:
|
44 |
+
matching_indices.update(self.content_index[word])
|
45 |
+
|
46 |
+
sorted_results = sorted(
|
47 |
+
matching_indices,
|
48 |
+
key=lambda idx: self._rate_result(self.database[idx], query_words),
|
49 |
+
reverse=True
|
50 |
+
)
|
51 |
+
|
52 |
+
results = []
|
53 |
+
for idx in sorted_results[:top_n]:
|
54 |
+
entry = self.database[idx]
|
55 |
+
snippet = self._get_snippet(entry['content'], query_words)
|
56 |
+
updated_tags = self._update_tags(entry['tags'], entry['content'], query_words)
|
57 |
+
results.append({
|
58 |
+
'name': entry['name'],
|
59 |
+
'content': snippet,
|
60 |
+
'tags': updated_tags,
|
61 |
+
'index': idx
|
62 |
+
})
|
63 |
+
|
64 |
+
return results
|
65 |
+
|
66 |
+
def save(self, filename: str) -> None:
|
67 |
+
"""Save the current database to a file."""
|
68 |
+
with open(filename, 'w', encoding='utf-8') as f:
|
69 |
+
for entry in self.database:
|
70 |
+
line = f"{entry['name']}\t{entry['timestamp']}\t{entry['content']}\t{entry['tags']}\n"
|
71 |
+
f.write(line)
|
72 |
+
print(f"Updated database saved to {filename}.")
|
73 |
+
|
74 |
+
def _rate_result(self, entry: Dict[str, str], query_words: List[str]) -> float:
|
75 |
+
content_lower = entry['content'].lower()
|
76 |
+
name_lower = entry['name'].lower()
|
77 |
+
tags = entry['tags'].split(',')
|
78 |
+
|
79 |
+
unique_matches = sum(1 for word in set(query_words) if word in content_lower)
|
80 |
+
content_score = sum(content_lower.count(word) for word in query_words)
|
81 |
+
name_score = sum(3 for word in query_words if word in name_lower)
|
82 |
+
phrase_score = 5 if ' '.join(query_words) in content_lower else 0
|
83 |
+
unique_match_score = unique_matches * 10
|
84 |
+
|
85 |
+
# Include all tags in weighting
|
86 |
+
tag_score = sum(2 for tag in tags if any(word in tag.lower() for word in query_words))
|
87 |
+
|
88 |
+
length_penalty = min(1, len(content_lower) / 100)
|
89 |
+
|
90 |
+
return (content_score + name_score + phrase_score + unique_match_score + tag_score) * length_penalty
|
91 |
+
|
92 |
+
def _tokenize(self, text: str) -> List[str]:
|
93 |
+
return re.findall(r'\w+', text.lower())
|
94 |
+
|
95 |
+
def _get_snippet(self, content: str, query_words: List[str], max_length: int = 200) -> str:
|
96 |
+
lower_content = content.lower()
|
97 |
+
best_start = 0
|
98 |
+
max_score = 0
|
99 |
+
|
100 |
+
for i in range(len(lower_content) - max_length):
|
101 |
+
snippet = lower_content[i:i+max_length]
|
102 |
+
score = sum(snippet.count(word) * (len(word) ** 0.5) for word in query_words)
|
103 |
+
if score > max_score:
|
104 |
+
max_score = score
|
105 |
+
best_start = i
|
106 |
+
|
107 |
+
snippet = content[best_start:best_start+max_length]
|
108 |
+
return snippet + "..." if len(content) > max_length else snippet
|
109 |
+
|
110 |
+
def _update_tags(self, original_tags: str, content: str, query_words: List[str]) -> str:
|
111 |
+
tags = original_tags.split(',')
|
112 |
+
original_tag = tags[0] # Keep the first tag unchanged
|
113 |
+
|
114 |
+
words = self._tokenize(content)
|
115 |
+
word_counts = Counter(words)
|
116 |
+
|
117 |
+
relevant_keywords = [word for word in query_words if word in word_counts and word not in tags]
|
118 |
+
relevant_keywords += [word for word, count in word_counts.most_common(5) if word not in tags and word not in query_words]
|
119 |
+
|
120 |
+
updated_tags = [original_tag] + tags[1:] + relevant_keywords
|
121 |
+
return ','.join(updated_tags)
|
122 |
+
|
123 |
+
def load_from_file(self, filename: str) -> None:
|
124 |
+
self.database.clear()
|
125 |
+
self.content_index.clear()
|
126 |
+
with open(filename, 'r', encoding='utf-8') as f:
|
127 |
+
for idx, line in enumerate(f):
|
128 |
+
name, timestamp, content, tags = line.strip().split('\t')
|
129 |
+
self.database.append({
|
130 |
+
"name": name,
|
131 |
+
"timestamp": timestamp,
|
132 |
+
"content": content,
|
133 |
+
"tags": tags
|
134 |
+
})
|
135 |
+
self._index_content(idx, content)
|
136 |
+
|
137 |
+
def main():
|
138 |
+
dbms = FiberDBMS()
|
139 |
+
|
140 |
+
# Load or create the database
|
141 |
+
dbms.load_or_create("Celsiaaa.txt")
|
142 |
+
|
143 |
+
while True:
|
144 |
+
query = input("\nEnter your search query (or 'quit' to exit): ")
|
145 |
+
if query.lower() == 'quit':
|
146 |
+
break
|
147 |
+
|
148 |
+
try:
|
149 |
+
top_n = int(input("Enter the number of top results to display: "))
|
150 |
+
except ValueError:
|
151 |
+
print("Invalid input. Using default value of 5.")
|
152 |
+
top_n = 5
|
153 |
+
|
154 |
+
results = dbms.query(query, top_n)
|
155 |
+
if results:
|
156 |
+
print(f"\nTop {len(results)} results for '{query}':")
|
157 |
+
for idx, result in enumerate(results, 1):
|
158 |
+
print(f"\nResult {idx}:")
|
159 |
+
print(f"Name: {result['name']}")
|
160 |
+
print(f"Content: {result['content']}")
|
161 |
+
print(f"Tags: {result['tags']}")
|
162 |
+
else:
|
163 |
+
print(f"No results found for '{query}'.")
|
164 |
+
|
165 |
+
# Save updated database with new tags
|
166 |
+
dbms.save("Celsiaaa.txt")
|
167 |
+
|