# Alteryx Open-Source Workflow Runner — Technical Specification > **Target audience:** Agentic coding tools (opencode, pi, etc.) > **Stack:** Python 3.11+, DuckDB, Polars, PyArrow > **License target:** Apache 2.0 --- ## 1. Project Overview Build a Python-native runner that parses Alteryx Designer workflow XML files (`.yxmd`) and executes each tool node against real data using DuckDB (for SQL-heavy transforms and I/O) and Polars (for streaming/in-memory frame operations). The runner does **not** require Alteryx to be installed. ### 1.1 Design Principles - **Faithful semantics**: match Alteryx output column order, naming, data types and NULL behaviour exactly. - **DAG execution**: parse the `` graph, build a topological execution plan, execute nodes in dependency order. - **Lazy where possible**: use Polars LazyFrame / DuckDB views so materialisation happens once at sink nodes. - **Pluggable tool registry**: each tool is a self-contained Python class registered by its XML `Plugin` string; new tools can be dropped in without touching core execution logic. - **No GUI dependency**: CLI-first; expose a Python API for programmatic use. --- ## 2. Repository Layout ``` alteryx_runner/ ├── cli.py # Entry point: `python -m alteryx_runner run workflow.yxmd` ├── engine/ │ ├── __init__.py │ ├── parser.py # XML → WorkflowGraph │ ├── executor.py # Topological executor │ ├── graph.py # WorkflowGraph, Node, Connection dataclasses │ ├── context.py # RunContext: connection pool, temp dir, constants │ └── type_mapper.py # Alteryx type → Polars/DuckDB dtype ├── tools/ │ ├── __init__.py # Tool registry │ ├── base.py # BaseTool ABC │ ├── inout/ │ │ ├── input_data.py │ │ ├── output_data.py │ │ ├── text_input.py │ │ └── browse.py │ ├── preparation/ │ │ ├── filter_tool.py │ │ ├── formula_tool.py │ │ ├── select_tool.py │ │ ├── sort_tool.py │ │ ├── sample_tool.py │ │ ├── unique_tool.py │ │ ├── generate_rows.py │ │ ├── multi_row_formula.py │ │ ├── multi_field_formula.py │ │ ├── record_id.py │ │ └── auto_field.py │ ├── join/ │ │ ├── join_tool.py │ │ ├── join_multiple.py │ │ ├── union_tool.py │ │ ├── append_fields.py │ │ └── find_replace.py │ ├── parse/ │ │ ├── datetime_tool.py │ │ ├── regex_tool.py │ │ └── text_to_columns.py │ └── transform/ │ ├── summarize_tool.py │ ├── cross_tab.py │ └── transpose_tool.py ├── expression/ │ ├── __init__.py │ ├── transpiler.py # Alteryx expression → DuckDB SQL / Polars expr │ └── functions.py # Built-in function mappings ├── tests/ │ ├── fixtures/ # .yxmd files (from uploaded samples) │ └── test_*.py └── pyproject.toml ``` --- ## 3. .yxmd File Format Alteryx workflow files are UTF-8 XML. The root element is ``. ### 3.1 Top-Level Structure ```xml ... ... ... ``` ### 3.2 Node Structure ```xml ... ``` **Key attributes for the runner:** - `ToolID`: integer, unique per workflow. Use as node identifier. - `Plugin` in ``: maps to the tool class (see registry table §4). - ``: tool-specific settings; parse per-tool. - `//`: output schema hint (available on input tools; not always present on transform tools). - ``: `Macro` attribute signals a macro/yxmc sub-workflow; handle recursively. ### 3.3 Connection Structure ```xml ``` - `Connection` attribute on `` / ``: the anchor label. - Most tools: `"Output"` / `"Input"`. - Filter: `"True"` / `"False"`. - Join: `"Left"` / `"Right"` (inputs), `"J"` / `"L"` / `"R"` (outputs). - JoinMultiple: `"Input"` (numbered), `"Output"`. - Union: numbered `"Input1"`, `"Input2"`, etc. - `Wireless="True"`: logical connection, no visual wire; treat identically to wired. - `name="#N"`: numbered for multi-input tools; determines input slot ordering. ### 3.4 ChildNodes (Tool Containers) ```xml ... ... ``` Flatten `` into the parent graph. The container itself is a no-op passthrough for execution. --- ## 4. Tool Registry Map `Plugin` string → Python class. The runner looks up the `Plugin` from ``. | Plugin String | Class | Category | |---|---|---| | `AlteryxBasePluginsGui.DbFileInput.DbFileInput` | `InputDataTool` | In/Out | | `AlteryxBasePluginsGui.DbFileOutput.DbFileOutput` | `OutputDataTool` | In/Out | | `AlteryxBasePluginsGui.TextInput.TextInput` | `TextInputTool` | In/Out | | `AlteryxBasePluginsGui.BrowseV2.BrowseV2` | `BrowseTool` | In/Out | | `AlteryxBasePluginsGui.Filter.Filter` | `FilterTool` | Preparation | | `AlteryxBasePluginsGui.Formula.Formula` | `FormulaTool` | Preparation | | `AlteryxBasePluginsGui.AlteryxSelect.AlteryxSelect` | `SelectTool` | Preparation | | `AlteryxBasePluginsGui.Sort.Sort` | `SortTool` | Preparation | | `AlteryxBasePluginsGui.Sample.Sample` | `SampleTool` | Preparation | | `AlteryxBasePluginsGui.Unique.Unique` | `UniqueTool` | Preparation | | `AlteryxBasePluginsGui.GenerateRows.GenerateRows` | `GenerateRowsTool` | Preparation | | `AlteryxBasePluginsGui.MultiRowFormula.MultiRowFormula` | `MultiRowFormulaTool` | Preparation | | `AlteryxBasePluginsGui.MultiFieldFormula.MultiFieldFormula` | `MultiFieldFormulaTool` | Preparation | | `AlteryxBasePluginsGui.RecordID.RecordID` | `RecordIDTool` | Preparation | | `AlteryxBasePluginsGui.AutoField.AutoField` | `AutoFieldTool` | Preparation | | `AlteryxBasePluginsGui.Join.Join` | `JoinTool` | Join | | `AlteryxBasePluginsGui.JoinMultiple.JoinMultiple` | `JoinMultipleTool` | Join | | `AlteryxBasePluginsGui.Union.Union` | `UnionTool` | Join | | `AlteryxBasePluginsGui.AppendFields.AppendFields` | `AppendFieldsTool` | Join | | `AlteryxBasePluginsGui.FindReplace.FindReplace` | `FindReplaceTool` | Join | | `AlteryxBasePluginsGui.DateTime.DateTime` | `DateTimeTool` | Parse | | `AlteryxBasePluginsGui.RegEx.RegEx` | `RegExTool` | Parse | | `AlteryxBasePluginsGui.TextToColumns.TextToColumns` | `TextToColumnsTool` | Parse | | `AlteryxSpatialPluginsGui.Summarize.Summarize` | `SummarizeTool` | Transform | | `AlteryxBasePluginsGui.CrossTab.CrossTab` | `CrossTabTool` | Transform | | `AlteryxBasePluginsGui.Transpose.Transpose` | `TransposeTool` | Transform | | `AlteryxGuiToolkit.ToolContainer.ToolContainer` | `PassthroughTool` | Documentation | | `AlteryxGuiToolkit.TextBox.TextBox` | `NullTool` | Documentation | --- ## 5. Data Model ### 5.1 Alteryx → Python Type Mapping | Alteryx Type | Polars dtype | DuckDB type | Notes | |---|---|---|---| | `Bool` | `pl.Boolean` | `BOOLEAN` | | | `Byte` | `pl.UInt8` | `TINYINT UNSIGNED` | 0–255 | | `Int16` | `pl.Int16` | `SMALLINT` | | | `Int32` | `pl.Int32` | `INTEGER` | | | `Int64` | `pl.Int64` | `BIGINT` | | | `Float` | `pl.Float32` | `FLOAT` | | | `Double` | `pl.Float64` | `DOUBLE` | | | `FixedDecimal` | `pl.Decimal(p,s)` | `DECIMAL(p,s)` | size="19.2" → DECIMAL(19,2) | | `String` | `pl.String` | `VARCHAR` | fixed max length | | `V_String` | `pl.String` | `VARCHAR` | variable length | | `WString` | `pl.String` | `VARCHAR` | wide string | | `V_WString` | `pl.String` | `VARCHAR` | variable wide string | | `Date` | `pl.Date` | `DATE` | | | `Time` | `pl.Time` | `TIME` | | | `DateTime` | `pl.Datetime` | `TIMESTAMP` | | | `SpatialObj` | `pl.String` | `VARCHAR` | GeoJSON string; skip in non-spatial mode | | `Blob` | `pl.Binary` | `BLOB` | | ### 5.2 Internal Frame Type All frames are passed between tools as `polars.DataFrame`. DuckDB is used for: - Reading/writing external files (CSV, Parquet, XLSX via `duckdb.read_csv`, `read_parquet`, etc.) - Complex SQL generation (joins, aggregates, window functions) - Expression evaluation where transpilation to DuckDB SQL is simpler than Polars Conversion helpers: ```python import polars as pl import duckdb def polars_to_duckdb(df: pl.DataFrame, name: str, con: duckdb.DuckDBPyConnection): con.register(name, df.to_arrow()) def duckdb_to_polars(con: duckdb.DuckDBPyConnection, sql: str) -> pl.DataFrame: return pl.from_arrow(con.execute(sql).arrow()) ``` --- ## 6. Core Engine ### 6.1 Parser (`engine/parser.py`) ```python from dataclasses import dataclass, field from typing import Dict, List, Optional import xml.etree.ElementTree as ET @dataclass class FieldDef: name: str type: str size: Optional[int] = None source: Optional[str] = None @dataclass class NodeDef: tool_id: int plugin: str config: ET.Element # raw element output_schema: List[FieldDef] = field(default_factory=list) position: tuple = (0, 0) @dataclass class ConnectionDef: origin_id: int origin_anchor: str # e.g. "Output", "True", "J" dest_id: int dest_anchor: str # e.g. "Input", "Left", "#1" name: Optional[str] = None wireless: bool = False @dataclass class WorkflowGraph: nodes: Dict[int, NodeDef] connections: List[ConnectionDef] properties: ET.Element # element def parse_workflow(path: str) -> WorkflowGraph: """Parse .yxmd XML into WorkflowGraph.""" tree = ET.parse(path) root = tree.getroot() nodes = {} for node_el in root.iter("Node"): # iter flattens ChildNodes tid = int(node_el.attrib["ToolID"]) gui = node_el.find("GuiSettings") plugin = gui.attrib.get("Plugin", "") if gui is not None else "" config = node_el.find("Properties/Configuration") pos_el = gui.find("Position") if gui is not None else None pos = (int(pos_el.attrib.get("x",0)), int(pos_el.attrib.get("y",0))) if pos_el is not None else (0,0) schema = _parse_schema(node_el) nodes[tid] = NodeDef(tid, plugin, config, schema, pos) connections = [] for conn_el in root.findall("Connections/Connection"): orig = conn_el.find("Origin") dest = conn_el.find("Destination") connections.append(ConnectionDef( origin_id=int(orig.attrib["ToolID"]), origin_anchor=orig.attrib.get("Connection","Output"), dest_id=int(dest.attrib["ToolID"]), dest_anchor=dest.attrib.get("Connection","Input"), name=conn_el.attrib.get("name"), wireless=conn_el.attrib.get("Wireless","False")=="True" )) props = root.find("Properties") return WorkflowGraph(nodes, connections, props) def _parse_schema(node_el: ET.Element) -> List[FieldDef]: fields = [] for f in node_el.findall(".//MetaInfo/RecordInfo/Field"): size_str = f.attrib.get("size") size = int(float(size_str)) if size_str else None fields.append(FieldDef(f.attrib["name"], f.attrib.get("type","V_String"), size)) return fields ``` ### 6.2 Executor (`engine/executor.py`) ```python from collections import defaultdict, deque import polars as pl from .graph import WorkflowGraph, ConnectionDef from .context import RunContext from tools import get_tool_class def execute(graph: WorkflowGraph, ctx: RunContext): """Topological BFS execution.""" # Build adjacency in_degree = defaultdict(int) successors = defaultdict(list) # tool_id → [(conn, dest_id)] predecessors = defaultdict(list) # tool_id → [(conn, origin_id)] for c in graph.connections: in_degree[c.dest_id] += 1 successors[c.origin_id].append(c) predecessors[c.dest_id].append(c) for tid in graph.nodes: if tid not in in_degree: in_degree[tid] = 0 # Output buffers: (tool_id, anchor) → DataFrame outputs: dict[tuple, pl.DataFrame] = {} queue = deque([tid for tid, deg in in_degree.items() if deg == 0]) while queue: tid = queue.popleft() node = graph.nodes[tid] tool_cls = get_tool_class(node.plugin) if tool_cls is None: # Unsupported / documentation tool — pass through if single input _passthrough(tid, predecessors, outputs, successors, in_degree, queue) continue tool = tool_cls(node, ctx) # Gather inputs: dict of anchor → DataFrame inputs = {} for c in predecessors[tid]: df = outputs.get((c.origin_id, c.origin_anchor)) if df is not None: inputs[c.dest_anchor] = df # Execute result = tool.execute(inputs) # returns dict[anchor, DataFrame] for anchor, df in result.items(): outputs[(tid, anchor)] = df # Decrement in-degree of successors for c in successors[tid]: in_degree[c.dest_id] -= 1 if in_degree[c.dest_id] == 0: queue.append(c.dest_id) return outputs ``` ### 6.3 Base Tool (`tools/base.py`) ```python from abc import ABC, abstractmethod from typing import Dict import polars as pl import xml.etree.ElementTree as ET from engine.graph import NodeDef from engine.context import RunContext class BaseTool(ABC): def __init__(self, node: NodeDef, ctx: RunContext): self.node = node self.ctx = ctx self.config = node.config # ET.Element @abstractmethod def execute(self, inputs: Dict[str, pl.DataFrame]) -> Dict[str, pl.DataFrame]: """ inputs: dict of anchor_name → DataFrame returns: dict of anchor_name → DataFrame """ def _cfg(self, xpath: str, default=None): """Helper: get text of config sub-element.""" el = self.config.find(xpath) if self.config is not None else None return el.text if el is not None else default def _cfg_attr(self, xpath: str, attr: str, default=None): el = self.config.find(xpath) if self.config is not None else None return el.attrib.get(attr, default) if el is not None else default ``` --- ## 7. Tool Implementations ### 7.1 InputData (`AlteryxBasePluginsGui.DbFileInput.DbFileInput`) **XML Configuration:** ```xml path/to/file.csv , True 1 28591 ``` **File path syntax:** - `path/to/file.csv` — plain file - `path/file.xlsx|||``SheetName$`` ` — Excel sheet (triple pipe separator) - `path/file.xlsx|||```` ` — returns column: `Sheet Names` - `path/file.xlsx|||``NamedRange`` ` — named range **FileFormat codes (partial):** | Code | Format | |---|---| | 0 | CSV / delimited text | | 19 | YXDB (Alteryx native) | | 25 | Excel (.xlsx) | | 6 | Fixed-width | **Implementation strategy:** ```python class InputDataTool(BaseTool): def execute(self, inputs): file_el = self.config.find("File") raw_path = file_el.text.strip() fmt = int(file_el.attrib.get("FileFormat","0")) record_limit = file_el.attrib.get("RecordLimit","") or None limit = int(record_limit) if record_limit else None opts = self.config.find("FormatSpecificOptions") or ET.Element("x") # Resolve relative path against workflow directory path, sheet = self._parse_path(raw_path) df = self._read(path, fmt, sheet, opts, limit) return {"Output": df} def _parse_path(self, raw): if "|||" in raw: path, sheet = raw.split("|||", 1) return path.strip(), sheet.strip().strip("`") return raw.strip(), None def _read(self, path, fmt, sheet, opts, limit): import duckdb, polars as pl con = self.ctx.duckdb_con if fmt == 0: # CSV delim = opts.findtext("Delimeter", ",") header = opts.findtext("HeaderRow","True") == "True" skip = max(0, int(opts.findtext("ImportLine","1") or 1) - 1) df = pl.read_csv(path, separator=delim, has_header=header, skip_rows=skip) elif fmt == 25: # Excel df = pl.read_excel(path, sheet_name=sheet) elif fmt == 19: # YXDB — use DuckDB + yxdb reader or fallback raise NotImplementedError("YXDB: use yxdb-py library") else: df = pl.read_csv(path) # best-effort fallback if limit: df = df.head(limit) return df ``` **YXDB support:** Use the `yxdb` Python package (`pip install yxdb`) which reads Alteryx's native binary format. --- ### 7.2 TextInput (`AlteryxBasePluginsGui.TextInput.TextInput`) **XML Configuration:** ```xml 49 Thomas 456 ``` **Implementation:** ```python class TextInputTool(BaseTool): def execute(self, inputs): fields = [f.attrib["name"] for f in self.config.findall("Fields/Field")] rows = [] for r in self.config.findall("Data/r"): cells = r.findall("c") row = {} for i, f in enumerate(fields): el = cells[i] if i < len(cells) else None text = el.text if el is not None else None row[f] = text if text != "" else None rows.append(row) df = pl.DataFrame(rows, schema={f: pl.String for f in fields}) return {"Output": df} ``` Note: All columns are `String` unless downstream Select/Formula coerces them. --- ### 7.3 OutputData (`AlteryxBasePluginsGui.DbFileOutput.DbFileOutput`) **XML Configuration:** ```xml %temp%output.csv , True CRLF 28591 Suffix Region ``` **Implementation notes:** - Resolve `%temp%` to `tempfile.gettempdir()`. - If `MultiFile value="True"`: partition by `MultiFileField`, write one file per unique value, append/prepend value to filename. - If `MaxRecords` non-empty: split into chunks of that size, auto-number extra files with `_1`, `_2` suffix. --- ### 7.4 Browse (`AlteryxBasePluginsGui.BrowseV2.BrowseV2`) **Behaviour:** Passthrough — output equals input. In the runner, log a preview (first N rows) to stdout or a callback. ```python class BrowseTool(BaseTool): def execute(self, inputs): df = inputs.get("Input", pl.DataFrame()) if self.ctx.verbose: print(f"[Browse ToolID={self.node.tool_id}]\n{df.head(20)}") return {"Output": df} ``` --- ### 7.5 Filter (`AlteryxBasePluginsGui.Filter.Filter`) **XML Configuration — two modes:** **Mode: Simple (Basic filter)** ```xml [CustomerID] > 30 Simple > CustomerID 30 fixed True days 2 ``` **Mode: Custom (expression)** ```xml Custom ``` **Anchors:** `Input` → `True` (rows matching), `False` (rows not matching). **Operator mapping (Simple mode):** | Alteryx Operator | Polars equivalent | |---|---| | `=` / `Equals` | `col == value` | | `!=` / `Does not equal` | `col != value` | | `>` | `col > value` | | `>=` | `col >= value` | | `<` | `col < value` | | `<=` | `col <= value` | | `IsNull` | `col.is_null()` | | `IsNotNull` | `col.is_not_null()` | | `Contains` | `col.str.contains(value)` | | `Does not contain` | `~col.str.contains(value)` | | `Is empty` | `col.is_null() | col == ""` | | `Is not empty` | `col.is_not_null() & (col != "")` | | `Comes before (<)` | `col < value` (string compare) | | `Comes after (>)` | `col > value` (string compare) | | `Is true` | `col == True` | | `Is false` | `col == False` | | `Range` | `(col >= start) & (col <= end)` | | `PeriodAfter` | date window around anchor date | | `PeriodBefore` | date window before anchor date | **Date dynamic values:** - `today` → `datetime.date.today()` - `tomorrow` → `today + timedelta(1)` - `yesterday` → `today - timedelta(1)` **Custom mode:** transpile the `` text via `expression/transpiler.py` (see §8). **Implementation:** ```python class FilterTool(BaseTool): def execute(self, inputs): df = inputs.get("Input", pl.DataFrame()) mode = self._cfg("Mode", "Custom") if mode == "Simple": mask = self._build_simple_mask(df) else: expr_text = self._cfg("Expression","True") mask = self.ctx.transpiler.eval_mask(df, expr_text) return { "True": df.filter(mask), "False": df.filter(~mask), } ``` --- ### 7.6 Formula (`AlteryxBasePluginsGui.Formula.Formula`) **XML Configuration:** ```xml ``` **Behaviour:** - Each `` is applied sequentially; later expressions can reference columns created by earlier ones. - If `field` matches an existing column: update in place. - If `field` is new: append as new column. - Expression HTML entities: `>` → `>`, `<` → `<`, `"` → `"`, ` ` → newline. **Implementation:** ```python class FormulaTool(BaseTool): def execute(self, inputs): df = inputs.get("Input", pl.DataFrame()) for ff in self.config.findall("FormulaFields/FormulaField"): expr_text = ff.attrib["expression"] field = ff.attrib["field"] dtype = self.ctx.type_mapper.map(ff.attrib.get("type","V_WString"), ff.attrib.get("size")) series = self.ctx.transpiler.eval_series(df, expr_text, field, dtype) if field in df.columns: df = df.with_columns(series.alias(field)) else: df = df.with_columns(series.alias(field)) return {"Output": df} ``` --- ### 7.7 Select (`AlteryxBasePluginsGui.AlteryxSelect.AlteryxSelect`) **XML Configuration:** ```xml ``` **Behaviour:** - `selected="False"`: drop the column. - `rename` attribute: rename column. - `type` + `size` attribute on SelectField: cast column to new type. - `*Unknown` row: if `selected="True"`, pass through any incoming columns not explicitly listed; if `selected="False"`, drop them. - `OrderChanged="True"`: output columns in the order listed in ``. **Implementation:** ```python class SelectTool(BaseTool): def execute(self, inputs): df = inputs.get("Input", pl.DataFrame()) fields = self.config.findall("SelectFields/SelectField") keep_unknown = False explicit = {} # original_name → (selected, rename, dtype) for sf in fields: name = sf.attrib["field"] if name == "*Unknown": keep_unknown = sf.attrib.get("selected","True") == "True" continue explicit[name] = { "selected": sf.attrib.get("selected","True") == "True", "rename": sf.attrib.get("rename", name), "type": sf.attrib.get("type"), "size": sf.attrib.get("size"), } order_changed = self.config.attrib.get("OrderChanged","False") == "True" # Build column list in config order if OrderChanged, else original order result_cols = [] processed = set() for sf in fields: name = sf.attrib["field"] if name == "*Unknown": continue if name not in df.columns: continue meta = explicit[name] if not meta["selected"]: continue processed.add(name) col = pl.col(name) if meta["type"]: col = col.cast(self.ctx.type_mapper.map(meta["type"], meta["size"])) result_cols.append(col.alias(meta["rename"])) if keep_unknown: for c in df.columns: if c not in processed: result_cols.append(pl.col(c)) if not order_changed: # respect incoming column order result_cols = sorted(result_cols, key=lambda e: df.columns.index(e._pyexpr.root_names()[0]) if hasattr(e, '_pyexpr') else 999) return {"Output": df.select(result_cols)} ``` --- ### 7.8 Sort (`AlteryxBasePluginsGui.Sort.Sort`) **XML Configuration:** ```xml ``` **Attributes:** - `order`: `"Ascending"` | `"Descending"` - `locale`: `0` = default byte-order sort; `1033` = dictionary order (natural number sort for strings, English locale) **Implementation:** ```python class SortTool(BaseTool): def execute(self, inputs): df = inputs.get("Input", pl.DataFrame()) sort_fields = self.config.findall("SortInfo/Field") by = [f.attrib["field"] for f in sort_fields] descending = [f.attrib.get("order","Ascending") == "Descending" for f in sort_fields] locale = self.config.findtext("SortInfo[@locale]") or "0" # locale 1033 = natural sort (numeric strings as numbers) # Polars: use sort with maintain_order=True for stability df = df.sort(by=by, descending=descending, maintain_order=True) return {"Output": df} ``` Note: For `locale="1033"` (dictionary/natural order on string columns containing numbers), apply a Schwartzian transform: extract numeric prefix for sort, then restore original values. --- ### 7.9 Sample (`AlteryxBasePluginsGui.Sample.Sample`) **XML Configuration:** ```xml First 3 ``` **Modes:** | Mode | Behaviour | |---|---| | `First` | Return first N rows (per group if GroupFields present) | | `Last` | Return last N rows | | `Sample` | Return 1 of every N rows (deterministic, not random) | | `Random` | Each row has 1-in-N chance of inclusion (non-deterministic) | | `NPercent` | Return first N% of rows | **Implementation:** ```python class SampleTool(BaseTool): def execute(self, inputs): df = inputs.get("Input", pl.DataFrame()) mode = self._cfg("Mode","First") n = int(self._cfg("N","1")) group_fields = [f.attrib["name"] for f in self.config.findall("GroupFields/Field")] if not group_fields: return {"Output": self._sample_flat(df, mode, n)} # Grouped sampling parts = [] for _, group_df in df.group_by(group_fields, maintain_order=True): parts.append(self._sample_flat(group_df, mode, n)) return {"Output": pl.concat(parts)} def _sample_flat(self, df, mode, n): if mode == "First": return df.head(n) elif mode == "Last": return df.tail(n) elif mode == "Sample": indices = range(0, len(df), n) return df[list(indices)] elif mode == "Random": mask = [random.random() < (1/n) for _ in range(len(df))] return df.filter(mask) elif mode == "NPercent": count = max(1, int(len(df) * n / 100)) return df.head(count) return df ``` --- ### 7.10 Unique (`AlteryxBasePluginsGui.Unique.Unique`) **XML Configuration:** ```xml ``` **Behaviour:** - Group by the listed fields. - First occurrence of each group → `Unique` anchor output. - All subsequent duplicates → `Duplicate` anchor output. - Column order preserved. **Anchors:** `Input` → `Unique`, `Duplicate` **Implementation:** ```python class UniqueTool(BaseTool): def execute(self, inputs): df = inputs.get("Input", pl.DataFrame()) key_fields = [f.attrib["name"] for f in self.config.findall("UniqueFields/Field")] df = df.with_row_index("_row_idx") first_idx = df.group_by(key_fields, maintain_order=True).agg(pl.col("_row_idx").first())["_row_idx"] unique_df = df.filter(pl.col("_row_idx").is_in(first_idx)).drop("_row_idx") dup_df = df.filter(~pl.col("_row_idx").is_in(first_idx)).drop("_row_idx") return {"Unique": unique_df, "Duplicate": dup_df} ``` --- ### 7.11 Join (`AlteryxBasePluginsGui.Join.Join`) **XML Configuration:** ```xml ... ... ``` The join key pairs are stored as child elements: ```xml ``` **Anchors:** - Inputs: `Left`, `Right` - Outputs: `J` (inner join), `L` (left unmatched), `R` (right unmatched) **Column naming on J output:** - If both inputs share a column name (other than join key): prefix with `L_` and `R_`. - Join key columns: appear once (from Left input by default). **Implementation using DuckDB:** ```python class JoinTool(BaseTool): def execute(self, inputs): left = inputs.get("Left", pl.DataFrame()) right = inputs.get("Right", pl.DataFrame()) by_pos = self._cfg_attr("JoinByRecordPos","value","False") == "True" con = self.ctx.duckdb_con con.register("_left", left.to_arrow()) con.register("_right", right.to_arrow()) if by_pos: left = left.with_row_index("_pos") right = right.with_row_index("_pos") # join on _pos join_keys = [("_pos","_pos")] else: join_keys = [(jf.attrib["Left"], jf.attrib["Right"]) for jf in self.config.findall("JoinFields/JoinField")] j_df, l_df, r_df = self._execute_join(left, right, join_keys, con) return {"J": j_df, "L": l_df, "R": r_df} def _execute_join(self, left, right, join_keys, con): # Build column lists with disambiguation l_cols = left.columns r_cols = right.columns key_l = [k[0] for k in join_keys] key_r = [k[1] for k in join_keys] # ... build SQL for inner, left anti, right anti # Inner join (J): on_clause = " AND ".join(f"l.{k[0]} = r.{k[1]}" for k in join_keys) j_sql = f"SELECT * FROM _left l INNER JOIN _right r ON {on_clause}" l_sql = f"SELECT l.* FROM _left l LEFT ANTI JOIN _right r ON {on_clause}" r_sql = f"SELECT r.* FROM _right r LEFT ANTI JOIN _left l ON {on_clause}" j_df = duckdb_to_polars(con, j_sql) l_df = duckdb_to_polars(con, l_sql) r_df = duckdb_to_polars(con, r_sql) return j_df, l_df, r_df ``` --- ### 7.12 Union (`AlteryxBasePluginsGui.Union.Union`) **XML Configuration:** ```xml Auto ``` **Modes:** - `Auto` / `ByName`: match columns by name; fill missing columns with NULL. - `ByPosition`: stack columns positionally regardless of name; use first input's column names. **Anchors:** Multiple `Input1`, `Input2`, ... `InputN`; single `Output`. **Implementation:** ```python class UnionTool(BaseTool): def execute(self, inputs): mode = self._cfg("Mode","Auto") # Sort input anchors: Input, Input1, Input2... or by numeric suffix sorted_keys = sorted(inputs.keys(), key=lambda k: int(k.replace("Input","") or 0)) dfs = [inputs[k] for k in sorted_keys] if mode == "ByPosition": # Rename all to first df's columns names = dfs[0].columns renamed = [df.rename(dict(zip(df.columns, names))) for df in dfs] result = pl.concat(renamed, how="diagonal") else: # ByName / Auto result = pl.concat(dfs, how="diagonal_relaxed") return {"Output": result} ``` --- ### 7.13 Summarize (`AlteryxSpatialPluginsGui.Summarize.Summarize`) **XML Configuration:** ```xml ``` **Action mapping:** | Alteryx Action | DuckDB / Polars equivalent | |---|---| | `GroupBy` | `group_by` key | | `Sum` | `pl.col(f).sum()` | | `Count` | `pl.col(f).count()` | | `Count Non Null` | `pl.col(f).drop_nulls().count()` | | `Count Distinct` | `pl.col(f).n_unique()` | | `Count Distinct Non Null` | `pl.col(f).drop_nulls().n_unique()` | | `Count Null` | `pl.col(f).is_null().sum()` | | `Min` | `pl.col(f).min()` | | `Max` | `pl.col(f).max()` | | `Avg` | `pl.col(f).mean()` | | `Median` | `pl.col(f).median()` | | `Std Deviation` | `pl.col(f).std()` | | `Variance` | `pl.col(f).var()` | | `Percentile` | `pl.col(f).quantile(p/100)` | | `Concatenate` | `pl.col(f).sort().cast(str).str.join(sep)` | | `First` | `pl.col(f).first()` | | `Last` | `pl.col(f).last()` | | `Mode` | custom UDF | **Implementation:** ```python class SummarizeTool(BaseTool): def execute(self, inputs): df = inputs.get("Input", pl.DataFrame()) fields = self.config.findall("SummarizeFields/SummarizeField") group_fields = [f.attrib["field"] for f in fields if f.attrib.get("action")=="GroupBy"] agg_exprs = [] for f in fields: action = f.attrib.get("action","GroupBy") field = f.attrib["field"] rename = f.attrib.get("rename", field) if action == "GroupBy": continue expr = self._build_agg(field, action, f.attrib) agg_exprs.append(expr.alias(rename)) if group_fields: result = df.group_by(group_fields, maintain_order=True).agg(agg_exprs) else: result = df.select(agg_exprs) return {"Output": result} ``` --- ### 7.14 JoinMultiple (`AlteryxBasePluginsGui.JoinMultiple.JoinMultiple`) **XML Configuration:** ```xml ``` **Behaviour:** - Joins N inputs simultaneously (by position or by field). - Column names from each input are prefixed with `Input_#N_` to disambiguate. - Single `Output` anchor. - `OutputJoinOnly="False"`: include all rows (outer join on all inputs). --- ### 7.15 AppendFields (`AlteryxBasePluginsGui.AppendFields.AppendFields`) **Behaviour:** Cartesian product of a small "Source" input appended to every row of a large "Target" input. One-to-many: every Target row gets one or more Source rows' fields added. **Anchors:** `Target` (large), `Source` (small) → `Output` **Implementation:** Cross join if source is small; otherwise warn. --- ### 7.16 CrossTab (`AlteryxBasePluginsGui.CrossTab.CrossTab`) **XML Configuration:** ```xml ``` **Behaviour:** Pivot table: rows = GroupBy values, columns = unique values of `HeaderField`, cell = aggregate of `DataField`. **Implementation using DuckDB PIVOT:** ```python class CrossTabTool(BaseTool): def execute(self, inputs): df = inputs.get("Input", pl.DataFrame()) con = self.ctx.duckdb_con con.register("_ct_input", df.to_arrow()) header = self._cfg("HeaderField/@field") or self.config.find("HeaderField").attrib["field"] data = self.config.find("DataField") data_field = data.attrib["field"] method = data.attrib.get("method","Sum").upper() groups = [f.attrib["name"] for f in self.config.findall("GroupFields/Field")] group_clause = ", ".join(groups) # DuckDB PIVOT sql = f""" PIVOT _ct_input ON "{header}" USING {method}("{data_field}") GROUP BY {group_clause} """ result = duckdb_to_polars(con, sql) return {"Output": result} ``` --- ### 7.17 Transpose (`AlteryxBasePluginsGui.Transpose.Transpose`) **XML Configuration:** ```xml ``` **Behaviour:** Un-pivot (melt). Key fields remain; each Data field becomes a row with `Name` (field name) and `Value` columns. **Implementation:** ```python class TransposeTool(BaseTool): def execute(self, inputs): df = inputs.get("Input", pl.DataFrame()) keys = [f.attrib["name"] for f in self.config.findall("KeyFields/Field")] data_cols = [f.attrib["name"] for f in self.config.findall("DataFields/Field")] result = df.melt(id_vars=keys, value_vars=data_cols, variable_name="Name", value_name="Value") return {"Output": result} ``` --- ### 7.18 DateTime Parse (`AlteryxBasePluginsGui.DateTime.DateTime`) **XML Configuration:** ```xml %d/%m/%Y DateString ParsedDate String Date ``` **Implementation:** `pl.col(input).str.to_date(format)` or `str.to_datetime(format)`. --- ### 7.19 RegEx (`AlteryxBasePluginsGui.RegEx.RegEx`) **XML Configuration:** ```xml EmailAddress (\w+)@(\w+)\.(\w+) $1_at_$2 Match Matched ``` **Methods:** - `Match`: boolean column; True if pattern matches. - `Replace`: replace with `ReplaceString` (use `$1`, `$2` for capture groups → `\1`, `\2` in Python). - `Parse`: extract capture groups into new columns (one per group). - `Token`: split by pattern, return one row per token. --- ### 7.20 TextToColumns (`AlteryxBasePluginsGui.TextToColumns.TextToColumns`) **XML Configuration:** ```xml AddressField , Address_ ``` **Behaviour:** - `SplitToRows="False"`: split into `NumCols` new columns named `{RootName}1`, `{RootName}2`, etc. - `SplitToRows="True"`: one row per token. --- ### 7.21 GenerateRows (`AlteryxBasePluginsGui.GenerateRows.GenerateRows`) **XML Configuration:** ```xml JoinDate Date DateTimeFormat(DateTimeNow(),"%Y-%m-%d") [JoinDate] <= DateTimeAdd("2025-01-01", 5, "Days") DateTimeAdd([JoinDate], 1, "days") ``` **Behaviour:** Generate rows by: 1. Initialise the target field using `Expression_Init`. 2. While `Expression_Cond` is true: a. Emit current row. b. Update field with `Expression_Loop`. **Implementation note:** Because GenerateRows typically has no input (or one input row acts as a seed), implement as an iterative Python loop evaluating the transpiled expressions. --- ### 7.22 MultiRowFormula (`AlteryxBasePluginsGui.MultiRowFormula.MultiRowFormula`) **XML Configuration:** ```xml ``` **Expression syntax:** - `[Row-1:FieldName]` → previous row's value of `FieldName` (null at boundary). - `[Row+1:FieldName]` → next row's value. - `[Row-N:FieldName]` → N rows back. **Implementation:** Use Polars `shift()` for lookback/lookahead, or DuckDB window functions (`LAG`, `LEAD`). --- ### 7.23 RecordID (`AlteryxBasePluginsGui.RecordID.RecordID`) **XML Configuration:** ```xml RecordID 1 Int32 ``` **Implementation:** `df.with_row_index(name=field, offset=start_value - 1)` then cast to int type. --- ## 8. Expression Transpiler The `expression/transpiler.py` module converts Alteryx expression syntax into either: - A Polars expression (`pl.Expr`) for simple column-level operations. - A DuckDB SQL fragment for complex/multi-step cases. ### 8.1 Syntax Differences | Alteryx | Python/Polars/DuckDB | |---|---| | `[ColumnName]` | `pl.col("ColumnName")` / `"ColumnName"` | | `"string literal"` | `'string literal'` | | `IF ... THEN ... ELSEIF ... ELSE ... ENDIF` | `pl.when(...).then(...).when(...).then(...).otherwise(...)` / `CASE WHEN` | | `IIF(cond, true_val, false_val)` | `pl.when(cond).then(true_val).otherwise(false_val)` | | `IsNull([Field])` | `pl.col("Field").is_null()` | | `!IsNull([Field])` | `pl.col("Field").is_not_null()` | | `NULL()` | `None` / `pl.lit(None)` | | `AND` / `OR` / `NOT` | `&` / `\|` / `~` (Polars) or `AND`/`OR`/`NOT` (SQL) | | `==` (equality) | `==` | | `!=` | `!=` | | Row reference `[Row-1:Field]` | `pl.col("Field").shift(1)` | ### 8.2 Built-in Function Mapping **String functions:** | Alteryx | DuckDB SQL / Polars | |---|---| | `Uppercase([F])` | `UPPER(F)` / `pl.col(F).str.to_uppercase()` | | `Lowercase([F])` | `LOWER(F)` / `pl.col(F).str.to_lowercase()` | | `Titlecase([F])` | custom: capitalise each word | | `Trim([F])` | `TRIM(F)` | | `LTrim([F])` | `LTRIM(F)` | | `RTrim([F])` | `RTRIM(F)` | | `Length([F])` | `LENGTH(F)` / `pl.col(F).str.len_chars()` | | `Left([F],n)` | `LEFT(F,n)` / `pl.col(F).str.slice(0,n)` | | `Right([F],n)` | `RIGHT(F,n)` | | `Substring([F],start,len)` | `SUBSTR(F,start,len)` (1-indexed) | | `FindString([F],pattern)` | `INSTR(F,pattern)` (1-indexed, 0=not found) | | `ReplaceChar([F],old,new)` | `REPLACE(F,old,new)` | | `StringToDate([F],fmt)` | `STRPTIME(F,fmt)` | | `ToString([num],decimals)` | `PRINTF('%.Nf', num)` | | `Contains([F],str)` | `CONTAINS(F,str)` | | `StartsWith([F],str)` | `STARTS_WITH(F,str)` | | `EndsWith([F],str)` | `ENDS_WITH(F,str)` | | `REGEX_Match([F],pat)` | `REGEXP_MATCHES(F,pat)` | | `REGEX_Replace([F],pat,repl)` | `REGEXP_REPLACE(F,pat,repl)` | | `REGEX_CountMatches([F],pat)` | custom UDF | **Math functions:** | Alteryx | DuckDB / Polars | |---|---| | `ABS([F])` | `ABS(F)` | | `CEIL([F])` | `CEIL(F)` | | `FLOOR([F])` | `FLOOR(F)` | | `ROUND([F],n)` | `ROUND(F,n)` | | `SQRT([F])` | `SQRT(F)` | | `POW([F],n)` | `POWER(F,n)` | | `LOG([F])` | `LN(F)` | | `LOG10([F])` | `LOG10(F)` | | `MOD([F],n)` | `F % n` | | `MIN([A],[B])` | `LEAST(A,B)` | | `MAX([A],[B])` | `GREATEST(A,B)` | | `RandInt(n)` | `FLOOR(RANDOM()*n)` | **Date/Time functions:** | Alteryx | DuckDB / Polars | |---|---| | `DateTimeNow()` | `NOW()` / `datetime.datetime.now()` | | `DateTimeToday()` | `CURRENT_DATE` | | `DateTimeAdd([D],n,"days")` | `D + INTERVAL n DAY` | | `DateTimeDiff([D1],[D2],"days")` | `DATEDIFF('day', D2, D1)` | | `DateTimeFormat([D],fmt)` | `STRFTIME(D, fmt)` | | `ToDate([D])` | `CAST(D AS DATE)` | | `DateTimeYear([D])` | `YEAR(D)` | | `DateTimeMonth([D])` | `MONTH(D)` | | `DateTimeDay([D])` | `DAY(D)` | | `DateTimeHour([D])` | `HOUR(D)` | **Conditional:** | Alteryx | DuckDB | |---|---| | `IF [F] > 0 THEN "pos" ELSE "neg" ENDIF` | `CASE WHEN F > 0 THEN 'pos' ELSE 'neg' END` | | `IIF([F]>0, "pos", "neg")` | `IF(F>0, 'pos', 'neg')` | | `Switch([F], default, v1, r1, v2, r2)` | `CASE F WHEN v1 THEN r1 WHEN v2 THEN r2 ELSE default END` | ### 8.3 Transpiler Implementation Strategy Use a recursive descent parser or a grammar-based approach (e.g. `lark-parser` or `pyparsing`). **Recommended approach:** 1. Tokenise the expression (handle `[column]`, string literals, function calls, operators, keywords). 2. Build an AST. 3. Walk the AST and emit DuckDB SQL or Polars expressions. 4. For `eval_mask(df, expr)`: register df as a DuckDB view, run `SELECT *, ({sql_expr}) AS _mask FROM df`, return the boolean series. 5. For `eval_series(df, expr, field, dtype)`: run `SELECT ({sql_expr}) AS {field} FROM df`. **Fallback:** If an expression cannot be transpiled, raise `UnsupportedExpressionError` with the original expression text. --- ## 9. RunContext ```python import duckdb import tempfile from pathlib import Path class RunContext: def __init__(self, workflow_dir: str, verbose: bool = False): self.workflow_dir = Path(workflow_dir) self.verbose = verbose self.duckdb_con = duckdb.connect(":memory:") self.temp_dir = Path(tempfile.mkdtemp()) self.transpiler = ExpressionTranspiler(self.duckdb_con) self.type_mapper = TypeMapper() self.constants: dict = {} # workflow-level constants from def resolve_path(self, path: str) -> Path: """Resolve relative paths against workflow directory, expand %temp%.""" path = path.replace("%temp%", str(self.temp_dir) + "/") p = Path(path) if not p.is_absolute(): p = self.workflow_dir / p return p ``` --- ## 10. CLI ``` python -m alteryx_runner run [options] Options: --output-dir PATH Write output files to this directory (overrides workflow paths) --param KEY=VALUE Set workflow constant --verbose Print Browse tool results and execution log --dry-run Parse and validate only; do not execute --format {json,csv,parquet} Default output format for Browse nodes ``` **Example:** ```bash python -m alteryx_runner run MyWorkflow.yxmd --verbose --output-dir ./results ``` --- ## 11. Dependencies (`pyproject.toml`) ```toml [project] name = "alteryx-runner" version = "0.1.0" requires-python = ">=3.11" dependencies = [ "polars>=0.20", "duckdb>=0.10", "pyarrow>=14", "yxdb>=0.1", # Alteryx YXDB binary format "openpyxl>=3.1", # Excel read/write "lark>=1.1", # Expression parser grammar "click>=8.0", # CLI ] [project.optional-dependencies] dev = ["pytest", "pytest-cov", "ruff", "mypy"] ``` --- ## 12. Test Strategy Each tool should have: 1. A unit test loading the corresponding `.yxmd` fixture from `tests/fixtures/`. 2. Assertions on output schema (column names, dtypes). 3. Assertions on output data (row count, specific cell values). **Priority test fixtures (from uploaded files):** | File | Tools covered | |---|---| | `Filter.yxmd` | TextInput, Filter (Simple+Custom), JoinMultiple, GenerateRows, Formula | | `Formula.yxmd` | InputData (YXDB), Formula (static/conditional/modify/multi) | | `Select.yxmd` | InputData, Select (simple/type-cast/rename-reorder/prefix) | | `Sort.yxmd` | TextInput, Sort (single/multi/string-numeric/dictionary) | | `Sample.yxmd` | InputData, Sample (First/Sample/Random/NPercent/Grouped) | | `Unique.yxmd` | InputData, Unique | | `Input_Data.yxmd` | InputData (CSV/Excel/sheets/named-ranges/record-limit) | | `Output_Data.yxmd` | TextInput, OutputData (CSV/YXDB/multi-file/max-records) | | `Browse.yxmd` | InputData, Browse (passthrough, profiling) | | `Text_Input.yxmd` | TextInput | --- ## 13. Known Limitations & Out-of-Scope - **Spatial tools**: `SpatialObj` columns are passed through as opaque strings; spatial operations (Buffer, Distance, etc.) are not implemented. - **In-Database tools**: The `LockIn*` plugin family (In-DB tools) requires a live database connection; not supported. - **Macros** (`.yxmc`): The `EngineDll="Macro"` setting triggers sub-workflow execution. Implement basic macro recursion but mark as best-effort. - **Predictive tools** (R-based `.yxmc`): Out of scope. - **YXDB write**: Writing `.yxdb` output requires the `yxdb` library's write API; fall back to Parquet if unavailable. - **Gallery/Server connections**: All `SavedDataConnections` and `Gallery` sources will raise `NotImplementedError`. - **Report/Rendering tools**: `PortfolioPlugins*` family is out of scope. - **Wireless connections**: Treated identically to wired connections. --- ## 14. Execution Example Given `Sort.yxmd` from the uploaded fixtures: ``` TextInput (ToolID=64) ↓ Sort: CustomerID Ascending (ToolID=26) ↓ (no downstream in this example workflow) ``` The runner will: 1. Parse XML → `WorkflowGraph` with 4 Sort nodes all sourced from TextInput node 64. 2. Execute `TextInput` → DataFrame with columns: `CustomerID`, `FirstName`, `LastName`, `Gender`, `JoinDate`, `Region`, `Score`. 3. Execute `Sort(26)` → `df.sort("CustomerID", descending=False)`. 4. Since no downstream connections exist (no Output/Browse), store results in the context. --- ## 15. XML Quick-Reference Card Below is the distilled XML config shape for each implemented tool, as extracted from the uploaded `.yxmd` example files. ### Filter (Simple) ```xml [CustomerID] > 30 Simple > CustomerID 30fixed ``` ### Filter (Custom) ```xml Custom ``` ### Formula ```xml ``` ### Select ```xml ``` ### Sort ```xml ``` ### Sample ```xml First 3 ``` ### Unique ```xml ``` ### InputData (CSV) ```xml path/to/file.csv ,True1 ``` ### InputData (Excel) ```xml path/to/file.xlsx|||`SheetName$` False1 ``` ### OutputData (CSV) ```xml %temp%output.csv ,TrueCRLF ``` ### TextInput ```xml val1val2 val3 ```