Spaces:
Running
Running
File size: 4,118 Bytes
80f7283 7c43c64 80f7283 5df8b77 80f7283 5df8b77 7c43c64 5df8b77 80f7283 d254579 80f7283 7c43c64 80f7283 5df8b77 80f7283 d610227 80f7283 d610227 80f7283 d610227 80f7283 d610227 80f7283 d610227 80f7283 d610227 80f7283 d610227 80f7283 d610227 80f7283 d610227 80f7283 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 |
from functools import partial
from pathlib import Path
import pandas as pd
from huggingface_hub import hf_hub_download
from prefect import Task, flow, task
from prefect.client.schemas.objects import TaskRun
from prefect.futures import wait
from prefect.states import State
from ase.db import connect
from mlip_arena.data.local import SafeHDFStore
from mlip_arena.models import REGISTRY, MLIPEnum
from mlip_arena.tasks.eos import run as EOS
@task
def get_atoms_from_db(db_path: Path | str):
db_path = Path(db_path)
if not db_path.exists():
db_path = hf_hub_download(
repo_id="atomind/mlip-arena",
repo_type="dataset",
subfolder=f"{Path(__file__).parent.name}",
filename=str(db_path),
)
with connect(db_path) as db:
for row in db.select():
yield row.toatoms()
def save_to_hdf(
tsk: Task, run: TaskRun, state: State, fpath: Path | str, table_name: str
):
"""
Define a hook on completion of EOS task to save results to HDF5 file.
"""
if run.state.is_failed():
return
result = run.state.result(raise_on_failure=False)
if not isinstance(result, dict):
return
try:
atoms = result["atoms"]
calculator_name = (
run.task_inputs["calculator_name"] or result["calculator_name"]
)
energies = [float(e) for e in result["eos"]["energies"]]
formula = atoms.get_chemical_formula()
df = pd.DataFrame(
{
"method": calculator_name,
"formula": formula,
"total_run_time": run.total_run_time,
"v0": result["v0"],
"e0": result["e0"],
"b0": result["b0"],
"b1": result["b1"],
"volume": result["eos"]["volumes"],
"energy": energies,
}
)
fpath = Path(fpath)
fpath = fpath.with_stem(fpath.stem + f"_{calculator_name}")
family_path = Path(__file__).parent / REGISTRY[calculator_name]["family"]
family_path.mkdir(parents=True, exist_ok=True)
df.to_json(family_path / f"{calculator_name}_{formula}.json", indent=2)
with SafeHDFStore(fpath, mode="a") as store:
store.append(
table_name,
df,
format="table",
data_columns=True,
min_itemsize={"formula": 50, "method": 20},
)
except Exception as e:
print(e)
@flow(
name="EOS Alloy"
)
def run(
db_path: Path | str,
out_path: Path | str,
table_name: str,
optimizer="FIRE",
optimizer_kwargs=None,
filter="FrechetCell",
filter_kwargs=None,
criterion=dict(fmax=0.1, steps=1000),
max_abs_strain=0.20,
concurrent=False,
cache=True,
):
EOS_ = EOS.with_options(
on_completion=[partial(save_to_hdf, fpath=out_path, table_name=table_name)],
refresh_cache=not cache,
)
futures = []
for atoms in get_atoms_from_db(db_path):
for mlip in MLIPEnum:
if not REGISTRY[mlip.name]["npt"]:
continue
if Path(__file__).parent.name not in (
REGISTRY[mlip.name].get("cpu-tasks", [])
+ REGISTRY[mlip.name].get("gpu-tasks", [])
):
continue
future = EOS_.submit(
atoms=atoms,
calculator_name=mlip.name,
calculator_kwargs=dict(),
optimizer=optimizer,
optimizer_kwargs=optimizer_kwargs,
filter=filter,
filter_kwargs=filter_kwargs,
criterion=criterion,
max_abs_strain=max_abs_strain,
concurrent=concurrent,
persist_opt=cache,
cache_opt=cache,
# return_state=True
)
futures.append(future)
wait(futures)
return [
f.result(timeout=None, raise_on_failure=False)
for f in futures
if f.state.is_completed()
]
|