from __future__ import annotations from typing import Dict import polars as pl from tools.base import BaseTool class FindReplaceTool(BaseTool): """Look up values in a Find/Replace table and replace or append matching fields. Alteryx FindReplace connections: - "Targets" (dest anchor) = the data to search in - "Source" (dest anchor) = the find/replace lookup table Key XML config elements: - FieldSearch : field in Target data to search - FieldFind : field in Source data with the find values - ReplaceFoundField : field in Source data with the replacement values - ReplaceMode : "Replace" (overwrite searched field) or "Append" (add new columns) - ReplaceAppendFields: which fields from Source to append (Append mode) - FindMode : "FindAny" (substring), "FindAll" (all occurrences) - NoCase : case-insensitive matching - MatchWholeWord : whole-word matching """ def execute(self, inputs: Dict[str, pl.DataFrame]) -> Dict[str, pl.DataFrame]: # Targets = the data being searched; Source = the lookup table target = inputs.get("Targets", inputs.get("Target", inputs.get("Input", pl.DataFrame()))) source = inputs.get("Source", inputs.get("Find", inputs.get("Replace", pl.DataFrame()))) if target.is_empty() or source.is_empty() or self.config is None: return {"Output": target} # Read config (support both old and new XML element names) search_field = self._cfg("FieldSearch", "") or self._cfg("Field", "") or "" find_field = self._cfg("FieldFind", "") or self._cfg("FindField", "") or "" replace_field = self._cfg("ReplaceFoundField", "") or self._cfg("ReplaceField", "") or "" replace_mode = (self._cfg("ReplaceMode", "Replace") or "Replace").strip() no_case = (self._cfg_attr("NoCase", "value", "False") or "False").lower() == "true" whole_word = (self._cfg_attr("MatchWholeWord", "value", "False") or "False").lower() == "true" if not (search_field and find_field and replace_field): return {"Output": target} # Build the lookup mapping: find_value → replace_value find_values = source[find_field].cast(pl.String).to_list() replace_values = source[replace_field].cast(pl.String).to_list() lookup = dict(zip(find_values, replace_values)) if replace_mode == "Append": return self._do_append(target, source, search_field, find_field, replace_field, lookup, no_case, whole_word) else: return self._do_replace(target, search_field, lookup, no_case, whole_word) def _do_replace( self, target: pl.DataFrame, search_field: str, lookup: dict, no_case: bool, whole_word: bool, ) -> Dict[str, pl.DataFrame]: """Replace mode: overwrite the searched field in-place with replacement values.""" def _replace_fn(val: str | None) -> str | None: if val is None: return None for find, rep in lookup.items(): if whole_word: import re flags = re.IGNORECASE if no_case else 0 val = re.sub(r"\b" + re.escape(find) + r"\b", rep, val, flags=flags) elif no_case: # Case-insensitive substring replace import re val = re.sub(re.escape(find), rep, val, flags=re.IGNORECASE) else: val = val.replace(find, rep) return val series = target[search_field].cast(pl.String).map_elements(_replace_fn, return_dtype=pl.String) return {"Output": target.with_columns(series.alias(search_field))} def _do_append( self, target: pl.DataFrame, source: pl.DataFrame, search_field: str, find_field: str, replace_field: str, lookup: dict, no_case: bool, whole_word: bool, ) -> Dict[str, pl.DataFrame]: """Append mode: keep the original target field unchanged; add columns from the Source table for matching rows.""" # Determine which fields from Source to append append_fields: list[str] = [] append_el = self.config.find("ReplaceAppendFields") if self.config is not None else None if append_el is not None: for field_el in append_el.findall("Field"): fname = field_el.attrib.get("field", "") if fname: append_fields.append(fname) # Default: if no explicit append fields, append the replace field if not append_fields: append_fields = [replace_field] def _match_fn(val: str | None) -> str | None: """Return the replacement value if the find pattern matches, else None.""" if val is None: return None for find, rep in lookup.items(): if whole_word: import re flags = re.IGNORECASE if no_case else 0 if re.search(r"\b" + re.escape(find) + r"\b", val, flags=flags): return rep elif no_case: if find.lower() in val.lower(): return rep else: if find in val: return rep return None # For each append field, compute the matched value result = target for af in append_fields: if af == replace_field: # The replacement value from the lookup matched = target[search_field].cast(pl.String).map_elements( _match_fn, return_dtype=pl.String ) result = result.with_columns(matched.alias(af)) elif af in source.columns: # For other Source columns, do a similar lookup af_lookup = dict(zip( source[find_field].cast(pl.String).to_list(), source[af].cast(pl.String).to_list(), )) def _af_match(val: str | None, _lookup=af_lookup) -> str | None: if val is None: return None for find, rep in _lookup.items(): if whole_word: import re flags = re.IGNORECASE if no_case else 0 if re.search(r"\b" + re.escape(find) + r"\b", val, flags=flags): return rep elif no_case: if find.lower() in val.lower(): return rep else: if find in val: return rep return None matched = target[search_field].cast(pl.String).map_elements( _af_match, return_dtype=pl.String ) result = result.with_columns(matched.alias(af)) return {"Output": result}