Skip to content

Index

rules

Rules subpackage — modular project rule implementations.

BareExceptRule dataclass

Bases: ProjectRule

Detect bare except clauses (except: without type).

Source code in packages/axm-audit/src/axm_audit/core/rules/practices/bare_except.py
Python
@dataclass
@register_rule("practices")
class BareExceptRule(ProjectRule):
    """Detect bare except clauses (except: without type)."""

    @property
    def rule_id(self) -> str:
        """Unique identifier for this rule."""
        return "PRACTICE_BARE_EXCEPT"

    def check(self, project_path: Path) -> CheckResult:
        """Check for bare except clauses in the project."""
        early = self.check_src(project_path)
        if early is not None:
            return early

        src_path = project_path / "src"
        bare_excepts = self._collect_bare_excepts(src_path)

        count = len(bare_excepts)
        passed = count == 0
        score = max(0, 100 - count * 20)

        text_lines = [
            f"     • {_short_path(str(loc['file']))}:{loc['line']}"
            for loc in bare_excepts
        ]

        return CheckResult(
            rule_id=self.rule_id,
            passed=passed,
            message=f"{count} bare except(s) found",
            severity=Severity.WARNING if not passed else Severity.INFO,
            score=int(score),
            details={
                "bare_except_count": count,
                "locations": bare_excepts,
            },
            text="\n".join(text_lines) if text_lines else None,
            fix_hint="Use specific exception types (e.g., except ValueError:)"
            if not passed
            else None,
        )

    def _collect_bare_excepts(
        self,
        src_path: Path,
    ) -> list[dict[str, str | int]]:
        """Parse every ``.py`` file under *src_path* and gather bare excepts."""
        bare_excepts: list[dict[str, str | int]] = []
        for path in get_python_files(src_path):
            cache = get_ast_cache()
            tree = cache.get_or_parse(path) if cache else parse_file_safe(path)
            if tree is None:
                continue
            self._find_bare_excepts(tree, path, src_path, bare_excepts)
        return bare_excepts

    def _find_bare_excepts(
        self,
        tree: ast.Module,
        path: Path,
        src_path: Path,
        bare_excepts: list[dict[str, str | int]],
    ) -> None:
        """Find bare except clauses in a syntax tree."""
        for node in ast.walk(tree):
            if isinstance(node, ast.ExceptHandler):
                if node.type is None:
                    bare_excepts.append(
                        {
                            "file": str(path.relative_to(src_path)),
                            "line": node.lineno,
                        }
                    )
rule_id property

Unique identifier for this rule.

check(project_path)

Check for bare except clauses in the project.

Source code in packages/axm-audit/src/axm_audit/core/rules/practices/bare_except.py
Python
def check(self, project_path: Path) -> CheckResult:
    """Check for bare except clauses in the project."""
    early = self.check_src(project_path)
    if early is not None:
        return early

    src_path = project_path / "src"
    bare_excepts = self._collect_bare_excepts(src_path)

    count = len(bare_excepts)
    passed = count == 0
    score = max(0, 100 - count * 20)

    text_lines = [
        f"     • {_short_path(str(loc['file']))}:{loc['line']}"
        for loc in bare_excepts
    ]

    return CheckResult(
        rule_id=self.rule_id,
        passed=passed,
        message=f"{count} bare except(s) found",
        severity=Severity.WARNING if not passed else Severity.INFO,
        score=int(score),
        details={
            "bare_except_count": count,
            "locations": bare_excepts,
        },
        text="\n".join(text_lines) if text_lines else None,
        fix_hint="Use specific exception types (e.g., except ValueError:)"
        if not passed
        else None,
    )

BlockingIORule dataclass

Bases: ProjectRule

Detect blocking I/O anti-patterns.

Source code in packages/axm-audit/src/axm_audit/core/rules/practices/blocking_io.py
Python
@dataclass
@register_rule("practices")
class BlockingIORule(ProjectRule):
    """Detect blocking I/O anti-patterns."""

    @property
    def rule_id(self) -> str:
        """Unique identifier for this rule."""
        return "PRACTICE_BLOCKING_IO"

    def check(self, project_path: Path) -> CheckResult:
        """Check for blocking I/O patterns in the project."""
        early = self.check_src(project_path)
        if early is not None:
            return early

        src_path = project_path / "src"

        violations: list[dict[str, str | int]] = []

        for path in get_python_files(src_path):
            cache = get_ast_cache()
            tree = cache.get_or_parse(path) if cache else parse_file_safe(path)
            if tree is None:
                continue
            rel = str(path.relative_to(src_path))
            self._check_async_sleep(tree, rel, violations)
            self._check_http_no_timeout(tree, rel, violations)

        count = len(violations)
        passed = count == 0
        score = max(0, 100 - count * 15)

        text_lines = [f"• {v['file']}:{v['line']}: {v['issue']}" for v in violations]

        return CheckResult(
            rule_id=self.rule_id,
            passed=passed,
            message=f"{count} blocking-IO violation(s) found",
            severity=Severity.WARNING if not passed else Severity.INFO,
            score=int(score),
            details={"violations": violations},
            text="\n".join(text_lines) if text_lines else None,
            fix_hint=(
                "Use asyncio.sleep() instead of time.sleep() in async context; "
                "add timeout= to HTTP calls"
            )
            if not passed
            else None,
        )

    @staticmethod
    def _check_async_sleep(
        tree: ast.Module,
        rel: str,
        violations: list[dict[str, str | int]],
    ) -> None:
        """Find ``time.sleep()`` inside ``async def`` bodies."""
        for node in ast.walk(tree):
            if not isinstance(node, ast.AsyncFunctionDef):
                continue
            for child in ast.walk(node):
                if (
                    isinstance(child, ast.Call)
                    and isinstance(child.func, ast.Attribute)
                    and child.func.attr == "sleep"
                    and isinstance(child.func.value, ast.Name)
                    and child.func.value.id == "time"
                ):
                    violations.append(
                        {
                            "file": rel,
                            "line": child.lineno,
                            "issue": "time.sleep in async",
                        }
                    )

    @staticmethod
    def _check_http_no_timeout(
        tree: ast.Module,
        rel: str,
        violations: list[dict[str, str | int]],
    ) -> None:
        """Find HTTP calls without ``timeout=`` keyword argument."""
        for node in ast.walk(tree):
            if not (
                isinstance(node, ast.Call)
                and isinstance(node.func, ast.Attribute)
                and node.func.attr in _HTTP_METHODS
            ):
                continue

            if not _is_http_call(node.func.value):
                continue

            has_timeout = any(kw.arg == "timeout" for kw in node.keywords)
            if not has_timeout:
                violations.append(
                    {
                        "file": rel,
                        "line": node.lineno,
                        "issue": "HTTP call without timeout",
                    }
                )
rule_id property

Unique identifier for this rule.

check(project_path)

Check for blocking I/O patterns in the project.

Source code in packages/axm-audit/src/axm_audit/core/rules/practices/blocking_io.py
Python
def check(self, project_path: Path) -> CheckResult:
    """Check for blocking I/O patterns in the project."""
    early = self.check_src(project_path)
    if early is not None:
        return early

    src_path = project_path / "src"

    violations: list[dict[str, str | int]] = []

    for path in get_python_files(src_path):
        cache = get_ast_cache()
        tree = cache.get_or_parse(path) if cache else parse_file_safe(path)
        if tree is None:
            continue
        rel = str(path.relative_to(src_path))
        self._check_async_sleep(tree, rel, violations)
        self._check_http_no_timeout(tree, rel, violations)

    count = len(violations)
    passed = count == 0
    score = max(0, 100 - count * 15)

    text_lines = [f"• {v['file']}:{v['line']}: {v['issue']}" for v in violations]

    return CheckResult(
        rule_id=self.rule_id,
        passed=passed,
        message=f"{count} blocking-IO violation(s) found",
        severity=Severity.WARNING if not passed else Severity.INFO,
        score=int(score),
        details={"violations": violations},
        text="\n".join(text_lines) if text_lines else None,
        fix_hint=(
            "Use asyncio.sleep() instead of time.sleep() in async context; "
            "add timeout= to HTTP calls"
        )
        if not passed
        else None,
    )

CircularImportRule dataclass

Bases: ProjectRule

Detect circular imports via import graph + Tarjan's SCC algorithm.

Source code in packages/axm-audit/src/axm_audit/core/rules/architecture/__init__.py
Python
@dataclass
@register_rule("architecture")
class CircularImportRule(ProjectRule):
    """Detect circular imports via import graph + Tarjan's SCC algorithm."""

    @property
    def rule_id(self) -> str:
        """Unique identifier for this rule."""
        return "ARCH_CIRCULAR"

    def check(self, project_path: Path) -> CheckResult:
        """Check for circular imports in the project."""
        early = self.check_src(project_path)
        if early is not None:
            return early

        src_path = project_path / "src"
        cycles, score = self._analyze_cycles(src_path)
        passed = len(cycles) == 0

        text_lines = [f"     \u2022 {' \u2192 '.join(strip_prefix(c))}" for c in cycles]

        return CheckResult(
            rule_id=self.rule_id,
            passed=passed,
            message=f"{len(cycles)} circular import(s) found",
            severity=Severity.ERROR if not passed else Severity.INFO,
            score=int(score),
            details={"cycles": cycles},
            text="\n".join(text_lines) if text_lines else None,
            fix_hint="Break cycles by using lazy imports or restructuring"
            if cycles
            else None,
        )

    def _analyze_cycles(self, src_path: Path) -> tuple[list[list[str]], int]:
        """Build import graph and detect cycles."""
        graph = self._build_import_graph(src_path)
        cycles = tarjan_scc(graph)
        score = max(0, 100 - len(cycles) * 20)
        return cycles, score

    def _build_import_graph(self, src_path: Path) -> dict[str, set[str]]:
        """Build the module import graph from source files."""
        graph: dict[str, set[str]] = defaultdict(set)

        for path in get_python_files(src_path):
            if path.name == "__init__.py":
                continue
            cache = get_ast_cache()
            tree = cache.get_or_parse(path) if cache else parse_file_safe(path)
            if tree is None:
                continue
            module_name = _get_module_name(path, src_path)
            if not module_name:
                continue
            for imp in extract_imports(tree):
                graph[module_name].add(imp)
            if module_name not in graph:
                graph[module_name] = set()

        return dict(graph)
rule_id property

Unique identifier for this rule.

check(project_path)

Check for circular imports in the project.

Source code in packages/axm-audit/src/axm_audit/core/rules/architecture/__init__.py
Python
def check(self, project_path: Path) -> CheckResult:
    """Check for circular imports in the project."""
    early = self.check_src(project_path)
    if early is not None:
        return early

    src_path = project_path / "src"
    cycles, score = self._analyze_cycles(src_path)
    passed = len(cycles) == 0

    text_lines = [f"     \u2022 {' \u2192 '.join(strip_prefix(c))}" for c in cycles]

    return CheckResult(
        rule_id=self.rule_id,
        passed=passed,
        message=f"{len(cycles)} circular import(s) found",
        severity=Severity.ERROR if not passed else Severity.INFO,
        score=int(score),
        details={"cycles": cycles},
        text="\n".join(text_lines) if text_lines else None,
        fix_hint="Break cycles by using lazy imports or restructuring"
        if cycles
        else None,
    )

ComplexityRule dataclass

Bases: ProjectRule

Analyse complexity via radon (CC) and complexipy (Cognitive).

Double constraint: a function is flagged if either radon grade is C+ (CC >= 11, aligned with ruff C901) or cognitive complexity > 15 (SonarSource convention: strictly higher than 15, aligned with complexipy). A function exceeding both thresholds counts as one violation (no double penalty) but is reported with reason='cc+cog'. Falls back gracefully to CC-only mode when complexipy is unavailable.

Source code in packages/axm-audit/src/axm_audit/core/rules/complexity.py
Python
@dataclass
@register_rule("complexity")
class ComplexityRule(ProjectRule):
    """Analyse complexity via radon (CC) and complexipy (Cognitive).

    Double constraint: a function is flagged if either radon grade is
    C+ (CC >= 11, aligned with ruff C901) or cognitive complexity > 15
    (SonarSource convention: strictly higher than 15, aligned with
    complexipy). A function exceeding both thresholds counts
    as one violation (no double penalty) but is reported with
    ``reason='cc+cog'``. Falls back gracefully to CC-only mode when
    complexipy is unavailable.
    """

    @property
    def rule_id(self) -> str:
        """Unique identifier for this rule."""
        return "QUALITY_COMPLEXITY"

    def check(self, project_path: Path) -> CheckResult:
        """Check project complexity with radon + complexipy."""
        early = self.check_src(project_path)
        if early is not None:
            return early

        src_path = project_path / "src"
        cog_map, cog_disabled = _compute_cognitive_map(src_path)

        radon_api = _try_import_radon()
        if radon_api is not None:
            cc_visit, cc_rank = radon_api
            return self._check_via_api(
                src_path, cc_visit, cc_rank, cog_map, cog_disabled
            )

        return self._check_via_subprocess(src_path, cog_map, cog_disabled)

    # ------------------------------------------------------------------
    # Private helpers
    # ------------------------------------------------------------------

    def _check_via_api(
        self,
        src_path: Path,
        cc_visit: Callable[[str], list[object]],
        cc_rank: Callable[[int], str],
        cog_map: dict[tuple[str, str], int],
        cog_disabled: bool,
    ) -> CheckResult:
        """Analyse complexity using the radon Python API."""
        offenders: list[dict[str, str | int]] = []
        for py_file in src_path.rglob("*.py"):
            try:
                source = py_file.read_text(encoding="utf-8")
                blocks = cc_visit(source)
            except (SyntaxError, UnicodeDecodeError):
                continue
            rel = _relative_key(str(py_file), src_path)
            for block in blocks:
                if not hasattr(block, "complexity"):
                    continue
                cc = int(getattr(block, "complexity", 0))
                rank = cc_rank(cc)
                classname = getattr(block, "classname", "") or ""
                block_name = getattr(block, "name", "") or ""
                name = f"{classname}.{block_name}" if classname else block_name
                cognitive = cog_map.get((rel, name), 0)
                offender = _classify(rel, name, cc, rank, cognitive)
                if offender is not None:
                    offenders.append(offender)
        return self._build_result(offenders, cog_disabled)

    def _check_via_subprocess(
        self,
        src_path: Path,
        cog_map: dict[tuple[str, str], int],
        cog_disabled: bool,
    ) -> CheckResult:
        """Analyse complexity by shelling out to ``radon cc --json``."""
        radon_bin = shutil.which("radon")
        if radon_bin is None:
            return CheckResult(
                rule_id=self.rule_id,
                passed=False,
                message=("radon not found — complexity analysis skipped"),
                severity=Severity.ERROR,
                score=0,
                fix_hint=(
                    "Run 'uv sync' at workspace root or "
                    "'uv pip install axm-audit' to make radon available"
                ),
            )

        try:
            with tempfile.TemporaryDirectory() as _cwd:
                proc = subprocess.run(  # noqa: S603
                    [radon_bin, "cc", "--json", str(src_path)],
                    capture_output=True,
                    text=True,
                    check=False,
                    cwd=_cwd,
                )
            data: dict[str, list[dict[str, object]]] = (
                json.loads(proc.stdout) if proc.stdout.strip() else {}
            )
        except (json.JSONDecodeError, OSError) as exc:
            logger.warning(
                "radon cc --json failed: %s",
                exc,
                exc_info=True,
            )
            return CheckResult(
                rule_id=self.rule_id,
                passed=False,
                message="radon cc --json failed",
                severity=Severity.ERROR,
                score=0,
                fix_hint="Check radon installation",
            )

        return self._process_radon_output(data, cog_map, cog_disabled, src_path)

    def _process_radon_output(
        self,
        data: dict[str, list[dict[str, object]]],
        cog_map: dict[tuple[str, str], int] | None = None,
        cog_disabled: bool = False,
        src_path: Path | None = None,
    ) -> CheckResult:
        """Process JSON output from radon cc."""
        if cog_map is None:
            cog_map = {}
        offenders: list[dict[str, str | int]] = []
        for file_path, blocks in data.items():
            file_key = _relative_key(file_path, src_path)
            for block in blocks:
                if not isinstance(block, dict):
                    continue
                rank = str(block.get("rank", ""))
                raw_cc = block.get("complexity", 0)
                cc = int(raw_cc) if isinstance(raw_cc, int | float | str) else 0
                raw_name = str(block.get("name", ""))
                classname = str(block.get("classname", ""))
                name = f"{classname}.{raw_name}" if classname else raw_name
                cognitive = cog_map.get((file_key, name), 0)
                offender = _classify(file_key, name, cc, rank, cognitive)
                if offender is not None:
                    offenders.append(offender)
        return self._build_result(offenders, cog_disabled)

    def _build_result(
        self,
        offenders: list[dict[str, str | int]],
        cog_disabled: bool = False,
    ) -> CheckResult:
        """Build the final ``CheckResult`` from computed metrics."""
        top_offenders = sorted(
            offenders,
            key=lambda x: max(int(x["cc"]), int(x.get("cognitive", 0))),
            reverse=True,
        )
        high_complexity_count = len(top_offenders)
        score = max(0, 100 - high_complexity_count * 10)
        passed = score >= PASS_THRESHOLD

        text_lines = [
            (
                f"• {o['file']}:{o['function']} "
                f"cc={o['cc']} ({o['rank']}) cog={o['cognitive']} [{o['reason']}]"
            )
            for o in top_offenders
        ]

        message = (
            f"Complexity score: {score}/100 "
            f"({high_complexity_count} high-complexity functions)"
        )
        if cog_disabled:
            message += " — cognitive layer disabled (complexipy unavailable)"

        return CheckResult(
            rule_id=self.rule_id,
            passed=passed,
            message=message,
            severity=(
                Severity.WARNING if not passed or cog_disabled else Severity.INFO
            ),
            score=int(score),
            details={
                "high_complexity_count": high_complexity_count,
                "top_offenders": top_offenders,
                "cognitive_disabled": cog_disabled,
            },
            text="\n".join(text_lines) if text_lines else None,
            fix_hint=(
                "Refactor complex functions into smaller units"
                if high_complexity_count > 0
                else None
            ),
        )
rule_id property

Unique identifier for this rule.

check(project_path)

Check project complexity with radon + complexipy.

Source code in packages/axm-audit/src/axm_audit/core/rules/complexity.py
Python
def check(self, project_path: Path) -> CheckResult:
    """Check project complexity with radon + complexipy."""
    early = self.check_src(project_path)
    if early is not None:
        return early

    src_path = project_path / "src"
    cog_map, cog_disabled = _compute_cognitive_map(src_path)

    radon_api = _try_import_radon()
    if radon_api is not None:
        cc_visit, cc_rank = radon_api
        return self._check_via_api(
            src_path, cc_visit, cc_rank, cog_map, cog_disabled
        )

    return self._check_via_subprocess(src_path, cog_map, cog_disabled)

CouplingMetricRule dataclass

Bases: ProjectRule

Measure module coupling via fan-in/fan-out analysis.

Scores based on the number of modules whose fan-out exceeds the threshold: score = 100 - N(over) * 5.

Source code in packages/axm-audit/src/axm_audit/core/rules/architecture/__init__.py
Python
@dataclass
@register_rule("architecture")
class CouplingMetricRule(ProjectRule):
    """Measure module coupling via fan-in/fan-out analysis.

    Scores based on the number of modules whose fan-out exceeds
    the threshold: ``score = 100 - N(over) * 5``.
    """

    fan_out_threshold: int = 10

    @property
    def rule_id(self) -> str:
        """Unique identifier for this rule."""
        return "ARCH_COUPLING"

    def check(self, project_path: Path) -> CheckResult:
        """Check coupling metrics for the project.

        Scans ``src/`` for import fan-out per module and compares each
        against its effective threshold (base + orchestrator bonus +
        per-module overrides).

        Returns a :class:`CheckResult` with:

        * ``text`` — one line per violation formatted as
          ``• {leaf_module} fo:{fan_out}/{threshold} {⚠|✘}``
          (``None`` when all modules pass).
        * ``details`` — full ``over_threshold`` list with FQN, fan-out,
          role, effective threshold, and severity.
        * ``fix_hint`` — human-readable remediation listing.
        """
        early = self.check_src(project_path)
        if early is not None:
            return early

        metrics = self._collect_metrics(project_path)
        n_over, over, avg = _extract_typed_coupling_fields(metrics)

        n_warnings, n_errors, severity = _resolve_coupling_severity(over)
        penalty = n_warnings * 3 + n_errors * 5
        score = max(0, 100 - penalty)
        msg = _coupling_message(n_over, penalty, metrics["max_fan_out"])

        return CheckResult(
            rule_id=self.rule_id,
            passed=n_errors == 0,
            message=msg,
            severity=severity,
            score=int(score),
            details={
                "max_fan_out": metrics["max_fan_out"],
                "max_fan_in": metrics["max_fan_in"],
                "avg_coupling": round(avg, 2),
                "n_over_threshold": n_over,
                "over_threshold": over,
            },
            text=_format_coupling_text(over),
            fix_hint=_format_coupling_hint(over),
        )

    def _collect_metrics(self, project_path: Path) -> dict[str, object]:
        """Read config and compute coupling metrics for ``project_path``."""
        threshold, overrides, orchestrator_bonus, multiplier = read_coupling_config(
            project_path
        )
        return _compute_coupling_metrics(
            project_path / "src",
            threshold,
            overrides,
            orchestrator_bonus,
            severity_error_multiplier=multiplier,
        )
rule_id property

Unique identifier for this rule.

check(project_path)

Check coupling metrics for the project.

Scans src/ for import fan-out per module and compares each against its effective threshold (base + orchestrator bonus + per-module overrides).

Returns a :class:CheckResult with:

  • text — one line per violation formatted as • {leaf_module} fo:{fan_out}/{threshold} {⚠|✘} (None when all modules pass).
  • details — full over_threshold list with FQN, fan-out, role, effective threshold, and severity.
  • fix_hint — human-readable remediation listing.
Source code in packages/axm-audit/src/axm_audit/core/rules/architecture/__init__.py
Python
def check(self, project_path: Path) -> CheckResult:
    """Check coupling metrics for the project.

    Scans ``src/`` for import fan-out per module and compares each
    against its effective threshold (base + orchestrator bonus +
    per-module overrides).

    Returns a :class:`CheckResult` with:

    * ``text`` — one line per violation formatted as
      ``• {leaf_module} fo:{fan_out}/{threshold} {⚠|✘}``
      (``None`` when all modules pass).
    * ``details`` — full ``over_threshold`` list with FQN, fan-out,
      role, effective threshold, and severity.
    * ``fix_hint`` — human-readable remediation listing.
    """
    early = self.check_src(project_path)
    if early is not None:
        return early

    metrics = self._collect_metrics(project_path)
    n_over, over, avg = _extract_typed_coupling_fields(metrics)

    n_warnings, n_errors, severity = _resolve_coupling_severity(over)
    penalty = n_warnings * 3 + n_errors * 5
    score = max(0, 100 - penalty)
    msg = _coupling_message(n_over, penalty, metrics["max_fan_out"])

    return CheckResult(
        rule_id=self.rule_id,
        passed=n_errors == 0,
        message=msg,
        severity=severity,
        score=int(score),
        details={
            "max_fan_out": metrics["max_fan_out"],
            "max_fan_in": metrics["max_fan_in"],
            "avg_coupling": round(avg, 2),
            "n_over_threshold": n_over,
            "over_threshold": over,
        },
        text=_format_coupling_text(over),
        fix_hint=_format_coupling_hint(over),
    )

DeadCodeRule

Bases: ProjectRule

Detect dead (unreferenced) code using axm-ast.

Gracefully skips if axm-ast is not available in the environment.

Scoring: 100 - (dead_symbols_count * 5), min 0.

Source code in packages/axm-audit/src/axm_audit/core/rules/dead_code.py
Python
@register_rule("lint")
class DeadCodeRule(ProjectRule):
    """Detect dead (unreferenced) code using axm-ast.

    Gracefully skips if axm-ast is not available in the environment.

    Scoring: 100 - (dead_symbols_count * 5), min 0.
    """

    @property
    def rule_id(self) -> str:
        """Unique identifier for this rule."""
        return "QUALITY_DEAD_CODE"

    def _skip(self, reason: str) -> CheckResult:
        """Return graceful skip result."""
        return CheckResult(
            rule_id=self.rule_id,
            passed=True,  # Passing so it doesn't fail the build
            message=f"Skipped: {reason}",
            severity=Severity.INFO,
            score=100,
            details={"skipped": True, "reason": reason},
        )

    def check(self, project_path: Path) -> CheckResult:
        """Check for dead code using axm-ast dead-code via subprocess."""
        availability = self._check_availability(project_path)
        if availability is not None:
            return availability

        result = self._run_analysis(project_path)
        if isinstance(result, CheckResult):
            return result

        dead_symbols = self._parse_dead_symbols(result)
        if dead_symbols is None:
            return CheckResult(
                rule_id=self.rule_id,
                passed=False,
                message="Failed to parse axm-ast output",
                severity=Severity.ERROR,
                score=0,
                details={
                    "stdout": result.stdout,
                    "stderr": result.stderr,
                },
            )
        return self._build_result(dead_symbols)

    def _check_availability(self, project_path: Path) -> CheckResult | None:
        """Return a skip result if axm-ast is not on the PATH, else None."""
        if shutil.which("axm-ast") is None:
            return self._skip("axm-ast is not available in the environment")
        return None

    def _run_analysis(
        self,
        project_path: Path,
    ) -> subprocess.CompletedProcess[str] | CheckResult:
        """Run axm-ast dead-code directly (resolved from PATH, not target venv)."""
        try:
            return subprocess.run(  # noqa: S603
                ["axm-ast", "dead-code", str(project_path), "--json"],  # noqa: S607
                capture_output=True,
                text=True,
                check=False,
                timeout=300,
            )
        except (FileNotFoundError, subprocess.TimeoutExpired):
            return CheckResult(
                rule_id=self.rule_id,
                passed=False,
                message="Failed to execute axm-ast",
                severity=Severity.ERROR,
                score=0,
                fix_hint="Ensure axm-ast is installed in the audit environment",
            )

    def _parse_dead_symbols(
        self,
        result: subprocess.CompletedProcess[str],
    ) -> list[dict[str, str]] | None:
        """Parse JSON output from axm-ast, returning the dead symbols list.

        Returns ``None`` when the output is not valid JSON.
        """
        try:
            out = result.stdout or "[]"
            data = json.loads(out)
        except json.JSONDecodeError:
            return None

        if isinstance(data, dict):
            symbols: list[dict[str, str]] = data.get("dead_symbols", [])
            return symbols
        return data if isinstance(data, list) else []

    def _build_result(self, dead_symbols: list[dict[str, str]]) -> CheckResult:
        """Build a CheckResult from the dead symbols list."""
        dead_count = len(dead_symbols)
        score = max(0.0, 100.0 - (dead_count * 5.0))
        passed = dead_count == 0

        message = (
            "No dead code detected."
            if passed
            else f"Found {dead_count} dead (unreferenced) symbol(s)."
        )

        details: dict[str, object] = {
            "dead_count": dead_count,
            "symbols": dead_symbols,
        }

        if dead_symbols:
            details["top_offenders"] = dead_symbols[:_MAX_TOP_OFFENDERS]

        text_lines: list[str] = []
        for sym in dead_symbols[:_MAX_TOP_OFFENDERS]:
            path = sym.get("file", sym.get("module_path", ""))
            if path.startswith("src/"):
                path = path[4:]
            text_lines.append(f"\u2022 {sym['name']} {path}:{sym.get('line', '')}")
        if dead_count > _MAX_TOP_OFFENDERS:
            text_lines.append(f"\u2022 +{dead_count - _MAX_TOP_OFFENDERS} more")

        return CheckResult(
            rule_id=self.rule_id,
            passed=passed,
            message=message,
            severity=Severity.WARNING if dead_count > 0 else Severity.INFO,
            score=int(score),
            details=details,
            fix_hint="Remove dead code or mark exported in __all__ if public API",
            text="\n".join(text_lines) if text_lines else None,
        )
rule_id property

Unique identifier for this rule.

check(project_path)

Check for dead code using axm-ast dead-code via subprocess.

Source code in packages/axm-audit/src/axm_audit/core/rules/dead_code.py
Python
def check(self, project_path: Path) -> CheckResult:
    """Check for dead code using axm-ast dead-code via subprocess."""
    availability = self._check_availability(project_path)
    if availability is not None:
        return availability

    result = self._run_analysis(project_path)
    if isinstance(result, CheckResult):
        return result

    dead_symbols = self._parse_dead_symbols(result)
    if dead_symbols is None:
        return CheckResult(
            rule_id=self.rule_id,
            passed=False,
            message="Failed to parse axm-ast output",
            severity=Severity.ERROR,
            score=0,
            details={
                "stdout": result.stdout,
                "stderr": result.stderr,
            },
        )
    return self._build_result(dead_symbols)

DependencyAuditRule dataclass

Bases: ProjectRule

Scan dependencies for known vulnerabilities via pip-audit.

Scoring: 100 - (vuln_count * 15), min 0.

Vulnerabilities reported against environment tools (pip, setuptools, wheel, uv, pip-audit) are excluded from the count. These tools are injected into the audit venv by uv run --with rather than declared as project dependencies, so their CVEs are not actionable from the project being audited. Matching is case-insensitive.

Source code in packages/axm-audit/src/axm_audit/core/rules/dependencies.py
Python
@dataclass
@register_rule("deps")
class DependencyAuditRule(ProjectRule):
    """Scan dependencies for known vulnerabilities via pip-audit.

    Scoring: 100 - (vuln_count * 15), min 0.

    Vulnerabilities reported against environment tools (``pip``, ``setuptools``,
    ``wheel``, ``uv``, ``pip-audit``) are excluded from the count. These tools
    are injected into the audit venv by ``uv run --with`` rather than declared
    as project dependencies, so their CVEs are not actionable from the project
    being audited. Matching is case-insensitive.
    """

    @property
    def rule_id(self) -> str:
        """Unique identifier for this rule."""
        return "DEPS_AUDIT"

    def check(self, project_path: Path) -> CheckResult:
        """Check dependencies for known CVEs."""
        try:
            data = _run_pip_audit(project_path)
        except FileNotFoundError:
            return CheckResult(
                rule_id=self.rule_id,
                passed=False,
                message="pip-audit not available",
                severity=Severity.ERROR,
                score=0,
                details={"vuln_count": 0},
                fix_hint="Install with: uv add --dev pip-audit",
            )
        except RuntimeError as exc:
            return CheckResult(
                rule_id=self.rule_id,
                passed=False,
                message=str(exc),
                severity=Severity.ERROR,
                score=0,
                details={"vuln_count": 0},
                fix_hint="Check pip-audit installation: uv run pip-audit --version",
            )

        vulns = _parse_vulns(data)
        vuln_count = len(vulns)
        score = max(0, 100 - vuln_count * 15)

        top_vulns = [_summarize_vuln(v) for v in vulns[:5]]
        text_lines = [_format_vuln_line(v) for v in top_vulns]

        return CheckResult(
            rule_id=self.rule_id,
            passed=score >= PASS_THRESHOLD,
            message=(
                "No known vulnerabilities"
                if vuln_count == 0
                else f"{vuln_count} vulnerable package(s) found"
            ),
            severity=Severity.WARNING if score < PASS_THRESHOLD else Severity.INFO,
            score=int(score),
            details={
                "vuln_count": vuln_count,
                "top_vulns": top_vulns,
            },
            text="\n".join(text_lines) if text_lines else None,
            fix_hint=("Run: pip-audit --fix to remediate" if vuln_count > 0 else None),
        )
rule_id property

Unique identifier for this rule.

check(project_path)

Check dependencies for known CVEs.

Source code in packages/axm-audit/src/axm_audit/core/rules/dependencies.py
Python
def check(self, project_path: Path) -> CheckResult:
    """Check dependencies for known CVEs."""
    try:
        data = _run_pip_audit(project_path)
    except FileNotFoundError:
        return CheckResult(
            rule_id=self.rule_id,
            passed=False,
            message="pip-audit not available",
            severity=Severity.ERROR,
            score=0,
            details={"vuln_count": 0},
            fix_hint="Install with: uv add --dev pip-audit",
        )
    except RuntimeError as exc:
        return CheckResult(
            rule_id=self.rule_id,
            passed=False,
            message=str(exc),
            severity=Severity.ERROR,
            score=0,
            details={"vuln_count": 0},
            fix_hint="Check pip-audit installation: uv run pip-audit --version",
        )

    vulns = _parse_vulns(data)
    vuln_count = len(vulns)
    score = max(0, 100 - vuln_count * 15)

    top_vulns = [_summarize_vuln(v) for v in vulns[:5]]
    text_lines = [_format_vuln_line(v) for v in top_vulns]

    return CheckResult(
        rule_id=self.rule_id,
        passed=score >= PASS_THRESHOLD,
        message=(
            "No known vulnerabilities"
            if vuln_count == 0
            else f"{vuln_count} vulnerable package(s) found"
        ),
        severity=Severity.WARNING if score < PASS_THRESHOLD else Severity.INFO,
        score=int(score),
        details={
            "vuln_count": vuln_count,
            "top_vulns": top_vulns,
        },
        text="\n".join(text_lines) if text_lines else None,
        fix_hint=("Run: pip-audit --fix to remediate" if vuln_count > 0 else None),
    )

DependencyHygieneRule dataclass

Bases: ProjectRule

Check for unused/missing/transitive dependencies via deptry.

Scoring: 100 - (issue_count * 10), min 0.

Source code in packages/axm-audit/src/axm_audit/core/rules/dependencies.py
Python
@dataclass
@register_rule("deps")
class DependencyHygieneRule(ProjectRule):
    """Check for unused/missing/transitive dependencies via deptry.

    Scoring: 100 - (issue_count * 10), min 0.
    """

    @property
    def rule_id(self) -> str:
        """Unique identifier for this rule."""
        return "DEPS_HYGIENE"

    def check(self, project_path: Path) -> CheckResult:
        """Check dependency hygiene with deptry."""
        members = resolve_workspace_members(project_path)
        if members is not None:
            return self._check_workspace(project_path, members)
        result = self._check_single(project_path)
        assert isinstance(result, CheckResult)
        return result

    def _run_deptry_safely(
        self, project_path: Path, *, member_name: str = ""
    ) -> tuple[list[dict[str, object]] | None, CheckResult | None]:
        """Invoke ``run_deptry`` and translate failures into a ``CheckResult``.

        Returns ``(issues, None)`` on success and ``(None, error_result)`` on
        failure.  When *member_name* is set, non-missing failures are logged so
        workspace callers keep their existing diagnostics.
        """
        try:
            return run_deptry(project_path), None
        except FileNotFoundError:
            return None, CheckResult(
                rule_id=self.rule_id,
                passed=False,
                message="deptry not available",
                severity=Severity.ERROR,
                score=0,
                details={"issue_count": 0},
                fix_hint="Install with: uv add --dev deptry",
            )
        except (RuntimeError, json.JSONDecodeError) as exc:
            if member_name:
                logger.warning("deptry failed for %s: %s", member_name, exc)
            is_runtime = isinstance(exc, RuntimeError)
            msg = f"deptry failed: {exc}" if is_runtime else "deptry output parse error"
            return None, CheckResult(
                rule_id=self.rule_id,
                passed=False,
                message=msg,
                severity=Severity.ERROR,
                score=0,
                details={"issue_count": 0},
                fix_hint="Check deptry installation: uv run deptry --version",
            )

    def _build_single_check_result(
        self, issues: list[dict[str, object]]
    ) -> CheckResult:
        """Build the success-path ``CheckResult`` for single-package mode."""
        issue_count = len(issues)
        score = max(0, 100 - issue_count * 10)

        formatted = [_format_issue(i) for i in issues[:5]]
        text_lines = [
            f"\u2022 {fi['code']} {fi['module']}:"
            f" {_DEPTRY_LABELS.get(fi['code'], fi['message'])}"
            for fi in formatted
        ]

        return CheckResult(
            rule_id=self.rule_id,
            passed=score >= PASS_THRESHOLD,
            message=(
                "Clean dependencies (0 issues)"
                if issue_count == 0
                else f"{issue_count} dependency issue(s) found"
            ),
            severity=Severity.WARNING if score < PASS_THRESHOLD else Severity.INFO,
            score=int(score),
            details={
                "issue_count": issue_count,
                "top_issues": formatted,
            },
            text="\n".join(text_lines) if text_lines else None,
            fix_hint=("Run: deptry . to see details" if issue_count > 0 else None),
        )

    def _check_single(
        self, project_path: Path, *, member_name: str = ""
    ) -> CheckResult | list[dict[str, object]]:
        """Run deptry on a single package and return a CheckResult or issue list.

        When *member_name* is set the method returns filtered issues (for
        workspace aggregation).  Otherwise it returns a full ``CheckResult``.
        """
        issues, error = self._run_deptry_safely(project_path, member_name=member_name)
        if error is not None:
            return [] if member_name else error

        filtered = _filter_false_positives(issues or [], project_path)
        if member_name:
            return filtered
        return self._build_single_check_result(filtered)

    def _collect_member_issues(
        self, members: list[Path]
    ) -> list[tuple[str, dict[str, object]]]:
        """Run deptry on each workspace member and collect tagged issues."""
        all_issues: list[tuple[str, dict[str, object]]] = []
        for member in members:
            member_name = member.name
            issues = self._check_single(member, member_name=member_name)
            if isinstance(issues, list):
                for issue in issues:
                    all_issues.append((member_name, issue))
        return all_issues

    def _build_workspace_result(
        self, all_issues: list[tuple[str, dict[str, object]]]
    ) -> CheckResult:
        """Score and format aggregated member issues into a CheckResult."""
        issue_count = len(all_issues)
        score = max(0, 100 - issue_count * 10)

        formatted = [
            _format_issue(issue, member=name) for name, issue in all_issues[:5]
        ]
        text_lines = [
            f"\u2022 {fi['code']} {fi['module']}:"
            f" {_DEPTRY_LABELS.get(fi['code'], fi['message'])}"
            + (f" ({fi['member']})" if fi.get("member") else "")
            for fi in formatted
        ]

        return CheckResult(
            rule_id=self.rule_id,
            passed=score >= PASS_THRESHOLD,
            message=(
                "Clean dependencies (0 issues)"
                if issue_count == 0
                else f"{issue_count} dependency issue(s) found"
            ),
            severity=Severity.WARNING if score < PASS_THRESHOLD else Severity.INFO,
            score=int(score),
            details={
                "issue_count": issue_count,
                "top_issues": formatted,
            },
            text="\n".join(text_lines) if text_lines else None,
            fix_hint=("Run: deptry . to see details" if issue_count > 0 else None),
        )

    def _check_workspace(self, project_path: Path, members: list[Path]) -> CheckResult:
        """Aggregate deptry results across workspace members."""
        all_issues = self._collect_member_issues(members)
        return self._build_workspace_result(all_issues)
rule_id property

Unique identifier for this rule.

check(project_path)

Check dependency hygiene with deptry.

Source code in packages/axm-audit/src/axm_audit/core/rules/dependencies.py
Python
def check(self, project_path: Path) -> CheckResult:
    """Check dependency hygiene with deptry."""
    members = resolve_workspace_members(project_path)
    if members is not None:
        return self._check_workspace(project_path, members)
    result = self._check_single(project_path)
    assert isinstance(result, CheckResult)
    return result

DiffSizeRule dataclass

Bases: ProjectRule

Warn when uncommitted changes are too large.

Encourages smaller, focused commits/PRs.

Scoring: 100 if ≤ ideal lines changed, linear degrade to 0 at max lines. Defaults: ideal=400, max=1200. Overridable via [tool.axm-audit] in pyproject.toml.

Gracefully skips if not in a git repository or git is not installed.

Source code in packages/axm-audit/src/axm_audit/core/rules/quality.py
Python
@dataclass
@register_rule("lint")
class DiffSizeRule(ProjectRule):
    """Warn when uncommitted changes are too large.

    Encourages smaller, focused commits/PRs.

    Scoring: 100 if ≤ *ideal* lines changed, linear degrade to 0 at *max*
    lines.  Defaults: ideal=400, max=1200.  Overridable via
    ``[tool.axm-audit]`` in ``pyproject.toml``.

    Gracefully skips if not in a git repository or git is not installed.
    """

    @property
    def rule_id(self) -> str:
        """Unique identifier for this rule."""
        return "QUALITY_DIFF_SIZE"

    def check(self, project_path: Path) -> CheckResult:
        """Check uncommitted diff size."""
        project_path = Path(project_path)
        if shutil.which("git") is None:
            return self._skip("git not installed")

        if not self._is_git_repo(project_path):
            return self._skip("not a git repo")

        return self._measure_diff(project_path)

    # -- private helpers -------------------------------------------------------

    def _skip(self, reason: str) -> CheckResult:
        """Return graceful skip result."""
        return CheckResult(
            rule_id=self.rule_id,
            passed=True,
            message=f"{reason} — diff size check skipped",
            severity=Severity.INFO,
            score=100,
            details={"lines_changed": 0},
        )

    @staticmethod
    def _is_git_repo(project_path: Path) -> bool:
        """Check whether *project_path* is inside a git repository."""
        try:
            result = subprocess.run(
                ["git", "rev-parse", "--git-dir"],
                cwd=str(project_path),
                capture_output=True,
                text=True,
                check=False,
            )
            return result.returncode == 0
        except OSError:
            return False

    def _measure_diff(self, project_path: Path) -> CheckResult:
        """Run ``git diff --stat HEAD`` and score the result.

        Counts uncommitted changed lines via ``git diff --stat``, computes a
        score against configurable *ideal* / *max* thresholds, and returns a
        :class:`CheckResult`.  When the check passes (score ≥ 90) or there are
        no changes, ``text`` is ``None`` so the result is omitted from
        agent-facing output.
        """
        try:
            result = subprocess.run(
                ["git", "diff", "--stat", "HEAD"],
                cwd=str(project_path),
                capture_output=True,
                text=True,
                check=False,
            )
        except OSError:
            return self._skip("git command failed")

        stdout = result.stdout.strip()
        if not stdout:
            # No uncommitted changes
            return CheckResult(
                rule_id=self.rule_id,
                passed=True,
                message="No uncommitted changes",
                severity=Severity.INFO,
                score=100,
                details={"lines_changed": 0},
            )

        ideal, max_lines = read_diff_config(project_path)
        lines_changed = self._parse_stat(stdout)
        score = self.compute_score(lines_changed, ideal, max_lines)
        passed = score >= PASS_THRESHOLD

        text = (
            f"\u2022 {lines_changed} lines \u0394 (ideal < {ideal})"
            if not passed and lines_changed > 0
            else None
        )

        return CheckResult(
            rule_id=self.rule_id,
            passed=passed,
            message=f"Diff size: {lines_changed} lines changed (score {score}/100)",
            severity=Severity.WARNING if not passed else Severity.INFO,
            score=int(score),
            details={"lines_changed": lines_changed},
            text=text,
            fix_hint=(
                f"Consider splitting into smaller commits (< {ideal} lines ideal)"
                if not passed
                else None
            ),
        )

    @staticmethod
    def _parse_stat(stdout: str) -> int:
        """Extract total lines changed from ``git diff --stat`` output."""
        last_line = stdout.strip().split("\n")[-1]
        match = _DIFF_STAT_RE.search(last_line)
        if not match:
            return 0
        insertions = int(match.group(2) or 0)
        deletions = int(match.group(3) or 0)
        return insertions + deletions

    @staticmethod
    def compute_score(
        lines_changed: int,
        ideal: int = _DIFF_IDEAL,
        max_lines: int = _DIFF_MAX,
    ) -> int:
        """Compute score from lines changed: 100→0 over [ideal, max_lines]."""
        if lines_changed <= ideal:
            return 100
        if lines_changed >= max_lines:
            return 0
        return int(100 - (lines_changed - ideal) * 100 / (max_lines - ideal))
rule_id property

Unique identifier for this rule.

check(project_path)

Check uncommitted diff size.

Source code in packages/axm-audit/src/axm_audit/core/rules/quality.py
Python
def check(self, project_path: Path) -> CheckResult:
    """Check uncommitted diff size."""
    project_path = Path(project_path)
    if shutil.which("git") is None:
        return self._skip("git not installed")

    if not self._is_git_repo(project_path):
        return self._skip("not a git repo")

    return self._measure_diff(project_path)
compute_score(lines_changed, ideal=_DIFF_IDEAL, max_lines=_DIFF_MAX) staticmethod

Compute score from lines changed: 100→0 over [ideal, max_lines].

Source code in packages/axm-audit/src/axm_audit/core/rules/quality.py
Python
@staticmethod
def compute_score(
    lines_changed: int,
    ideal: int = _DIFF_IDEAL,
    max_lines: int = _DIFF_MAX,
) -> int:
    """Compute score from lines changed: 100→0 over [ideal, max_lines]."""
    if lines_changed <= ideal:
        return 100
    if lines_changed >= max_lines:
        return 0
    return int(100 - (lines_changed - ideal) * 100 / (max_lines - ideal))

DocstringCoverageRule dataclass

Bases: ProjectRule

Calculate docstring coverage for public functions.

Public functions are those not starting with underscore.

Source code in packages/axm-audit/src/axm_audit/core/rules/practices/docstring_coverage.py
Python
@dataclass
@register_rule("practices")
class DocstringCoverageRule(ProjectRule):
    """Calculate docstring coverage for public functions.

    Public functions are those not starting with underscore.
    """

    min_coverage: float = 0.80

    @property
    def rule_id(self) -> str:
        """Unique identifier for this rule."""
        return "PRACTICE_DOCSTRING"

    def check(self, project_path: Path) -> CheckResult:
        """Check docstring coverage in the project."""
        early = self.check_src(project_path)
        if early is not None:
            return early

        src_path = project_path / "src"
        documented, missing = self._analyze_docstrings(src_path)
        return self._build_result(documented, missing)

    def _build_result(
        self,
        documented: int,
        missing: list[str],
    ) -> CheckResult:
        """Build CheckResult from docstring analysis."""
        total = documented + len(missing)
        coverage = documented / total if total > 0 else 1.0
        passed = coverage >= self.min_coverage
        score = int(coverage * 100)

        text: str | None = None
        if missing:
            groups: dict[str, list[str]] = {}
            for item in missing:
                file_part, _, func_name = item.rpartition(":")
                groups.setdefault(file_part, []).append(func_name)
            text_lines = [
                f"     • {path}: {', '.join(funcs)}" for path, funcs in groups.items()
            ]
            text = "\n".join(text_lines)

        return CheckResult(
            rule_id=self.rule_id,
            passed=passed,
            message=f"Docstring coverage: {coverage:.0%} ({documented}/{total})",
            severity=Severity.WARNING if not passed else Severity.INFO,
            score=int(score),
            details={
                "coverage": round(coverage, 2),
                "total": total,
                "documented": documented,
                "missing": missing,
            },
            text=text,
            fix_hint="Add docstrings to public functions" if missing else None,
        )

    def _analyze_docstrings(self, src_path: Path) -> tuple[int, list[str]]:
        """Analyze docstring coverage in source files."""
        documented = 0
        missing: list[str] = []

        file_trees: dict[Path, ast.Module] = {}
        for path in get_python_files(src_path):
            cache = get_ast_cache()
            tree = cache.get_or_parse(path) if cache else parse_file_safe(path)
            if tree is not None:
                file_trees[path] = tree

        global_classes: dict[str, list[ast.ClassDef]] = {}
        for tree in file_trees.values():
            for node in ast.walk(tree):
                if isinstance(node, ast.ClassDef):
                    global_classes.setdefault(node.name, []).append(node)

        for path, tree in file_trees.items():
            rel_path = path.relative_to(src_path)
            class_map = self._build_class_map(tree)
            doc, mis = self._check_file_docstrings(
                tree,
                rel_path,
                class_map,
                global_classes,
            )
            documented += doc
            missing.extend(mis)

        return documented, missing

    def _check_file_docstrings(
        self,
        tree: ast.Module,
        rel_path: Path,
        class_map: dict[str, ast.ClassDef],
        global_classes: dict[str, list[ast.ClassDef]],
    ) -> tuple[int, list[str]]:
        """Check docstring coverage for public functions in a single file."""
        documented = 0
        missing: list[str] = []
        for node in ast.walk(tree):
            if not isinstance(node, ast.FunctionDef | ast.AsyncFunctionDef):
                continue
            if node.name.startswith("_"):
                continue
            if self._is_setter_or_deleter(node):
                continue
            if self.is_abstract_stub(node):
                continue
            if self._is_abstract_override(node, class_map, global_classes):
                continue

            if self._has_docstring(node):
                documented += 1
            else:
                missing.append(f"{rel_path}:{node.name}")
        return documented, missing

    @staticmethod
    def has_abstractmethod_decorator(
        node: ast.FunctionDef | ast.AsyncFunctionDef,
    ) -> bool:
        """Return *True* if *node* has an ``@abstractmethod`` decorator."""
        return any(
            (isinstance(d, ast.Name) and d.id == "abstractmethod")
            or (isinstance(d, ast.Attribute) and d.attr == "abstractmethod")
            for d in node.decorator_list
        )

    @staticmethod
    def is_stub_body(
        node: ast.FunctionDef | ast.AsyncFunctionDef,
    ) -> bool:
        """Return *True* if *node*'s body is just ``...`` or ``pass``."""
        if len(node.body) != 1:
            return False
        stmt = node.body[0]
        if isinstance(stmt, ast.Pass):
            return True
        return (
            isinstance(stmt, ast.Expr)
            and isinstance(stmt.value, ast.Constant)
            and stmt.value.value is ...
        )

    @staticmethod
    def is_abstract_stub(
        node: ast.FunctionDef | ast.AsyncFunctionDef,
    ) -> bool:
        """Check if node is an abstract method stub (body is ``...`` or ``pass``)."""
        return DocstringCoverageRule.has_abstractmethod_decorator(
            node
        ) and DocstringCoverageRule.is_stub_body(node)

    @staticmethod
    def _is_setter_or_deleter(
        node: ast.FunctionDef | ast.AsyncFunctionDef,
    ) -> bool:
        """Check if node is a property setter or deleter."""
        for dec in node.decorator_list:
            if isinstance(dec, ast.Attribute) and dec.attr in ("setter", "deleter"):
                return True
        return False

    @staticmethod
    def _build_class_map(
        tree: ast.Module,
    ) -> dict[str, ast.ClassDef]:
        """Build a name -> ClassDef map for all classes in the module."""
        return {
            node.name: node for node in ast.walk(tree) if isinstance(node, ast.ClassDef)
        }

    def _find_enclosing_class(
        self,
        node: ast.FunctionDef | ast.AsyncFunctionDef,
        class_map: dict[str, ast.ClassDef],
    ) -> ast.ClassDef | None:
        """Return the class whose body contains *node*, or None."""
        for cls in class_map.values():
            for item in cls.body:
                if item is node:
                    return cls
        return None

    def _resolve_base_class(
        self,
        base_name: str,
        class_map: dict[str, ast.ClassDef],
        global_classes: dict[str, list[ast.ClassDef]] | None,
    ) -> ast.ClassDef | None:
        """Resolve *base_name* to a ClassDef via same-file or cross-file lookup."""
        if base_name in class_map:
            return class_map[base_name]
        if global_classes and base_name in global_classes:
            definitions = global_classes[base_name]
            if len(definitions) == 1:
                return definitions[0]
        return None

    def _is_abstract_override(
        self,
        node: ast.FunctionDef | ast.AsyncFunctionDef,
        class_map: dict[str, ast.ClassDef],
        global_classes: dict[str, list[ast.ClassDef]] | None = None,
    ) -> bool:
        """Check if node overrides a documented abstractmethod."""
        enclosing = self._find_enclosing_class(node, class_map)
        if enclosing is None:
            return False

        for base in enclosing.bases:
            base_name = base.id if isinstance(base, ast.Name) else None
            if base_name is None:
                continue
            base_cls = self._resolve_base_class(base_name, class_map, global_classes)
            if base_cls is not None and self._check_abstract_parent(
                base_cls, node.name
            ):
                return True

        return False

    def _check_abstract_parent(
        self,
        base_cls: ast.ClassDef,
        method_name: str,
    ) -> bool:
        """Check if base_cls has a documented @abstractmethod named *method_name*."""
        for item in base_cls.body:
            if not isinstance(item, ast.FunctionDef | ast.AsyncFunctionDef):
                continue
            if item.name != method_name:
                continue
            if self.has_abstractmethod_decorator(item) and self._has_docstring(item):
                return True
        return False

    def _has_docstring(self, node: ast.FunctionDef | ast.AsyncFunctionDef) -> bool:
        """Check if a function node has a docstring."""
        if not node.body:
            return False
        first = node.body[0]
        return (
            isinstance(first, ast.Expr)
            and isinstance(first.value, ast.Constant)
            and isinstance(first.value.value, str)
        )
rule_id property

Unique identifier for this rule.

check(project_path)

Check docstring coverage in the project.

Source code in packages/axm-audit/src/axm_audit/core/rules/practices/docstring_coverage.py
Python
def check(self, project_path: Path) -> CheckResult:
    """Check docstring coverage in the project."""
    early = self.check_src(project_path)
    if early is not None:
        return early

    src_path = project_path / "src"
    documented, missing = self._analyze_docstrings(src_path)
    return self._build_result(documented, missing)
has_abstractmethod_decorator(node) staticmethod

Return True if node has an @abstractmethod decorator.

Source code in packages/axm-audit/src/axm_audit/core/rules/practices/docstring_coverage.py
Python
@staticmethod
def has_abstractmethod_decorator(
    node: ast.FunctionDef | ast.AsyncFunctionDef,
) -> bool:
    """Return *True* if *node* has an ``@abstractmethod`` decorator."""
    return any(
        (isinstance(d, ast.Name) and d.id == "abstractmethod")
        or (isinstance(d, ast.Attribute) and d.attr == "abstractmethod")
        for d in node.decorator_list
    )
is_abstract_stub(node) staticmethod

Check if node is an abstract method stub (body is ... or pass).

Source code in packages/axm-audit/src/axm_audit/core/rules/practices/docstring_coverage.py
Python
@staticmethod
def is_abstract_stub(
    node: ast.FunctionDef | ast.AsyncFunctionDef,
) -> bool:
    """Check if node is an abstract method stub (body is ``...`` or ``pass``)."""
    return DocstringCoverageRule.has_abstractmethod_decorator(
        node
    ) and DocstringCoverageRule.is_stub_body(node)
is_stub_body(node) staticmethod

Return True if node's body is just ... or pass.

Source code in packages/axm-audit/src/axm_audit/core/rules/practices/docstring_coverage.py
Python
@staticmethod
def is_stub_body(
    node: ast.FunctionDef | ast.AsyncFunctionDef,
) -> bool:
    """Return *True* if *node*'s body is just ``...`` or ``pass``."""
    if len(node.body) != 1:
        return False
    stmt = node.body[0]
    if isinstance(stmt, ast.Pass):
        return True
    return (
        isinstance(stmt, ast.Expr)
        and isinstance(stmt.value, ast.Constant)
        and stmt.value.value is ...
    )

DuplicationRule dataclass

Bases: ProjectRule

Detect copy-pasted code via AST structure hashing.

Extracts function and method bodies, normalises them by stripping variable names and locations, hashes each body, and reports groups whose hash appears more than once.

Scoring: 100 - (dup_groups * 10), min 0.

Source code in packages/axm-audit/src/axm_audit/core/rules/duplication.py
Python
@dataclass
@register_rule("architecture")
class DuplicationRule(ProjectRule):
    """Detect copy-pasted code via AST structure hashing.

    Extracts function and method bodies, normalises them by stripping
    variable names and locations, hashes each body, and reports
    groups whose hash appears more than once.

    Scoring: ``100 - (dup_groups * 10)``, min 0.
    """

    min_lines: int = _MIN_DUP_LINES

    @property
    def rule_id(self) -> str:
        """Unique identifier for this rule."""
        return "ARCH_DUPLICATION"

    def check(self, project_path: Path) -> CheckResult:
        """Check for code duplication in the project."""
        early = self.check_src(project_path)
        if early is not None:
            return early

        src_path = project_path / "src"

        clones = self._find_duplicates(src_path)
        dup_count = len(clones)
        score = max(0, 100 - dup_count * 10)
        passed = dup_count == 0

        capped = clones[:20]
        text_lines = [
            f"     \u2022 {s[1]} {s[0]}:{s[2]}\u2192{t[0]}:{t[2]}"
            for c in capped
            for s in [c["source"].split(":")]
            for t in [c["target"].split(":")]
        ]

        return CheckResult(
            rule_id=self.rule_id,
            passed=passed,
            message=f"{dup_count} duplicate block(s) found",
            severity=Severity.WARNING if not passed else Severity.INFO,
            score=int(score),
            details={"dup_count": dup_count, "clones": capped},
            text="\n".join(text_lines) if text_lines else None,
            fix_hint=(
                "Extract duplicated code into shared functions" if not passed else None
            ),
        )

    def _find_duplicates(self, src_path: Path) -> list[dict[str, str]]:
        """Hash function bodies and find duplicates."""
        seen = self._collect_function_hashes(src_path)

        clones: list[dict[str, str]] = []
        for entries in seen.values():
            if len(entries) < _MIN_CLONE_GROUP:
                continue
            first = entries[0]
            for other in entries[1:]:
                clones.append(
                    {
                        "source": f"{first[0]}:{first[1]}:{first[2]}",
                        "target": f"{other[0]}:{other[1]}:{other[2]}",
                    }
                )
        return clones

    def _collect_function_hashes(
        self,
        src_path: Path,
    ) -> dict[str, list[tuple[str, str, int]]]:
        """Scan source files and hash each function body."""
        seen: dict[str, list[tuple[str, str, int]]] = defaultdict(list)

        for path in _get_python_files(src_path):
            _cache = _get_ast_cache()
            tree = _cache.get_or_parse(path) if _cache else _parse_file_safe(path)
            if tree is None:
                continue
            rel = str(path.relative_to(src_path))
            self._hash_functions_in_tree(tree, rel, seen)

        return seen

    def _hash_functions_in_tree(
        self,
        tree: ast.Module,
        rel: str,
        seen: dict[str, list[tuple[str, str, int]]],
    ) -> None:
        """Hash each function body in a single AST and add to *seen*."""
        for node in ast.walk(tree):
            if not isinstance(node, ast.FunctionDef | ast.AsyncFunctionDef):
                continue
            end = getattr(node, "end_lineno", None) or node.lineno
            if end - node.lineno + 1 < self.min_lines:
                continue
            body_str = _normalize_ast(node)
            h = hashlib.md5(body_str.encode(), usedforsecurity=False).hexdigest()
            seen[h].append((rel, node.name, node.lineno))
rule_id property

Unique identifier for this rule.

check(project_path)

Check for code duplication in the project.

Source code in packages/axm-audit/src/axm_audit/core/rules/duplication.py
Python
def check(self, project_path: Path) -> CheckResult:
    """Check for code duplication in the project."""
    early = self.check_src(project_path)
    if early is not None:
        return early

    src_path = project_path / "src"

    clones = self._find_duplicates(src_path)
    dup_count = len(clones)
    score = max(0, 100 - dup_count * 10)
    passed = dup_count == 0

    capped = clones[:20]
    text_lines = [
        f"     \u2022 {s[1]} {s[0]}:{s[2]}\u2192{t[0]}:{t[2]}"
        for c in capped
        for s in [c["source"].split(":")]
        for t in [c["target"].split(":")]
    ]

    return CheckResult(
        rule_id=self.rule_id,
        passed=passed,
        message=f"{dup_count} duplicate block(s) found",
        severity=Severity.WARNING if not passed else Severity.INFO,
        score=int(score),
        details={"dup_count": dup_count, "clones": capped},
        text="\n".join(text_lines) if text_lines else None,
        fix_hint=(
            "Extract duplicated code into shared functions" if not passed else None
        ),
    )

FormattingRule dataclass

Bases: ProjectRule

Run ruff format --check and score based on unformatted file count.

Scoring: 100 - (unformatted_count * 5), min 0.

Source code in packages/axm-audit/src/axm_audit/core/rules/quality.py
Python
@dataclass
@register_rule("lint")
class FormattingRule(ProjectRule):
    """Run ``ruff format --check`` and score based on unformatted file count.

    Scoring: 100 - (unformatted_count * 5), min 0.
    """

    @property
    def rule_id(self) -> str:
        """Unique identifier for this rule."""
        return "QUALITY_FORMAT"

    def check(self, project_path: Path) -> CheckResult:
        """Check project formatting with ruff format --check."""
        early = self.check_src(project_path)
        if early is not None:
            return early

        targets, checked = _get_audit_targets(project_path)

        result = run_in_project(
            ["ruff", "format", "--check", *targets],
            project_path,
            with_packages=["ruff"],
            capture_output=True,
            text=True,
            check=False,
        )

        unformatted_files = self._parse_unformatted_files(result)
        unformatted_count = len(unformatted_files)

        score = max(0, 100 - unformatted_count * 5)
        passed = score >= PASS_THRESHOLD

        text_lines = unformatted_files[:20]

        return CheckResult(
            rule_id=self.rule_id,
            passed=passed,
            message=f"Format score: {score}/100 ({unformatted_count} unformatted)",
            severity=Severity.WARNING if not passed else Severity.INFO,
            score=int(score),
            details={
                "unformatted_count": unformatted_count,
                "unformatted_files": unformatted_files[:20],
                "checked": checked,
            },
            text="\n".join(text_lines) if text_lines else None,
            fix_hint=(f"Run: ruff format {checked}" if unformatted_count > 0 else None),
        )

    @staticmethod
    def _parse_unformatted_files(
        result: subprocess.CompletedProcess[str],
    ) -> list[str]:
        """Extract unformatted file paths from ruff format --check output."""
        if result.returncode == 0:
            return []
        return [
            line.strip()
            for line in result.stdout.strip().split("\n")
            if line.strip()
            and not line.startswith("error")
            and not line.startswith("warning")
        ]
rule_id property

Unique identifier for this rule.

check(project_path)

Check project formatting with ruff format --check.

Source code in packages/axm-audit/src/axm_audit/core/rules/quality.py
Python
def check(self, project_path: Path) -> CheckResult:
    """Check project formatting with ruff format --check."""
    early = self.check_src(project_path)
    if early is not None:
        return early

    targets, checked = _get_audit_targets(project_path)

    result = run_in_project(
        ["ruff", "format", "--check", *targets],
        project_path,
        with_packages=["ruff"],
        capture_output=True,
        text=True,
        check=False,
    )

    unformatted_files = self._parse_unformatted_files(result)
    unformatted_count = len(unformatted_files)

    score = max(0, 100 - unformatted_count * 5)
    passed = score >= PASS_THRESHOLD

    text_lines = unformatted_files[:20]

    return CheckResult(
        rule_id=self.rule_id,
        passed=passed,
        message=f"Format score: {score}/100 ({unformatted_count} unformatted)",
        severity=Severity.WARNING if not passed else Severity.INFO,
        score=int(score),
        details={
            "unformatted_count": unformatted_count,
            "unformatted_files": unformatted_files[:20],
            "checked": checked,
        },
        text="\n".join(text_lines) if text_lines else None,
        fix_hint=(f"Run: ruff format {checked}" if unformatted_count > 0 else None),
    )

GodClassRule dataclass

Bases: ProjectRule

Detect god classes (too many lines or methods).

Source code in packages/axm-audit/src/axm_audit/core/rules/architecture/__init__.py
Python
@dataclass
@register_rule("architecture")
class GodClassRule(ProjectRule):
    """Detect god classes (too many lines or methods)."""

    max_lines: int = 500
    max_methods: int = 15

    @property
    def rule_id(self) -> str:
        """Unique identifier for this rule."""
        return "ARCH_GOD_CLASS"

    def check(self, project_path: Path) -> CheckResult:
        """Check for god classes in the project.

        Scans all classes under ``src/`` and flags those exceeding
        :attr:`max_lines` or :attr:`max_methods`.

        The ``text`` field uses the compact format
        ``• {basename}:{ClassName} {lines}L/{methods}M`` (one line per
        violation), or ``None`` when no god classes are found.
        """
        early = self.check_src(project_path)
        if early is not None:
            return early

        src_path = project_path / "src"

        god_classes = self._find_god_classes(src_path)

        score = max(0, 100 - len(god_classes) * 15)
        passed = len(god_classes) == 0

        text_lines = [
            f"\u2022 {Path(str(g['file'])).name}:{g['name']}"
            f" {g['lines']}L/{g['methods']}M"
            for g in god_classes
        ]

        return CheckResult(
            rule_id=self.rule_id,
            passed=passed,
            message=f"{len(god_classes)} god class(es) found",
            severity=Severity.WARNING if not passed else Severity.INFO,
            score=int(score),
            details={"god_classes": god_classes},
            text="\n".join(text_lines) if text_lines else None,
            fix_hint="Split large classes into smaller, focused classes"
            if god_classes
            else None,
        )

    def _find_god_classes(self, src_path: Path) -> list[dict[str, str | int]]:
        """Identify god classes in the source directory."""
        god_classes: list[dict[str, str | int]] = []
        py_files = get_python_files(src_path)

        for path in py_files:
            cache = get_ast_cache()
            tree = cache.get_or_parse(path) if cache else parse_file_safe(path)
            if tree is None:
                continue

            for node in ast.walk(tree):
                if isinstance(node, ast.ClassDef):
                    self._check_class_node(node, path, src_path, god_classes)

        return god_classes

    def _check_class_node(
        self,
        node: ast.ClassDef,
        file_path: Path,
        src_root: Path,
        results: list[dict[str, str | int]],
    ) -> None:
        """Analyze a single class node for god class metrics."""
        if hasattr(node, "end_lineno") and node.end_lineno:
            lines = node.end_lineno - node.lineno + 1
        else:
            lines = 0

        methods = sum(
            1
            for child in node.body
            if isinstance(child, ast.FunctionDef | ast.AsyncFunctionDef)
        )

        if lines > self.max_lines or methods > self.max_methods:
            results.append(
                {
                    "name": node.name,
                    "file": str(file_path.relative_to(src_root)),
                    "lines": lines,
                    "methods": methods,
                }
            )
rule_id property

Unique identifier for this rule.

check(project_path)

Check for god classes in the project.

Scans all classes under src/ and flags those exceeding :attr:max_lines or :attr:max_methods.

The text field uses the compact format • {basename}:{ClassName} {lines}L/{methods}M (one line per violation), or None when no god classes are found.

Source code in packages/axm-audit/src/axm_audit/core/rules/architecture/__init__.py
Python
def check(self, project_path: Path) -> CheckResult:
    """Check for god classes in the project.

    Scans all classes under ``src/`` and flags those exceeding
    :attr:`max_lines` or :attr:`max_methods`.

    The ``text`` field uses the compact format
    ``• {basename}:{ClassName} {lines}L/{methods}M`` (one line per
    violation), or ``None`` when no god classes are found.
    """
    early = self.check_src(project_path)
    if early is not None:
        return early

    src_path = project_path / "src"

    god_classes = self._find_god_classes(src_path)

    score = max(0, 100 - len(god_classes) * 15)
    passed = len(god_classes) == 0

    text_lines = [
        f"\u2022 {Path(str(g['file'])).name}:{g['name']}"
        f" {g['lines']}L/{g['methods']}M"
        for g in god_classes
    ]

    return CheckResult(
        rule_id=self.rule_id,
        passed=passed,
        message=f"{len(god_classes)} god class(es) found",
        severity=Severity.WARNING if not passed else Severity.INFO,
        score=int(score),
        details={"god_classes": god_classes},
        text="\n".join(text_lines) if text_lines else None,
        fix_hint="Split large classes into smaller, focused classes"
        if god_classes
        else None,
    )

LintingRule dataclass

Bases: ProjectRule

Run ruff and score based on issue count.

Scoring: 100 - (issue_count * 2), min 0.

Source code in packages/axm-audit/src/axm_audit/core/rules/quality.py
Python
@dataclass
@register_rule("lint")
class LintingRule(ProjectRule):
    """Run ruff and score based on issue count.

    Scoring: 100 - (issue_count * 2), min 0.
    """

    @property
    def rule_id(self) -> str:
        """Unique identifier for this rule."""
        return "QUALITY_LINT"

    def check(self, project_path: Path) -> CheckResult:
        """Check project linting with ruff on src/ and tests/."""
        early = self.check_src(project_path)
        if early is not None:
            return early
        targets, checked = _get_audit_targets(project_path)

        result = run_in_project(
            ["ruff", "check", "--output-format=json", *targets],
            project_path,
            with_packages=["ruff"],
            capture_output=True,
            text=True,
            check=False,
        )

        try:
            issues = json.loads(result.stdout) if result.stdout.strip() else []
        except json.JSONDecodeError:
            issues = []

        issue_count = len(issues)
        score = max(0, 100 - issue_count * 2)
        passed = score >= LINT_PASS_THRESHOLD

        # Store individual violations (capped at 20) for agent mode
        formatted_issues: list[dict[str, str | int]] = [
            {
                "file": i.get("filename", ""),
                "line": i.get("location", {}).get("row", 0),
                "code": i.get("code", ""),
                "message": i.get("message", ""),
            }
            for i in issues[:20]
        ]

        text_lines = [
            f"\u2022 {i['code']} {_short_path(str(i['file']), project_path)}"
            f":{i['line']} {i['message']}"
            for i in formatted_issues
        ]

        return CheckResult(
            rule_id=self.rule_id,
            passed=passed,
            message=f"Lint score: {score}/100 ({issue_count} issues)",
            severity=Severity.WARNING if not passed else Severity.INFO,
            score=int(score),
            details={
                "issue_count": issue_count,
                "checked": checked,
                "issues": formatted_issues,
            },
            text="\n".join(text_lines) if text_lines else None,
            fix_hint=f"Run: ruff check --fix {checked}" if issue_count > 0 else None,
        )
rule_id property

Unique identifier for this rule.

check(project_path)

Check project linting with ruff on src/ and tests/.

Source code in packages/axm-audit/src/axm_audit/core/rules/quality.py
Python
def check(self, project_path: Path) -> CheckResult:
    """Check project linting with ruff on src/ and tests/."""
    early = self.check_src(project_path)
    if early is not None:
        return early
    targets, checked = _get_audit_targets(project_path)

    result = run_in_project(
        ["ruff", "check", "--output-format=json", *targets],
        project_path,
        with_packages=["ruff"],
        capture_output=True,
        text=True,
        check=False,
    )

    try:
        issues = json.loads(result.stdout) if result.stdout.strip() else []
    except json.JSONDecodeError:
        issues = []

    issue_count = len(issues)
    score = max(0, 100 - issue_count * 2)
    passed = score >= LINT_PASS_THRESHOLD

    # Store individual violations (capped at 20) for agent mode
    formatted_issues: list[dict[str, str | int]] = [
        {
            "file": i.get("filename", ""),
            "line": i.get("location", {}).get("row", 0),
            "code": i.get("code", ""),
            "message": i.get("message", ""),
        }
        for i in issues[:20]
    ]

    text_lines = [
        f"\u2022 {i['code']} {_short_path(str(i['file']), project_path)}"
        f":{i['line']} {i['message']}"
        for i in formatted_issues
    ]

    return CheckResult(
        rule_id=self.rule_id,
        passed=passed,
        message=f"Lint score: {score}/100 ({issue_count} issues)",
        severity=Severity.WARNING if not passed else Severity.INFO,
        score=int(score),
        details={
            "issue_count": issue_count,
            "checked": checked,
            "issues": formatted_issues,
        },
        text="\n".join(text_lines) if text_lines else None,
        fix_hint=f"Run: ruff check --fix {checked}" if issue_count > 0 else None,
    )

MirrorRule dataclass

Bases: ProjectRule

Check the bidirectional 1:1 mapping between source modules and unit tests.

Forward direction — for each src/<pkg>/foo.py, looks for a test_foo.py under tests/ (or tests/unit/ when present).

Reverse direction (orphan check) — for each tests/unit/<rel>/test_<name>.py, requires that some package exposes src/<pkg>/<rel>/<name>.py. Tests at the wrong nesting level or pointing to nonexistent source modules are flagged as orphans. Reverse check only walks tests/unit/tests/integration/ and tests/e2e/ are scenario-named and never flagged.

Private modules (leading underscores) are matched with the prefix stripped: _facade.py matches test_facade.py or test__facade.py.

Exempt filenames (no test required): __init__.py, __main__.py, _version.py, conftest.py, py.typed.

Forward-mirror exemptions can additionally be declared in pyproject.toml (path globs anchored at src/<top_pkg>/)::

Text Only
[tool.axm-audit.mirror]
exempt_paths = ["commands/*.py", "schemas/*.py", "**/_facade.py"]

Exempted modules do not need a matching test and surface in details["exempt"]. Reverse (orphan) checks ignore exemptions.

Scoring: 100 - (len(missing) + len(orphan)) * 15, min 0.

Source code in packages/axm-audit/src/axm_audit/core/rules/practices/mirror.py
Python
@dataclass
@register_rule("practices")
class MirrorRule(ProjectRule):
    """Check the bidirectional 1:1 mapping between source modules and unit tests.

    Forward direction — for each ``src/<pkg>/foo.py``, looks for a
    ``test_foo.py`` under ``tests/`` (or ``tests/unit/`` when present).

    Reverse direction (orphan check) — for each ``tests/unit/<rel>/test_<name>.py``,
    requires that some package exposes ``src/<pkg>/<rel>/<name>.py``. Tests at the
    wrong nesting level or pointing to nonexistent source modules are flagged
    as orphans. Reverse check only walks ``tests/unit/`` — ``tests/integration/``
    and ``tests/e2e/`` are scenario-named and never flagged.

    Private modules (leading underscores) are matched with the prefix
    stripped: ``_facade.py`` matches ``test_facade.py`` or
    ``test__facade.py``.

    Exempt filenames (no test required): ``__init__.py``, ``__main__.py``,
    ``_version.py``, ``conftest.py``, ``py.typed``.

    Forward-mirror exemptions can additionally be declared in
    ``pyproject.toml`` (path globs anchored at ``src/<top_pkg>/``)::

        [tool.axm-audit.mirror]
        exempt_paths = ["commands/*.py", "schemas/*.py", "**/_facade.py"]

    Exempted modules do not need a matching test and surface in
    ``details["exempt"]``. Reverse (orphan) checks ignore exemptions.

    Scoring: ``100 - (len(missing) + len(orphan)) * 15``, min 0.
    """

    @property
    def rule_id(self) -> str:
        """Unique identifier for this rule (bidirectional mirror check)."""
        return "PRACTICE_TEST_MIRROR"

    def check(self, project_path: Path) -> CheckResult:
        """Check forward + reverse test/source mapping."""
        early = self.check_src(project_path)
        if early is not None:
            return early

        config = _load_mirror_config(project_path)
        if config.error is not None:
            return CheckResult(
                rule_id=self.rule_id,
                passed=False,
                message="Invalid mirror config",
                severity=Severity.WARNING,
                score=0,
                details={"missing": [], "orphan": [], "exempt": []},
                fix_hint=config.error,
            )

        src_path = project_path / "src"
        tests_path = project_path / "tests"
        missing, exempt = self._find_untested_modules(
            src_path, tests_path, config.exempt_paths
        )
        orphan = self._collect_orphan_tests(src_path, tests_path)

        if not missing and not orphan:
            return CheckResult(
                rule_id=self.rule_id,
                passed=True,
                message="All source modules have test files",
                severity=Severity.INFO,
                score=100,
                details={"missing": [], "orphan": [], "exempt": exempt},
            )

        violations = len(missing) + len(orphan)
        score = max(0, 100 - violations * 15)
        passed = score >= 90  # noqa: PLR2004

        hint = self._build_fix_hint(src_path, missing, orphan)
        text = self._build_text(missing, orphan)
        message = self._build_message(missing, orphan)

        return CheckResult(
            rule_id=self.rule_id,
            passed=passed,
            message=message,
            severity=Severity.WARNING if not passed else Severity.INFO,
            score=int(score),
            details={"missing": missing, "orphan": orphan, "exempt": exempt},
            fix_hint=hint,
            text=text,
        )

    @staticmethod
    def _build_message(missing: list[str], orphan: list[str]) -> str:
        """Build the summary message line."""
        parts = []
        if missing:
            parts.append(f"{len(missing)} source module(s) without tests")
        if orphan:
            parts.append(f"{len(orphan)} orphan test(s)")
        return "; ".join(parts) if parts else "All source modules have test files"

    @staticmethod
    def _build_text(missing: list[str], orphan: list[str]) -> str | None:
        """Build the multi-line text block (untested + orphan bullets)."""
        lines: list[str] = []
        if missing:
            shown = missing[:5]
            tail = f" (+{len(missing) - 5} more)" if len(missing) > 5 else ""  # noqa: PLR2004
            lines.append("• untested: " + " ".join(shown) + tail)
        if orphan:
            shown_o = orphan[:5]
            tail_o = f" (+{len(orphan) - 5} more)" if len(orphan) > 5 else ""  # noqa: PLR2004
            lines.append("• orphan: " + " ".join(shown_o) + tail_o)
        return "\n".join(lines) if lines else None

    @classmethod
    def _build_fix_hint(
        cls,
        src_path: Path,
        missing: list[str],
        orphan: list[str],
    ) -> str | None:
        """Build the fix hint covering both missing tests and orphan suggestions."""
        parts: list[str] = []
        if missing:
            files = ", ".join(f"tests/test_{m}" for m in missing[:5])
            if len(missing) > 5:  # noqa: PLR2004
                files += f" (+{len(missing) - 5} more)"
            parts.append(f"Create test files: {files}")
        if orphan:
            source_stems = sorted(
                {
                    Path(n).stem.lstrip("_")
                    for n in cls._collect_source_modules(src_path)
                }
            )
            for orphan_path in orphan[:5]:
                stem = Path(orphan_path).stem[len("test_") :]
                close = difflib.get_close_matches(stem, source_stems, n=1, cutoff=0.6)
                if close:
                    suggested = f"test_{close[0]}.py"
                    parts.append(
                        f"{orphan_path} → rename to {suggested} or merge into "
                        f"tests/unit/{suggested}"
                    )
                else:
                    parts.append(
                        f"{orphan_path} → delete or rename to a real source module"
                    )
            if len(orphan) > 5:  # noqa: PLR2004
                parts.append(f"(+{len(orphan) - 5} more orphans)")
        return "; ".join(parts) if parts else None

    @staticmethod
    def _collect_source_modules(src_path: Path) -> list[str]:
        """Collect non-exempt Python module basenames from ``src/``."""
        pkg_dirs = [
            d for d in src_path.iterdir() if d.is_dir() and d.name != "__pycache__"
        ]
        modules: list[str] = []
        for pkg_dir in pkg_dirs:
            for py_file in pkg_dir.rglob("*.py"):
                if py_file.name not in _TEST_MIRROR_EXEMPT:
                    modules.append(py_file.name)
        return modules

    @staticmethod
    def _collect_test_basenames(tests_path: Path) -> set[str]:
        """Collect ``test_*.py`` basenames eligible to satisfy the mirror rule.

        Mirror naming (1:1 with source modules) is a unit-level convention.
        Integration and e2e tests are scenario-named, so they must not count
        toward mirror coverage. If ``tests/unit/`` exists and contains test
        files, search there; otherwise fall back to the whole ``tests/`` tree
        (legacy flat layout) while still excluding ``integration/`` and
        ``e2e/`` subdirs.
        """
        if not tests_path.exists():
            return set()
        unit_path = tests_path / "unit"
        if unit_path.exists():
            unit_tests = {f.name for f in unit_path.rglob("test_*.py")}
            if unit_tests:
                return unit_tests
        return {
            f.name
            for f in tests_path.rglob("test_*.py")
            if "integration" not in f.parts and "e2e" not in f.parts
        }

    @staticmethod
    def _collect_unit_test_index(tests_path: Path) -> dict[str, set[str]]:
        """Index unit tests by rel directory; the ``test_`` prefix is stripped."""
        unit_path = tests_path / "unit"
        if not unit_path.is_dir():
            return {}
        index: dict[str, set[str]] = {}
        for test_file in unit_path.rglob("test_*.py"):
            if not test_file.is_file():
                continue
            rel_dir = test_file.parent.relative_to(unit_path).as_posix()
            rel_dir = "" if rel_dir == "." else rel_dir
            stem = test_file.stem[len("test_") :]
            bucket = index.setdefault(rel_dir, set())
            bucket.add(stem)
            bucket.add(stem.lstrip("_"))
        return index

    @staticmethod
    def _is_exempt_path(rel_posix: str, exempt_paths: list[str]) -> bool:
        """Return True if ``rel_posix`` matches any glob in ``exempt_paths``.

        Patterns are anchored at ``src/<pkg>/`` and matched segment-by-segment
        with ``fnmatch.fnmatchcase`` (so ``*`` and ``?`` never cross ``/``).
        A literal ``**`` segment matches zero or more path segments.
        """
        rel_parts = rel_posix.split("/")
        return any(
            _glob_segments_match(pattern.split("/"), rel_parts)
            for pattern in exempt_paths
        )

    @classmethod
    def _find_untested_modules(
        cls,
        src_path: Path,
        tests_path: Path,
        exempt_paths: list[str] | None = None,
    ) -> tuple[list[str], list[str]]:
        """Find source modules without corresponding test files.

        When ``tests/unit/`` is populated, the mirror is arborescence-aware:
        ``src/<pkg>/<rel>/<name>.py`` requires ``tests/unit/<rel>/test_<name>.py``.
        Otherwise (legacy flat layout, or empty ``tests/unit/``), basename
        matching is used.

        Returns ``(missing, exempt)`` where ``exempt`` is the list of
        basenames matched by ``exempt_paths`` globs (relative to
        ``src/<pkg>/``) — they are excluded from ``missing``.
        """
        if not src_path.is_dir():
            return [], []
        exempt_paths = exempt_paths or []
        unit_index = cls._collect_unit_test_index(tests_path)
        test_basenames = None if unit_index else cls._collect_test_basenames(tests_path)
        missing: list[str] = []
        exempt: list[str] = []
        for pkg_dir, py_file in cls._iter_source_modules(src_path):
            label = cls._classify_py_file(
                py_file, pkg_dir, unit_index, test_basenames, exempt_paths
            )
            if label == "exempt":
                exempt.append(py_file.name)
            elif label == "missing":
                missing.append(py_file.name)
        return sorted(set(missing)), sorted(set(exempt))

    @classmethod
    def _iter_source_modules(cls, src_path: Path) -> Iterator[tuple[Path, Path]]:
        """Yield ``(pkg_dir, py_file)`` for each non-exempt source module.

        Skips ``__pycache__``, files in ``_TEST_MIRROR_EXEMPT``, and
        deduplicates by basename across package directories.
        """
        seen: set[str] = set()
        for pkg_dir in sorted(src_path.iterdir()):
            if not pkg_dir.is_dir() or pkg_dir.name == "__pycache__":
                continue
            for py_file in sorted(pkg_dir.rglob("*.py")):
                if py_file.name in _TEST_MIRROR_EXEMPT or py_file.name in seen:
                    continue
                seen.add(py_file.name)
                yield pkg_dir, py_file

    @classmethod
    def _classify_py_file(
        cls,
        py_file: Path,
        pkg_dir: Path,
        unit_index: dict[str, set[str]] | None,
        test_basenames: set[str] | None,
        exempt_paths: list[str],
    ) -> str:
        """Return ``"exempt"``, ``"missing"``, or ``"covered"`` for ``py_file``."""
        rel_to_pkg = py_file.relative_to(pkg_dir).as_posix()
        if exempt_paths and cls._is_exempt_path(rel_to_pkg, exempt_paths):
            return "exempt"
        if not cls._has_matching_test(py_file, pkg_dir, unit_index, test_basenames):
            return "missing"
        return "covered"

    @staticmethod
    def _has_matching_test(
        py_file: Path,
        pkg_dir: Path,
        unit_index: dict[str, set[str]] | None,
        test_basenames: set[str] | None,
    ) -> bool:
        """Return True iff ``py_file`` has a matching test file."""
        stem = py_file.stem
        if unit_index:
            rel_dir = py_file.parent.relative_to(pkg_dir).as_posix()
            rel_dir = "" if rel_dir == "." else rel_dir
            available = unit_index.get(rel_dir, set())
            return bool({stem, stem.lstrip("_")} & available)
        candidates = {f"test_{stem.lstrip('_')}.py", f"test_{stem}.py"}
        return bool(candidates & (test_basenames or set()))

    @staticmethod
    def _collect_source_index(src_path: Path) -> dict[str, set[str]]:
        """Index source modules as ``{rel_dir: {stem variants}}``.

        Each non-exempt ``src/<pkg>/<rel>/<basename>.py`` contributes one entry
        per package: key = ``<rel>`` (POSIX, empty for package root), value
        contains both the original stem and the underscore-stripped stem so
        that ``_facade.py`` matches a ``test_facade.py`` test.
        """
        if not src_path.is_dir():
            return {}
        index: dict[str, set[str]] = {}
        for pkg_dir in src_path.iterdir():
            if not pkg_dir.is_dir() or pkg_dir.name == "__pycache__":
                continue
            for py_file in pkg_dir.rglob("*.py"):
                if py_file.name in _TEST_MIRROR_EXEMPT:
                    continue
                rel_dir = py_file.parent.relative_to(pkg_dir).as_posix()
                rel_dir = "" if rel_dir == "." else rel_dir
                stem = py_file.stem
                bucket = index.setdefault(rel_dir, set())
                bucket.add(stem)
                bucket.add(stem.lstrip("_"))
        return index

    @classmethod
    def _collect_orphan_tests(
        cls,
        src_path: Path,
        tests_path: Path,
    ) -> list[str]:
        """List ``tests/unit/`` test files with no matching source module.

        Walks ``tests/unit/**/test_*.py`` and flags each whose
        ``(rel_dir, stem)`` does not correspond to any source module under
        ``src/<pkg>/<rel_dir>/<stem>.py`` (with optional leading underscores).
        Returns POSIX ``tests/unit``-rooted paths sorted for determinism.
        Always returns ``[]`` when ``tests/unit/`` is absent.
        """
        unit_path = tests_path / "unit"
        if not unit_path.is_dir():
            return []
        src_index = cls._collect_source_index(src_path)
        orphans: list[str] = []
        for test_file in unit_path.rglob("test_*.py"):
            if not test_file.is_file():
                continue
            rel_dir = test_file.parent.relative_to(unit_path).as_posix()
            rel_dir = "" if rel_dir == "." else rel_dir
            test_stem = test_file.stem[len("test_") :]
            candidates = {test_stem, test_stem.lstrip("_")}
            available = src_index.get(rel_dir, set())
            if not candidates & available:
                rel_to_unit = test_file.relative_to(tests_path).as_posix()
                orphans.append(f"tests/{rel_to_unit}")
        return sorted(orphans)
rule_id property

Unique identifier for this rule (bidirectional mirror check).

check(project_path)

Check forward + reverse test/source mapping.

Source code in packages/axm-audit/src/axm_audit/core/rules/practices/mirror.py
Python
def check(self, project_path: Path) -> CheckResult:
    """Check forward + reverse test/source mapping."""
    early = self.check_src(project_path)
    if early is not None:
        return early

    config = _load_mirror_config(project_path)
    if config.error is not None:
        return CheckResult(
            rule_id=self.rule_id,
            passed=False,
            message="Invalid mirror config",
            severity=Severity.WARNING,
            score=0,
            details={"missing": [], "orphan": [], "exempt": []},
            fix_hint=config.error,
        )

    src_path = project_path / "src"
    tests_path = project_path / "tests"
    missing, exempt = self._find_untested_modules(
        src_path, tests_path, config.exempt_paths
    )
    orphan = self._collect_orphan_tests(src_path, tests_path)

    if not missing and not orphan:
        return CheckResult(
            rule_id=self.rule_id,
            passed=True,
            message="All source modules have test files",
            severity=Severity.INFO,
            score=100,
            details={"missing": [], "orphan": [], "exempt": exempt},
        )

    violations = len(missing) + len(orphan)
    score = max(0, 100 - violations * 15)
    passed = score >= 90  # noqa: PLR2004

    hint = self._build_fix_hint(src_path, missing, orphan)
    text = self._build_text(missing, orphan)
    message = self._build_message(missing, orphan)

    return CheckResult(
        rule_id=self.rule_id,
        passed=passed,
        message=message,
        severity=Severity.WARNING if not passed else Severity.INFO,
        score=int(score),
        details={"missing": missing, "orphan": orphan, "exempt": exempt},
        fix_hint=hint,
        text=text,
    )

ProjectRule

Bases: ABC

Base class for project invariants.

Each rule defines a single check that a project must satisfy.

Source code in packages/axm-audit/src/axm_audit/core/rules/base.py
Python
class ProjectRule(ABC):
    """Base class for project invariants.

    Each rule defines a single check that a project must satisfy.
    """

    @property
    @abstractmethod
    def rule_id(self) -> str:
        """Unique identifier for this rule."""

    @property
    def category(self) -> str:
        """Scoring category, auto-injected by ``@register_rule``.

        Valid values: ``lint``, ``type``, ``complexity``, ``security``,
        ``deps``, ``testing``, ``architecture``, ``practices``,
        ``structure``, ``tooling``.
        """
        return getattr(self, "_registered_category", "")

    @abstractmethod
    def check(self, project_path: Path) -> CheckResult:
        """Execute the check against a project.

        Args:
            project_path: Root directory of the project to check.

        Returns:
            CheckResult with pass/fail status and message.
        """

    def check_src(self, project_path: Path) -> CheckResult | None:
        """Return an early ``CheckResult`` if ``src/`` does not exist.

        Call this at the top of ``check()`` to eliminate boilerplate::

            early = self.check_src(project_path)
            if early is not None:
                return early

        Returns:
            ``None`` if ``src/`` exists — single-package layout (``src/``)
            or multi-package workspace (``packages/*/src/``). The rule
            should continue.
            A passing ``CheckResult`` if neither layout is present.
        """
        if iter_src_dirs(project_path):
            return None
        return CheckResult(
            rule_id=self.rule_id,
            passed=True,
            message="src/ directory not found",
            severity=Severity.INFO,
            score=100,
        )

    @classmethod
    def get_instances(cls) -> list[ProjectRule]:
        """Instantiate this rule.

        Override in subclasses that require constructor parameters
        (e.g. ``ToolAvailabilityRule``).

        Returns:
            List of rule instances — ``[cls()]`` by default.
        """
        return [cls()]
category property

Scoring category, auto-injected by @register_rule.

Valid values: lint, type, complexity, security, deps, testing, architecture, practices, structure, tooling.

rule_id abstractmethod property

Unique identifier for this rule.

check(project_path) abstractmethod

Execute the check against a project.

Parameters:

Name Type Description Default
project_path Path

Root directory of the project to check.

required

Returns:

Type Description
CheckResult

CheckResult with pass/fail status and message.

Source code in packages/axm-audit/src/axm_audit/core/rules/base.py
Python
@abstractmethod
def check(self, project_path: Path) -> CheckResult:
    """Execute the check against a project.

    Args:
        project_path: Root directory of the project to check.

    Returns:
        CheckResult with pass/fail status and message.
    """
check_src(project_path)

Return an early CheckResult if src/ does not exist.

Call this at the top of check() to eliminate boilerplate::

Text Only
early = self.check_src(project_path)
if early is not None:
    return early

Returns:

Type Description
CheckResult | None

None if src/ exists — single-package layout (src/)

CheckResult | None

or multi-package workspace (packages/*/src/). The rule

CheckResult | None

should continue.

CheckResult | None

A passing CheckResult if neither layout is present.

Source code in packages/axm-audit/src/axm_audit/core/rules/base.py
Python
def check_src(self, project_path: Path) -> CheckResult | None:
    """Return an early ``CheckResult`` if ``src/`` does not exist.

    Call this at the top of ``check()`` to eliminate boilerplate::

        early = self.check_src(project_path)
        if early is not None:
            return early

    Returns:
        ``None`` if ``src/`` exists — single-package layout (``src/``)
        or multi-package workspace (``packages/*/src/``). The rule
        should continue.
        A passing ``CheckResult`` if neither layout is present.
    """
    if iter_src_dirs(project_path):
        return None
    return CheckResult(
        rule_id=self.rule_id,
        passed=True,
        message="src/ directory not found",
        severity=Severity.INFO,
        score=100,
    )
get_instances() classmethod

Instantiate this rule.

Override in subclasses that require constructor parameters (e.g. ToolAvailabilityRule).

Returns:

Type Description
list[ProjectRule]

List of rule instances — [cls()] by default.

Source code in packages/axm-audit/src/axm_audit/core/rules/base.py
Python
@classmethod
def get_instances(cls) -> list[ProjectRule]:
    """Instantiate this rule.

    Override in subclasses that require constructor parameters
    (e.g. ``ToolAvailabilityRule``).

    Returns:
        List of rule instances — ``[cls()]`` by default.
    """
    return [cls()]

PyprojectCompletenessRule dataclass

Bases: ProjectRule

Validate PEP 621 field completeness in pyproject.toml.

Checks 9 fields: name, version/dynamic, description, requires-python, license, authors, classifiers, urls, readme. Scoring: (fields_present / 9) x 100.

Source code in packages/axm-audit/src/axm_audit/core/rules/structure.py
Python
@dataclass
@register_rule("structure")
class PyprojectCompletenessRule(ProjectRule):
    """Validate PEP 621 field completeness in pyproject.toml.

    Checks 9 fields: name, version/dynamic, description, requires-python,
    license, authors, classifiers, urls, readme.
    Scoring: (fields_present / 9) x 100.
    """

    @property
    def rule_id(self) -> str:
        """Unique identifier for this rule."""
        return "STRUCTURE_PYPROJECT"

    def check(self, project_path: Path) -> CheckResult:
        """Check pyproject.toml completeness."""
        pyproject_path = project_path / "pyproject.toml"
        if not pyproject_path.exists():
            return CheckResult(
                rule_id=self.rule_id,
                passed=False,
                message="pyproject.toml not found",
                severity=Severity.ERROR,
                score=0,
                details={"fields_present": 0, "total_fields": 9},
                fix_hint="Create pyproject.toml with PEP 621 metadata",
            )

        try:
            data = tomllib.loads(pyproject_path.read_text())
        except (tomllib.TOMLDecodeError, OSError):
            return CheckResult(
                rule_id=self.rule_id,
                passed=False,
                message="pyproject.toml parse error",
                severity=Severity.ERROR,
                score=0,
                details={"fields_present": 0, "total_fields": 9},
                fix_hint="Fix pyproject.toml syntax",
            )

        present, missing = check_fields(data.get("project", {}))
        score = int((present / _TOTAL_FIELDS) * 100)

        return CheckResult(
            rule_id=self.rule_id,
            passed=score >= PASS_THRESHOLD,
            message=f"pyproject.toml completeness: {present}/{_TOTAL_FIELDS} fields",
            severity=Severity.WARNING if score < PASS_THRESHOLD else Severity.INFO,
            text=f"\u2022 missing: {', '.join(missing)}" if missing else None,
            score=int(score),
            details={
                "fields_present": present,
                "total_fields": _TOTAL_FIELDS,
                "missing": missing,
            },
            fix_hint=(
                "Add missing PEP 621 fields to [project]"
                if score < PASS_THRESHOLD
                else None
            ),
        )
rule_id property

Unique identifier for this rule.

check(project_path)

Check pyproject.toml completeness.

Source code in packages/axm-audit/src/axm_audit/core/rules/structure.py
Python
def check(self, project_path: Path) -> CheckResult:
    """Check pyproject.toml completeness."""
    pyproject_path = project_path / "pyproject.toml"
    if not pyproject_path.exists():
        return CheckResult(
            rule_id=self.rule_id,
            passed=False,
            message="pyproject.toml not found",
            severity=Severity.ERROR,
            score=0,
            details={"fields_present": 0, "total_fields": 9},
            fix_hint="Create pyproject.toml with PEP 621 metadata",
        )

    try:
        data = tomllib.loads(pyproject_path.read_text())
    except (tomllib.TOMLDecodeError, OSError):
        return CheckResult(
            rule_id=self.rule_id,
            passed=False,
            message="pyproject.toml parse error",
            severity=Severity.ERROR,
            score=0,
            details={"fields_present": 0, "total_fields": 9},
            fix_hint="Fix pyproject.toml syntax",
        )

    present, missing = check_fields(data.get("project", {}))
    score = int((present / _TOTAL_FIELDS) * 100)

    return CheckResult(
        rule_id=self.rule_id,
        passed=score >= PASS_THRESHOLD,
        message=f"pyproject.toml completeness: {present}/{_TOTAL_FIELDS} fields",
        severity=Severity.WARNING if score < PASS_THRESHOLD else Severity.INFO,
        text=f"\u2022 missing: {', '.join(missing)}" if missing else None,
        score=int(score),
        details={
            "fields_present": present,
            "total_fields": _TOTAL_FIELDS,
            "missing": missing,
        },
        fix_hint=(
            "Add missing PEP 621 fields to [project]"
            if score < PASS_THRESHOLD
            else None
        ),
    )

SecurityPatternRule dataclass

Bases: ProjectRule

Detect hardcoded secrets via regex patterns.

Source code in packages/axm-audit/src/axm_audit/core/rules/security.py
Python
@dataclass
@register_rule("security")
class SecurityPatternRule(ProjectRule):
    """Detect hardcoded secrets via regex patterns."""

    patterns: list[str] = field(
        default_factory=lambda: [
            r"password\s*=\s*[\"'][^\"']+[\"']",
            r"secret\s*=\s*[\"'][^\"']+[\"']",
            r"api_key\s*=\s*[\"'][^\"']+[\"']",
            r"token\s*=\s*[\"'][^\"']+[\"']",
        ]
    )

    @property
    def rule_id(self) -> str:
        """Unique identifier for this rule."""
        return "PRACTICE_SECURITY"

    def _scan_file_for_secrets(
        self, path: Path, src_path: Path
    ) -> list[dict[str, str | int]]:
        try:
            content = path.read_text()
        except (OSError, UnicodeDecodeError):
            return []

        found: list[dict[str, str | int]] = []
        for pattern in self.patterns:
            for match in re.finditer(pattern, content, re.IGNORECASE):
                line_num = content[: match.start()].count("\n") + 1
                found.append(
                    {
                        "file": str(path.relative_to(src_path)),
                        "line": line_num,
                        "pattern": pattern.split(r"\s*")[0],
                    }
                )
        return found

    def _build_secret_result(self, matches: list[dict[str, str | int]]) -> CheckResult:
        count = len(matches)
        passed = count == 0
        score = max(0, 100 - count * 25)
        text_lines = [f"• {m['file']}:{m['line']} {m['pattern']}" for m in matches]

        return CheckResult(
            rule_id=self.rule_id,
            passed=passed,
            message=f"{count} potential secret(s) found",
            severity=Severity.ERROR if not passed else Severity.INFO,
            score=int(score),
            details={"secret_count": count, "matches": matches},
            text="\n".join(text_lines) if text_lines else None,
            fix_hint="Use environment variables or secret managers"
            if not passed
            else None,
        )

    def check(self, project_path: Path) -> CheckResult:
        """Check for hardcoded secrets in the project."""
        early = self.check_src(project_path)
        if early is not None:
            return early

        src_path = project_path / "src"
        matches: list[dict[str, str | int]] = []
        for path in get_python_files(src_path):
            matches.extend(self._scan_file_for_secrets(path, src_path))

        return self._build_secret_result(matches)
rule_id property

Unique identifier for this rule.

check(project_path)

Check for hardcoded secrets in the project.

Source code in packages/axm-audit/src/axm_audit/core/rules/security.py
Python
def check(self, project_path: Path) -> CheckResult:
    """Check for hardcoded secrets in the project."""
    early = self.check_src(project_path)
    if early is not None:
        return early

    src_path = project_path / "src"
    matches: list[dict[str, str | int]] = []
    for path in get_python_files(src_path):
        matches.extend(self._scan_file_for_secrets(path, src_path))

    return self._build_secret_result(matches)

SecurityRule dataclass

Bases: ProjectRule

Run Bandit and score based on vulnerability severity.

Scoring: 100 - (high_count * 15 + medium_count * 5), min 0.

Source code in packages/axm-audit/src/axm_audit/core/rules/security.py
Python
@dataclass
@register_rule("security")
class SecurityRule(ProjectRule):
    """Run Bandit and score based on vulnerability severity.

    Scoring: 100 - (high_count * 15 + medium_count * 5), min 0.
    """

    @property
    def rule_id(self) -> str:
        """Unique identifier for this rule."""
        return "QUALITY_SECURITY"

    def check(self, project_path: Path) -> CheckResult:
        """Check project security with Bandit."""
        early = self.check_src(project_path)
        if early is not None:
            return early

        src_path = project_path / "src"

        try:
            data = run_bandit(src_path, project_path)
        except FileNotFoundError:
            return CheckResult(
                rule_id=self.rule_id,
                passed=False,
                message="bandit not available",
                severity=Severity.ERROR,
                score=0,
                details={"high_count": 0, "medium_count": 0},
                fix_hint="Install with: uv add --dev bandit",
            )
        except RuntimeError as exc:
            return CheckResult(
                rule_id=self.rule_id,
                passed=False,
                message=str(exc),
                severity=Severity.ERROR,
                score=0,
                details={"high_count": 0, "medium_count": 0},
                fix_hint="Check bandit installation: uv run bandit --version",
            )

        raw_results = data.get("results", [])
        results: list[dict[str, object]] = (
            [r for r in raw_results if isinstance(r, dict)]
            if isinstance(raw_results, list)
            else []
        )
        return _build_security_result(self.rule_id, results)
rule_id property

Unique identifier for this rule.

check(project_path)

Check project security with Bandit.

Source code in packages/axm-audit/src/axm_audit/core/rules/security.py
Python
def check(self, project_path: Path) -> CheckResult:
    """Check project security with Bandit."""
    early = self.check_src(project_path)
    if early is not None:
        return early

    src_path = project_path / "src"

    try:
        data = run_bandit(src_path, project_path)
    except FileNotFoundError:
        return CheckResult(
            rule_id=self.rule_id,
            passed=False,
            message="bandit not available",
            severity=Severity.ERROR,
            score=0,
            details={"high_count": 0, "medium_count": 0},
            fix_hint="Install with: uv add --dev bandit",
        )
    except RuntimeError as exc:
        return CheckResult(
            rule_id=self.rule_id,
            passed=False,
            message=str(exc),
            severity=Severity.ERROR,
            score=0,
            details={"high_count": 0, "medium_count": 0},
            fix_hint="Check bandit installation: uv run bandit --version",
        )

    raw_results = data.get("results", [])
    results: list[dict[str, object]] = (
        [r for r in raw_results if isinstance(r, dict)]
        if isinstance(raw_results, list)
        else []
    )
    return _build_security_result(self.rule_id, results)

TestCoverageRule dataclass

Bases: ProjectRule

Check test coverage via pytest-cov.

Scoring: coverage percentage directly (e.g., 90% → score 90). Pass threshold: 90%.

Source code in packages/axm-audit/src/axm_audit/core/rules/coverage.py
Python
@dataclass
@register_rule("testing")
class TestCoverageRule(ProjectRule):
    """Check test coverage via pytest-cov.

    Scoring: coverage percentage directly (e.g., 90% → score 90).
    Pass threshold: 90%.
    """

    min_coverage: float = 90.0

    @property
    def rule_id(self) -> str:
        """Unique identifier for this rule."""
        return "QUALITY_COVERAGE"

    def check(self, project_path: Path) -> CheckResult:
        """Check test coverage and capture failures with pytest-cov.

        Delegates to ``run_tests(mode='compact')`` from the shared
        test runner for structured output, then converts the result
        to a ``CheckResult``.

        The per-file gap list is derived from ``parse_coverage``'s
        filtered ``per_file`` map, which excludes ``__main__.py``
        entries by default (aligned with coverage.py's
        ``exclude_also`` convention).
        """
        from axm_audit.core.test_runner import run_tests

        report = run_tests(project_path, mode="compact", stop_on_first=False)
        return self._report_to_result(report)

    def _no_coverage_result(self, failures: list[dict[str, str]]) -> CheckResult:
        """Build the ``CheckResult`` returned when pytest-cov is not configured."""
        return CheckResult(
            rule_id=self.rule_id,
            passed=False,
            message="No coverage data (pytest-cov not configured)",
            severity=Severity.WARNING,
            score=0,
            details={"coverage": 0.0, "failures": failures},
            fix_hint="Add pytest-cov: uv add --dev pytest-cov",
        )

    @staticmethod
    def _build_text_parts(
        coverage_pct: float, failures: list[dict[str, str]]
    ) -> list[str]:
        """Build compact bullet lines for the coverage gap and up to 10 failures."""
        text_parts: list[str] = []
        if coverage_pct < _FULL_COVERAGE:
            text_parts.append(
                f"\u2022 cov {coverage_pct:.0f}% \u2192 {_FULL_COVERAGE}%"
            )
        for f in failures[:10]:
            short = f["test"].rsplit("::", 1)[-1]
            text_parts.append(f"\u2022 FAIL {short}")
        return text_parts

    @staticmethod
    def _build_message(
        coverage_pct: float, score: int, has_failures: bool, total_fails: int
    ) -> str:
        """Format the human-readable coverage message, with or without failures."""
        if has_failures:
            return f"Test coverage: {coverage_pct:.0f}% ({total_fails} test(s) failed)"
        return f"Test coverage: {coverage_pct:.0f}% ({score}/100)"

    def _report_to_result(self, report: TestReport) -> CheckResult:
        """Convert a ``TestReport`` to a ``CheckResult``.

        Builds a compact text summary with bullet lines for coverage gap
        (``• cov N% → 100%``) and up to 10 short failure names
        (``• FAIL test_name``).  Returns ``text=None`` when coverage is
        full and no failures exist.
        """
        coverage_pct = report.coverage if report.coverage is not None else 0.0
        score = int(coverage_pct)
        has_failures = report.failed > 0 or report.errors > 0
        passed = coverage_pct >= self.min_coverage and not has_failures
        total_fails = report.failed + report.errors

        failures: list[dict[str, str]] = [
            {"test": f.test, "traceback": f.message} for f in report.failures or []
        ]

        if report.coverage is None:
            return self._no_coverage_result(failures)

        message = self._build_message(coverage_pct, score, has_failures, total_fails)
        text_parts = self._build_text_parts(coverage_pct, failures)

        return CheckResult(
            rule_id=self.rule_id,
            passed=passed,
            message=message,
            severity=Severity.WARNING if not passed else Severity.INFO,
            score=int(score),
            details={
                "coverage": coverage_pct,
                "failures": failures,
            },
            text="\n".join(text_parts) if text_parts else None,
            fix_hint=self._generate_fix_hints(has_failures, coverage_pct),
        )

    def _generate_fix_hints(
        self, has_failures: bool, coverage_pct: float
    ) -> str | None:
        """Generate fix hints based on failures and coverage."""
        fix_hints: list[str] = []
        if has_failures:
            fix_hints.append("Fix failing tests")
        if coverage_pct < self.min_coverage:
            fix_hints.append(f"Increase test coverage to >= {self.min_coverage:.0f}%")
        return "; ".join(fix_hints) if fix_hints else None
rule_id property

Unique identifier for this rule.

check(project_path)

Check test coverage and capture failures with pytest-cov.

Delegates to run_tests(mode='compact') from the shared test runner for structured output, then converts the result to a CheckResult.

The per-file gap list is derived from parse_coverage's filtered per_file map, which excludes __main__.py entries by default (aligned with coverage.py's exclude_also convention).

Source code in packages/axm-audit/src/axm_audit/core/rules/coverage.py
Python
def check(self, project_path: Path) -> CheckResult:
    """Check test coverage and capture failures with pytest-cov.

    Delegates to ``run_tests(mode='compact')`` from the shared
    test runner for structured output, then converts the result
    to a ``CheckResult``.

    The per-file gap list is derived from ``parse_coverage``'s
    filtered ``per_file`` map, which excludes ``__main__.py``
    entries by default (aligned with coverage.py's
    ``exclude_also`` convention).
    """
    from axm_audit.core.test_runner import run_tests

    report = run_tests(project_path, mode="compact", stop_on_first=False)
    return self._report_to_result(report)

ToolAvailabilityRule dataclass

Bases: ProjectRule

Check if a required CLI tool is available on PATH.

Source code in packages/axm-audit/src/axm_audit/core/rules/tooling.py
Python
@dataclass
@register_rule("tooling")
class ToolAvailabilityRule(ProjectRule):
    """Check if a required CLI tool is available on PATH."""

    tool_name: str = ""
    critical: bool = True  # If True, severity=ERROR when missing; else WARNING

    @property
    def rule_id(self) -> str:
        """Unique identifier for this rule."""
        return f"TOOL_{self.tool_name.upper()}"

    @classmethod
    def get_instances(cls) -> list[ProjectRule]:
        """Return one instance per required tool."""
        return [cls(tool_name=t) for t in _REQUIRED_TOOLS]

    def check(self, project_path: Path) -> CheckResult:
        """Check if the tool is available on the system PATH."""
        _ = project_path  # Not used for tool availability checks
        available = shutil.which(self.tool_name) is not None

        if available:
            return CheckResult(
                rule_id=self.rule_id,
                passed=True,
                message=f"{self.tool_name} found",
                severity=Severity.INFO,
            )

        severity = Severity.ERROR if self.critical else Severity.WARNING
        return CheckResult(
            rule_id=self.rule_id,
            passed=False,
            message=f"{self.tool_name} not found",
            severity=severity,
            fix_hint=f"Install with: uv tool install {self.tool_name}",
        )
rule_id property

Unique identifier for this rule.

check(project_path)

Check if the tool is available on the system PATH.

Source code in packages/axm-audit/src/axm_audit/core/rules/tooling.py
Python
def check(self, project_path: Path) -> CheckResult:
    """Check if the tool is available on the system PATH."""
    _ = project_path  # Not used for tool availability checks
    available = shutil.which(self.tool_name) is not None

    if available:
        return CheckResult(
            rule_id=self.rule_id,
            passed=True,
            message=f"{self.tool_name} found",
            severity=Severity.INFO,
        )

    severity = Severity.ERROR if self.critical else Severity.WARNING
    return CheckResult(
        rule_id=self.rule_id,
        passed=False,
        message=f"{self.tool_name} not found",
        severity=severity,
        fix_hint=f"Install with: uv tool install {self.tool_name}",
    )
get_instances() classmethod

Return one instance per required tool.

Source code in packages/axm-audit/src/axm_audit/core/rules/tooling.py
Python
@classmethod
def get_instances(cls) -> list[ProjectRule]:
    """Return one instance per required tool."""
    return [cls(tool_name=t) for t in _REQUIRED_TOOLS]

TypeCheckRule dataclass

Bases: ProjectRule

Run mypy with zero-tolerance for errors.

Scoring: 100 - (error_count * 5), min 0. Pass/fail: any error means failure (matches pre-commit mypy hook).

Source code in packages/axm-audit/src/axm_audit/core/rules/quality.py
Python
@dataclass
@register_rule("type")
class TypeCheckRule(ProjectRule):
    """Run mypy with zero-tolerance for errors.

    Scoring: 100 - (error_count * 5), min 0.
    Pass/fail: any error means failure (matches pre-commit mypy hook).
    """

    @property
    def rule_id(self) -> str:
        """Unique identifier for this rule."""
        return "QUALITY_TYPE"

    def check(self, project_path: Path) -> CheckResult:
        """Check project type hints with mypy on src/ and tests/."""
        early = self.check_src(project_path)
        if early is not None:
            return early

        targets, checked = _get_audit_targets(project_path)

        result = run_in_project(
            ["mypy", "--no-error-summary", "--output", "json", *targets],
            project_path,
            capture_output=True,
            text=True,
            check=False,
        )

        error_count, errors = self.parse_mypy_errors(result.stdout)

        score = max(0, 100 - error_count * 5)
        passed = error_count == 0

        text_lines = [
            f"• [{e['code']}] {str(e['file']).removeprefix('src/')}"
            f":{e['line']}: {e['message']}"
            for e in errors
        ]

        return CheckResult(
            rule_id=self.rule_id,
            passed=passed,
            message=f"Type score: {score}/100 ({error_count} errors)",
            severity=Severity.WARNING if not passed else Severity.INFO,
            score=int(score),
            details={
                "error_count": error_count,
                "checked": checked,
                "errors": errors,
            },
            text="\n".join(text_lines) if text_lines else None,
            fix_hint=(
                "Add type hints to functions and fix type errors"
                if error_count > 0
                else None
            ),
        )

    @staticmethod
    def parse_mypy_errors(
        stdout: str,
    ) -> tuple[int, list[dict[str, str | int]]]:
        """Parse mypy JSON output and extract errors.

        Non-dict JSON lines (strings, arrays, ints, nulls) emitted by
        ``mypy --output json`` are silently skipped.
        """
        error_count = 0
        errors: list[dict[str, str | int]] = []
        if not stdout.strip():
            return error_count, errors

        for line in stdout.strip().split("\n"):
            if not line.strip():
                continue
            try:
                entry = json.loads(line)
            except json.JSONDecodeError:
                continue
            if not isinstance(entry, dict):
                continue
            if entry.get("severity") != "error":
                continue
            error_count += 1
            errors.append(
                {
                    "file": entry.get("file", ""),
                    "line": entry.get("line", 0),
                    "message": entry.get("message", ""),
                    "code": entry.get("code", ""),
                }
            )
        return error_count, errors
rule_id property

Unique identifier for this rule.

check(project_path)

Check project type hints with mypy on src/ and tests/.

Source code in packages/axm-audit/src/axm_audit/core/rules/quality.py
Python
def check(self, project_path: Path) -> CheckResult:
    """Check project type hints with mypy on src/ and tests/."""
    early = self.check_src(project_path)
    if early is not None:
        return early

    targets, checked = _get_audit_targets(project_path)

    result = run_in_project(
        ["mypy", "--no-error-summary", "--output", "json", *targets],
        project_path,
        capture_output=True,
        text=True,
        check=False,
    )

    error_count, errors = self.parse_mypy_errors(result.stdout)

    score = max(0, 100 - error_count * 5)
    passed = error_count == 0

    text_lines = [
        f"• [{e['code']}] {str(e['file']).removeprefix('src/')}"
        f":{e['line']}: {e['message']}"
        for e in errors
    ]

    return CheckResult(
        rule_id=self.rule_id,
        passed=passed,
        message=f"Type score: {score}/100 ({error_count} errors)",
        severity=Severity.WARNING if not passed else Severity.INFO,
        score=int(score),
        details={
            "error_count": error_count,
            "checked": checked,
            "errors": errors,
        },
        text="\n".join(text_lines) if text_lines else None,
        fix_hint=(
            "Add type hints to functions and fix type errors"
            if error_count > 0
            else None
        ),
    )
parse_mypy_errors(stdout) staticmethod

Parse mypy JSON output and extract errors.

Non-dict JSON lines (strings, arrays, ints, nulls) emitted by mypy --output json are silently skipped.

Source code in packages/axm-audit/src/axm_audit/core/rules/quality.py
Python
@staticmethod
def parse_mypy_errors(
    stdout: str,
) -> tuple[int, list[dict[str, str | int]]]:
    """Parse mypy JSON output and extract errors.

    Non-dict JSON lines (strings, arrays, ints, nulls) emitted by
    ``mypy --output json`` are silently skipped.
    """
    error_count = 0
    errors: list[dict[str, str | int]] = []
    if not stdout.strip():
        return error_count, errors

    for line in stdout.strip().split("\n"):
        if not line.strip():
            continue
        try:
            entry = json.loads(line)
        except json.JSONDecodeError:
            continue
        if not isinstance(entry, dict):
            continue
        if entry.get("severity") != "error":
            continue
        error_count += 1
        errors.append(
            {
                "file": entry.get("file", ""),
                "line": entry.get("line", 0),
                "message": entry.get("message", ""),
                "code": entry.get("code", ""),
            }
        )
    return error_count, errors

get_registry()

Return the current rule registry (read-only view).

Callers must ensure that rule modules have been imported before calling this function so that @register_rule decorators have fired.

Source code in packages/axm-audit/src/axm_audit/core/rules/base.py
Python
def get_registry() -> dict[str, list[type[ProjectRule]]]:
    """Return the current rule registry (read-only view).

    Callers must ensure that rule modules have been imported before
    calling this function so that ``@register_rule`` decorators have
    fired.
    """
    return _RULE_REGISTRY

register_rule(category)

Class decorator that registers a rule in the auto-discovery registry.

Also injects _registered_category on the class so that ProjectRule.category resolves automatically.

Parameters:

Name Type Description Default
category str

Unified category (e.g. "lint", "security").

required

Returns:

Type Description
Callable[[type[ProjectRule]], type[ProjectRule]]

The unmodified class — the decorator only appends to the registry

Callable[[type[ProjectRule]], type[ProjectRule]]

and sets the _registered_category attribute.

Source code in packages/axm-audit/src/axm_audit/core/rules/base.py
Python
def register_rule(category: str) -> Callable[[type[ProjectRule]], type[ProjectRule]]:
    """Class decorator that registers a rule in the auto-discovery registry.

    Also injects ``_registered_category`` on the class so that
    ``ProjectRule.category`` resolves automatically.

    Args:
        category: Unified category (e.g. ``"lint"``, ``"security"``).

    Returns:
        The unmodified class — the decorator only appends to the registry
        and sets the ``_registered_category`` attribute.
    """

    def _decorator(cls: type[ProjectRule]) -> type[ProjectRule]:
        cls._registered_category = category  # type: ignore[attr-defined]
        bucket = _RULE_REGISTRY.setdefault(category, [])
        if cls not in bucket:
            bucket.append(cls)
        return cls

    return _decorator