168 lines
7.1 KiB
Python
168 lines
7.1 KiB
Python
from __future__ import annotations
|
|
from typing import Dict
|
|
import polars as pl
|
|
from tools.base import BaseTool
|
|
|
|
|
|
class FindReplaceTool(BaseTool):
|
|
"""Look up values in a Find/Replace table and replace or append matching fields.
|
|
|
|
Alteryx FindReplace connections:
|
|
- "Targets" (dest anchor) = the data to search in
|
|
- "Source" (dest anchor) = the find/replace lookup table
|
|
|
|
Key XML config elements:
|
|
- FieldSearch : field in Target data to search
|
|
- FieldFind : field in Source data with the find values
|
|
- ReplaceFoundField : field in Source data with the replacement values
|
|
- ReplaceMode : "Replace" (overwrite searched field) or "Append" (add new columns)
|
|
- ReplaceAppendFields: which fields from Source to append (Append mode)
|
|
- FindMode : "FindAny" (substring), "FindAll" (all occurrences)
|
|
- NoCase : case-insensitive matching
|
|
- MatchWholeWord : whole-word matching
|
|
"""
|
|
|
|
def execute(self, inputs: Dict[str, pl.DataFrame]) -> Dict[str, pl.DataFrame]:
|
|
# Targets = the data being searched; Source = the lookup table
|
|
target = inputs.get("Targets", inputs.get("Target", inputs.get("Input", pl.DataFrame())))
|
|
source = inputs.get("Source", inputs.get("Find", inputs.get("Replace", pl.DataFrame())))
|
|
|
|
if target.is_empty() or source.is_empty() or self.config is None:
|
|
return {"Output": target}
|
|
|
|
# Read config (support both old and new XML element names)
|
|
search_field = self._cfg("FieldSearch", "") or self._cfg("Field", "") or ""
|
|
find_field = self._cfg("FieldFind", "") or self._cfg("FindField", "") or ""
|
|
replace_field = self._cfg("ReplaceFoundField", "") or self._cfg("ReplaceField", "") or ""
|
|
replace_mode = (self._cfg("ReplaceMode", "Replace") or "Replace").strip()
|
|
no_case = (self._cfg_attr("NoCase", "value", "False") or "False").lower() == "true"
|
|
whole_word = (self._cfg_attr("MatchWholeWord", "value", "False") or "False").lower() == "true"
|
|
|
|
if not (search_field and find_field and replace_field):
|
|
return {"Output": target}
|
|
|
|
# Build the lookup mapping: find_value → replace_value
|
|
find_values = source[find_field].cast(pl.String).to_list()
|
|
replace_values = source[replace_field].cast(pl.String).to_list()
|
|
lookup = dict(zip(find_values, replace_values))
|
|
|
|
if replace_mode == "Append":
|
|
return self._do_append(target, source, search_field, find_field,
|
|
replace_field, lookup, no_case, whole_word)
|
|
else:
|
|
return self._do_replace(target, search_field, lookup, no_case, whole_word)
|
|
|
|
def _do_replace(
|
|
self,
|
|
target: pl.DataFrame,
|
|
search_field: str,
|
|
lookup: dict,
|
|
no_case: bool,
|
|
whole_word: bool,
|
|
) -> Dict[str, pl.DataFrame]:
|
|
"""Replace mode: overwrite the searched field in-place with replacement values."""
|
|
|
|
def _replace_fn(val: str | None) -> str | None:
|
|
if val is None:
|
|
return None
|
|
for find, rep in lookup.items():
|
|
if whole_word:
|
|
import re
|
|
flags = re.IGNORECASE if no_case else 0
|
|
val = re.sub(r"\b" + re.escape(find) + r"\b", rep, val, flags=flags)
|
|
elif no_case:
|
|
# Case-insensitive substring replace
|
|
import re
|
|
val = re.sub(re.escape(find), rep, val, flags=re.IGNORECASE)
|
|
else:
|
|
val = val.replace(find, rep)
|
|
return val
|
|
|
|
series = target[search_field].cast(pl.String).map_elements(_replace_fn, return_dtype=pl.String)
|
|
return {"Output": target.with_columns(series.alias(search_field))}
|
|
|
|
def _do_append(
|
|
self,
|
|
target: pl.DataFrame,
|
|
source: pl.DataFrame,
|
|
search_field: str,
|
|
find_field: str,
|
|
replace_field: str,
|
|
lookup: dict,
|
|
no_case: bool,
|
|
whole_word: bool,
|
|
) -> Dict[str, pl.DataFrame]:
|
|
"""Append mode: keep the original target field unchanged;
|
|
add columns from the Source table for matching rows."""
|
|
|
|
# Determine which fields from Source to append
|
|
append_fields: list[str] = []
|
|
append_el = self.config.find("ReplaceAppendFields") if self.config is not None else None
|
|
if append_el is not None:
|
|
for field_el in append_el.findall("Field"):
|
|
fname = field_el.attrib.get("field", "")
|
|
if fname:
|
|
append_fields.append(fname)
|
|
|
|
# Default: if no explicit append fields, append the replace field
|
|
if not append_fields:
|
|
append_fields = [replace_field]
|
|
|
|
def _match_fn(val: str | None) -> str | None:
|
|
"""Return the replacement value if the find pattern matches, else None."""
|
|
if val is None:
|
|
return None
|
|
for find, rep in lookup.items():
|
|
if whole_word:
|
|
import re
|
|
flags = re.IGNORECASE if no_case else 0
|
|
if re.search(r"\b" + re.escape(find) + r"\b", val, flags=flags):
|
|
return rep
|
|
elif no_case:
|
|
if find.lower() in val.lower():
|
|
return rep
|
|
else:
|
|
if find in val:
|
|
return rep
|
|
return None
|
|
|
|
# For each append field, compute the matched value
|
|
result = target
|
|
for af in append_fields:
|
|
if af == replace_field:
|
|
# The replacement value from the lookup
|
|
matched = target[search_field].cast(pl.String).map_elements(
|
|
_match_fn, return_dtype=pl.String
|
|
)
|
|
result = result.with_columns(matched.alias(af))
|
|
elif af in source.columns:
|
|
# For other Source columns, do a similar lookup
|
|
af_lookup = dict(zip(
|
|
source[find_field].cast(pl.String).to_list(),
|
|
source[af].cast(pl.String).to_list(),
|
|
))
|
|
|
|
def _af_match(val: str | None, _lookup=af_lookup) -> str | None:
|
|
if val is None:
|
|
return None
|
|
for find, rep in _lookup.items():
|
|
if whole_word:
|
|
import re
|
|
flags = re.IGNORECASE if no_case else 0
|
|
if re.search(r"\b" + re.escape(find) + r"\b", val, flags=flags):
|
|
return rep
|
|
elif no_case:
|
|
if find.lower() in val.lower():
|
|
return rep
|
|
else:
|
|
if find in val:
|
|
return rep
|
|
return None
|
|
|
|
matched = target[search_field].cast(pl.String).map_elements(
|
|
_af_match, return_dtype=pl.String
|
|
)
|
|
result = result.with_columns(matched.alias(af))
|
|
|
|
return {"Output": result}
|