Spaces:
Running
Running
import sqlite3 | |
from sqlite3 import Error | |
import csv | |
import pandas as pd | |
import os | |
def create_connection(db_file): | |
""" create a database connection to a database that resides | |
in the memory | |
""" | |
conn = None; | |
try: | |
conn = sqlite3.connect(db_file) | |
return conn | |
except Error as e: | |
print(e) | |
return conn | |
def create_table(conn, create_table_sql, table_name): | |
""" create a table from the create_table_sql statement | |
:param conn: Connection object | |
:param create_table_sql: a CREATE TABLE statement | |
:return: | |
""" | |
try: | |
c = conn.cursor() | |
c.execute(f"""DROP TABLE IF EXISTS {table_name}""") | |
c.execute(create_table_sql) | |
except Error as e: | |
print(e) | |
def insert_values(conn, task, sql): | |
cur = conn.cursor() | |
cur.execute(sql, task) | |
conn.commit() | |
return cur.lastrowid | |
def populate(csv_file, db_file, table_insert): | |
conn = create_connection(db_file) | |
with conn: | |
with open(csv_file, mode ='r')as file: | |
csvfile = csv.reader(file) | |
for n,lines in enumerate(csvfile): | |
if n>0: | |
lines = tuple(i for i in lines) | |
insert_values(conn, lines, table_insert) | |
else:pass | |
def main(): | |
name = "sql_pdf.xlsx" | |
excel_file = (pd.read_excel(name)) | |
csv_file = f"""{name.split(".")[0]}.csv""" | |
excel_file.to_csv(csv_file, | |
index=None, | |
header=True) | |
column = [x for x in excel_file.columns] | |
column_type = {} | |
type_map = { | |
"<class 'str'>": "TEXT", | |
"<class 'int'>": "INTEGER", | |
"<class 'float'>": "REAL", | |
} | |
for i in range(len(column)): | |
datatype = {} | |
for j in excel_file.values: | |
if type(j[i]) not in list(datatype.keys()):datatype[type(j[i])] = 1 | |
else: datatype[type(j[i])] += 1 | |
ma_x = 0 | |
max_type = "<class 'str'>" | |
for k in list(datatype.keys()): | |
if ma_x < datatype[k]:max_type = str(k) | |
try: | |
column_type[column[i]] = type_map[max_type] | |
except KeyError: | |
column_type[column[i]] = "TEXT" | |
print(column_type) | |
table_construct = f"""CREATE TABLE IF NOT EXISTS {name.split(".")[0]}( """ | |
table_insert = f"""INSERT INTO {name.split(".")[0]}(""" | |
table_values = f"""VALUES (""" | |
for l in list(column_type.keys()): | |
table_construct += f"""{l} {column_type[l]}, """ | |
table_insert += f"""{l}, """ | |
table_values += "?, " | |
table_construct = f"""{table_construct[:-2]});""" | |
table_values = f"""{table_values[:-2]})""" | |
table_insert = f"""{table_insert[:-2]})\n{table_values}""" | |
print(table_construct) | |
print("\n\n", table_insert) | |
database = f"""{name.split(".")[0]}.db""" | |
conn = create_connection(database) | |
# create tables | |
if conn is not None: | |
# create projects table | |
create_table(conn, table_construct, name.split(".")[0]) | |
else: | |
print("Error! cannot create the database connection.") | |
populate(csv_file, database, table_insert) | |
if __name__ == '__main__': | |
main() |