Skip to content

API Reference

Auto-generated from source docstrings.

IO

Output Parser

fp_wraptr.io.parser

Parse FP output files (fmout.txt) into structured data.

The FP output file contains: 1. Echo of input commands with program responses 2. Estimation results (coefficients, standard errors, fit stats per equation) 3. Solve iteration log 4. Forecast table: for each requested variable, rows of - "P lv" (level values), "P ch" (absolute change), "P %ch" (percent change) across quarterly periods

Example forecast block from fmout.txt: Variable Periods forecast are 2025.4 TO 2029.4 427 PCY P lv 5.0353 4.2968 2.3212 ... P ch -0.58871 -0.73847 -1.9756 ... P %ch -35.744 -46.974 -91.483 ...

FPOutputData dataclass

Parsed FP output data.

Source code in src/fp_wraptr/io/parser.py
@dataclass
class FPOutputData:
    """Parsed FP output data."""

    model_title: str = ""
    forecast_start: str = ""
    forecast_end: str = ""
    base_period: str = ""
    periods: list[str] = field(default_factory=list)
    variables: dict[str, ForecastVariable] = field(default_factory=dict)
    estimations: list[EstimationResult] = field(default_factory=list)
    solve_iterations: list[SolveIteration] = field(default_factory=list)
    raw_text: str = ""

    def to_dataframe(self) -> pd.DataFrame:
        """Convert forecast data to a pandas DataFrame.

        Returns:
            DataFrame with periods as index and variables as columns (level values).
        """
        if not self.variables:
            return pd.DataFrame()

        data = {}
        for name, var in self.variables.items():
            data[name] = var.levels

        # periods includes the base period as first entry
        return pd.DataFrame(data, index=self.periods[: len(next(iter(data.values())))])

    def to_dict(self) -> dict:
        """Convert to a JSON-serializable dictionary."""
        return {
            "model_title": self.model_title,
            "forecast_start": self.forecast_start,
            "forecast_end": self.forecast_end,
            "base_period": self.base_period,
            "periods": self.periods,
            "variables": {
                name: {
                    "var_id": v.var_id,
                    "name": v.name,
                    "levels": v.levels,
                    "changes": v.changes,
                    "pct_changes": v.pct_changes,
                }
                for name, v in self.variables.items()
            },
            "estimations": [
                {
                    "equation_number": e.equation_number,
                    "dependent_var": e.dependent_var,
                    "sample_start": e.sample_start,
                    "sample_end": e.sample_end,
                    "n_obs": e.n_obs,
                    "r_squared": e.r_squared,
                    "durbin_watson": e.durbin_watson,
                    "se_equation": e.se_equation,
                    "n_coefficients": len(e.coefficients),
                }
                for e in self.estimations
            ],
            "solve_iterations": [
                {"period": s.period, "iterations": s.iterations} for s in self.solve_iterations
            ],
        }

to_dataframe()

Convert forecast data to a pandas DataFrame.

Returns:

Type Description
DataFrame

DataFrame with periods as index and variables as columns (level values).

Source code in src/fp_wraptr/io/parser.py
def to_dataframe(self) -> pd.DataFrame:
    """Convert forecast data to a pandas DataFrame.

    Returns:
        DataFrame with periods as index and variables as columns (level values).
    """
    if not self.variables:
        return pd.DataFrame()

    data = {}
    for name, var in self.variables.items():
        data[name] = var.levels

    # periods includes the base period as first entry
    return pd.DataFrame(data, index=self.periods[: len(next(iter(data.values())))])

to_dict()

Convert to a JSON-serializable dictionary.

Source code in src/fp_wraptr/io/parser.py
def to_dict(self) -> dict:
    """Convert to a JSON-serializable dictionary."""
    return {
        "model_title": self.model_title,
        "forecast_start": self.forecast_start,
        "forecast_end": self.forecast_end,
        "base_period": self.base_period,
        "periods": self.periods,
        "variables": {
            name: {
                "var_id": v.var_id,
                "name": v.name,
                "levels": v.levels,
                "changes": v.changes,
                "pct_changes": v.pct_changes,
            }
            for name, v in self.variables.items()
        },
        "estimations": [
            {
                "equation_number": e.equation_number,
                "dependent_var": e.dependent_var,
                "sample_start": e.sample_start,
                "sample_end": e.sample_end,
                "n_obs": e.n_obs,
                "r_squared": e.r_squared,
                "durbin_watson": e.durbin_watson,
                "se_equation": e.se_equation,
                "n_coefficients": len(e.coefficients),
            }
            for e in self.estimations
        ],
        "solve_iterations": [
            {"period": s.period, "iterations": s.iterations} for s in self.solve_iterations
        ],
    }

ForecastVariable dataclass

A single variable's forecast data.

Source code in src/fp_wraptr/io/parser.py
@dataclass
class ForecastVariable:
    """A single variable's forecast data."""

    var_id: int
    name: str
    levels: list[float] = field(default_factory=list)
    changes: list[float] = field(default_factory=list)
    pct_changes: list[float] = field(default_factory=list)

EstimationResult dataclass

Estimation results for a single equation.

Source code in src/fp_wraptr/io/parser.py
@dataclass
class EstimationResult:
    """Estimation results for a single equation."""

    equation_number: int
    dependent_var: str
    sample_start: str = ""
    sample_end: str = ""
    n_obs: int = 0
    coefficients: list[EstimationCoefficient] = field(default_factory=list)
    rho_iterations: int = 0
    se_equation: float = 0.0
    r_squared: float = 0.0
    durbin_watson: float = 0.0
    overid_pvalue: float | None = None
    overid_df: int = 0
    mean_dep_var: float = 0.0

EstimationCoefficient dataclass

A single coefficient from an equation's estimation results.

Source code in src/fp_wraptr/io/parser.py
@dataclass
class EstimationCoefficient:
    """A single coefficient from an equation's estimation results."""

    var_id: int
    var_name: str
    lag: int
    estimate: float
    std_error: float
    t_statistic: float
    mean: float

SolveIteration dataclass

A single solve iteration entry.

Source code in src/fp_wraptr/io/parser.py
@dataclass
class SolveIteration:
    """A single solve iteration entry."""

    period: str
    iterations: int

parse_fp_output(path)

Parse an FP output file.

Parameters:

Name Type Description Default
path Path | str

Path to fmout.txt or similar FP output file.

required

Returns:

Type Description
FPOutputData

FPOutputData with extracted forecast information.

Source code in src/fp_wraptr/io/parser.py
def parse_fp_output(path: Path | str) -> FPOutputData:
    """Parse an FP output file.

    Args:
        path: Path to fmout.txt or similar FP output file.

    Returns:
        FPOutputData with extracted forecast information.
    """
    path = Path(path)
    text = path.read_text(encoding="utf-8", errors="replace")
    return parse_fp_output_text(text)

Input Parser

fp_wraptr.io.input_parser

Parse FP input and related data-format files.

Parsers included: - FP input file (fminput.txt) command-oriented parser. - FP model data file (fmdata.txt) with SMPL / LOAD / values blocks. - FM exogenous file (fmexog.txt) with SMPL / CHANGEVAR variable overrides.

parse_fp_input(path)

Parse an FP input file into a structured dictionary.

Parameters:

Name Type Description Default
path Path | str

Path to fminput.txt or similar FP input file.

required

Returns:

Type Description
dict

Structured dictionary with parsed command sections.

Source code in src/fp_wraptr/io/input_parser.py
def parse_fp_input(path: Path | str) -> dict:
    """Parse an FP input file into a structured dictionary.

    Args:
        path: Path to fminput.txt or similar FP input file.

    Returns:
        Structured dictionary with parsed command sections.
    """
    path = Path(path)
    text = path.read_text(encoding="utf-8", errors="replace")
    return parse_fp_input_text(text)

parse_fp_input_text(text)

Parse FP input text into structured fields.

The parser is line-oriented at the command level and splits on semicolons, which is how FP consumes commands.

Source code in src/fp_wraptr/io/input_parser.py
def parse_fp_input_text(text: str) -> dict[str, Any]:
    """Parse FP input text into structured fields.

    The parser is line-oriented at the command level and splits on semicolons,
    which is how FP consumes commands.
    """
    lines = _strip_comments(text)
    title = _extract_title(lines)
    parse_lines = lines[1:] if title and lines and lines[0] == title else lines
    result: dict[str, Any] = {
        "title": title,
        "raw_text": text,
        "space": {},
        "commands": [],
        "commands_by_type": {},
        "samples": [],
        "load_data": [],
        "setupsolve": [],
        "setupect": [],
        "creates": [],
        "generated_vars": [],
        "equations": [],
        "equation_lhs": [],
        "identities": [],
        "modeqs": [],
        "solve": {},
        "solve_commands": [],
        "extrapolate": [],
        "printvar": [],
        "printmodel": 0,
        "printnames": 0,
        "exogenous": [],
        "changevar_blocks": [],
        "control_commands": [],
    }

    for command in _split_commands(parse_lines):
        command = command.strip()
        if not command:
            continue

        pieces = command.split(None, 1)
        command_name = pieces[0].upper()
        body = pieces[1].strip() if len(pieces) > 1 else ""

        command_record = {"name": command_name, "body": body}
        result["commands"].append(command_record)
        command_key = _normalize_command_key(command_name)
        result.setdefault("commands_by_type", {}).setdefault(command_key, []).append(
            body,
        )

        if command_name == "SPACE":
            result["space"] = _parse_param_assignments(body)
        elif command_name == "SETUPSOLVE":
            result["setupsolve"].append(_parse_param_assignments(body))
        elif command_name == "SETUPEST":
            parsed = _parse_param_assignments(body)
            result["setupect"].append(parsed)
        elif command_name == "SMPL":
            sample = _parse_sample(body)
            if sample:
                result["samples"].append(sample)
        elif command_name == "LOADDATA":
            load_data = _parse_param_assignments(body)
            filename = load_data.get("file")
            if filename:
                result["load_data"].append(filename)
        elif command_name == "CREATE":
            result["creates"].append(_parse_named_assignment(body))
        elif command_name == "GENR":
            result["generated_vars"].append(_parse_named_assignment(body))
        elif command_name == "EQ":
            equation = _parse_equation(body)
            if equation:
                result["equations"].append(equation)
        elif command_name == "MODEQ":
            modeq = body.strip()
            if modeq:
                result["modeqs"].append(modeq)
        elif command_name == "LHS":
            lhs = _parse_named_assignment(body)
            lhs["type"] = "lhs"
            result["equation_lhs"].append(lhs)
        elif command_name == "IDENT":
            result["identities"].append(_parse_named_assignment(body))
        elif command_name == "EXOGENOUS":
            result["exogenous"].append(body.strip())
        elif command_name == "SOLVE":
            parsed = _parse_solve_command(body)
            result["solve"] = parsed
            result["solve_commands"].append(parsed)
        elif command_name == "EXTRAPOLATE":
            result["extrapolate"].append({"raw": body})
        elif command_name == "TEST":
            result.setdefault("tests", []).append(_split_words(body))
        elif command_name == "PRINTVAR":
            result["printvar"].append(_parse_printvar(body))
        elif command_name == "PRINTMODEL":
            result["printmodel"] += 1
        elif command_name == "PRINTNAMES":
            result["printnames"] += 1
        elif command_name == "CHANGEVAR":
            result["changevar_blocks"].append(_parse_changevar_block(body))
        elif command_name in {"INPUT", "RETURN", "QUIT", "EXIT", "END"}:
            result["control_commands"].append(command_record)
        else:
            result.setdefault("unhandled_commands", []).append(command_record)

    # Backward-compatible summary keys expected by existing callers.
    # `solve` used to be a simple dict and is now always present.
    if not result["solve"]:
        result["solve"] = {}

    return result

Writer

fp_wraptr.io.writer

Utilities for generating FP input artifacts and patched inputs.

This module writes FP exogenous variable override files and applies text-level patches to base FP input files, including INPUT FILE reference updates for generated fmexog inputs.

write_exogenous_file(variables, sample_start, sample_end, output_path)

Write an FP exogenous variable file (CHANGEVAR format).

Parameters:

Name Type Description Default
variables dict[str, dict]

Dict mapping variable names to {method, value} dicts. method: one of CHGSAMEPCT, SAMEVALUE, CHGSAMEABS value: numeric value or list of (period, value) pairs

required
sample_start str

Start period, e.g. "2025.4"

required
sample_end str

End period, e.g. "2029.4"

required
output_path Path

Where to write the file.

required

Returns:

Type Description
Path

Path to the written file.

Source code in src/fp_wraptr/io/writer.py
def write_exogenous_file(
    variables: dict[str, dict],
    sample_start: str,
    sample_end: str,
    output_path: Path,
) -> Path:
    """Write an FP exogenous variable file (CHANGEVAR format).

    Args:
        variables: Dict mapping variable names to {method, value} dicts.
            method: one of CHGSAMEPCT, SAMEVALUE, CHGSAMEABS
            value: numeric value or list of (period, value) pairs
        sample_start: Start period, e.g. "2025.4"
        sample_end: End period, e.g. "2029.4"
        output_path: Where to write the file.

    Returns:
        Path to the written file.
    """
    lines = [
        f"SMPL {sample_start} {sample_end};",
        "CHANGEVAR;",
    ]

    for var_name, spec in variables.items():
        method = spec.get("method", "SAMEVALUE")
        value = spec.get("value", 0.0)

        if _is_series_value(value):
            pairs = [
                _normalize_period_value_pair(item)
                for item in value  # type: ignore[arg-type]
            ]
            _emit_series_override(var_name, method, pairs, lines)
        else:
            _emit_constant_override(var_name, method, _normalize_scalar(value), lines)

    lines.append(";")
    lines.append("RETURN;")

    output_path.parent.mkdir(parents=True, exist_ok=True)
    output_path.write_text("\n".join(lines) + "\n", encoding="utf-8")
    return output_path

patch_fmexog_reference(base_input, fmexog_path, output_path)

Patch INPUT FILE references to an FMEXOG-style file path.

Parameters:

Name Type Description Default
base_input Path

Path to source input file.

required
fmexog_path Path

Path to the replacement exogenous input file.

required
output_path Path

Output path for patched file.

required

Returns:

Type Description
Path

Path to patched file.

Source code in src/fp_wraptr/io/writer.py
def patch_fmexog_reference(
    base_input: Path,
    fmexog_path: Path,
    output_path: Path,
) -> Path:
    """Patch INPUT FILE references to an FMEXOG-style file path.

    Args:
        base_input: Path to source input file.
        fmexog_path: Path to the replacement exogenous input file.
        output_path: Output path for patched file.

    Returns:
        Path to patched file.
    """
    text = base_input.read_text(encoding="utf-8", errors="replace")
    pattern = re.compile(
        r"(?im)^(\s*INPUT\s+FILE\s*=\s*)([^\r\n;]*)(\s*;[^\r\n]*)$",
        re.IGNORECASE,
    )
    replacement = rf"\1{fmexog_path.name}\3"

    if pattern.search(text):
        text = pattern.sub(replacement, text, count=1)
    else:
        raise ValueError(f"No INPUT FILE directive found in {base_input}")

    output_path.parent.mkdir(parents=True, exist_ok=True)
    output_path.write_text(text, encoding="utf-8")
    return output_path

patch_input_file(base_input, overrides, output_path)

Create a modified FP input file by applying text-level overrides.

Supports two patch styles

1) Literal replacement (legacy): {"old text": "new text"} 2) Command-aware parameter patch: {"cmd:SETUPSOLVE.MAXCHECK": "80"} {"cmd:SETUPSOLVE[1].MINITERS": "40"}

Command-aware patches update FP command parameters without relying on fragile full-command string matches.

Parameters:

Name Type Description Default
base_input Path

Path to the original fminput.txt.

required
overrides dict[str, str]

Dict of {search_string: replacement_string} pairs.

required
output_path Path

Where to write the patched file.

required

Returns:

Type Description
Path

Path to the written file.

TODO: Replace with AST-level manipulation once input parser is complete.

Source code in src/fp_wraptr/io/writer.py
def patch_input_file(
    base_input: Path,
    overrides: dict[str, str],
    output_path: Path,
) -> Path:
    """Create a modified FP input file by applying text-level overrides.

    Supports two patch styles:
      1) Literal replacement (legacy): ``{"old text": "new text"}``
      2) Command-aware parameter patch:
         ``{"cmd:SETUPSOLVE.MAXCHECK": "80"}``
         ``{"cmd:SETUPSOLVE[1].MINITERS": "40"}``

    Command-aware patches update FP command parameters without relying on
    fragile full-command string matches.

    Args:
        base_input: Path to the original fminput.txt.
        overrides: Dict of {search_string: replacement_string} pairs.
        output_path: Where to write the patched file.

    Returns:
        Path to the written file.

    TODO: Replace with AST-level manipulation once input parser is complete.
    """
    text = base_input.read_text(encoding="utf-8", errors="replace")

    separated = _split_command_overrides(overrides)

    for search, replace in separated["command"]:
        text = _apply_command_param_override(text, search, replace)

    for search, replace in separated["literal"]:
        text = text.replace(search, replace)

    output_path.parent.mkdir(parents=True, exist_ok=True)
    output_path.write_text(text, encoding="utf-8")
    return output_path

Scenarios

Configuration

fp_wraptr.scenarios.config

Scenario configuration models.

A scenario config is a YAML file that describes: - Which FP model directory to use - What exogenous variable overrides to apply - Forecast period settings - What variables to track in output - Optional input overlay directory for extra include scripts

Example YAML

name: higher_growth description: "Test scenario with higher potential GDP growth" fp_home: FM forecast_start: "2025.4" forecast_end: "2029.4" overrides: YS: method: CHGSAMEPCT value: 0.008 track_variables: - PCY - UR - GDPR

ScenarioConfig

Bases: BaseModel

Configuration for an FP scenario run.

Source code in src/fp_wraptr/scenarios/config.py
class ScenarioConfig(BaseModel):
    """Configuration for an FP scenario run."""

    name: str = Field(description="Scenario name (used for output directory naming)")
    description: str = Field(default="", description="Human-readable description")
    fp_home: Path = Field(default=Path("FM"), description="Path to FP model directory")
    input_overlay_dir: Path | None = Field(
        default=None,
        description=(
            "Optional directory searched for input scripts/includes referenced by input_file "
            "(e.g. nested `INPUT FILE=...;`). When set, files are copied into the working "
            "directory before running fp.exe."
        ),
    )
    input_file: str = Field(default="fminput.txt", description="FP input filename")
    forecast_start: str = Field(default="2025.4", description="Forecast start period (YYYY.Q)")
    forecast_end: str = Field(default="2029.4", description="Forecast end period (YYYY.Q)")
    backend: str = Field(
        default="fpexe",
        description="Execution backend: fpexe, fppy, or both",
    )
    fppy: dict[str, Any] = Field(
        default_factory=dict,
        description="Optional fp-py backend settings (timeout_seconds, eq_flags_preset, etc)",
    )
    overrides: dict[str, VariableOverride] = Field(
        default_factory=dict,
        description="Exogenous variable overrides",
    )
    track_variables: list[str] = Field(
        default_factory=lambda: ["PCY", "PCPF", "UR", "PIEF", "GDPR"],
        description="Variables to include in output summary",
    )
    input_patches: dict[str, str] = Field(
        default_factory=dict,
        description="Raw text patches to apply to input file {search: replace}",
    )
    alerts: dict[str, dict[str, float]] = Field(
        default_factory=dict,
        description="Alert thresholds by variable, e.g. {'UR': {'max': 6.0}}",
    )
    extra: dict[str, Any] = Field(
        default_factory=dict,
        description="Additional metadata",
    )

    @field_validator("forecast_start", "forecast_end", mode="before")
    @classmethod
    def _coerce_forecast_period_to_str(cls, value: Any) -> Any:
        # Operator-authored YAML often uses unquoted values like `forecast_start: 2025.4`,
        # which YAML parses as a float. Coerce these numeric inputs to strings.
        if isinstance(value, (int, float)):
            return str(value)
        return value

    @classmethod
    def from_yaml(cls, path: Path | str) -> ScenarioConfig:
        """Load scenario config from a YAML file."""
        path = Path(path)
        with path.open() as f:
            data = yaml.safe_load(f)
        # Be tolerant of YAML keys explicitly set to null (e.g. `overrides:` with only comments).
        # Treat these as "unset" so operator-authored scenarios don't crash at load time.
        if isinstance(data, dict):
            for key, default in (
                ("fppy", {}),
                ("overrides", {}),
                ("input_patches", {}),
                ("track_variables", []),
                ("alerts", {}),
                ("extra", {}),
            ):
                if data.get(key) is None:
                    data[key] = default
        return cls(**data)

    def to_yaml(self, path: Path | str) -> Path:
        """Write scenario config to a YAML file."""
        path = Path(path)
        path.parent.mkdir(parents=True, exist_ok=True)
        with path.open("w") as f:
            yaml.dump(self.model_dump(mode="json"), f, default_flow_style=False, sort_keys=False)
        return path

from_yaml(path) classmethod

Load scenario config from a YAML file.

Source code in src/fp_wraptr/scenarios/config.py
@classmethod
def from_yaml(cls, path: Path | str) -> ScenarioConfig:
    """Load scenario config from a YAML file."""
    path = Path(path)
    with path.open() as f:
        data = yaml.safe_load(f)
    # Be tolerant of YAML keys explicitly set to null (e.g. `overrides:` with only comments).
    # Treat these as "unset" so operator-authored scenarios don't crash at load time.
    if isinstance(data, dict):
        for key, default in (
            ("fppy", {}),
            ("overrides", {}),
            ("input_patches", {}),
            ("track_variables", []),
            ("alerts", {}),
            ("extra", {}),
        ):
            if data.get(key) is None:
                data[key] = default
    return cls(**data)

to_yaml(path)

Write scenario config to a YAML file.

Source code in src/fp_wraptr/scenarios/config.py
def to_yaml(self, path: Path | str) -> Path:
    """Write scenario config to a YAML file."""
    path = Path(path)
    path.parent.mkdir(parents=True, exist_ok=True)
    with path.open("w") as f:
        yaml.dump(self.model_dump(mode="json"), f, default_flow_style=False, sort_keys=False)
    return path

VariableOverride

Bases: BaseModel

Override for a single exogenous variable.

Source code in src/fp_wraptr/scenarios/config.py
class VariableOverride(BaseModel):
    """Override for a single exogenous variable."""

    method: str = Field(
        default="SAMEVALUE",
        description="FP method: CHGSAMEPCT, SAMEVALUE, or CHGSAMEABS",
    )
    value: float = Field(default=0.0, description="Override value")

Runner

fp_wraptr.scenarios.runner

Scenario execution pipeline.

Orchestrates the full run flow: 1. Load scenario config (YAML) 2. Prepare working directory (copy data files + apply overrides) 3. Invoke fp.exe via subprocess wrapper 4. Parse output 5. Generate charts

ScenarioResult dataclass

Result of a scenario execution.

Source code in src/fp_wraptr/scenarios/runner.py
@dataclass
class ScenarioResult:
    """Result of a scenario execution."""

    config: ScenarioConfig
    output_dir: Path
    run_result: RunResult | None = None
    parsed_output: FPOutputData | None = None
    chart_path: Path | None = None
    golden_comparison: dict | None = None
    backend_diagnostics: dict[str, Any] | None = None
    timestamp: str = field(
        default_factory=lambda: _dt.datetime.now(_dt.UTC).strftime("%Y%m%d_%H%M%S")
    )

    @property
    def success(self) -> bool:
        return self.run_result is not None and self.run_result.success

run_scenario(config, output_dir=None, backend=None)

Execute a scenario end-to-end.

Parameters:

Name Type Description Default
config ScenarioConfig

Scenario configuration.

required
output_dir Path | None

Base directory for run artifacts.

None
backend ModelBackend | None

Optional execution backend (defaults to FPExecutable).

None

Returns:

Type Description
ScenarioResult

ScenarioResult with run outcome and parsed data.

Source code in src/fp_wraptr/scenarios/runner.py
def run_scenario(
    config: ScenarioConfig,
    output_dir: Path | None = None,
    backend: ModelBackend | None = None,
) -> ScenarioResult:
    """Execute a scenario end-to-end.

    Args:
        config: Scenario configuration.
        output_dir: Base directory for run artifacts.
        backend: Optional execution backend (defaults to FPExecutable).

    Returns:
        ScenarioResult with run outcome and parsed data.
    """
    timestamp = _dt.datetime.now(_dt.UTC).strftime("%Y%m%d_%H%M%S")
    if output_dir is None:
        output_dir = Path("artifacts")
    run_dir = output_dir / f"{config.name}_{timestamp}"
    run_dir.mkdir(parents=True, exist_ok=True)

    result = ScenarioResult(config=config, output_dir=run_dir, timestamp=timestamp)

    # Save config for reproducibility. Use an absolute fp_home in the artifact
    # so the saved scenario can be re-run from the run directory.
    config_for_artifact = config.model_copy(deep=True)
    try:
        config_for_artifact.fp_home = Path(config_for_artifact.fp_home).expanduser().resolve()
    except Exception:
        config_for_artifact.fp_home = Path(config_for_artifact.fp_home)
    try:
        overlay_dir = getattr(config_for_artifact, "input_overlay_dir", None)
        if overlay_dir:
            config_for_artifact.input_overlay_dir = Path(overlay_dir).expanduser().resolve()
    except Exception:
        pass
    config_for_artifact.to_yaml(run_dir / "scenario.yaml")

    # Prepare working directory
    work_dir = run_dir / "work"
    work_dir.mkdir(exist_ok=True)

    selected_backend: ModelBackend
    if backend is not None:
        selected_backend = backend
    else:
        backend_name = str(getattr(config, "backend", "fpexe") or "fpexe").strip().lower()
        if backend_name in {"fpexe", "fp.exe", "fp_exe"}:
            selected_backend = FPExecutable(fp_home=config.fp_home)
        elif backend_name in {"fppy", "fairpy", "fair-py", "fp-py"}:
            fppy_settings = getattr(config, "fppy", {}) or {}
            # Default to FP-style solve semantics for direct fppy runs; callers can
            # opt out via `fppy.eq_flags_preset: default`.
            eq_flags_preset = str(fppy_settings.get("eq_flags_preset", "parity")).strip()
            default_timeout = 2400 if eq_flags_preset.lower() == "parity" else 600
            timeout_seconds = int(fppy_settings.get("timeout_seconds", default_timeout))
            num_threads_raw = fppy_settings.get("num_threads")
            num_threads = (
                int(num_threads_raw)
                if num_threads_raw is not None and int(num_threads_raw) > 0
                else None
            )
            eq_structural_read_cache = str(
                fppy_settings.get("eq_structural_read_cache", "off") or "off"
            ).strip()
            selected_backend = FairPyBackend(
                fp_home=config.fp_home,
                timeout_seconds=timeout_seconds,
                eq_flags_preset=eq_flags_preset,
                eq_structural_read_cache=eq_structural_read_cache,
                num_threads=num_threads,
            )
        elif backend_name == "both":
            from fp_wraptr.analysis.parity import run_parity
            from fp_wraptr.runtime.backend import RunResult

            parity = run_parity(
                config,
                output_dir=run_dir / "parity",
                fp_home_override=config.fp_home,
            )
            parity_dir = Path(parity.run_dir)
            parity_report = parity_dir / "parity_report.json"
            if parity_report.exists():
                shutil.copy2(parity_report, run_dir / "parity_report.json")
            parity_outputs = [
                parity_dir / "work_fppy" / "PABEV.TXT",
                parity_dir / "work_fppy" / "PACEV.TXT",
                parity_dir / "work_fpexe" / "PABEV.TXT",
                parity_dir / "work_fpexe" / "PACEV.TXT",
            ]
            pabev_selected = next((path for path in parity_outputs if path.exists()), None)
            if pabev_selected is not None:
                preserved = run_dir / pabev_selected.name
                shutil.copy2(pabev_selected, preserved)
                shutil.copy2(preserved, run_dir / "LOADFORMAT.DAT")
            result.run_result = RunResult(
                return_code=int(parity.exit_code),
                stdout=json.dumps(parity.to_dict(), sort_keys=True),
                stderr="",
                working_dir=parity_dir,
                input_file=parity_dir / "bundle" / config.input_file,
                output_file=pabev_selected if pabev_selected is not None else None,
                duration_seconds=0.0,
            )
            result.backend_diagnostics = {"mode": "both", "parity": parity.to_dict()}
            return result
        else:
            raise ValueError(f"Unknown backend: {backend_name!r} (expected fpexe|fppy|both)")

    # Copy data files to work dir
    if isinstance(selected_backend, FPExecutable):
        selected_backend._copy_data_files(work_dir)
    else:
        _copy_model_files(config.fp_home, work_dir)

    # Apply input patches if any
    input_file = work_dir / config.input_file
    if not input_file.exists():
        src = config.fp_home / config.input_file
        if src.exists():
            shutil.copy2(src, input_file)
        else:
            overlay_dir = getattr(config, "input_overlay_dir", None)
            if overlay_dir is not None:
                overlay_src = Path(overlay_dir) / config.input_file
                if overlay_src.exists():
                    shutil.copy2(overlay_src, input_file)

    if not input_file.exists():
        searched: list[str] = [str(config.fp_home / config.input_file)]
        overlay_dir = getattr(config, "input_overlay_dir", None)
        if overlay_dir is not None:
            searched.append(str(Path(overlay_dir) / config.input_file))
        raise FileNotFoundError(
            "Scenario input file not found. "
            f"input_file={config.input_file!r} searched: {', '.join(searched)}"
        )

    if config.input_patches:
        patch_input_file(input_file, config.input_patches, input_file)

    # Write exogenous overrides.
    #
    # Keep fp.exe-compatible behavior by writing merged baseline+scenario exogenous
    # adjustments back to `fmexog.txt` (the canonical template filename).
    if config.overrides:
        overrides_dict = {
            name: {"method": ov.method, "value": ov.value} for name, ov in config.overrides.items()
        }
        exogenous_path = work_dir / "fmexog.txt"
        write_exogenous_override_file(
            base_fmexog=config.fp_home / "fmexog.txt",
            variables=overrides_dict,
            sample_start=config.forecast_start,
            sample_end=config.forecast_end,
            output_path=exogenous_path,
        )
        # Keep an inspection copy with an explicit name for operator debugging.
        shutil.copy2(exogenous_path, work_dir / "fmexog_override.txt")

    input_manifest: InputTreeManifest | None = None
    try:
        input_manifest = prepare_work_dir_for_fp_run(
            entry_input=input_file,
            work_dir=work_dir,
            overlay_dir=getattr(config, "input_overlay_dir", None),
            fp_home=config.fp_home,
        )
    except Exception as exc:
        # For missing includes, fail with a path-focused error message early.
        raise RuntimeError(f"Failed to prepare FP input tree: {exc}") from exc
    else:
        if config.input_patches and input_manifest is not None:
            for name in (input_manifest.entry_input_file, *input_manifest.include_files):
                staged = work_dir / name
                if staged.exists():
                    patch_input_file(staged, config.input_patches, staged)

    # Run fp.exe
    if not selected_backend.check_available():
        # fp.exe not available -- parse existing output for development fallback.
        # For custom backends, keep behavior strict (unavailable backend => no run result).
        result.run_result = None
        if isinstance(selected_backend, FPExecutable):
            diagnostics = selected_backend.preflight_report(
                input_file=input_file, work_dir=work_dir
            )
            result.backend_diagnostics = diagnostics
            (run_dir / "backend_preflight.json").write_text(
                json.dumps(diagnostics, indent=2, default=str) + "\n",
                encoding="utf-8",
            )
        if backend is None:
            _try_parse_existing_output(result, config)
        return result

    run_result = selected_backend.run(input_file=input_file, work_dir=work_dir)
    result.run_result = run_result

    # Copy key outputs to artifacts, if present.
    fmout_path = work_dir / "fmout.txt"
    if fmout_path.exists():
        shutil.copy2(fmout_path, run_dir / "fmout.txt")
    pabev_path = work_dir / "PABEV.TXT"
    if pabev_path.exists():
        shutil.copy2(pabev_path, run_dir / "PABEV.TXT")
    pacev_path = work_dir / "PACEV.TXT"
    if pacev_path.exists():
        shutil.copy2(pacev_path, run_dir / "PACEV.TXT")

    copied_outputs: list[Path] = []
    if input_manifest is not None and input_manifest.expected_output_files:
        for name in input_manifest.expected_output_files:
            candidate = work_dir / name
            if not candidate.exists():
                continue
            target = run_dir / name
            shutil.copy2(candidate, target)
            copied_outputs.append(target)

    primary = select_primary_loadformat_output(run_dir, copied_outputs)
    if primary is not None:
        shutil.copy2(primary, run_dir / "LOADFORMAT.DAT")

    # Parse output
    output_file = run_dir / "fmout.txt"
    if output_file.exists():
        result.parsed_output = parse_fp_output(output_file)

    # Generate chart
    if result.parsed_output and result.parsed_output.variables:
        try:
            from fp_wraptr.viz.plots import plot_forecast

            chart_path = run_dir / "forecast.png"
            plot_forecast(
                result.parsed_output,
                variables=config.track_variables or None,
                output_path=chart_path,
            )
            result.chart_path = chart_path
        except ImportError:
            pass  # matplotlib not installed

    return result

load_scenario_config(path)

Load a scenario YAML file with a clear path-focused error message.

Source code in src/fp_wraptr/scenarios/runner.py
def load_scenario_config(path: Path | str) -> ScenarioConfig:
    """Load a scenario YAML file with a clear path-focused error message."""
    scenario_path = Path(path)
    if not scenario_path.exists():
        raise FileNotFoundError(
            f"Scenario YAML not found: {scenario_path} (attempted path: {scenario_path.resolve()})"
        )
    raw_payload: object = None
    try:
        raw_payload = yaml.safe_load(scenario_path.read_text(encoding="utf-8"))
    except Exception:
        raw_payload = None
    explicit_fp_home = isinstance(raw_payload, dict) and raw_payload.get("fp_home") not in (
        None,
        "",
    )
    explicit_overlay = isinstance(raw_payload, dict) and raw_payload.get(
        "input_overlay_dir"
    ) not in (
        None,
        "",
    )

    config = ScenarioConfig.from_yaml(scenario_path)

    # Interpret relative paths in the YAML as relative to the YAML file location,
    # not the current working directory.
    fp_home = Path(getattr(config, "fp_home", Path("FM")))
    if explicit_fp_home and not fp_home.is_absolute():
        config.fp_home = (scenario_path.parent / fp_home).resolve()

    overlay_dir = getattr(config, "input_overlay_dir", None)
    if explicit_overlay and overlay_dir:
        overlay_path = Path(overlay_dir)
        if not overlay_path.is_absolute():
            config.input_overlay_dir = (scenario_path.parent / overlay_path).resolve()

    return config

validate_fp_home(fp_home)

Validate that fp_home exists and provide actionable remediation.

Source code in src/fp_wraptr/scenarios/runner.py
def validate_fp_home(fp_home: Path) -> None:
    """Validate that fp_home exists and provide actionable remediation."""
    if not Path(fp_home).exists():
        raise FileNotFoundError(
            f"fp_home path not found: {fp_home}. "
            "Check the fp_home path in your scenario YAML or --fp-home option."
        )

Batch

fp_wraptr.scenarios.batch

Batch scenario execution and golden-output comparison tools.

run_batch(configs, output_dir, baseline_dir=None)

Run each scenario configuration in sequence and return results.

Source code in src/fp_wraptr/scenarios/batch.py
def run_batch(
    configs: list[ScenarioConfig],
    output_dir: Path,
    baseline_dir: Path | None = None,
) -> list[ScenarioResult]:
    """Run each scenario configuration in sequence and return results."""
    output_dir = Path(output_dir)
    output_dir.mkdir(parents=True, exist_ok=True)
    results: list[ScenarioResult] = []

    for config in configs:
        # Import lazily so tests can monkeypatch fp_wraptr.scenarios.runner.run_scenario
        # without depending on module import order.
        from fp_wraptr.scenarios.runner import run_scenario

        result = run_scenario(config, output_dir=output_dir)
        if baseline_dir is None:
            result.golden_comparison = None
        else:
            golden_subdir = Path(baseline_dir) / config.name
            if golden_subdir.exists():
                result.golden_comparison = compare_to_golden(result, golden_subdir)
            else:
                result.golden_comparison = None
        results.append(result)

    return results

compare_to_golden(result, golden_dir, tolerance=0.0001)

Compare a scenario result against a stored golden output.

Source code in src/fp_wraptr/scenarios/batch.py
def compare_to_golden(
    result: ScenarioResult,
    golden_dir: Path,
    tolerance: float = 1e-4,
) -> dict:
    """Compare a scenario result against a stored golden output."""
    golden_dir = Path(golden_dir)
    golden_path = golden_dir / "fmout.txt"
    if not golden_path.exists():
        return {"matches": False, "variable_diffs": {}, "max_delta": float("inf")}

    scenario_output, scenario_error = _load_or_parse_output(result)
    if scenario_output is None:
        return {
            "matches": False,
            "variable_diffs": {},
            "max_delta": float("inf"),
            "error": scenario_error,
        }

    baseline_output = parse_fp_output(golden_path)
    deltas = {}
    max_delta = 0.0

    for var_name in sorted(scenario_output.variables):
        if var_name not in baseline_output.variables:
            deltas[var_name] = {
                "status": "missing_in_golden",
                "abs_delta": None,
                "pct_delta": None,
                "baseline": None,
                "scenario": scenario_output.variables[var_name].levels[-1]
                if scenario_output.variables[var_name].levels
                else None,
            }
            max_delta = float("inf")
            continue

        scenario_values = scenario_output.variables[var_name].levels
        baseline_values = baseline_output.variables[var_name].levels
        if not scenario_values or not baseline_values:
            deltas[var_name] = {
                "status": "empty_series",
                "abs_delta": None,
                "pct_delta": None,
                "baseline": baseline_values[-1] if baseline_values else None,
                "scenario": scenario_values[-1] if scenario_values else None,
            }
            continue

        abs_delta = scenario_values[-1] - baseline_values[-1]
        pct_delta = (abs_delta / baseline_values[-1] * 100) if baseline_values[-1] != 0 else None
        deltas[var_name] = {
            "baseline": baseline_values[-1],
            "scenario": scenario_values[-1],
            "abs_delta": abs_delta,
            "pct_delta": pct_delta,
        }
        max_delta = max(max_delta, abs(abs_delta))

    matches = max_delta <= tolerance and all(
        values.get("status") is None
        and (values["abs_delta"] is None or abs(values["abs_delta"]) <= tolerance)
        for values in deltas.values()
    )

    # Include extra vars in golden that are missing from the scenario output.
    for missing in sorted(set(baseline_output.variables) - set(scenario_output.variables)):
        baseline_values = baseline_output.variables[missing].levels
        deltas[missing] = {
            "status": "missing_in_scenario",
            "abs_delta": None,
            "pct_delta": None,
            "baseline": baseline_values[-1] if baseline_values else None,
            "scenario": None,
        }
        max_delta = float("inf")
        matches = False

    return {
        "matches": matches,
        "variable_diffs": deltas,
        "max_delta": max_delta,
    }

save_golden(result, golden_dir)

Copy fmout.txt into a golden directory.

Source code in src/fp_wraptr/scenarios/batch.py
def save_golden(result: ScenarioResult, golden_dir: Path) -> Path:
    """Copy `fmout.txt` into a golden directory."""
    golden_dir = Path(golden_dir)
    golden_dir.mkdir(parents=True, exist_ok=True)

    source = result.output_dir / "fmout.txt"
    if not source.exists():
        raise FileNotFoundError(f"No fmout.txt to save at {result.output_dir}")

    destination = golden_dir / "fmout.txt"
    shutil.copy2(source, destination)
    return destination

DSL Compiler

fp_wraptr.scenarios.dsl

Human-readable scenario DSL compiler.

This module provides a lightweight line-oriented DSL that compiles into ScenarioConfig so users can author scenarios without verbose YAML.

Example DSL::

scenario baseline_plus
description "Higher growth + lower rates"
forecast 2025.4 to 2029.4
track PCY,UR,GDPR
set YS CHGSAMEPCT 0.008
alert UR max 6.0
patch cmd:SETUPSOLVE.MAXCHECK 80
policy monetary_rule rate=4.0 method=SAMEVALUE

DSLCompileError

Bases: ValueError

Raised when DSL text cannot be compiled into a valid scenario.

Source code in src/fp_wraptr/scenarios/dsl.py
class DSLCompileError(ValueError):
    """Raised when DSL text cannot be compiled into a valid scenario."""

    def __init__(self, message: str, line_no: int | None = None) -> None:
        self.line_no = line_no
        if line_no is None:
            super().__init__(message)
        else:
            super().__init__(f"Line {line_no}: {message}")

compile_scenario_dsl_text(text, default_name=None)

Compile DSL text into ScenarioConfig.

Parameters:

Name Type Description Default
text str

DSL source text.

required
default_name str | None

Optional fallback scenario name if no scenario command is present.

None
Source code in src/fp_wraptr/scenarios/dsl.py
def compile_scenario_dsl_text(text: str, default_name: str | None = None) -> ScenarioConfig:
    """Compile DSL text into ``ScenarioConfig``.

    Args:
        text: DSL source text.
        default_name: Optional fallback scenario name if no ``scenario`` command is present.
    """
    data: dict[str, Any] = {
        "name": default_name or "",
        "description": "",
        "fp_home": Path("FM"),
        "input_file": "fminput.txt",
        "forecast_start": "2025.4",
        "forecast_end": "2029.4",
        "overrides": {},
        "track_variables": ["PCY", "PCPF", "UR", "PIEF", "GDPR"],
        "input_patches": {},
        "alerts": {},
        "extra": {},
    }
    compiled_policy_summaries: list[dict[str, Any]] = []

    for line_no, raw_line in enumerate(text.splitlines(), start=1):
        tokens = _tokenize_line(raw_line, line_no)
        if not tokens:
            continue

        command = tokens[0].lower()
        args = tokens[1:]

        if command == "scenario":
            _require_arg_count(command, args, min_count=1, line_no=line_no)
            data["name"] = args[0]
            continue

        if command == "description":
            _require_arg_count(command, args, min_count=1, line_no=line_no)
            data["description"] = " ".join(args)
            continue

        if command == "fp_home":
            _require_arg_count(command, args, exact_count=1, line_no=line_no)
            data["fp_home"] = Path(args[0])
            continue

        if command == "input_file":
            _require_arg_count(command, args, exact_count=1, line_no=line_no)
            data["input_file"] = args[0]
            continue

        if command == "forecast":
            start, end = _parse_forecast_args(args, line_no)
            data["forecast_start"] = start
            data["forecast_end"] = end
            continue

        if command == "track":
            _require_arg_count(command, args, min_count=1, line_no=line_no)
            data["track_variables"] = _parse_track_args(args)
            continue

        if command == "set":
            _require_arg_count(command, args, exact_count=3, line_no=line_no)
            var_name = args[0].upper()
            method = args[1].upper()
            try:
                value = float(args[2])
            except ValueError as exc:
                raise DSLCompileError(
                    f"Invalid numeric override value: {args[2]}", line_no
                ) from exc
            data["overrides"][var_name] = VariableOverride(method=method, value=value)
            continue

        if command == "alert":
            _require_arg_count(command, args, exact_count=3, line_no=line_no)
            var_name = args[0].upper()
            bound = args[1].lower()
            if bound not in _ALLOWED_ALERT_BOUNDS:
                allowed = ", ".join(sorted(_ALLOWED_ALERT_BOUNDS))
                raise DSLCompileError(f"Alert bound must be one of: {allowed}", line_no)
            try:
                value = float(args[2])
            except ValueError as exc:
                raise DSLCompileError(f"Invalid alert value: {args[2]}", line_no) from exc
            data["alerts"].setdefault(var_name, {})[bound] = value
            continue

        if command == "patch":
            _require_arg_count(command, args, min_count=2, line_no=line_no)
            key = args[0]
            value = " ".join(args[1:])
            data["input_patches"][key] = value
            continue

        if command == "policy":
            _require_arg_count(command, args, min_count=1, line_no=line_no)
            policy_type = args[0].strip()
            policy_fields = _parse_key_value_args(args[1:], line_no)
            policy_data = {"type": policy_type, **policy_fields}
            try:
                policy_block = PolicyRegistry.create(policy_data)
            except Exception as exc:
                raise DSLCompileError(f"Invalid policy block: {exc}", line_no) from exc
            data["overrides"].update(policy_block.compile())
            compiled_policy_summaries.append(policy_block.to_summary())
            continue

        if command == "extra":
            _require_arg_count(command, args, min_count=1, line_no=line_no)
            data["extra"].update(_parse_key_value_args(args, line_no))
            continue

        raise DSLCompileError(f"Unknown DSL command '{command}'", line_no)

    if not data["name"]:
        raise DSLCompileError(
            "Scenario name is required (use `scenario <name>` or provide default_name)",
            line_no=None,
        )

    if compiled_policy_summaries:
        data["extra"]["compiled_policies"] = compiled_policy_summaries

    try:
        return ScenarioConfig(**data)
    except Exception as exc:
        raise DSLCompileError(f"Compiled scenario is invalid: {exc}") from exc

compile_scenario_dsl_file(path)

Compile a DSL file into ScenarioConfig.

Source code in src/fp_wraptr/scenarios/dsl.py
def compile_scenario_dsl_file(path: Path | str) -> ScenarioConfig:
    """Compile a DSL file into ``ScenarioConfig``."""
    path = Path(path)
    text = path.read_text(encoding="utf-8")
    return compile_scenario_dsl_text(text, default_name=path.stem)

Analysis

Diff

fp_wraptr.analysis.diff

Compare FP runs and produce summary deltas.

Compares two parsed FP outputs (or two run directories) and reports the top variable differences, both absolute and percentage.

diff_outputs(baseline, scenario, top_n=20)

Compare two parsed FP outputs.

Computes the difference in the final forecast period level for each common variable and ranks by absolute difference.

Parameters:

Name Type Description Default
baseline FPOutputData

Baseline parsed output.

required
scenario FPOutputData

Scenario parsed output.

required
top_n int

Number of top deltas to include.

20

Returns:

Type Description
dict

Dict with:

dict
  • deltas: {var_name: {baseline, scenario, abs_delta, pct_delta}}
dict
  • common_variables: list of variables in both outputs
dict
  • baseline_only: list of variables only in baseline
dict
  • scenario_only: list of variables only in scenario
Source code in src/fp_wraptr/analysis/diff.py
def diff_outputs(
    baseline: FPOutputData,
    scenario: FPOutputData,
    top_n: int = 20,
) -> dict:
    """Compare two parsed FP outputs.

    Computes the difference in the final forecast period level for each
    common variable and ranks by absolute difference.

    Args:
        baseline: Baseline parsed output.
        scenario: Scenario parsed output.
        top_n: Number of top deltas to include.

    Returns:
        Dict with:
        - deltas: {var_name: {baseline, scenario, abs_delta, pct_delta}}
        - common_variables: list of variables in both outputs
        - baseline_only: list of variables only in baseline
        - scenario_only: list of variables only in scenario
    """
    base_vars = set(baseline.variables.keys())
    scen_vars = set(scenario.variables.keys())
    common = sorted(base_vars & scen_vars)

    deltas = {}
    for var_name in common:
        base_var = baseline.variables[var_name]
        scen_var = scenario.variables[var_name]

        # Use the last forecast period level value
        if not base_var.levels or not scen_var.levels:
            continue

        base_val = base_var.levels[-1]
        scen_val = scen_var.levels[-1]
        abs_delta = scen_val - base_val
        pct_delta = (abs_delta / base_val * 100) if base_val != 0 else None

        deltas[var_name] = {
            "baseline": base_val,
            "scenario": scen_val,
            "abs_delta": abs_delta,
            "pct_delta": pct_delta,
        }

    # Sort by absolute delta, descending
    sorted_deltas = dict(
        sorted(deltas.items(), key=lambda x: abs(x[1]["abs_delta"]), reverse=True)[:top_n]
    )

    return {
        "deltas": sorted_deltas,
        "common_variables": common,
        "baseline_only": sorted(base_vars - scen_vars),
        "scenario_only": sorted(scen_vars - base_vars),
        "total_compared": len(common),
    }

diff_run_dirs(dir_a, dir_b, top_n=20)

Compare two run directories by parsing their fmout.txt files.

Parameters:

Name Type Description Default
dir_a Path | str

First run directory (baseline).

required
dir_b Path | str

Second run directory (scenario).

required
top_n int

Number of top deltas to include.

20

Returns:

Type Description
dict

Dict with comparison summary.

Source code in src/fp_wraptr/analysis/diff.py
def diff_run_dirs(
    dir_a: Path | str,
    dir_b: Path | str,
    top_n: int = 20,
) -> dict:
    """Compare two run directories by parsing their fmout.txt files.

    Args:
        dir_a: First run directory (baseline).
        dir_b: Second run directory (scenario).
        top_n: Number of top deltas to include.

    Returns:
        Dict with comparison summary.
    """
    dir_a = Path(dir_a)
    dir_b = Path(dir_b)
    output_a = dir_a / "fmout.txt"
    output_b = dir_b / "fmout.txt"

    if not output_a.exists() or not output_b.exists():
        missing = []
        if not output_a.exists():
            missing.append(str(output_a))
        if not output_b.exists():
            missing.append(str(output_b))
        return {"error": f"Missing output files: {', '.join(missing)}", "deltas": {}}

    parsed_a = parse_fp_output(output_a)
    parsed_b = parse_fp_output(output_b)

    return diff_outputs(parsed_a, parsed_b, top_n=top_n)

export_diff_csv(diff_result, output_path)

Export diff deltas as a CSV file.

Source code in src/fp_wraptr/analysis/diff.py
def export_diff_csv(diff_result: dict, output_path: Path | str) -> Path:
    """Export diff deltas as a CSV file."""
    output_path = Path(output_path)
    output_path.parent.mkdir(parents=True, exist_ok=True)

    rows = _records_from_diff(diff_result)
    if not rows:
        output_path.write_text(
            "variable,baseline,scenario,abs_delta,pct_delta\n", encoding="utf-8"
        )
        return output_path

    with output_path.open("w", encoding="utf-8", newline="") as stream:
        import csv

        writer = csv.DictWriter(
            stream,
            fieldnames=["variable", "baseline", "scenario", "abs_delta", "pct_delta"],
        )
        writer.writeheader()

        formatted_rows = []
        for row in rows:
            formatted_row = dict(row)
            if formatted_row.get("pct_delta") is None:
                formatted_row["pct_delta"] = ""
            formatted_rows.append(formatted_row)
        writer.writerows(formatted_rows)

    return output_path

export_diff_excel(diff_result, output_path)

Export diff deltas as an Excel file.

Source code in src/fp_wraptr/analysis/diff.py
def export_diff_excel(diff_result: dict, output_path: Path | str) -> Path:
    """Export diff deltas as an Excel file."""
    output_path = Path(output_path)
    output_path.parent.mkdir(parents=True, exist_ok=True)

    try:
        import pandas as pd
    except ModuleNotFoundError as exc:
        raise RuntimeError("pandas is required for Excel export") from exc

    rows = _records_from_diff(diff_result)
    if not rows:
        pd.DataFrame(
            columns=["variable", "baseline", "scenario", "abs_delta", "pct_delta"]
        ).to_excel(output_path, index=False)
        return output_path

    frame = pd.DataFrame(rows)
    if "pct_delta" in frame.columns:
        frame["pct_delta"] = frame["pct_delta"].where(
            frame["pct_delta"].notna(),
            None,
        )
    frame.to_excel(output_path, index=False)
    return output_path

Report

fp_wraptr.analysis.report

Utilities for generating markdown run reports.

build_run_report(run_dir, baseline_dir=None)

Build a markdown summary for a completed fp-wraptr run.

Source code in src/fp_wraptr/analysis/report.py
def build_run_report(
    run_dir: str | Path,
    baseline_dir: str | Path | None = None,
) -> str:
    """Build a markdown summary for a completed fp-wraptr run."""
    run_dir = _normalize_path(run_dir)

    lines: list[str] = [
        "# fp-wraptr Run Report",
        "",
        f"Run directory: `{run_dir}`",
        "",
    ]

    config, config_error = _read_scenario(run_dir)
    if config_error:
        lines.append(config_error)
        lines.append("")
    else:
        timestamp = "N/A"
        if "_" in run_dir.name:
            timestamp = run_dir.name.split("_", 1)[1]
        lines.extend([
            f"- Name: `{config.name}`",
            f"- Description: {config.description or '(none)'}",
            f"- Timestamp: `{timestamp}`",
        ])
        lines.append("")
        if config.track_variables:
            lines.append(f"- Track variables: {', '.join(config.track_variables)}")
        else:
            lines.append("- Track variables: <none>")
        lines.append("")

        if config.overrides:
            lines.append("## Overrides")
            for name, override in config.overrides.items():
                lines.append(f"- `{name}`: `{override.method}` -> `{override.value}`")
            lines.append("")

    parsed_output = _read_output(run_dir)
    if not parsed_output:
        lines.append("No forecast output available for this run.")
        return "\n".join(lines)

    lines.extend([
        "## Forecast",
        f"- Forecast start: `{parsed_output.forecast_start or 'N/A'}`",
        f"- Forecast end: `{parsed_output.forecast_end or 'N/A'}`",
        "",
    ])

    top_levels = _top_levels_table_rows(parsed_output)
    if top_levels:
        lines.extend([
            "### Top 10 variable levels (final period)",
            "",
            "|Variable|Final level|",
            "|---|---:|",
        ])
        for variable, value in top_levels:
            lines.append(f"|{variable}|{value:.4f}|")
        lines.append("")
    else:
        lines.append("No level series found in output.")
        lines.append("")

    if baseline_dir is None:
        return "\n".join(lines)

    baseline_output_path = _normalize_path(baseline_dir) / "fmout.txt"
    if not baseline_output_path.exists():
        lines.append("## Baseline comparison")
        lines.append(f"Baseline output missing: `{baseline_output_path}`")
        return "\n".join(lines)

    baseline_output = parse_fp_output(baseline_output_path)
    comparison = diff_outputs(baseline_output, parsed_output, top_n=10)
    lines.extend([
        "## Baseline comparison",
        f"- Compared variables: `{comparison.get('total_compared', 0)}`",
        "",
        "|Variable|Baseline|Scenario|Abs delta|% change|",
        "|---|---:|---:|---:|---:|",
    ])

    for name, values in comparison.get("deltas", {}).items():
        baseline_value = values.get("baseline")
        scenario_value = values.get("scenario")
        abs_delta = values.get("abs_delta")
        pct_delta = values.get("pct_delta")
        lines.append(
            "|{var}|{baseline}|{scenario}|{abs_delta}|{pct_delta}|".format(
                var=name,
                baseline="N/A" if baseline_value is None else f"{baseline_value:.4f}",
                scenario="N/A" if scenario_value is None else f"{scenario_value:.4f}",
                abs_delta="N/A" if abs_delta is None else f"{abs_delta:.4f}",
                pct_delta="N/A" if pct_delta is None else f"{pct_delta:.4f}",
            )
        )

    return "\n".join(lines)

Dependency Graph

fp_wraptr.analysis.graph

Build and analyze FP dependency graphs.

build_dependency_graph(parsed_input)

Build a variable dependency graph.

The graph has a directed edge B -> A when variable A depends on B in any supported command type.

Source code in src/fp_wraptr/analysis/graph.py
def build_dependency_graph(parsed_input: dict) -> nx.DiGraph:
    """Build a variable dependency graph.

    The graph has a directed edge ``B -> A`` when variable ``A`` depends on ``B``
    in any supported command type.
    """
    nx = _load_networkx()
    graph = nx.DiGraph()

    for lhs, rhs in _iter_dependency_records(parsed_input):
        graph.add_node(lhs)
        graph.add_node(rhs)
        graph.add_edge(rhs, lhs)

    return graph

summarize_graph(graph)

Summarize node/edge counts and core structural diagnostics.

Source code in src/fp_wraptr/analysis/graph.py
def summarize_graph(graph: nx.DiGraph) -> dict:
    """Summarize node/edge counts and core structural diagnostics."""
    if graph is None:
        return {
            "nodes": 0,
            "edges": 0,
            "roots": [],
            "leaves": [],
            "most_connected": [],
        }

    roots = sorted([node for node, degree in graph.in_degree() if degree == 0])
    leaves = sorted([node for node, degree in graph.out_degree() if degree == 0])
    if graph.number_of_nodes() == 0:
        most_connected: list[str] = []
    else:
        max_degree = max((graph.degree(node) for node in graph.nodes), default=0)
        most_connected = sorted([node for node in graph.nodes if graph.degree(node) == max_degree])

    return {
        "nodes": graph.number_of_nodes(),
        "edges": graph.number_of_edges(),
        "roots": roots,
        "leaves": leaves,
        "most_connected": most_connected,
    }

get_upstream(graph, variable)

Return all ancestors of variable.

Source code in src/fp_wraptr/analysis/graph.py
def get_upstream(graph: nx.DiGraph, variable: str) -> set[str]:
    """Return all ancestors of ``variable``."""
    nx = _load_networkx()
    if variable not in graph:
        return set()
    return set(nx.ancestors(graph, variable))

get_downstream(graph, variable)

Return all descendants of variable.

Source code in src/fp_wraptr/analysis/graph.py
def get_downstream(graph: nx.DiGraph, variable: str) -> set[str]:
    """Return all descendants of ``variable``."""
    nx = _load_networkx()
    if variable not in graph:
        return set()
    return set(nx.descendants(graph, variable))

Visualization

fp_wraptr.viz.plots

Forecast visualization using matplotlib.

Provides two canonical plot types: 1. Forecast levels: time series of variable levels over the forecast horizon. 2. Forecast comparison: overlay baseline vs scenario levels with delta shading.

plot_forecast(data, variables=None, output_path=Path('artifacts/forecast.png'), title=None)

Plot forecast levels for selected variables.

Parameters:

Name Type Description Default
data FPOutputData

Parsed FP output data.

required
variables list[str] | None

List of variable names to plot. If None, plots all.

None
output_path Path | str

Path to save the chart.

Path('artifacts/forecast.png')
title str | None

Chart title. If None, auto-generated.

None

Returns:

Type Description
Path

Path to the saved chart file.

Source code in src/fp_wraptr/viz/plots.py
def plot_forecast(
    data: FPOutputData,
    variables: list[str] | None = None,
    output_path: Path | str = Path("artifacts/forecast.png"),
    title: str | None = None,
) -> Path:
    """Plot forecast levels for selected variables.

    Args:
        data: Parsed FP output data.
        variables: List of variable names to plot. If None, plots all.
        output_path: Path to save the chart.
        title: Chart title. If None, auto-generated.

    Returns:
        Path to the saved chart file.
    """
    import matplotlib.pyplot as plt

    output_path = Path(output_path)
    output_path.parent.mkdir(parents=True, exist_ok=True)

    if variables is None:
        variables = list(data.variables.keys())

    # Filter to variables that exist
    plot_vars = [v for v in variables if v in data.variables]
    if not plot_vars:
        raise ValueError(f"No matching variables found. Available: {list(data.variables.keys())}")

    n_vars = len(plot_vars)
    fig, axes = plt.subplots(n_vars, 1, figsize=(12, 3 * n_vars), squeeze=False)

    for i, var_name in enumerate(plot_vars):
        ax = axes[i, 0]
        var = data.variables[var_name]
        periods = data.periods[: len(var.levels)]

        ax.plot(periods, var.levels, marker="o", linewidth=2, markersize=4)
        ax.set_ylabel(var_name)
        ax.set_title(f"{var_name} (id={var.var_id})")
        ax.grid(True, alpha=0.3)

        # Rotate x labels for readability
        ax.tick_params(axis="x", rotation=45)

        # Only show x labels on bottom subplot
        if i < n_vars - 1:
            ax.set_xticklabels([])

    fig.suptitle(
        title or f"FP Forecast: {data.forecast_start} to {data.forecast_end}",
        fontsize=14,
        fontweight="bold",
    )
    fig.tight_layout()
    fig.savefig(output_path, dpi=150, bbox_inches="tight")
    plt.close(fig)

    return output_path

plot_comparison(baseline, scenario, variables=None, output_path=Path('artifacts/comparison.png'), title=None)

Plot baseline vs scenario comparison for selected variables.

Parameters:

Name Type Description Default
baseline FPOutputData

Baseline parsed output.

required
scenario FPOutputData

Scenario parsed output.

required
variables list[str] | None

Variables to compare. If None, uses common variables.

None
output_path Path | str

Path to save the chart.

Path('artifacts/comparison.png')
title str | None

Chart title.

None

Returns:

Type Description
Path

Path to the saved chart file.

Source code in src/fp_wraptr/viz/plots.py
def plot_comparison(
    baseline: FPOutputData,
    scenario: FPOutputData,
    variables: list[str] | None = None,
    output_path: Path | str = Path("artifacts/comparison.png"),
    title: str | None = None,
) -> Path:
    """Plot baseline vs scenario comparison for selected variables.

    Args:
        baseline: Baseline parsed output.
        scenario: Scenario parsed output.
        variables: Variables to compare. If None, uses common variables.
        output_path: Path to save the chart.
        title: Chart title.

    Returns:
        Path to the saved chart file.
    """
    import matplotlib.pyplot as plt

    output_path = Path(output_path)
    output_path.parent.mkdir(parents=True, exist_ok=True)

    if variables is None:
        common = set(baseline.variables.keys()) & set(scenario.variables.keys())
        variables = sorted(common)

    plot_vars = [v for v in variables if v in baseline.variables and v in scenario.variables]
    if not plot_vars:
        raise ValueError("No common variables found to compare.")

    n_vars = len(plot_vars)
    fig, axes = plt.subplots(n_vars, 1, figsize=(12, 3.5 * n_vars), squeeze=False)

    for i, var_name in enumerate(plot_vars):
        ax = axes[i, 0]
        base_var = baseline.variables[var_name]
        scen_var = scenario.variables[var_name]

        n_points = min(len(base_var.levels), len(scen_var.levels))
        periods = baseline.periods[:n_points]
        base_vals = base_var.levels[:n_points]
        scen_vals = scen_var.levels[:n_points]

        ax.plot(periods, base_vals, marker="s", linewidth=2, markersize=4, label="Baseline")
        ax.plot(periods, scen_vals, marker="o", linewidth=2, markersize=4, label="Scenario")
        ax.fill_between(
            range(n_points), base_vals, scen_vals, alpha=0.15, color="orange", label="Delta"
        )
        ax.set_ylabel(var_name)
        ax.set_title(var_name)
        ax.legend(loc="upper left", fontsize=8)
        ax.grid(True, alpha=0.3)
        ax.tick_params(axis="x", rotation=45)

        if i < n_vars - 1:
            ax.set_xticklabels([])

    fig.suptitle(title or "FP Run Comparison", fontsize=14, fontweight="bold")
    fig.tight_layout()
    fig.savefig(output_path, dpi=150, bbox_inches="tight")
    plt.close(fig)

    return output_path

Dashboard

Artifacts

fp_wraptr.dashboard.artifacts

Helpers for discovering and loading fp-wraptr dashboard run artifacts.

RunArtifact dataclass

Metadata for one scenario run artifact directory.

Source code in src/fp_wraptr/dashboard/artifacts.py
@dataclass
class RunArtifact:
    """Metadata for one scenario run artifact directory."""

    run_dir: Path
    scenario_name: str
    timestamp: str
    has_output: bool
    has_chart: bool
    config: ScenarioConfig | None
    backend_hint: str = ""

    @property
    def display_name(self) -> str:
        if self.timestamp:
            return f"{self.scenario_name} ({self.timestamp})"
        return self.scenario_name

    @property
    def backend_name(self) -> str:
        """Return the configured backend for the run (fpexe/fppy/both/unknown)."""
        if self.config is None:
            return "unknown"
        value = str(getattr(self.config, "backend", "") or "").strip().lower()
        return value or "unknown"

    def load_output(self) -> FPOutputData | None:
        """Load and parse output data for this run.

        Preference order:
          1) ``fmout.txt`` (rich FP console output)
          2) ``LOADFORMAT.DAT`` / ``PABEV.TXT`` / ``PACEV.TXT`` (PRINTVAR LOADFORMAT series)
        """
        fmout = self.run_dir / "fmout.txt"
        if fmout.exists():
            return parse_fp_output(fmout)

        config = self.config
        if config is None:
            scenario_file = self.run_dir / "scenario.yaml"
            if scenario_file.exists():
                try:
                    config = ScenarioConfig.from_yaml(scenario_file)
                except Exception:
                    config = None

        loadformat_path = _first_existing(
            self.run_dir / "LOADFORMAT.DAT",
            self.run_dir / "PABEV.TXT",
            self.run_dir / "PACEV.TXT",
        )
        if loadformat_path is None:
            return None

        try:
            period_tokens, series = read_loadformat(loadformat_path)
        except Exception:
            return None

        add_derived_series(series)
        start = (
            str(getattr(config, "forecast_start", "") or "").strip() if config is not None else ""
        )
        end = str(getattr(config, "forecast_end", "") or "").strip() if config is not None else ""
        model_title = str(getattr(config, "name", "") or "").strip() if config is not None else ""
        if not model_title:
            model_title = self.scenario_name
        return _fp_output_from_series(
            period_tokens,
            series,
            forecast_start=start or None,
            forecast_end=end or None,
            model_title=model_title,
        )

    def load_series_output(self) -> FPOutputData | None:
        """Load run output preferring LOADFORMAT-style series over fmout.

        This is useful for side-by-side comparisons between fpexe/fppy runs,
        since both engines emit comparable LOADFORMAT artifacts.
        """
        config = self.config
        if config is None:
            scenario_file = self.run_dir / "scenario.yaml"
            if scenario_file.exists():
                try:
                    config = ScenarioConfig.from_yaml(scenario_file)
                except Exception:
                    config = None

        loadformat_path = _first_existing(
            self.run_dir / "LOADFORMAT.DAT",
            self.run_dir / "PABEV.TXT",
            self.run_dir / "PACEV.TXT",
        )
        if loadformat_path is not None:
            try:
                period_tokens, series = read_loadformat(loadformat_path)
            except Exception:
                period_tokens, series = [], {}
            if period_tokens and series:
                add_derived_series(series)
                start = (
                    str(getattr(config, "forecast_start", "") or "").strip()
                    if config is not None
                    else ""
                )
                end = (
                    str(getattr(config, "forecast_end", "") or "").strip()
                    if config is not None
                    else ""
                )
                model_title = (
                    str(getattr(config, "name", "") or "").strip() if config is not None else ""
                )
                if not model_title:
                    model_title = self.scenario_name
                return _fp_output_from_series(
                    period_tokens,
                    series,
                    forecast_start=start or None,
                    forecast_end=end or None,
                    model_title=model_title,
                )

        fmout = self.run_dir / "fmout.txt"
        if fmout.exists():
            return parse_fp_output(fmout)
        return None

backend_name property

Return the configured backend for the run (fpexe/fppy/both/unknown).

load_output()

Load and parse output data for this run.

Preference order

1) fmout.txt (rich FP console output) 2) LOADFORMAT.DAT / PABEV.TXT / PACEV.TXT (PRINTVAR LOADFORMAT series)

Source code in src/fp_wraptr/dashboard/artifacts.py
def load_output(self) -> FPOutputData | None:
    """Load and parse output data for this run.

    Preference order:
      1) ``fmout.txt`` (rich FP console output)
      2) ``LOADFORMAT.DAT`` / ``PABEV.TXT`` / ``PACEV.TXT`` (PRINTVAR LOADFORMAT series)
    """
    fmout = self.run_dir / "fmout.txt"
    if fmout.exists():
        return parse_fp_output(fmout)

    config = self.config
    if config is None:
        scenario_file = self.run_dir / "scenario.yaml"
        if scenario_file.exists():
            try:
                config = ScenarioConfig.from_yaml(scenario_file)
            except Exception:
                config = None

    loadformat_path = _first_existing(
        self.run_dir / "LOADFORMAT.DAT",
        self.run_dir / "PABEV.TXT",
        self.run_dir / "PACEV.TXT",
    )
    if loadformat_path is None:
        return None

    try:
        period_tokens, series = read_loadformat(loadformat_path)
    except Exception:
        return None

    add_derived_series(series)
    start = (
        str(getattr(config, "forecast_start", "") or "").strip() if config is not None else ""
    )
    end = str(getattr(config, "forecast_end", "") or "").strip() if config is not None else ""
    model_title = str(getattr(config, "name", "") or "").strip() if config is not None else ""
    if not model_title:
        model_title = self.scenario_name
    return _fp_output_from_series(
        period_tokens,
        series,
        forecast_start=start or None,
        forecast_end=end or None,
        model_title=model_title,
    )

load_series_output()

Load run output preferring LOADFORMAT-style series over fmout.

This is useful for side-by-side comparisons between fpexe/fppy runs, since both engines emit comparable LOADFORMAT artifacts.

Source code in src/fp_wraptr/dashboard/artifacts.py
def load_series_output(self) -> FPOutputData | None:
    """Load run output preferring LOADFORMAT-style series over fmout.

    This is useful for side-by-side comparisons between fpexe/fppy runs,
    since both engines emit comparable LOADFORMAT artifacts.
    """
    config = self.config
    if config is None:
        scenario_file = self.run_dir / "scenario.yaml"
        if scenario_file.exists():
            try:
                config = ScenarioConfig.from_yaml(scenario_file)
            except Exception:
                config = None

    loadformat_path = _first_existing(
        self.run_dir / "LOADFORMAT.DAT",
        self.run_dir / "PABEV.TXT",
        self.run_dir / "PACEV.TXT",
    )
    if loadformat_path is not None:
        try:
            period_tokens, series = read_loadformat(loadformat_path)
        except Exception:
            period_tokens, series = [], {}
        if period_tokens and series:
            add_derived_series(series)
            start = (
                str(getattr(config, "forecast_start", "") or "").strip()
                if config is not None
                else ""
            )
            end = (
                str(getattr(config, "forecast_end", "") or "").strip()
                if config is not None
                else ""
            )
            model_title = (
                str(getattr(config, "name", "") or "").strip() if config is not None else ""
            )
            if not model_title:
                model_title = self.scenario_name
            return _fp_output_from_series(
                period_tokens,
                series,
                forecast_start=start or None,
                forecast_end=end or None,
                model_title=model_title,
            )

    fmout = self.run_dir / "fmout.txt"
    if fmout.exists():
        return parse_fp_output(fmout)
    return None

scan_artifacts(artifacts_dir)

Scan for run directories up to three levels deep.

A run is identified by a scenario.yaml file present in the directory.

Source code in src/fp_wraptr/dashboard/artifacts.py
def scan_artifacts(artifacts_dir: Path) -> list[RunArtifact]:
    """Scan for run directories up to three levels deep.

    A run is identified by a `scenario.yaml` file present in the directory.
    """
    artifacts_root = Path(artifacts_dir)
    if not artifacts_root.exists():
        return []

    runs: list[RunArtifact] = []
    for scenario_file in artifacts_root.rglob("scenario.yaml"):
        parts = scenario_file.relative_to(artifacts_root).parts
        if not parts:
            continue
        top = str(parts[0])
        if top == "parity":
            continue
        if top == "sensitivity":
            continue
        if top.startswith("."):
            continue
        if len(parts) > 4:
            continue

        run_dir = scenario_file.parent
        scenario_name, timestamp = _split_run_dir_name(run_dir)
        has_output = (run_dir / "fmout.txt").exists() or (run_dir / "LOADFORMAT.DAT").exists()
        has_chart = (run_dir / "forecast.png").exists()
        try:
            config = ScenarioConfig.from_yaml(scenario_file)
        except Exception:
            config = None

        runs.append(
            RunArtifact(
                run_dir=run_dir,
                scenario_name=scenario_name,
                timestamp=timestamp,
                has_output=has_output,
                has_chart=has_chart,
                config=config,
            )
        )

    runs.sort(key=_artifact_sort_key, reverse=True)
    return runs

Charts

fp_wraptr.dashboard.charts

Plotly chart builders for dashboard forecast and comparison views.

forecast_figure(data, variables=None, title=None, mode='levels', *, units=None)

Build a multi-panel forecast figure for selected variables.

Source code in src/fp_wraptr/dashboard/charts.py
def forecast_figure(
    data: FPOutputData,
    variables: list[str] | None = None,
    title: str | None = None,
    mode: str = "levels",
    *,
    units: Mapping[str, str] | None = None,
) -> go.Figure:
    """Build a multi-panel forecast figure for selected variables."""
    selected = list(variables) if variables else list(data.variables.keys())
    selected = [name for name in selected if name in data.variables]

    if not selected:
        fig = go.Figure()
        fig.update_layout(title=title or "Forecast")
        return apply_white_theme(fig)

    axis_tokens = list(data.periods or [])
    axis_labels = [format_period_label(tok) for tok in axis_tokens] if axis_tokens else []

    fig = make_subplots(
        rows=len(selected),
        cols=1,
        shared_xaxes=True,
        vertical_spacing=0.05,
    )

    for row_idx, var_name in enumerate(selected, start=1):
        variable = data.variables[var_name]
        y_values = _resolve_mode_values(variable, mode)
        if axis_tokens:
            x_tokens = axis_tokens[: len(y_values)]
            x_values = [format_period_label(tok) for tok in x_tokens]
        else:
            x_values = list(range(len(y_values)))

        color = FP_COLOR_PALETTE[(row_idx - 1) % len(FP_COLOR_PALETTE)]
        fig.add_trace(
            go.Scatter(
                x=x_values,
                y=y_values,
                mode="lines+markers",
                name=var_name,
                line=dict(color=color),
                marker=dict(color=color, size=6),
                hovertemplate="%{x}<br>%{y:.4f}<extra>" + var_name + "</extra>",
            ),
            row=row_idx,
            col=1,
        )
        fig.update_yaxes(title_text=_format_axis_label(var_name, units), row=row_idx, col=1)

    fig.update_layout(
        title=title or "Forecast",
        height=260 * max(1, len(selected)),
        showlegend=False,
    )
    if axis_labels:
        fig.update_xaxes(
            type="category",
            categoryorder="array",
            categoryarray=axis_labels,
        )
    fig.update_xaxes(title_text="Period", row=len(selected), col=1)
    return apply_white_theme(fig)

comparison_figure(baseline, scenario, variables=None, title=None, baseline_label='Baseline', scenario_label='Scenario', *, units=None)

Build baseline-vs-scenario subplots with area shading between series.

Source code in src/fp_wraptr/dashboard/charts.py
def comparison_figure(
    baseline: FPOutputData,
    scenario: FPOutputData,
    variables: list[str] | None = None,
    title: str | None = None,
    baseline_label: str = "Baseline",
    scenario_label: str = "Scenario",
    *,
    units: Mapping[str, str] | None = None,
) -> go.Figure:
    """Build baseline-vs-scenario subplots with area shading between series."""
    requested = list(variables) if variables else list(baseline.variables.keys())
    selected = [
        var_name
        for var_name in requested
        if var_name in baseline.variables and var_name in scenario.variables
    ]

    if not selected:
        fig = go.Figure()
        fig.update_layout(title=title or "Forecast comparison")
        return apply_white_theme(fig)

    fig = make_subplots(
        rows=len(selected),
        cols=1,
        shared_xaxes=True,
        vertical_spacing=0.05,
    )

    for row_idx, var_name in enumerate(selected, start=1):
        base = baseline.variables[var_name]
        scen = scenario.variables[var_name]
        baseline_has_periods = bool(baseline.periods)
        scenario_has_periods = bool(scenario.periods)
        baseline_periods = (
            baseline.periods if baseline_has_periods else list(range(len(base.levels)))
        )
        scenario_periods = (
            scenario.periods if scenario_has_periods else list(range(len(scen.levels)))
        )
        point_count = min(
            len(base.levels),
            len(scen.levels),
            len(baseline_periods),
            len(scenario_periods),
        )
        if point_count <= 0:
            continue
        x_tokens = (
            baseline_periods[:point_count] if baseline_periods else scenario_periods[:point_count]
        )
        x_values = (
            [format_period_label(tok) for tok in x_tokens]
            if baseline_has_periods
            else list(x_tokens)
        )

        fig.add_trace(
            go.Scatter(
                x=x_values,
                y=base.levels[:point_count],
                mode="lines+markers",
                name=f"{baseline_label}: {var_name}",
                showlegend=row_idx == 1,
                line=dict(color=FP_COLOR_PALETTE[0]),
                marker=dict(size=6),
                hovertemplate="%{x}<br>%{y:.4f}<extra>" + baseline_label + "</extra>",
            ),
            row=row_idx,
            col=1,
        )
        fig.add_trace(
            go.Scatter(
                x=x_values,
                y=scen.levels[:point_count],
                mode="lines+markers",
                name=f"{scenario_label}: {var_name}",
                showlegend=row_idx == 1,
                fill="tonexty",
                fillcolor="rgba(89, 161, 79, 0.15)",
                line=dict(color=FP_COLOR_PALETTE[4]),
                marker=dict(size=6),
                hovertemplate="%{x}<br>%{y:.4f}<extra>" + scenario_label + "</extra>",
            ),
            row=row_idx,
            col=1,
        )
        fig.update_yaxes(title_text=_format_axis_label(var_name, units), row=row_idx, col=1)
        if baseline_has_periods:
            fig.update_xaxes(
                type="category",
                categoryorder="array",
                categoryarray=x_values,
                row=row_idx,
                col=1,
            )

    fig.update_layout(
        title=title or "Scenario comparison",
        height=280 * max(1, len(selected)),
    )
    fig.update_xaxes(title_text="Period", row=len(selected), col=1)
    return apply_white_theme(fig)

delta_bar_chart(diff_result, top_n=15, sort_by='abs_delta')

Build a horizontal bar chart of top-delta forecast changes.

Source code in src/fp_wraptr/dashboard/charts.py
def delta_bar_chart(
    diff_result: dict,
    top_n: int = 15,
    sort_by: str = "abs_delta",
) -> go.Figure:
    """Build a horizontal bar chart of top-delta forecast changes."""
    if sort_by not in {"abs_delta", "baseline", "scenario", "pct_delta"}:
        raise ValueError(f"Invalid sort_by '{sort_by}'")

    deltas: dict[str, dict[str, float]] = diff_result.get("deltas", {})
    items: Sequence[tuple[str, dict[str, float]]] = sorted(
        deltas.items(),
        key=lambda item: _sort_key(
            _coerce_numeric(item[1].get(sort_by, 0.0)),
            sort_by,
        ),
        reverse=True,
    )

    selected = list(items[:top_n]) if top_n > 0 else list(items)
    variable_names = [name for name, _ in selected]
    raw_values = [delta.get(sort_by, 0.0) for _, delta in selected]
    values = [_coerce_numeric(value) for value in raw_values]
    colors = [
        "#16a34a" if value > 0 else "#dc2626" if value < 0 else "#6b7280" for value in values
    ]
    text_values = [
        f"{value:.4f}" if isinstance(value, (int, float)) else "N/A" for value in raw_values
    ]

    fig = go.Figure(
        data=[
            go.Bar(
                x=values,
                y=variable_names,
                orientation="h",
                marker_color=colors,
                text=text_values,
                textposition="auto",
            )
        ]
    )
    sort_label = sort_by.replace("_", " ").title()
    fig.update_layout(
        title=f"Top {top_n} Variable Deltas ({sort_label})",
        xaxis_title=sort_label,
        yaxis_title="Variable",
        height=max(300, 35 * max(1, len(variable_names))),
    )
    return apply_white_theme(fig)

FRED

fp_wraptr.fred.ingest

Fetch economic time series from the FRED API.

Requires the fredapi optional dependency and a FRED API key set via the FRED_API_KEY environment variable.

get_fred_client()

Create and return an authenticated Fred API client.

Source code in src/fp_wraptr/fred/ingest.py
def get_fred_client() -> Any:
    """Create and return an authenticated Fred API client."""
    from fredapi import Fred

    key = os.environ.get("FRED_API_KEY")
    if not key:
        raise ValueError("FRED_API_KEY environment variable not set")
    return Fred(api_key=key)

fetch_series(series_ids, start=None, end=None, cache_dir=None, cache_ttl_seconds=DEFAULT_CACHE_TTL_SECONDS, respect_tos=True, min_request_interval_seconds=DEFAULT_MIN_REQUEST_INTERVAL_SECONDS)

Fetch one or more series from FRED as a wide DataFrame.

Parameters:

Name Type Description Default
series_ids list[str]

FRED series IDs to fetch.

required
start str | None

Optional observation start date (YYYY-MM-DD).

None
end str | None

Optional observation end date (YYYY-MM-DD).

None
cache_dir Path | None

Cache directory for JSON payloads.

None
cache_ttl_seconds int

Maximum cache age before refresh.

DEFAULT_CACHE_TTL_SECONDS
respect_tos bool

If True, insert delay between API calls to avoid burst traffic.

True
min_request_interval_seconds float

Minimum delay between uncached calls.

DEFAULT_MIN_REQUEST_INTERVAL_SECONDS
Source code in src/fp_wraptr/fred/ingest.py
def fetch_series(
    series_ids: list[str],
    start: str | None = None,
    end: str | None = None,
    cache_dir: Path | None = None,
    cache_ttl_seconds: int = DEFAULT_CACHE_TTL_SECONDS,
    respect_tos: bool = True,
    min_request_interval_seconds: float = DEFAULT_MIN_REQUEST_INTERVAL_SECONDS,
) -> pd.DataFrame:
    """Fetch one or more series from FRED as a wide DataFrame.

    Args:
        series_ids: FRED series IDs to fetch.
        start: Optional observation start date (YYYY-MM-DD).
        end: Optional observation end date (YYYY-MM-DD).
        cache_dir: Cache directory for JSON payloads.
        cache_ttl_seconds: Maximum cache age before refresh.
        respect_tos: If True, insert delay between API calls to avoid burst traffic.
        min_request_interval_seconds: Minimum delay between uncached calls.
    """
    cache_dir = cache_dir or (Path.home() / ".fp-wraptr" / "fred-cache")
    cache_dir.mkdir(parents=True, exist_ok=True)

    normalized_series_ids = list(dict.fromkeys(series_ids))
    series_data: list[pd.Series] = []
    client = None
    last_request_at: float | None = None

    for series_id in normalized_series_ids:
        path = _cache_path(cache_dir, series_id)
        if _cache_is_fresh(path, ttl_seconds=cache_ttl_seconds):
            payload = _load_cached_payload(path)
            cached = _load_cached_series(path, series_id)
            if cached is not None and (
                payload is None or _cache_covers_request(payload, start, end)
            ):
                cached.index = pd.to_datetime(cached.index)
                cached = _slice_series_to_request(cached, start, end)
                series_data.append(cached)
                continue

        if client is None:
            client = get_fred_client()

        if respect_tos and last_request_at is not None and min_request_interval_seconds > 0:
            elapsed = time.monotonic() - last_request_at
            wait_seconds = min_request_interval_seconds - elapsed
            if wait_seconds > 0:
                time.sleep(wait_seconds)

        values = client.get_series(series_id, observation_start=start, observation_end=end)
        last_request_at = time.monotonic()
        raw = pd.Series(values, name=series_id, dtype=float)
        raw.index = pd.to_datetime(raw.index)

        observation_start = ""
        observation_end = ""
        if not raw.empty:
            observation_start = raw.index.min().strftime("%Y-%m-%d")
            observation_end = raw.index.max().strftime("%Y-%m-%d")

        payload = {
            "cache_format": 2,
            "fetched_at": datetime.now(UTC).isoformat(),
            "series_id": series_id,
            "requested_start": start,
            "requested_end": end,
            "observation_start": observation_start,
            "observation_end": observation_end,
            "source": "FRED",
            "terms_url": FRED_TERMS_URL,
            "data": {index.strftime("%Y-%m-%d"): value for index, value in raw.items()},
        }
        path.write_text(json.dumps(payload), encoding="utf-8")
        series_data.append(raw)

    if not series_data:
        return pd.DataFrame()

    return pd.concat(series_data, axis=1).sort_index()

clear_cache(cache_dir=None)

Delete all cached FRED series files.

Source code in src/fp_wraptr/fred/ingest.py
def clear_cache(cache_dir: Path | None = None) -> int:
    """Delete all cached FRED series files."""
    cache_dir = cache_dir or (Path.home() / ".fp-wraptr" / "fred-cache")
    if not cache_dir.exists():
        return 0

    deleted = 0
    for path in cache_dir.glob("*.json"):
        path.unlink()
        deleted += 1
    return deleted

Runtime

Backend Protocol

fp_wraptr.runtime.backend

Runtime backend protocol for model execution.

Defines the interface that any model backend (fp.exe subprocess, fair-py pure-Python engine, or future alternatives) must satisfy in order to be used by the scenario runner.

ModelBackend

Bases: Protocol

Protocol that every model execution backend must implement.

Implementations
  • FPExecutable — subprocess wrapper around fp.exe / Wine
  • FairPyBackend — pure-Python engine (wraps fair-py / fp-py)
Source code in src/fp_wraptr/runtime/backend.py
@runtime_checkable
class ModelBackend(Protocol):
    """Protocol that every model execution backend must implement.

    Implementations:
      - ``FPExecutable``  — subprocess wrapper around fp.exe / Wine
      - ``FairPyBackend`` — pure-Python engine (wraps fair-py / fp-py)
    """

    def check_available(self) -> bool:
        """Return True if this backend can execute runs right now."""
        ...

    def run(
        self,
        input_file: Path | None = None,
        work_dir: Path | None = None,
        extra_env: dict[str, str] | None = None,
    ) -> RunResult:
        """Execute a model run and return the result.

        Args:
            input_file: Path to the input file.
            work_dir: Working directory for the run.
            extra_env: Additional environment variables.

        Returns:
            RunResult with captured output.
        """
        ...

check_available()

Return True if this backend can execute runs right now.

Source code in src/fp_wraptr/runtime/backend.py
def check_available(self) -> bool:
    """Return True if this backend can execute runs right now."""
    ...

run(input_file=None, work_dir=None, extra_env=None)

Execute a model run and return the result.

Parameters:

Name Type Description Default
input_file Path | None

Path to the input file.

None
work_dir Path | None

Working directory for the run.

None
extra_env dict[str, str] | None

Additional environment variables.

None

Returns:

Type Description
RunResult

RunResult with captured output.

Source code in src/fp_wraptr/runtime/backend.py
def run(
    self,
    input_file: Path | None = None,
    work_dir: Path | None = None,
    extra_env: dict[str, str] | None = None,
) -> RunResult:
    """Execute a model run and return the result.

    Args:
        input_file: Path to the input file.
        work_dir: Working directory for the run.
        extra_env: Additional environment variables.

    Returns:
        RunResult with captured output.
    """
    ...

RunResult dataclass

Backend-agnostic result of a model run.

Source code in src/fp_wraptr/runtime/backend.py
@dataclass
class RunResult:
    """Backend-agnostic result of a model run."""

    return_code: int
    stdout: str
    stderr: str
    working_dir: Path
    input_file: Path
    output_file: Path | None = None
    duration_seconds: float = 0.0

    @property
    def success(self) -> bool:
        return self.return_code == 0

BackendInfo dataclass

Metadata about a backend for display/debugging.

Source code in src/fp_wraptr/runtime/backend.py
@dataclass
class BackendInfo:
    """Metadata about a backend for display/debugging."""

    name: str
    version: str = ""
    available: bool = False
    details: dict[str, str] = field(default_factory=dict)

fp.exe Backend

fp_wraptr.runtime.fp_exe

Subprocess wrapper for the Fair-Parke executable (fp.exe).

fp.exe is a Windows PE32 binary. On macOS/Linux it requires Wine. The executable reads from stdin (via INPUT FILE= directive) and writes to stdout. Key files it expects in its working directory: - fminput.txt (or specified input file) - fmdata.txt, fmage.txt, fmexog.txt (data files referenced by input)

Environment variable FP_HOME (or config) points to the directory containing fp.exe and data files.

FPExecutable dataclass

Wrapper around the fp.exe subprocess.

Usage

fp = FPExecutable(fp_home=Path("FM")) result = fp.run(input_file=Path("fminput.txt"), work_dir=Path("runs/test1"))

Source code in src/fp_wraptr/runtime/fp_exe.py
@dataclass
class FPExecutable:
    """Wrapper around the fp.exe subprocess.

    Usage:
        fp = FPExecutable(fp_home=Path("FM"))
        result = fp.run(input_file=Path("fminput.txt"), work_dir=Path("runs/test1"))
    """

    fp_home: Path = field(default_factory=lambda: Path(os.environ.get("FP_HOME", "FM")))
    exe_name: str = "fp.exe"
    timeout_seconds: int = 300
    use_wine: bool | None = None  # None = auto-detect
    required_data_files: tuple[str, ...] = ("fmdata.txt", "fmage.txt", "fmexog.txt")

    def __post_init__(self) -> None:
        if self.use_wine is None:
            self.use_wine = platform.system() != "Windows"

    def _project_root(self) -> Path | None:
        """Best-effort project root discovery (for hygiene guardrails).

        We use this to ensure WINEPREFIX is not set inside the repo, which can create
        `.wine/` in the project tree.
        """
        cur = Path(__file__).resolve()
        for parent in (cur, *tuple(cur.parents)):
            if (parent / "pyproject.toml").exists():
                return parent
        return None

    def _validate_wineprefix(self, env: dict[str, str], *, work_dir: Path) -> None:
        if not self.use_wine:
            return

        project_root = self._project_root()
        raw = str(env.get("WINEPREFIX") or "").strip()
        if not raw:
            # Default to a user-home prefix rather than letting Wine choose a cwd-relative prefix.
            raw = str(Path.home() / ".wine-fp-wraptr")
            env["WINEPREFIX"] = raw

        prefix = Path(raw).expanduser()
        if not prefix.is_absolute():
            raise FPExecutableError(
                "Refusing to run Wine with a relative WINEPREFIX (would create `.wine/` "
                f"under the current directory). Set WINEPREFIX to an absolute path outside the repo. "
                f"Got: {raw!r}"
            )

        # Resolve relative symlinks, etc.
        try:
            resolved = prefix.resolve()
        except Exception:
            resolved = prefix

        if project_root is not None:
            try:
                resolved.relative_to(project_root)
            except ValueError:
                pass
            else:
                raise FPExecutableError(
                    "Refusing to run Wine with WINEPREFIX inside the project tree. "
                    f"Got: {resolved}. Set WINEPREFIX to an absolute path outside the repo."
                )

        # Ensure the prefix directory exists (and is user-writable) before launching Wine.
        resolved.mkdir(parents=True, exist_ok=True)
        if not os.access(str(resolved), os.W_OK):
            raise FPExecutableError(f"WINEPREFIX not writable: {resolved}")

        env["WINEPREFIX"] = str(resolved)

    @property
    def exe_path(self) -> Path:
        return self.fp_home / self.exe_name

    def check_available(self) -> bool:
        """Check if fp.exe (and Wine if needed) are available."""
        if not self.exe_path.exists():
            return False
        return not (self.use_wine and not shutil.which("wine"))

    def preflight_report(
        self,
        input_file: Path | None = None,
        work_dir: Path | None = None,
    ) -> dict[str, Any]:
        """Return diagnostic details about fp.exe run prerequisites."""
        resolved_work_dir = work_dir or self.fp_home
        input_name = (input_file or self.fp_home / "fminput.txt").name
        expected_input_path = resolved_work_dir / input_name
        missing_data_files = self._missing_files(resolved_work_dir, self.required_data_files)
        wine_path = shutil.which("wine")
        exe_exists = self.exe_path.exists()
        wine_available = bool(wine_path)
        wine_required = bool(self.use_wine)
        raw_wineprefix = str(os.environ.get("WINEPREFIX") or "").strip()
        if raw_wineprefix:
            wineprefix_path = Path(raw_wineprefix).expanduser()
        else:
            wineprefix_path = Path.home() / ".wine-fp-wraptr"
        try:
            resolved_wineprefix = wineprefix_path.resolve()
        except Exception:
            resolved_wineprefix = wineprefix_path
        wineprefix_exists = resolved_wineprefix.exists()
        wineprefix_initialized = bool(
            wineprefix_exists
            and (resolved_wineprefix / "drive_c").exists()
            and (resolved_wineprefix / "system.reg").exists()
            and (resolved_wineprefix / "user.reg").exists()
        )

        available = (
            exe_exists
            and (not wine_required or wine_available)
            and expected_input_path.exists()
            and not missing_data_files
        )
        return {
            "available": available,
            "fp_home": str(self.fp_home),
            "work_dir": str(resolved_work_dir),
            "exe_path": str(self.exe_path),
            "exe_exists": exe_exists,
            "wine_required": wine_required,
            "wine_path": wine_path or "",
            "wine_available": wine_available,
            "wineprefix": str(resolved_wineprefix),
            "wineprefix_exists": bool(wineprefix_exists),
            "wineprefix_initialized": bool(wineprefix_initialized),
            "input_file_name": input_name,
            "expected_input_path": str(expected_input_path),
            "input_file_exists": expected_input_path.exists(),
            "required_data_files": list(self.required_data_files),
            "missing_data_files": missing_data_files,
        }

    def run(
        self,
        input_file: Path | None = None,
        work_dir: Path | None = None,
        extra_env: dict[str, str] | None = None,
    ) -> FPRunResult:
        """Run fp.exe with the given input file.

        Args:
            input_file: Path to the FP input file. If None, uses fminput.txt in fp_home.
            work_dir: Working directory for the run. If None, uses fp_home.
                      Data files are copied here if work_dir != fp_home.
            extra_env: Additional environment variables.

        Returns:
            FPRunResult with stdout/stderr capture.

        Raises:
            FPExecutableError: If fp.exe is not found or Wine is unavailable.
        """
        import time

        resolved_input = input_file or (self.fp_home / "fminput.txt")
        resolved_work_dir = work_dir or self.fp_home
        preflight = self.preflight_report(input_file=resolved_input, work_dir=resolved_work_dir)

        if not self.exe_path.exists():
            raise FPExecutableError(
                f"fp.exe not found at {self.exe_path}. "
                "Check your fp_home path and that fp.exe exists there.",
                details={"preflight_report": preflight},
            )

        if self.use_wine and not shutil.which("wine"):
            raise FPExecutableError(
                "Wine is required on non-Windows systems to run fp.exe. "
                "Install with `brew install --cask wine-stable`.",
                details={"preflight_report": preflight},
            )

        if not self.check_available():
            raise FPExecutableError(
                f"fp.exe not available at {self.exe_path} "
                f"(Wine needed: {self.use_wine}, Wine found: {bool(shutil.which('wine'))})",
                details={"preflight_report": preflight},
            )

        if input_file is None:
            input_file = self.fp_home / "fminput.txt"

        if work_dir is None:
            work_dir = self.fp_home
        else:
            work_dir.mkdir(parents=True, exist_ok=True)
            self._copy_data_files(work_dir)

        expected_input_path = work_dir / input_file.name
        if not expected_input_path.exists():
            raise FPExecutableError(
                "Input file for fp.exe is missing in work directory: "
                f"{expected_input_path}. Ensure scenario input was copied/patched before run."
            )
        missing_data_files = self._missing_files(work_dir, self.required_data_files)
        if missing_data_files:
            raise FPExecutableError(
                "Missing required FP data files in work directory "
                f"{work_dir}: {', '.join(missing_data_files)}"
            )

        # Build command
        cmd = self._build_command()

        # Prepare stdin: fp.exe expects "INPUT FILE=<filename> ;" on stdin
        stdin_text = f"INPUT FILE={input_file.name} ;\n"

        env = os.environ.copy()
        if extra_env:
            env.update(extra_env)
        if self.use_wine:
            self._validate_wineprefix(env, work_dir=work_dir)

        start = time.monotonic()
        stdout_file = work_dir / "fp-exe.stdout.txt"
        stderr_file = work_dir / "fp-exe.stderr.txt"
        try:
            proc = subprocess.run(
                cmd,
                input=stdin_text,
                capture_output=True,
                text=True,
                cwd=str(work_dir),
                timeout=self.timeout_seconds,
                env=env,
            )
        except subprocess.TimeoutExpired as e:
            duration = time.monotonic() - start
            timed_out_stdout = e.stdout or ""
            timed_out_stderr = e.stderr or ""
            if isinstance(timed_out_stdout, bytes):
                timed_out_stdout = timed_out_stdout.decode("utf-8", errors="replace")
            if isinstance(timed_out_stderr, bytes):
                timed_out_stderr = timed_out_stderr.decode("utf-8", errors="replace")
            stdout_file.write_text(str(timed_out_stdout), encoding="utf-8")
            stderr_file.write_text(str(timed_out_stderr), encoding="utf-8")
            raise FPExecutableError(
                f"fp.exe timed out after {self.timeout_seconds}s",
                details={
                    "timeout_seconds": int(self.timeout_seconds),
                    "termination_reason": "timeout_expired",
                    "work_dir": str(work_dir),
                    "command": cmd,
                    "duration_seconds": float(duration),
                    "stdout_path": str(stdout_file),
                    "stderr_path": str(stderr_file),
                },
            ) from e
        except FileNotFoundError as e:
            raise FPExecutableError(f"Failed to execute: {e}") from e

        duration = time.monotonic() - start

        # Write captured output to file
        output_file = work_dir / "fmout.txt"
        stdout_text = proc.stdout or ""
        stderr_text = proc.stderr or ""
        stdout_file.write_text(stdout_text, encoding="utf-8")
        stderr_file.write_text(stderr_text, encoding="utf-8")
        if stdout_text:
            output_file.write_text(stdout_text, encoding="utf-8")

        return FPRunResult(
            return_code=proc.returncode,
            stdout=stdout_text,
            stderr=stderr_text,
            working_dir=work_dir,
            input_file=input_file,
            output_file=output_file if stdout_text else None,
            duration_seconds=duration,
        )

    def _build_command(self) -> list[str]:
        """Build the subprocess command list."""
        exe = str(self.exe_path.resolve())
        if self.use_wine:
            return ["wine", exe]
        return [exe]

    def _copy_data_files(self, dest: Path) -> None:
        """Copy required data files from fp_home to a working directory."""
        data_files = ["fmdata.txt", "fmage.txt", "fmexog.txt", "fminput.txt"]
        for fname in data_files:
            src = self.fp_home / fname
            if src.exists():
                dst = dest / fname
                if not dst.exists():
                    shutil.copy2(src, dst)

    def _missing_files(self, root: Path, names: Iterable[str]) -> list[str]:
        """Return required filenames that do not exist under root."""
        return [name for name in names if not (root / name).exists()]

check_available()

Check if fp.exe (and Wine if needed) are available.

Source code in src/fp_wraptr/runtime/fp_exe.py
def check_available(self) -> bool:
    """Check if fp.exe (and Wine if needed) are available."""
    if not self.exe_path.exists():
        return False
    return not (self.use_wine and not shutil.which("wine"))

preflight_report(input_file=None, work_dir=None)

Return diagnostic details about fp.exe run prerequisites.

Source code in src/fp_wraptr/runtime/fp_exe.py
def preflight_report(
    self,
    input_file: Path | None = None,
    work_dir: Path | None = None,
) -> dict[str, Any]:
    """Return diagnostic details about fp.exe run prerequisites."""
    resolved_work_dir = work_dir or self.fp_home
    input_name = (input_file or self.fp_home / "fminput.txt").name
    expected_input_path = resolved_work_dir / input_name
    missing_data_files = self._missing_files(resolved_work_dir, self.required_data_files)
    wine_path = shutil.which("wine")
    exe_exists = self.exe_path.exists()
    wine_available = bool(wine_path)
    wine_required = bool(self.use_wine)
    raw_wineprefix = str(os.environ.get("WINEPREFIX") or "").strip()
    if raw_wineprefix:
        wineprefix_path = Path(raw_wineprefix).expanduser()
    else:
        wineprefix_path = Path.home() / ".wine-fp-wraptr"
    try:
        resolved_wineprefix = wineprefix_path.resolve()
    except Exception:
        resolved_wineprefix = wineprefix_path
    wineprefix_exists = resolved_wineprefix.exists()
    wineprefix_initialized = bool(
        wineprefix_exists
        and (resolved_wineprefix / "drive_c").exists()
        and (resolved_wineprefix / "system.reg").exists()
        and (resolved_wineprefix / "user.reg").exists()
    )

    available = (
        exe_exists
        and (not wine_required or wine_available)
        and expected_input_path.exists()
        and not missing_data_files
    )
    return {
        "available": available,
        "fp_home": str(self.fp_home),
        "work_dir": str(resolved_work_dir),
        "exe_path": str(self.exe_path),
        "exe_exists": exe_exists,
        "wine_required": wine_required,
        "wine_path": wine_path or "",
        "wine_available": wine_available,
        "wineprefix": str(resolved_wineprefix),
        "wineprefix_exists": bool(wineprefix_exists),
        "wineprefix_initialized": bool(wineprefix_initialized),
        "input_file_name": input_name,
        "expected_input_path": str(expected_input_path),
        "input_file_exists": expected_input_path.exists(),
        "required_data_files": list(self.required_data_files),
        "missing_data_files": missing_data_files,
    }

run(input_file=None, work_dir=None, extra_env=None)

Run fp.exe with the given input file.

Parameters:

Name Type Description Default
input_file Path | None

Path to the FP input file. If None, uses fminput.txt in fp_home.

None
work_dir Path | None

Working directory for the run. If None, uses fp_home. Data files are copied here if work_dir != fp_home.

None
extra_env dict[str, str] | None

Additional environment variables.

None

Returns:

Type Description
FPRunResult

FPRunResult with stdout/stderr capture.

Raises:

Type Description
FPExecutableError

If fp.exe is not found or Wine is unavailable.

Source code in src/fp_wraptr/runtime/fp_exe.py
def run(
    self,
    input_file: Path | None = None,
    work_dir: Path | None = None,
    extra_env: dict[str, str] | None = None,
) -> FPRunResult:
    """Run fp.exe with the given input file.

    Args:
        input_file: Path to the FP input file. If None, uses fminput.txt in fp_home.
        work_dir: Working directory for the run. If None, uses fp_home.
                  Data files are copied here if work_dir != fp_home.
        extra_env: Additional environment variables.

    Returns:
        FPRunResult with stdout/stderr capture.

    Raises:
        FPExecutableError: If fp.exe is not found or Wine is unavailable.
    """
    import time

    resolved_input = input_file or (self.fp_home / "fminput.txt")
    resolved_work_dir = work_dir or self.fp_home
    preflight = self.preflight_report(input_file=resolved_input, work_dir=resolved_work_dir)

    if not self.exe_path.exists():
        raise FPExecutableError(
            f"fp.exe not found at {self.exe_path}. "
            "Check your fp_home path and that fp.exe exists there.",
            details={"preflight_report": preflight},
        )

    if self.use_wine and not shutil.which("wine"):
        raise FPExecutableError(
            "Wine is required on non-Windows systems to run fp.exe. "
            "Install with `brew install --cask wine-stable`.",
            details={"preflight_report": preflight},
        )

    if not self.check_available():
        raise FPExecutableError(
            f"fp.exe not available at {self.exe_path} "
            f"(Wine needed: {self.use_wine}, Wine found: {bool(shutil.which('wine'))})",
            details={"preflight_report": preflight},
        )

    if input_file is None:
        input_file = self.fp_home / "fminput.txt"

    if work_dir is None:
        work_dir = self.fp_home
    else:
        work_dir.mkdir(parents=True, exist_ok=True)
        self._copy_data_files(work_dir)

    expected_input_path = work_dir / input_file.name
    if not expected_input_path.exists():
        raise FPExecutableError(
            "Input file for fp.exe is missing in work directory: "
            f"{expected_input_path}. Ensure scenario input was copied/patched before run."
        )
    missing_data_files = self._missing_files(work_dir, self.required_data_files)
    if missing_data_files:
        raise FPExecutableError(
            "Missing required FP data files in work directory "
            f"{work_dir}: {', '.join(missing_data_files)}"
        )

    # Build command
    cmd = self._build_command()

    # Prepare stdin: fp.exe expects "INPUT FILE=<filename> ;" on stdin
    stdin_text = f"INPUT FILE={input_file.name} ;\n"

    env = os.environ.copy()
    if extra_env:
        env.update(extra_env)
    if self.use_wine:
        self._validate_wineprefix(env, work_dir=work_dir)

    start = time.monotonic()
    stdout_file = work_dir / "fp-exe.stdout.txt"
    stderr_file = work_dir / "fp-exe.stderr.txt"
    try:
        proc = subprocess.run(
            cmd,
            input=stdin_text,
            capture_output=True,
            text=True,
            cwd=str(work_dir),
            timeout=self.timeout_seconds,
            env=env,
        )
    except subprocess.TimeoutExpired as e:
        duration = time.monotonic() - start
        timed_out_stdout = e.stdout or ""
        timed_out_stderr = e.stderr or ""
        if isinstance(timed_out_stdout, bytes):
            timed_out_stdout = timed_out_stdout.decode("utf-8", errors="replace")
        if isinstance(timed_out_stderr, bytes):
            timed_out_stderr = timed_out_stderr.decode("utf-8", errors="replace")
        stdout_file.write_text(str(timed_out_stdout), encoding="utf-8")
        stderr_file.write_text(str(timed_out_stderr), encoding="utf-8")
        raise FPExecutableError(
            f"fp.exe timed out after {self.timeout_seconds}s",
            details={
                "timeout_seconds": int(self.timeout_seconds),
                "termination_reason": "timeout_expired",
                "work_dir": str(work_dir),
                "command": cmd,
                "duration_seconds": float(duration),
                "stdout_path": str(stdout_file),
                "stderr_path": str(stderr_file),
            },
        ) from e
    except FileNotFoundError as e:
        raise FPExecutableError(f"Failed to execute: {e}") from e

    duration = time.monotonic() - start

    # Write captured output to file
    output_file = work_dir / "fmout.txt"
    stdout_text = proc.stdout or ""
    stderr_text = proc.stderr or ""
    stdout_file.write_text(stdout_text, encoding="utf-8")
    stderr_file.write_text(stderr_text, encoding="utf-8")
    if stdout_text:
        output_file.write_text(stdout_text, encoding="utf-8")

    return FPRunResult(
        return_code=proc.returncode,
        stdout=stdout_text,
        stderr=stderr_text,
        working_dir=work_dir,
        input_file=input_file,
        output_file=output_file if stdout_text else None,
        duration_seconds=duration,
    )

FPRunResult dataclass

Bases: RunResult

Result of an fp.exe invocation.

Source code in src/fp_wraptr/runtime/fp_exe.py
@dataclass
class FPRunResult(RunResult):
    """Result of an fp.exe invocation."""

fair-py Backend (stub)

fp_wraptr.runtime.fairpy

fppy (fp-py) pure-Python backend.

fp-wraptr vendors the fppy package and runs it as a subprocess using:

python -m fppy.cli mini-run ...

This keeps the integration surface stable while fppy continues to evolve. The parity contract for fp-wraptr is PABEV.TXT (PRINTVAR LOADFORMAT output).

FairPyBackend dataclass

Pure-Python model backend wrapping vendored fppy via subprocess.

Source code in src/fp_wraptr/runtime/fairpy.py
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
@dataclass
class FairPyBackend:
    """Pure-Python model backend wrapping vendored fppy via subprocess."""

    fp_home: Path = field(default_factory=lambda: Path("FM"))
    timeout_seconds: int = 600
    heartbeat_seconds: int = 5
    # Default is intentionally "default" here; wraptr's parity runner layers
    # override to "parity" when doing cross-engine comparisons.
    eq_flags_preset: str = "default"
    eq_iter_trace: bool = False
    eq_iter_trace_period: str | None = None
    eq_iter_trace_targets: str | None = None
    eq_iter_trace_max_events: int | None = None
    eq_structural_read_cache: str = "off"
    num_threads: int | None = None

    def check_available(self) -> bool:
        try:
            import fppy  # noqa: F401
        except ImportError:
            return False
        return bool(sys.executable)

    def _require_file(self, path: Path, *, label: str) -> None:
        if not path.exists():
            raise FairPyBackendError(f"Missing required {label}: {path}")

    def _write_model_config(
        self,
        work_dir: Path,
        *,
        fminput_path: Path,
        fmexog_path: Path,
        fmout_path: Path,
    ) -> Path:
        config_path = work_dir / "model-config.toml"
        legacy = {
            "fminput": fminput_path,
            "fmdata": work_dir / "fmdata.txt",
            "fmage": work_dir / "fmage.txt",
            # fppy reads exogenous CHANGEVAR instructions from this path. For scenario
            # runs, wraptr may generate a `fmexog_override.txt`; honor it when present.
            "fmexog": fmexog_path,
            "fmout": fmout_path,
        }
        lines = ["[model]", "", "[model.legacy]"]
        for key, value in legacy.items():
            lines.append(f'{key} = "{Path(value).resolve()!s}"')
        lines.append("")
        config_path.write_text("\n".join(lines), encoding="utf-8")
        return config_path

    def _eq_args(self, *, fmout_coefs: Path) -> list[str]:
        preset = str(self.eq_flags_preset or "").strip().lower()
        if preset in {"", "default", "none", "off"}:
            return []
        if preset not in {"iss02_baseline", "parity"}:
            raise FairPyBackendError(f"Unknown eq_flags_preset: {self.eq_flags_preset!r}")
        if preset == "parity":
            # Parity runs want FP-style solve semantics:
            # - enable EQ backfill
            # - honor SETUPSOLVE iteration controls (MINITERS/MAXITERS/MAXCHECK)
            # - solve per-period (Gauss-Seidel), period-scoped
            #
            # Intentionally do NOT enable "context assignments first iter only";
            # it can hide non-closure in GENR/IDENT chains during solve windows.
            return [
                "--enable-eq",
                "--eq-use-setupsolve",
                "--eq-flags-preset-label",
                "parity",
                "--eq-period-sequential",
                "--eq-period-scoped",
                "on",
                "--eq-coefs-fmout",
                str(fmout_coefs),
            ]
        return [
            "--enable-eq",
            "--eq-use-setupsolve",
            "--eq-flags-preset-label",
            "iss02_baseline",
            "--eq-period-sequential",
            "--eq-period-scoped",
            "on",
            "--eq-period-sequential-context-assignments-first-iter-only",
            "--eq-coefs-fmout",
            str(fmout_coefs),
        ]

    def _eq_iter_trace_args(self) -> list[str]:
        if not bool(self.eq_iter_trace):
            return []
        args = ["--eq-iter-trace"]
        if self.eq_iter_trace_period is not None:
            args.extend(["--eq-iter-trace-period", str(self.eq_iter_trace_period)])
        if self.eq_iter_trace_targets is not None:
            args.extend(["--eq-iter-trace-targets", str(self.eq_iter_trace_targets)])
        if self.eq_iter_trace_max_events is not None:
            args.extend(["--eq-iter-trace-max-events", str(int(self.eq_iter_trace_max_events))])
        return args

    def _eq_structural_read_cache_args(self) -> list[str]:
        mode = str(self.eq_structural_read_cache or "off").strip().lower()
        if mode in {"", "off", "none"}:
            return []
        if mode not in {"numpy_columns"}:
            raise FairPyBackendError(
                f"Unknown eq_structural_read_cache: {self.eq_structural_read_cache!r}"
            )
        return ["--eq-structural-read-cache", mode]

    def _thread_env_overrides(self) -> dict[str, str]:
        raw = self.num_threads
        if raw is None:
            return {}
        threads = int(raw)
        if threads < 1:
            return {}
        value = str(threads)
        return {
            "OMP_NUM_THREADS": value,
            "OPENBLAS_NUM_THREADS": value,
            "MKL_NUM_THREADS": value,
            "NUMEXPR_NUM_THREADS": value,
            "VECLIB_MAXIMUM_THREADS": value,
        }

    def _write_eq_overlay_from_fmout(self, *, fmout_path: Path, out_path: Path) -> Path:
        """Write a standalone FP include file listing all equation numbers from fmout.

        fppy uses EQ records as the *execution deck* that decides which equation
        specs to apply during EQ backfill. fp.exe scenarios can omit EQ statements
        because the model is compiled, but fppy still needs an explicit EQ target
        list to approximate `SOLVE`.
        """
        from fppy.eq_solver import load_eq_specs_from_fmout

        specs = load_eq_specs_from_fmout(fmout_path)
        numbers: list[int] = []
        for spec in specs.values():
            number = getattr(spec, "equation_number", None)
            if isinstance(number, int):
                numbers.append(int(number))
        numbers = sorted(set(numbers))

        header = [
            "@ fp-wraptr: autogenerated EQ target overlay for fppy",
            f"@ source: {fmout_path.resolve()}",
            f"@ equation_count: {len(numbers)}",
            "",
        ]
        lines = header + [f"EQ {num} ;" for num in numbers] + [""]
        out_path.write_text("\n".join(lines), encoding="utf-8")
        return out_path

    def _write_fppy_wrapper_input(
        self,
        *,
        work_dir: Path,
        base_input: Path,
        eq_overlay: Path,
        identity_overlay: Path | None = None,
    ) -> Path:
        wrapper_path = work_dir / "fppy_fminput.txt"
        # Important: fppy post-command replay (PRINTVAR, SETYYTOY, etc.) uses the
        # parsed record list. If we `INPUT FILE=...` here, those statements live
        # in nested decks and are not reliably replayed. Inline both the EQ
        # overlay and the base input deck instead.
        eq_text = eq_overlay.read_text(encoding="utf-8", errors="replace").splitlines()
        base_text = base_input.read_text(encoding="utf-8", errors="replace").splitlines()
        identity_name = identity_overlay.name if identity_overlay is not None else None
        identity_text: list[str] = []
        if identity_overlay is not None:
            identity_text = identity_overlay.read_text(
                encoding="utf-8", errors="replace"
            ).splitlines()

        if identity_name:
            inserted = False
            out_lines: list[str] = []
            last_smpl_line: str | None = None
            for raw in base_text:
                stripped = raw.lstrip()
                if stripped.upper().startswith("SMPL"):
                    last_smpl_line = raw.rstrip("\n")
                if not inserted and raw.lstrip().upper().startswith("SOLVE"):
                    out_lines.extend(identity_text)
                    # The identity overlay may contain its own SMPL statements.
                    # Restore the scenario deck's current SMPL window so the SOLVE
                    # statement executes over the intended range.
                    if last_smpl_line is not None:
                        out_lines.append(last_smpl_line)
                    inserted = True
                out_lines.append(raw.rstrip("\n"))
            if not inserted:
                # Fall back: insert before QUIT/END, or append at the end.
                fallback: list[str] = []
                inserted = False
                last_smpl_line = None
                for raw in out_lines:
                    stripped = raw.lstrip()
                    if stripped.upper().startswith("SMPL"):
                        last_smpl_line = raw.rstrip("\n")
                    if not inserted and raw.lstrip().upper().startswith(("QUIT", "END")):
                        fallback.extend(identity_text)
                        if last_smpl_line is not None:
                            fallback.append(last_smpl_line)
                        inserted = True
                    fallback.append(raw)
                if not inserted:
                    fallback.extend(identity_text)
                    if last_smpl_line is not None:
                        fallback.append(last_smpl_line)
                base_text = fallback
            else:
                base_text = out_lines

        lines = [
            "@ fp-wraptr: fppy wrapper input",
            "@ - inlines EQ records (from fmout) so fppy can run EQ backfill",
            "@ - optionally injects baseline identity/GENR/IDENT definitions before SOLVE",
            "@ - then inlines the original scenario input deck",
            "",
            *eq_text,
            "",
            "@ fp-wraptr: begin base input",
            *base_text,
            "",
        ]
        wrapper_path.write_text("\n".join(lines) + "\n", encoding="utf-8")
        return wrapper_path

    def _collect_assignment_lhs_from_input_tree(
        self, *, work_dir: Path, entry_input: Path
    ) -> set[str]:
        """Collect assignment LHS names from a staged scenario input tree.

        When the parity preset injects baseline identity/GENR statements, we must
        avoid overriding scenario-specific definitions (for example PSE adds JG
        terms to GDP/GDPR). Scan the staged deck(s) in `work_dir` to discover
        which symbols are already defined so we can skip them in the overlay.
        """
        from fp_wraptr.io.input_parser import parse_fp_input_text

        def _clean_filename(token: str) -> str:
            raw = str(token or "").strip().strip("\"'")
            raw = raw.rstrip(";")
            return raw

        def _extract_input_file_from_command(body: str) -> str | None:
            match = _INPUT_FILE_RE.search(body or "")
            if not match:
                return None
            return _clean_filename(match.group("name"))

        def _resolve_case_insensitive(root: Path, relative: str) -> Path:
            rel = Path(relative)
            cur = root
            for part in rel.parts:
                candidate = cur / part
                if candidate.exists():
                    cur = candidate
                    continue
                alt_lower = cur / part.lower()
                if alt_lower.exists():
                    cur = alt_lower
                    continue
                alt_upper = cur / part.upper()
                if alt_upper.exists():
                    cur = alt_upper
                    continue
                want = part.lower()
                found = None
                try:
                    for child in cur.iterdir():
                        if child.name.lower() == want:
                            found = child
                            break
                except OSError:
                    found = None
                if found is None:
                    return root / rel
                cur = found
            return cur

        lhs_names: set[str] = set()
        visited: set[str] = set()
        queue: list[str] = [str(entry_input.name)]

        while queue:
            name = _clean_filename(queue.pop(0))
            if not name:
                continue
            norm = name.lower()
            if norm in visited:
                continue
            visited.add(norm)

            path = _resolve_case_insensitive(work_dir, name)
            if not path.exists():
                # Work dirs should be staged with all includes, but keep this
                # best-effort and avoid hard failures in the backend wrapper.
                continue

            parsed = parse_fp_input_text(path.read_text(encoding="utf-8", errors="replace"))
            for section in ("creates", "generated_vars", "identities", "equation_lhs"):
                for item in parsed.get(section, []) or []:
                    if not isinstance(item, dict):
                        continue
                    lhs = item.get("name")
                    if lhs is None:
                        continue
                    token = str(lhs).strip().upper()
                    if token:
                        lhs_names.add(token)

            for cmd in parsed.get("control_commands", []) or []:
                if not isinstance(cmd, dict):
                    continue
                if str(cmd.get("name", "")).upper() != "INPUT":
                    continue
                include = _extract_input_file_from_command(str(cmd.get("body", "")))
                if include:
                    queue.append(include)

        return lhs_names

    def _write_identity_overlay_from_base_deck(
        self,
        *,
        fp_home: Path,
        out_path: Path,
        exclude_lhs: set[str] | None = None,
    ) -> Path:
        """Extract baseline GENR/IDENT/LHS/CREATE statements from the base fminput deck.

        Many real scenarios (including PSE) rely on fp.exe's compiled identity/equation
        machinery and therefore omit a large number of derived-series GENR/IDENT
        statements from their scenario scripts. fppy needs these statements to exist
        in the input deck to compute the same stored series used by PRINTVAR exports.
        """
        from fppy.expressions import parse_assignment

        src = fp_home / "fminput.txt"
        self._require_file(src, label="fminput.txt in fp_home (identity overlay source)")
        keep_prefixes = ("GENR", "IDENT", "LHS", "CREATE")
        exclude = {
            str(item).strip().upper() for item in (exclude_lhs or set()) if str(item).strip()
        }
        out_lines: list[str] = [
            "@ fp-wraptr: autogenerated identity overlay (from base fminput.txt)",
            f"@ source: {src.resolve()}",
            "",
        ]
        statement_lines: list[str] | None = None
        for raw in src.read_text(encoding="utf-8", errors="replace").splitlines():
            stripped = raw.lstrip()
            upper = stripped.upper()
            if statement_lines is not None:
                statement_lines.append(raw.rstrip("\n"))
                if ";" in raw:
                    keep = True
                    if exclude:
                        try:
                            assignment = parse_assignment("\n".join(statement_lines))
                        except Exception:
                            assignment = None
                        if assignment is not None:
                            lhs_key = str(assignment.lhs).strip().upper()
                            if lhs_key and lhs_key in exclude:
                                keep = False
                    if keep:
                        out_lines.extend(statement_lines)
                    statement_lines = None
                continue
            if upper.startswith("SOLVE"):
                break
            if upper.startswith("@") or not stripped:
                continue
            if upper.startswith("SMPL"):
                # Preserve SMPL window context for extracted CREATE/GENR/IDENT/LHS
                # statements. This is especially important for pre-solve trend-break
                # blocks that intentionally end at the last historical quarter.
                out_lines.append(raw.rstrip("\n"))
                continue
            if not upper.startswith(keep_prefixes):
                continue
            statement_lines = [raw.rstrip("\n")]
            if ";" in raw:
                keep = True
                if exclude:
                    try:
                        assignment = parse_assignment("\n".join(statement_lines))
                    except Exception:
                        assignment = None
                    if assignment is not None:
                        lhs_key = str(assignment.lhs).strip().upper()
                        if lhs_key and lhs_key in exclude:
                            keep = False
                if keep:
                    out_lines.extend(statement_lines)
                statement_lines = None
        if statement_lines:
            keep = True
            if exclude:
                try:
                    assignment = parse_assignment("\n".join(statement_lines))
                except Exception:
                    assignment = None
                if assignment is not None:
                    lhs_key = str(assignment.lhs).strip().upper()
                    if lhs_key and lhs_key in exclude:
                        keep = False
            if keep:
                out_lines.extend(statement_lines)
        out_lines.append("")
        out_path.write_text("\n".join(out_lines), encoding="utf-8")
        return out_path

    def run(
        self,
        input_file: Path | None = None,
        work_dir: Path | None = None,
        extra_env: dict[str, str] | None = None,
    ) -> RunResult:
        if not self.check_available():
            raise FairPyBackendError("fppy is not importable (vendoring/packaging issue).")

        if work_dir is None:
            raise FairPyBackendError("work_dir is required for fair-py backend runs")
        work_dir = Path(work_dir).resolve()
        work_dir.mkdir(parents=True, exist_ok=True)

        expected_input = work_dir / (input_file.name if input_file is not None else "fminput.txt")
        self._require_file(expected_input, label="input file")
        self._require_file(work_dir / "fmdata.txt", label="fmdata.txt")
        self._require_file(work_dir / "fmage.txt", label="fmage.txt")
        fmexog_override = work_dir / "fmexog_override.txt"
        fmexog_path = fmexog_override if fmexog_override.exists() else (work_dir / "fmexog.txt")
        self._require_file(fmexog_path, label="fmexog (or fmexog_override).txt")

        # Coefficient baseline for EQ solve
        fmout_src = self.fp_home / "fmout.txt"
        self._require_file(fmout_src, label="fmout.txt (coefficient baseline) in fp_home")
        fmout_coefs = work_dir / "fmout_coefs.txt"
        shutil.copy2(fmout_src, fmout_coefs)

        # fppy needs explicit `EQ ...;` records present in the input deck to build
        # the equation backfill target list. fp.exe scenarios rely on compiled
        # model state, so wrap fppy with a small include that injects these EQ
        # statements extracted from the coefficient fmout.
        eq_overlay = self._write_eq_overlay_from_fmout(
            fmout_path=fmout_coefs, out_path=work_dir / "fppy_eq_overlay.txt"
        )
        identity_overlay: Path | None = None
        if str(self.eq_flags_preset or "").strip().lower() == "parity":
            # For parity preset, include full pre-first-SOLVE CREATE/GENR/IDENT/LHS
            # statements from baseline fminput so prerequisite derived series are
            # available to fppy (including multiline assignments).
            defined_lhs = self._collect_assignment_lhs_from_input_tree(
                work_dir=work_dir,
                entry_input=expected_input,
            )
            identity_overlay = self._write_identity_overlay_from_base_deck(
                fp_home=self.fp_home,
                out_path=work_dir / "fppy_identity_overlay.txt",
                exclude_lhs=defined_lhs,
            )
        wrapper_input = self._write_fppy_wrapper_input(
            work_dir=work_dir,
            base_input=expected_input,
            eq_overlay=eq_overlay,
            identity_overlay=identity_overlay,
        )

        config_path = self._write_model_config(
            work_dir,
            fminput_path=wrapper_input,
            fmexog_path=fmexog_path,
            fmout_path=fmout_coefs,
        )

        cmd = [
            sys.executable,
            "-m",
            "fppy.cli",
            "mini-run",
            "--config",
            str(config_path),
            "--on-error",
            "continue",
            *self._eq_args(fmout_coefs=fmout_coefs),
            *self._eq_structural_read_cache_args(),
            *self._eq_iter_trace_args(),
            "--report-json",
            str(work_dir / "fppy_report.json"),
        ]

        thread_env = self._thread_env_overrides()
        env = os.environ.copy()
        if extra_env:
            env.update(extra_env)
        env.update(thread_env)
        # Never write bytecode into the repo (OneDrive path).
        env.setdefault("PYTHONDONTWRITEBYTECODE", "1")
        env.setdefault("PYTHONPYCACHEPREFIX", "/tmp/fp-wraptr-pycache")

        stdout_path = work_dir / "fppy.stdout.txt"
        stderr_path = work_dir / "fppy.stderr.txt"
        runtime_path = work_dir / "fppy.runtime.json"

        def _write_runtime(status: str, **extra: object) -> None:
            payload: dict[str, object] = {
                "status": str(status),
                "pid": int(extra.pop("pid", 0) or 0),
                "elapsed_seconds": float(extra.pop("elapsed_seconds", 0.0) or 0.0),
                "timeout_seconds": int(self.timeout_seconds),
                "heartbeat_seconds": int(max(1, int(self.heartbeat_seconds))),
                "updated_unix": float(time.time()),
                "num_threads_requested": (
                    int(self.num_threads)
                    if self.num_threads is not None and int(self.num_threads) > 0
                    else None
                ),
                "thread_env_overrides": dict(thread_env),
            }
            payload.update(extra)
            runtime_path.write_text(json.dumps(payload, indent=2) + "\n", encoding="utf-8")

        started = time.monotonic()
        heartbeat_seconds = max(1, int(self.heartbeat_seconds))
        deadline = started + float(self.timeout_seconds)
        with (
            stdout_path.open("w", encoding="utf-8") as stdout_file,
            stderr_path.open("w", encoding="utf-8") as stderr_file,
        ):
            proc = subprocess.Popen(
                cmd,
                cwd=str(work_dir),
                stdout=stdout_file,
                stderr=stderr_file,
                text=True,
                env=env,
            )
            _write_runtime("running", pid=proc.pid, elapsed_seconds=0.0)
            next_heartbeat = started + float(heartbeat_seconds)
            while True:
                return_code = proc.poll()
                now = time.monotonic()
                elapsed = now - started
                if return_code is not None:
                    break
                if now >= deadline:
                    proc.kill()
                    try:
                        proc.wait(timeout=5)
                    except subprocess.TimeoutExpired:
                        proc.terminate()
                    _write_runtime(
                        "timeout",
                        pid=proc.pid,
                        elapsed_seconds=elapsed,
                    )
                    raise FairPyBackendError(
                        f"fppy mini-run timed out after {self.timeout_seconds}s"
                    )
                if now >= next_heartbeat:
                    _write_runtime(
                        "running",
                        pid=proc.pid,
                        elapsed_seconds=elapsed,
                    )
                    next_heartbeat = now + float(heartbeat_seconds)
                time.sleep(0.25)

        duration_seconds = time.monotonic() - started
        stdout_text = stdout_path.read_text(encoding="utf-8", errors="replace")
        stderr_text = stderr_path.read_text(encoding="utf-8", errors="replace")
        _write_runtime(
            "completed",
            pid=proc.pid,
            elapsed_seconds=duration_seconds,
            return_code=int(proc.returncode),
        )

        # Prefer PABEV parity contract output if present; fall back to fmout.
        output_file = None
        for candidate in ("PABEV.TXT", "pabev.txt", "fmout.txt"):
            path = work_dir / candidate
            if path.exists():
                output_file = path
                break

        return RunResult(
            return_code=int(proc.returncode),
            stdout=stdout_text,
            stderr=stderr_text,
            working_dir=work_dir,
            input_file=expected_input,
            output_file=output_file,
            duration_seconds=float(duration_seconds),
        )

    def info(self) -> BackendInfo:
        """Return metadata about this backend."""
        available = self.check_available()
        version = ""
        if available:
            try:
                import fppy

                version = getattr(fppy, "__version__", "unknown")
            except ImportError:
                pass
        return BackendInfo(
            name="fair-py",
            version=version,
            available=available,
            details={"fp_home": str(self.fp_home)},
        )

info()

Return metadata about this backend.

Source code in src/fp_wraptr/runtime/fairpy.py
def info(self) -> BackendInfo:
    """Return metadata about this backend."""
    available = self.check_available()
    version = ""
    if available:
        try:
            import fppy

            version = getattr(fppy, "__version__", "unknown")
        except ImportError:
            pass
    return BackendInfo(
        name="fair-py",
        version=version,
        available=available,
        details={"fp_home": str(self.fp_home)},
    )