Pyteryx/alteryx_runner/cli.py

95 lines
2.9 KiB
Python

"""CLI entry point: python -m alteryx_runner run workflow.yxmd [options]"""
from __future__ import annotations
import sys
from pathlib import Path
import click
import polars as pl
@click.group()
def main():
"""Alteryx workflow runner — execute .yxmd files without Alteryx."""
@main.command()
@click.argument("workflow", type=click.Path(exists=True, path_type=Path))
@click.option("--output-dir", default=None, type=click.Path(path_type=Path),
help="Write output files to this directory.")
@click.option("--param", multiple=True, metavar="KEY=VALUE",
help="Set workflow constant (repeatable).")
@click.option("--verbose", is_flag=True, default=False,
help="Print Browse results and execution log.")
@click.option("--dry-run", is_flag=True, default=False,
help="Parse and validate only; do not execute.")
@click.option("--format", "fmt",
type=click.Choice(["json", "csv", "parquet"]), default="csv",
help="Default output format for Browse nodes.")
def run(
workflow: Path,
output_dir: Path | None,
param: tuple[str, ...],
verbose: bool,
dry_run: bool,
fmt: str,
) -> None:
"""Execute WORKFLOW (.yxmd file)."""
# Import here so CLI loads fast even if deps are missing
from engine.parser import parse_workflow
from engine.executor import execute
from engine.context import RunContext
params: dict[str, str] = {}
for p in param:
if "=" in p:
k, v = p.split("=", 1)
params[k.strip()] = v.strip()
else:
click.echo(f"Warning: --param {p!r} ignored (no '=' found)", err=True)
click.echo(f"Parsing {workflow}")
try:
graph = parse_workflow(str(workflow))
except Exception as e:
click.echo(f"Parse error: {e}", err=True)
sys.exit(1)
click.echo(
f" {len(graph.nodes)} nodes, {len(graph.connections)} connections"
)
if dry_run:
click.echo("Dry run complete — no execution.")
return
ctx = RunContext(
workflow_dir=str(workflow.parent),
verbose=verbose,
output_dir=str(output_dir) if output_dir else None,
params=params,
)
click.echo("Executing …")
try:
outputs = execute(graph, ctx)
except Exception as e:
click.echo(f"Execution error: {e}", err=True)
if verbose:
import traceback
traceback.print_exc()
sys.exit(1)
n_frames = sum(1 for df in outputs.values() if isinstance(df, pl.DataFrame) and len(df) > 0)
click.echo(f"Done. {n_frames} non-empty output frames produced.")
@main.command("list-tools")
def list_tools() -> None:
"""List all registered tool Plugin strings."""
from tools import _REGISTRY
for plugin, cls in sorted(_REGISTRY.items()):
click.echo(f" {plugin:<70}{cls.__name__}")
if __name__ == "__main__":
main()