bash / pyUltroid /startup /_database.py
azils3's picture
Upload 216 files
618430a verified
# Ultroid - UserBot
# Copyright (C) 2021-2025 TeamUltroid
#
# This file is a part of < https://github.com/TeamUltroid/Ultroid/ >
# PLease read the GNU Affero General Public License in
# <https://github.com/TeamUltroid/pyUltroid/blob/main/LICENSE>.
import ast
import os
import sys
from .. import run_as_module
from . import *
if run_as_module:
from ..configs import Var
Redis = MongoClient = psycopg2 = Database = None
if Var.REDIS_URI or Var.REDISHOST:
try:
from redis import Redis
except ImportError:
LOGS.info("Installing 'redis' for database.")
os.system(f"{sys.executable} -m pip install -q redis hiredis")
from redis import Redis
elif Var.MONGO_URI:
try:
from pymongo import MongoClient
except ImportError:
LOGS.info("Installing 'pymongo' for database.")
os.system(f"{sys.executable} -m pip install -q pymongo[srv]")
from pymongo import MongoClient
elif Var.DATABASE_URL:
try:
import psycopg2
except ImportError:
LOGS.info("Installing 'pyscopg2' for database.")
os.system(f"{sys.executable} -m pip install -q psycopg2-binary")
import psycopg2
else:
try:
from localdb import Database
except ImportError:
LOGS.info("Using local file as database.")
os.system(f"{sys.executable} -m pip install -q localdb.json")
from localdb import Database
# --------------------------------------------------------------------------------------------- #
class _BaseDatabase:
def __init__(self, *args, **kwargs):
self._cache = {}
def get_key(self, key):
if key in self._cache:
return self._cache[key]
value = self._get_data(key)
self._cache.update({key: value})
return value
def re_cache(self):
self._cache.clear()
for key in self.keys():
self._cache.update({key: self.get_key(key)})
def ping(self):
return 1
@property
def usage(self):
return 0
def keys(self):
return []
def del_key(self, key):
if key in self._cache:
del self._cache[key]
self.delete(key)
return True
def _get_data(self, key=None, data=None):
if key:
data = self.get(str(key))
if data and isinstance(data, str):
try:
data = ast.literal_eval(data)
except BaseException:
pass
return data
def set_key(self, key, value, cache_only=False):
value = self._get_data(data=value)
self._cache[key] = value
if cache_only:
return
return self.set(str(key), str(value))
def rename(self, key1, key2):
_ = self.get_key(key1)
if _:
self.del_key(key1)
self.set_key(key2, _)
return 0
return 1
class MongoDB(_BaseDatabase):
def __init__(self, key, dbname="UltroidDB"):
self.dB = MongoClient(key, serverSelectionTimeoutMS=5000)
self.db = self.dB[dbname]
super().__init__()
def __repr__(self):
return f"<Ultroid.MonGoDB\n -total_keys: {len(self.keys())}\n>"
@property
def name(self):
return "Mongo"
@property
def usage(self):
return self.db.command("dbstats")["dataSize"]
def ping(self):
if self.dB.server_info():
return True
def keys(self):
return self.db.list_collection_names()
def set(self, key, value):
if key in self.keys():
self.db[key].replace_one({"_id": key}, {"value": str(value)})
else:
self.db[key].insert_one({"_id": key, "value": str(value)})
return True
def delete(self, key):
self.db.drop_collection(key)
def get(self, key):
if x := self.db[key].find_one({"_id": key}):
return x["value"]
def flushall(self):
self.dB.drop_database("UltroidDB")
self._cache.clear()
return True
# --------------------------------------------------------------------------------------------- #
# Thanks to "Akash Pattnaik" / @BLUE-DEVIL1134
# for SQL Implementation in Ultroid.
#
# Please use https://elephantsql.com/ !
class SqlDB(_BaseDatabase):
def __init__(self, url):
self._url = url
self._connection = None
self._cursor = None
try:
self._connection = psycopg2.connect(dsn=url)
self._connection.autocommit = True
self._cursor = self._connection.cursor()
self._cursor.execute(
"CREATE TABLE IF NOT EXISTS Ultroid (ultroidCli varchar(70))"
)
except Exception as error:
LOGS.exception(error)
LOGS.info("Invaid SQL Database")
if self._connection:
self._connection.close()
sys.exit()
super().__init__()
@property
def name(self):
return "SQL"
@property
def usage(self):
self._cursor.execute(
"SELECT pg_size_pretty(pg_relation_size('Ultroid')) AS size"
)
data = self._cursor.fetchall()
return int(data[0][0].split()[0])
def keys(self):
self._cursor.execute(
"SELECT column_name FROM information_schema.columns WHERE table_schema = 'public' AND table_name = 'ultroid'"
) # case sensitive
data = self._cursor.fetchall()
return [_[0] for _ in data]
def get(self, variable):
try:
self._cursor.execute(f"SELECT {variable} FROM Ultroid")
except psycopg2.errors.UndefinedColumn:
return None
data = self._cursor.fetchall()
if not data:
return None
if len(data) >= 1:
for i in data:
if i[0]:
return i[0]
def set(self, key, value):
try:
self._cursor.execute(f"ALTER TABLE Ultroid DROP COLUMN IF EXISTS {key}")
except (psycopg2.errors.UndefinedColumn, psycopg2.errors.SyntaxError):
pass
except BaseException as er:
LOGS.exception(er)
self._cache.update({key: value})
self._cursor.execute(f"ALTER TABLE Ultroid ADD {key} TEXT")
self._cursor.execute(f"INSERT INTO Ultroid ({key}) values (%s)", (str(value),))
return True
def delete(self, key):
try:
self._cursor.execute(f"ALTER TABLE Ultroid DROP COLUMN {key}")
except psycopg2.errors.UndefinedColumn:
return False
return True
def flushall(self):
self._cache.clear()
self._cursor.execute("DROP TABLE Ultroid")
self._cursor.execute(
"CREATE TABLE IF NOT EXISTS Ultroid (ultroidCli varchar(70))"
)
return True
# --------------------------------------------------------------------------------------------- #
class RedisDB(_BaseDatabase):
def __init__(
self,
host,
port,
password,
platform="",
logger=LOGS,
*args,
**kwargs,
):
if host and ":" in host:
spli_ = host.split(":")
host = spli_[0]
port = int(spli_[-1])
if host.startswith("http"):
logger.error("Your REDIS_URI should not start with http !")
import sys
sys.exit()
elif not host or not port:
logger.error("Port Number not found")
import sys
sys.exit()
kwargs["host"] = host
kwargs["password"] = password
kwargs["port"] = port
if platform.lower() == "qovery" and not host:
var, hash_, host, password = "", "", "", ""
for vars_ in os.environ:
if vars_.startswith("QOVERY_REDIS_") and vars.endswith("_HOST"):
var = vars_
if var:
hash_ = var.split("_", maxsplit=2)[1].split("_")[0]
if hash:
kwargs["host"] = os.environ.get(f"QOVERY_REDIS_{hash_}_HOST")
kwargs["port"] = os.environ.get(f"QOVERY_REDIS_{hash_}_PORT")
kwargs["password"] = os.environ.get(f"QOVERY_REDIS_{hash_}_PASSWORD")
self.db = Redis(**kwargs)
self.set = self.db.set
self.get = self.db.get
self.keys = self.db.keys
self.delete = self.db.delete
super().__init__()
@property
def name(self):
return "Redis"
@property
def usage(self):
return sum(self.db.memory_usage(x) for x in self.keys())
# --------------------------------------------------------------------------------------------- #
class LocalDB(_BaseDatabase):
def __init__(self):
self.db = Database("ultroid")
self.get = self.db.get
self.set = self.db.set
self.delete = self.db.delete
super().__init__()
@property
def name(self):
return "LocalDB"
def keys(self):
return self._cache.keys()
def __repr__(self):
return f"<Ultroid.LocalDB\n -total_keys: {len(self.keys())}\n>"
def UltroidDB():
_er = False
from .. import HOSTED_ON
try:
if Redis:
return RedisDB(
host=Var.REDIS_URI or Var.REDISHOST,
password=Var.REDIS_PASSWORD or Var.REDISPASSWORD,
port=Var.REDISPORT,
platform=HOSTED_ON,
decode_responses=True,
socket_timeout=5,
retry_on_timeout=True,
)
elif MongoClient:
return MongoDB(Var.MONGO_URI)
elif psycopg2:
return SqlDB(Var.DATABASE_URL)
else:
LOGS.critical(
"No DB requirement fullfilled!\nPlease install redis, mongo or sql dependencies...\nTill then using local file as database."
)
return LocalDB()
except BaseException as err:
LOGS.exception(err)
exit()
# --------------------------------------------------------------------------------------------- #