bilegentile's picture
Upload folder using huggingface_hub
c19ca42 verified
raw
history blame contribute delete
29.7 kB
import os
import sys
import time
import json
import platform
import subprocess
import datetime
import logging
from html.parser import HTMLParser
import torch
import gradio as gr
from modules import paths, script_callbacks, sd_models, sd_samplers, shared, extensions, devices
from benchmark import run_benchmark, submit_benchmark # pylint: disable=E0401,E0611,C0411
### system info globals
log = logging.getLogger('sd')
data = {
'date': '',
'timestamp': '',
'uptime': '',
'version': {},
'torch': '',
'gpu': {},
'state': {},
'memory': {},
'optimizations': [],
'libs': {},
'repos': {},
'device': {},
'schedulers': [],
'extensions': [],
'platform': '',
'crossattention': '',
'backend': getattr(devices, 'backend', ''),
'pipeline': shared.opts.data.get('sd_backend', ''),
'model': {},
}
networks = {
'models': [],
'hypernetworks': [],
'embeddings': [],
'skipped': [],
'loras': [],
'lycos': [],
}
### benchmark globals
bench_text = ''
bench_file = os.path.join(os.path.dirname(__file__), 'benchmark-data-local.json')
bench_headers = ['timestamp', 'performance', 'version', 'system', 'libraries', 'gpu', 'pipeline', 'model', 'username', 'note', 'hash']
bench_data = []
### system info module
def get_user():
user = ''
if user == '':
try:
user = os.getlogin()
except Exception:
pass
if user == '':
try:
import pwd
user = pwd.getpwuid(os.getuid())[0]
except Exception:
pass
return user
def get_gpu():
if not torch.cuda.is_available():
try:
if shared.cmd_opts.use_openvino:
from modules.intel.openvino import get_openvino_device
return {
'device': get_openvino_device(),
'openvino': get_package_version("openvino")
}
else:
return {}
except Exception:
return {}
else:
try:
if hasattr(torch, "xpu") and torch.xpu.is_available():
return {
'device': f'{torch.xpu.get_device_name(torch.xpu.current_device())} ({str(torch.xpu.device_count())})',
'ipex': get_package_version('intel-extension-for-pytorch'),
}
elif torch.version.cuda:
return {
'device': f'{torch.cuda.get_device_name(torch.cuda.current_device())} ({str(torch.cuda.device_count())}) ({torch.cuda.get_arch_list()[-1]}) {str(torch.cuda.get_device_capability(shared.device))}',
'cuda': torch.version.cuda,
'cudnn': torch.backends.cudnn.version(),
'driver': get_driver(),
}
elif torch.version.hip:
return {
'device': f'{torch.cuda.get_device_name(torch.cuda.current_device())} ({str(torch.cuda.device_count())})',
'hip': torch.version.hip,
}
else:
return {
'device': 'unknown'
}
except Exception as e:
return { 'error': e }
def get_driver():
if torch.cuda.is_available() and torch.version.cuda:
try:
result = subprocess.run('nvidia-smi --query-gpu=driver_version --format=csv,noheader', shell=True, check=False, env=os.environ, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
version = result.stdout.decode(encoding="utf8", errors="ignore").strip()
return version
except Exception:
return ''
else:
return ''
def get_uptime():
s = vars(shared.state)
return time.strftime('%c', time.localtime(s.get('server_start', time.time())))
class HTMLFilter(HTMLParser):
text = ""
def handle_data(self, data): # pylint: disable=redefined-outer-name
self.text += data
def get_state():
s = vars(shared.state)
flags = 'skipped ' if s.get('skipped', False) else ''
flags += 'interrupted ' if s.get('interrupted', False) else ''
flags += 'needs restart' if s.get('need_restart', False) else ''
text = s.get('textinfo', '')
if text is not None and len(text) > 0:
f = HTMLFilter()
f.feed(text)
text = os.linesep.join([s for s in f.text.splitlines() if s])
return {
'started': time.strftime('%c', time.localtime(s.get('time_start', time.time()))),
'step': f'{s.get("sampling_step", 0)} / {s.get("sampling_steps", 0)}',
'jobs': f'{s.get("job_no", 0)} / {s.get("job_count", 0)}', # pylint: disable=consider-using-f-string
'flags': flags,
'job': s.get('job', ''),
'text-info': text,
}
def get_memory():
def gb(val: float):
return round(val / 1024 / 1024 / 1024, 2)
mem = {}
try:
import psutil
process = psutil.Process(os.getpid())
res = process.memory_info()
ram_total = 100 * res.rss / process.memory_percent()
ram = { 'free': gb(ram_total - res.rss), 'used': gb(res.rss), 'total': gb(ram_total) }
mem.update({ 'ram': ram })
except Exception as e:
mem.update({ 'ram': e })
if torch.cuda.is_available():
try:
s = torch.cuda.mem_get_info()
gpu = { 'free': gb(s[0]), 'used': gb(s[1] - s[0]), 'total': gb(s[1]) }
s = dict(torch.cuda.memory_stats(shared.device))
allocated = { 'current': gb(s['allocated_bytes.all.current']), 'peak': gb(s['allocated_bytes.all.peak']) }
reserved = { 'current': gb(s['reserved_bytes.all.current']), 'peak': gb(s['reserved_bytes.all.peak']) }
active = { 'current': gb(s['active_bytes.all.current']), 'peak': gb(s['active_bytes.all.peak']) }
inactive = { 'current': gb(s['inactive_split_bytes.all.current']), 'peak': gb(s['inactive_split_bytes.all.peak']) }
warnings = { 'retries': s['num_alloc_retries'], 'oom': s['num_ooms'] }
mem.update({
'gpu': gpu,
'gpu-active': active,
'gpu-allocated': allocated,
'gpu-reserved': reserved,
'gpu-inactive': inactive,
'events': warnings,
'utilization': 0,
})
mem.update({ 'utilization': torch.cuda.utilization() }) # do this one separately as it may fail
except Exception:
pass
else:
try:
from openvino.runtime import Core as OpenVINO_Core
from modules.intel.openvino import get_device as get_raw_openvino_device
openvino_core = OpenVINO_Core()
mem.update({
'gpu': { 'total': gb(openvino_core.get_property(get_raw_openvino_device(), 'GPU_DEVICE_TOTAL_MEM_SIZE')) },
})
except Exception:
pass
return mem
def get_optimizations():
ram = []
if getattr(shared.cmd_opts, 'medvram', False):
ram.append('medvram')
if getattr(shared.cmd_opts, 'medvram_sdxl', False):
ram.append('medvram-sdxl')
if getattr(shared.cmd_opts, 'lowvram', False):
ram.append('lowvram')
if getattr(shared.cmd_opts, 'lowvam', False):
ram.append('lowram')
if len(ram) == 0:
ram.append('none')
return ram
def get_package_version(pkg: str):
import pkg_resources
spec = pkg_resources.working_set.by_key.get(pkg, None) # more reliable than importlib
version = pkg_resources.get_distribution(pkg).version if spec is not None else ''
return version
def get_libs():
return {
'xformers': get_package_version('xformers'),
'diffusers': get_package_version('diffusers'),
'transformers': get_package_version('transformers'),
}
def get_repos():
repos = {}
for key, val in paths.paths.items():
try:
cmd = f'git -C {val} log --pretty=format:"%h %ad" -1 --date=short'
res = subprocess.run(f'{cmd} {val}', stdout = subprocess.PIPE, stderr = subprocess.PIPE, shell=True, check=True)
stdout = res.stdout.decode(encoding = 'utf8', errors='ignore') if len(res.stdout) > 0 else ''
words = stdout.split(' ')
repos[key] = f'[{words[0]}] {words[1]}'
except Exception:
repos[key] = '(unknown)'
return repos
def get_platform():
try:
if platform.system() == 'Windows':
release = platform.platform(aliased = True, terse = False)
else:
release = platform.release()
return {
# 'host': platform.node(),
'arch': platform.machine(),
'cpu': platform.processor(),
'system': platform.system(),
'release': release,
# 'platform': platform.platform(aliased = True, terse = False),
# 'version': platform.version(),
'python': platform.python_version(),
}
except Exception as e:
return { 'error': e }
def get_torch():
try:
ver = torch.__long_version__
except Exception:
ver = torch.__version__
return f"{ver} {shared.cmd_opts.precision} {' nohalf' if shared.cmd_opts.no_half else ' half'}"
def get_version():
version = {}
try:
res = subprocess.run('git log --pretty=format:"%h %ad" -1 --date=short', stdout = subprocess.PIPE, stderr = subprocess.PIPE, shell=True, check=True)
ver = res.stdout.decode(encoding = 'utf8', errors='ignore') if len(res.stdout) > 0 else ''
githash, updated = ver.split(' ')
res = subprocess.run('git remote get-url origin', stdout = subprocess.PIPE, stderr = subprocess.PIPE, shell=True, check=True)
origin = res.stdout.decode(encoding = 'utf8', errors='ignore') if len(res.stdout) > 0 else ''
res = subprocess.run('git rev-parse --abbrev-ref HEAD', stdout = subprocess.PIPE, stderr = subprocess.PIPE, shell=True, check=True)
branch = res.stdout.decode(encoding = 'utf8', errors='ignore') if len(res.stdout) > 0 else ''
url = origin.replace('\n', '') + '/tree/' + branch.replace('\n', '')
app = origin.replace('\n', '').split('/')[-1]
if app == 'automatic':
app = 'SD.next'
version = {
'app': app,
'updated': updated,
'hash': githash,
'url': url
}
except Exception:
pass
return version
def get_crossattention():
try:
ca = getattr(shared.opts, 'cross_attention_optimization', None)
if ca is None:
from modules import sd_hijack
ca = sd_hijack.model_hijack.optimization_method
return ca
except Exception:
return 'unknown'
def get_model():
from modules.sd_models import model_data
import modules.sd_vae
obj = {
'configured': {
'base': shared.opts.data.get('sd_model_checkpoint', ''),
'refiner': shared.opts.data.get('sd_model_refiner', ''),
'vae': shared.opts.data.get('sd_vae', ''),
},
'loaded': {
'base': '',
'refiner': '',
'vae': '',
}
}
try:
obj['loaded']['base'] = model_data.sd_model.sd_checkpoint_info.filename if model_data.sd_model is not None and hasattr(model_data.sd_model, 'sd_checkpoint_info') else ''
except Exception :
pass
try:
obj['loaded']['refiner'] = model_data.sd_refiner.sd_checkpoint_info.filename if model_data.sd_refiner is not None and hasattr(model_data.sd_refiner, 'sd_checkpoint_info') else ''
except Exception :
pass
try:
obj['loaded']['vae'] = modules.sd_vae.loaded_vae_file
except Exception:
pass
return obj
def get_models():
return sorted([x.title for x in sd_models.checkpoints_list.values()])
def get_samplers():
return sorted([sampler[0] for sampler in sd_samplers.all_samplers])
def get_extensions():
return sorted([f"{e.name} ({'enabled' if e.enabled else 'disabled'}{' builtin' if e.is_builtin else ''})" for e in extensions.extensions])
def get_loras():
loras = []
try:
sys.path.append(extensions.extensions_builtin_dir)
from Lora import lora # pylint: disable=E0401
loras = sorted([l for l in lora.available_loras.keys()])
except Exception:
pass
return loras
def get_device():
dev = {
'active': str(devices.device),
'dtype': str(devices.dtype),
'vae': str(devices.dtype_vae),
'unet': str(devices.dtype_unet),
}
return dev
def get_full_data():
global data # pylint: disable=global-statement
data = {
'date': datetime.datetime.now().strftime('%c'),
'timestamp': datetime.datetime.now().strftime('%X'),
'uptime': get_uptime(),
'version': get_version(),
'torch': get_torch(),
'gpu': get_gpu(),
'state': get_state(),
'memory': get_memory(),
'optimizations': get_optimizations(),
'libs': get_libs(),
'repos': get_repos(),
'device': get_device(),
'model': get_model(),
'schedulers': get_samplers(),
'extensions': get_extensions(),
'platform': get_platform(),
'crossattention': get_crossattention(),
'backend': getattr(devices, 'backend', ''),
'pipeline': shared.opts.data.get('sd_backend', ''),
}
global networks # pylint: disable=global-statement
networks = {
'models': get_models(),
'loras': get_loras(),
}
return data
def get_quick_data():
data['timestamp'] = datetime.datetime.now().strftime('%X')
data['state'] = get_state()
data['memory'] = get_memory()
data['model'] = get_model()
def list2text(lst: list):
return '\n'.join(lst)
def dict2str(d: dict):
arr = [f'{name}:{d[name]}' for i, name in enumerate(d)]
return ' '.join(arr)
def dict2text(d: dict):
arr = ['{name}: {val}'.format(name = name, val = d[name] if not type(d[name]) is dict else dict2str(d[name])) for i, name in enumerate(d)] # pylint: disable=consider-using-f-string
return list2text(arr)
def refresh_info_quick(_old_data = None):
get_quick_data()
return dict2text(data['state']), dict2text(data['memory']), data['crossattention'], data['timestamp'], data
def refresh_info_full():
get_full_data()
return data['uptime'], dict2text(data['version']), dict2text(data['state']), dict2text(data['memory']), dict2text(data['platform']), data['torch'], dict2text(data['gpu']), list2text(data['optimizations']), data['crossattention'], data['backend'], data['pipeline'], dict2text(data['libs']), dict2text(data['repos']), dict2text(data['device']), dict2text(data['model']), networks['models'], networks['loras'], data['timestamp'], data
### ui definition
def create_ui(blocks: gr.Blocks = None):
try:
if shared.cmd_opts.api_only:
return
except:
pass
if not standalone:
from modules.ui import ui_system_tabs # pylint: disable=redefined-outer-name
else:
ui_system_tabs = None
with gr.Blocks(analytics_enabled = False) if standalone else blocks as system_info:
with gr.Row(elem_id = 'system_info'):
with gr.Tabs(elem_id = 'system_info_tabs') if standalone else ui_system_tabs:
with gr.TabItem('System Info'):
with gr.Row():
timestamp = gr.Textbox(value=data['timestamp'], label = '', elem_id = 'system_info_tab_last_update', container=False)
refresh_quick_btn = gr.Button('Refresh state', elem_id = 'system_info_tab_refresh_btn', visible = False) # quick refresh is used from js interval
refresh_full_btn = gr.Button('Refresh data', elem_id = 'system_info_tab_refresh_full_btn', variant='primary')
interrupt_btn = gr.Button('Send interrupt', elem_id = 'system_info_tab_interrupt_btn', variant='primary')
with gr.Row():
with gr.Column():
uptimetxt = gr.Textbox(data['uptime'], label = 'Server start time', lines = 1)
versiontxt = gr.Textbox(dict2text(data['version']), label = 'Version', lines = len(data['version']))
with gr.Column():
statetxt = gr.Textbox(dict2text(data['state']), label = 'State', lines = len(data['state']))
with gr.Column():
memorytxt = gr.Textbox(dict2text(data['memory']), label = 'Memory', lines = len(data['memory']))
with gr.Row():
with gr.Column():
platformtxt = gr.Textbox(dict2text(data['platform']), label = 'Platform', lines = len(data['platform']))
with gr.Row():
backendtxt = gr.Textbox(data['backend'], label = 'Backend')
pipelinetxt = gr.Textbox(data['pipeline'], label = 'Pipeline')
with gr.Column():
torchtxt = gr.Textbox(data['torch'], label = 'Torch', lines = 1)
gputxt = gr.Textbox(dict2text(data['gpu']), label = 'GPU', lines = len(data['gpu']))
with gr.Row():
opttxt = gr.Textbox(list2text(data['optimizations']), label = 'Memory optimization')
attentiontxt = gr.Textbox(data['crossattention'], label = 'Cross-attention')
with gr.Column():
libstxt = gr.Textbox(dict2text(data['libs']), label = 'Libs', lines = len(data['libs']))
repostxt = gr.Textbox(dict2text(data['repos']), label = 'Repos', lines = len(data['repos']), visible = False)
devtxt = gr.Textbox(dict2text(data['device']), label = 'Device Info', lines = len(data['device']))
modeltxt = gr.Textbox(dict2text(data['model']), label = 'Model Info', lines = len(data['model']))
with gr.Row():
gr.HTML('Load<br><div id="si-sparkline-load"></div>')
gr.HTML('Memory<br><div id="si-sparkline-memo"></div>')
with gr.Accordion('Info object', open = False, visible = True):
# reduce json data to avoid private info
refresh_info_quick()
js = gr.JSON(data)
with gr.TabItem('Benchmark'):
bench_load()
with gr.Row():
benchmark_data = gr.DataFrame(bench_data, label = 'Benchmark Data', elem_id = 'system_info_benchmark_data', show_label = True, interactive = False, wrap = True, overflow_row_behaviour = 'paginate', max_rows = 10, headers = bench_headers)
with gr.Row():
with gr.Column(scale=3):
username = gr.Textbox(get_user, label = 'Username', placeholder='enter username for submission', elem_id='system_info_tab_username')
note = gr.Textbox('', label = 'Note', placeholder='enter any additional notes', elem_id='system_info_tab_note')
with gr.Column(scale=1):
with gr.Row():
warmup = gr.Checkbox(label = 'Perform warmup', value = True, elem_id = 'system_info_tab_warmup')
extra = gr.Checkbox(label = 'Extra steps', value = False, elem_id = 'system_info_tab_extra')
level = gr.Radio(['quick', 'normal', 'extensive'], value = 'normal', label = 'Benchmark level', elem_id = 'system_info_tab_level')
# batches = gr.Textbox('1, 2, 4, 8', label = 'Batch sizes', elem_id = 'system_info_tab_batch_size', interactive = False)
with gr.Column(scale=1):
bench_run_btn = gr.Button('Run benchmark', elem_id = 'system_info_tab_benchmark_btn', variant='primary')
bench_run_btn.click(bench_init, inputs = [username, note, warmup, level, extra], outputs = [benchmark_data])
bench_submit_btn = gr.Button('Submit results', elem_id = 'system_info_tab_submit_btn', variant='primary')
bench_submit_btn.click(bench_submit, inputs = [username], outputs = [])
_bench_link = gr.HTML('<a href="https://vladmandic.github.io/sd-extension-system-info/pages/benchmark.html" target="_blank">Link to online results</a>')
with gr.Row():
_bench_note = gr.HTML(elem_id = 'system_info_tab_bench_note', value = """
<span>performance is measured in iterations per second (it/s) and reported for different batch sizes (e.g. 1, 2, 4, 8, 16...)</span><br>
<span>running benchmark may take a while. extensive tests may result in gpu out-of-memory conditions.</span>""")
with gr.Row():
bench_label = gr.HTML('', elem_id = 'system_info_tab_bench_label')
refresh_bench_btn = gr.Button('Refresh bench', elem_id = 'system_info_tab_refresh_bench_btn', visible = False) # quick refresh is used from js interval
refresh_bench_btn.click(bench_refresh, inputs = [], outputs = [bench_label])
with gr.TabItem('Models & Networks'):
with gr.Row():
with gr.Column():
models = gr.JSON(networks['models'], label = 'Models')
with gr.Column():
loras = gr.JSON(networks['loras'], label = 'Available LORA')
refresh_quick_btn.click(refresh_info_quick, _js='receive_system_info', show_progress = False,
inputs = [js],
outputs = [statetxt, memorytxt, attentiontxt, timestamp, js]
)
refresh_full_btn.click(refresh_info_full, show_progress = False,
inputs = [],
outputs = [uptimetxt, versiontxt, statetxt, memorytxt, platformtxt, torchtxt, gputxt, opttxt, attentiontxt, backendtxt, pipelinetxt, libstxt, repostxt, devtxt, modeltxt, models, loras, timestamp, js]
)
interrupt_btn.click(shared.state.interrupt, inputs = [], outputs = [])
return [(system_info, 'System Info', 'system_info')]
### benchmarking module
def bench_submit(username: str):
if username is None or username == '':
log.error('SD-System-Info: username is required to submit results')
return
submit_benchmark(bench_data, username)
log.info(f'SD-System-Info: benchmark data submitted: {len(bench_data)} records')
def bench_run(batches: list = [1], extra: bool = False):
results = []
for batch in batches:
log.debug(f'SD-System-Info: benchmark starting: batch={batch}')
res = run_benchmark(batch, extra)
log.info(f'SD-System-Info: benchmark batch={batch} its={res}')
results.append(str(res))
its = ' / '.join(results)
return its
def bench_init(username: str, note: str, warmup: bool, level: str, extra: bool):
from hashlib import sha256
log.debug('SD-System-Info: benchmark starting')
get_full_data()
hash256 = sha256((dict2str(data['platform']) + data['torch'] + dict2str(data['libs']) + dict2str(data['gpu']) + ','.join(data['optimizations']) + data['crossattention']).encode('utf-8')).hexdigest()[:6]
existing = [x for x in bench_data if (x[-1] is not None and x[-1][:6] == hash256)]
if len(existing) > 0:
log.debug('SD-System-Info: benchmark replacing existing entry')
d = existing[0]
elif bench_data[-1][0] is not None:
log.debug('SD-System-Info: benchmark new entry')
bench_data.append([None] * len(bench_headers))
d = bench_data[-1]
else:
d = bench_data[-1]
if level == 'quick':
batches = [1]
elif level == 'normal':
batches = [1, 2, 4]
elif level == 'extensive':
batches = [1, 2, 4, 8, 16]
else:
batches = []
if warmup:
bench_run([1], False)
try:
mem = data['memory']['gpu']['total']
except Exception:
mem = 0
# bench_headers = ['timestamp', 'performance', 'version', 'system', 'libraries', 'gpu', 'optimizations', 'model', 'username', 'note', 'hash']
d[0] = str(datetime.datetime.now())
d[1] = bench_run(batches, extra)
d[2] = dict2str(data['version'])
d[3] = dict2str(data['platform'])
d[4] = f"torch:{data['torch']} {dict2str(data['libs'])}"
d[5] = dict2str(data['gpu']) + f' {str(round(mem))}GB'
d[6] = (data['pipeline'] + ' ' + data['crossattention'] + ' ' + ','.join(data['optimizations'])).strip()
d[7] = shared.opts.data['sd_model_checkpoint']
d[8] = username
d[9] = note
d[10] = hash256
md = '| ' + ' | '.join(d) + ' |'
log.info(f'SD-System-Info: benchmark result: {md}')
bench_save()
return bench_data
def bench_load():
global bench_data # pylint: disable=global-statement
tmp = []
if os.path.isfile(bench_file) and os.path.getsize(bench_file) > 0:
try:
with open(bench_file, 'r', encoding='utf-8') as f:
tmp = json.load(f)
bench_data = tmp
log.debug(f'SD-System-Info: benchmark data loaded: {bench_file}')
except Exception as err:
log.debug(f'SD-System-Info: benchmark error loading: {bench_file} {str(err)}')
if len(bench_data) == 0:
bench_data.append([None] * len(bench_headers))
return bench_data
def bench_save():
if bench_data[-1][0] is None:
del bench_data[-1]
try:
with open(bench_file, 'w', encoding='utf-8') as f:
json.dump(bench_data, f, indent=2, default=str, skipkeys=True)
log.debug(f'SD-System-Info: benchmark data saved: {bench_file}')
except Exception as err:
log.error(f'SD-System-Info: benchmark error saving: {bench_file} {str(err)}')
def bench_refresh():
return gr.HTML.update(value = bench_text)
### API
from typing import Optional # pylint: disable=wrong-import-order
from fastapi import FastAPI, Depends # pylint: disable=wrong-import-order
from pydantic import BaseModel, Field # pylint: disable=wrong-import-order,no-name-in-module
class StatusReq(BaseModel): # definition of http request
state: bool = Field(title="State", description="Get server state", default=False)
memory: bool = Field(title="Memory", description="Get server memory status", default=False)
full: bool = Field(title="FullInfo", description="Get full server info", default=False)
refresh: bool = Field(title="FullInfo", description="Force refresh server info", default=False)
class StatusRes(BaseModel): # definition of http response
version: dict = Field(title="Version", description="Server version")
uptime: str = Field(title="Uptime", description="Server uptime")
timestamp: str = Field(title="Timestamp", description="Data timestamp")
state: Optional[dict] = Field(title="State", description="Server state")
memory: Optional[dict] = Field(title="Memory", description="Server memory status")
platform: Optional[dict] = Field(title="Platform", description="Server platform")
torch: Optional[str] = Field(title="Torch", description="Torch version")
gpu: Optional[dict] = Field(title="GPU", description="GPU info")
optimizations: Optional[list] = Field(title="Optimizations", description="Memory optimizations")
crossatention: Optional[str] = Field(title="CrossAttention", description="Cross-attention optimization")
device: Optional[dict] = Field(title="Device", description="Device info")
backend: Optional[str] = Field(title="Backend", description="Backend")
pipeline: Optional[str] = Field(title="Pipeline", description="Pipeline")
def get_status_api(req: StatusReq = Depends()):
if req.refresh:
get_full_data()
else:
get_quick_data()
res = StatusRes(
version = data['version'],
timestamp = data['timestamp'],
uptime = data['uptime']
)
if req.state or req.full:
res.state = data['state']
if req.memory or req.full:
res.memory = data['memory']
if req.full:
res.platform = data['platform']
res.torch = data['torch']
res.gpu = data['gpu']
res.optimizations = data['optimizations']
res.crossatention = data['crossattention']
res.device = data['device']
res.backend = data['backend']
res.pipeline = data['pipeline']
return res
def register_api(app: FastAPI):
app.add_api_route("/sdapi/v1/system-info/status", get_status_api, methods=["GET"], response_model=StatusRes)
### Entry point
def on_app_started(blocks, app): # register api
register_api(app)
if not standalone:
create_ui(blocks)
"""
@app.get("/sdapi/v1/system-info/status")
async def sysinfo_api():
get_quick_data()
res = { 'state': data['state'], 'memory': data['memory'], 'timestamp': data['timestamp'] }
return res
"""
try:
from modules.ui import ui_system_tabs # pylint: disable=unused-import,ungrouped-imports
standalone = False
except:
standalone = True
if standalone:
script_callbacks.on_ui_tabs(create_ui)
script_callbacks.on_app_started(on_app_started)