83 lines
3.1 KiB
Python
83 lines
3.1 KiB
Python
from __future__ import annotations
|
|
import xml.etree.ElementTree as ET
|
|
from typing import Dict, List, Optional
|
|
from .graph import FieldDef, NodeDef, ConnectionDef, WorkflowGraph
|
|
|
|
|
|
def parse_workflow(path: str) -> WorkflowGraph:
|
|
"""Parse a .yxmd XML file into a WorkflowGraph."""
|
|
tree = ET.parse(path)
|
|
root = tree.getroot()
|
|
|
|
nodes: Dict[int, NodeDef] = {}
|
|
_collect_nodes(root, nodes)
|
|
|
|
connections: List[ConnectionDef] = []
|
|
for conn_el in root.findall("Connections/Connection"):
|
|
orig = conn_el.find("Origin")
|
|
dest = conn_el.find("Destination")
|
|
if orig is None or dest is None:
|
|
continue
|
|
connections.append(ConnectionDef(
|
|
origin_id=int(orig.attrib["ToolID"]),
|
|
origin_anchor=orig.attrib.get("Connection", "Output"),
|
|
dest_id=int(dest.attrib["ToolID"]),
|
|
dest_anchor=dest.attrib.get("Connection", "Input"),
|
|
name=conn_el.attrib.get("name"),
|
|
wireless=conn_el.attrib.get("Wireless", "False") == "True",
|
|
))
|
|
|
|
props = root.find("Properties")
|
|
return WorkflowGraph(nodes=nodes, connections=connections, properties=props)
|
|
|
|
|
|
def _collect_nodes(parent: ET.Element, nodes: Dict[int, NodeDef]) -> None:
|
|
"""Recursively collect Node elements, flattening ChildNodes containers."""
|
|
for node_el in parent.findall("Nodes/Node"):
|
|
_parse_node(node_el, nodes)
|
|
# Recurse into ChildNodes (tool containers)
|
|
child_nodes = node_el.find("ChildNodes")
|
|
if child_nodes is not None:
|
|
for child in child_nodes.findall("Node"):
|
|
_parse_node(child, nodes)
|
|
grandchildren = child.find("ChildNodes")
|
|
if grandchildren is not None:
|
|
_collect_nodes_flat(grandchildren, nodes)
|
|
|
|
|
|
def _collect_nodes_flat(parent: ET.Element, nodes: Dict[int, NodeDef]) -> None:
|
|
for node_el in parent.findall("Node"):
|
|
_parse_node(node_el, nodes)
|
|
child_nodes = node_el.find("ChildNodes")
|
|
if child_nodes is not None:
|
|
_collect_nodes_flat(child_nodes, nodes)
|
|
|
|
|
|
def _parse_node(node_el: ET.Element, nodes: Dict[int, NodeDef]) -> None:
|
|
tid = int(node_el.attrib["ToolID"])
|
|
gui = node_el.find("GuiSettings")
|
|
plugin = gui.attrib.get("Plugin", "") if gui is not None else ""
|
|
config = node_el.find("Properties/Configuration")
|
|
pos_el = gui.find("Position") if gui is not None else None
|
|
pos = (
|
|
int(pos_el.attrib.get("x", 0)),
|
|
int(pos_el.attrib.get("y", 0)),
|
|
) if pos_el is not None else (0, 0)
|
|
schema = _parse_schema(node_el)
|
|
nodes[tid] = NodeDef(tool_id=tid, plugin=plugin, config=config,
|
|
output_schema=schema, position=pos)
|
|
|
|
|
|
def _parse_schema(node_el: ET.Element) -> List[FieldDef]:
|
|
fields = []
|
|
for f in node_el.findall(".//MetaInfo/RecordInfo/Field"):
|
|
size_str = f.attrib.get("size")
|
|
size = int(float(size_str)) if size_str else None
|
|
fields.append(FieldDef(
|
|
name=f.attrib["name"],
|
|
type=f.attrib.get("type", "V_String"),
|
|
size=size,
|
|
source=f.attrib.get("source"),
|
|
))
|
|
return fields
|