Spaces:
Running
Running
Add "simple" executor.
Browse files
lynxkite-core/src/lynxkite/core/executors/simple.py
ADDED
@@ -0,0 +1,64 @@
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
1 |
+
"""A LynxKite executor that simply passes the output of one box to the other."""
|
2 |
+
|
3 |
+
import os
|
4 |
+
from .. import ops
|
5 |
+
from .. import workspace
|
6 |
+
import traceback
|
7 |
+
import inspect
|
8 |
+
import graphlib
|
9 |
+
|
10 |
+
|
11 |
+
def register(env: str):
|
12 |
+
"""Registers the one-by-one executor."""
|
13 |
+
ops.EXECUTORS[env] = lambda ws: execute(ws, ops.CATALOGS[env])
|
14 |
+
|
15 |
+
|
16 |
+
async def await_if_needed(obj):
|
17 |
+
if inspect.isawaitable(obj):
|
18 |
+
return await obj
|
19 |
+
return obj
|
20 |
+
|
21 |
+
|
22 |
+
async def execute(ws: workspace.Workspace, catalog: ops.Catalog):
|
23 |
+
nodes = {n.id: n for n in ws.nodes}
|
24 |
+
dependencies = {n: [] for n in nodes}
|
25 |
+
in_edges = {n: {} for n in nodes}
|
26 |
+
for e in ws.edges:
|
27 |
+
dependencies[e.target].append(e.source)
|
28 |
+
assert e.targetHandle not in in_edges[e.target], f"Duplicate input for {e.target}"
|
29 |
+
in_edges[e.target][e.targetHandle] = e.source, e.sourceHandle
|
30 |
+
outputs = {}
|
31 |
+
ts = graphlib.TopologicalSorter(dependencies)
|
32 |
+
for node_id in ts.static_order():
|
33 |
+
node = nodes[node_id]
|
34 |
+
op = catalog[node.data.title]
|
35 |
+
params = {**node.data.params}
|
36 |
+
node.publish_started()
|
37 |
+
try:
|
38 |
+
inputs = []
|
39 |
+
missing = []
|
40 |
+
for i in op.inputs.values():
|
41 |
+
edges = in_edges[node_id]
|
42 |
+
if i.name in edges and edges[i.name] in outputs:
|
43 |
+
inputs.append(outputs[edges[i.name]])
|
44 |
+
else:
|
45 |
+
missing.append(i.name)
|
46 |
+
if missing:
|
47 |
+
node.publish_error(f"Missing input: {', '.join(missing)}")
|
48 |
+
continue
|
49 |
+
result = op(*inputs, **params)
|
50 |
+
result.output = await await_if_needed(result.output)
|
51 |
+
result.display = await await_if_needed(result.display)
|
52 |
+
if len(op.outputs) == 1:
|
53 |
+
[output] = list(op.outputs.values())
|
54 |
+
outputs[node_id, output.name] = result.output
|
55 |
+
elif len(op.outputs) > 1:
|
56 |
+
assert type(result.output) is dict, "An op with multiple outputs must return a dict"
|
57 |
+
for output in op.outputs.values():
|
58 |
+
outputs[node_id, output.name] = result.output[output.name]
|
59 |
+
node.publish_result(result)
|
60 |
+
except Exception as e:
|
61 |
+
if not os.environ.get("LYNXKITE_SUPPRESS_OP_ERRORS"):
|
62 |
+
traceback.print_exc()
|
63 |
+
node.publish_error(e)
|
64 |
+
return outputs
|
lynxkite-pillow-example/src/lynxkite_pillow_example/__init__.py
CHANGED
@@ -1,7 +1,7 @@
|
|
1 |
"""Demo for how easily we can provide a UI for popular open-source tools."""
|
2 |
|
3 |
from lynxkite.core import ops
|
4 |
-
from lynxkite.core.executors import
|
5 |
from PIL import Image, ImageFilter
|
6 |
import base64
|
7 |
import fsspec
|
@@ -9,7 +9,7 @@ import io
|
|
9 |
|
10 |
ENV = "Pillow"
|
11 |
op = ops.op_registration(ENV)
|
12 |
-
|
13 |
|
14 |
|
15 |
@op("Open image")
|
|
|
1 |
"""Demo for how easily we can provide a UI for popular open-source tools."""
|
2 |
|
3 |
from lynxkite.core import ops
|
4 |
+
from lynxkite.core.executors import simple
|
5 |
from PIL import Image, ImageFilter
|
6 |
import base64
|
7 |
import fsspec
|
|
|
9 |
|
10 |
ENV = "Pillow"
|
11 |
op = ops.op_registration(ENV)
|
12 |
+
simple.register(ENV)
|
13 |
|
14 |
|
15 |
@op("Open image")
|