Skip to content

Auditor

auditor

Project Auditor — verifies projects against standards.

This module provides the public API for checking project compliance. Rules execute in parallel via ThreadPoolExecutor for faster audits.

Rule discovery is automatic: every rule decorated with @register_rule is picked up via get_registry().

audit_project(project_path, category=None, quick=False)

Audit a project against Python 2026 standards.

Rules execute in parallel via ThreadPoolExecutor for speed. Each rule is isolated — one failure does not prevent others. An ASTCache is shared across rules to avoid redundant parsing.

Parameters:

Name Type Description Default
project_path Path

Root directory of the project to audit.

required
category str | None

Optional category filter.

None
quick bool

If True, run only lint + type checks.

False

Returns:

Type Description
AuditResult

AuditResult containing all check results.

Raises:

Type Description
FileNotFoundError

If project_path does not exist.

Source code in packages/axm-audit/src/axm_audit/core/auditor.py
Python
def audit_project(
    project_path: Path,
    category: str | None = None,
    quick: bool = False,
) -> AuditResult:
    """Audit a project against Python 2026 standards.

    Rules execute in parallel via ThreadPoolExecutor for speed.
    Each rule is isolated — one failure does not prevent others.
    An ``ASTCache`` is shared across rules to avoid redundant parsing.

    Args:
        project_path: Root directory of the project to audit.
        category: Optional category filter.
        quick: If True, run only lint + type checks.

    Returns:
        AuditResult containing all check results.

    Raises:
        FileNotFoundError: If project_path does not exist.
    """
    if not project_path.exists():
        raise FileNotFoundError(f"Project path does not exist: {project_path}")

    workspace_packages = iter_workspace_packages(project_path)
    if workspace_packages:
        return _audit_workspace(
            project_path, workspace_packages, category=category, quick=quick
        )

    rules = get_rules_for_category(category, quick)

    cache = ASTCache()
    token = set_ast_cache(cache)
    try:
        with concurrent.futures.ThreadPoolExecutor() as pool:
            futures = [
                pool.submit(
                    contextvars.copy_context().run, _safe_check, rule, project_path
                )
                for rule in rules
            ]
            checks = [f.result() for f in futures]
    finally:
        reset_ast_cache(token)

    return AuditResult(project_path=str(project_path), checks=checks)

get_rules_for_category(category, quick=False)

Get rules for a specific category or all rules.

Parameters:

Name Type Description Default
category str | None

Filter to specific category, or None for all.

required
quick bool

If True, only lint + type checks.

False

Returns:

Type Description
list[ProjectRule]

List of rule instances to run.

Raises:

Type Description
ValueError

If category is not valid.

Source code in packages/axm-audit/src/axm_audit/core/auditor.py
Python
def get_rules_for_category(
    category: str | None, quick: bool = False
) -> list[ProjectRule]:
    """Get rules for a specific category or all rules.

    Args:
        category: Filter to specific category, or None for all.
        quick: If True, only lint + type checks.

    Returns:
        List of rule instances to run.

    Raises:
        ValueError: If category is not valid.
    """
    _ensure_registry_loaded()

    if quick:
        from axm_audit.core.rules.quality import LintingRule, TypeCheckRule

        return [LintingRule(), TypeCheckRule()]

    # Validate category
    if category is not None and category not in VALID_CATEGORIES:
        raise ValueError(
            f"Invalid category: {category}. "
            f"Valid categories: {', '.join(sorted(VALID_CATEGORIES))}"
        )

    if not category:
        return _build_all_rules()

    registry = get_registry()
    rule_classes = registry.get(category, [])
    rules: list[ProjectRule] = []
    for cls in rule_classes:
        rules.extend(cls.get_instances())
    return rules

merge_check(existing, incoming, pkg_name)

Merge two CheckResults for the same rule_id (worst-of-N policy).

passed is AND'd, severity takes the max, score takes the worst (min) of set values, metadata is deep-merged via :func:merge_metadata, and findings are concatenated so workspace aggregation preserves per-package diagnostics instead of dropping them.

Source code in packages/axm-audit/src/axm_audit/core/auditor.py
Python
def merge_check(
    existing: CheckResult, incoming: CheckResult, pkg_name: str
) -> CheckResult:
    """Merge two CheckResults for the same rule_id (worst-of-N policy).

    ``passed`` is AND'd, ``severity`` takes the max, ``score`` takes the
    worst (min) of set values, ``metadata`` is deep-merged via
    :func:`merge_metadata`, and ``findings`` are concatenated so workspace
    aggregation preserves per-package diagnostics instead of dropping them.
    """
    incoming_prefixed = _prefix_check(incoming, pkg_name)
    existing_findings = list(getattr(existing, "findings", []) or [])
    incoming_findings = list(getattr(incoming_prefixed, "findings", []) or [])
    return existing.model_copy(
        update={
            "passed": existing.passed and incoming_prefixed.passed,
            "text": _merge_text(existing.text, incoming_prefixed.text),
            "details": _merge_details(existing.details, incoming_prefixed.details),
            "severity": _max_severity(existing.severity, incoming_prefixed.severity),
            "message": existing.message,
            "score": _merge_score(existing.score, incoming_prefixed.score),
            "metadata": merge_metadata(existing.metadata, incoming_prefixed.metadata),
            "findings": existing_findings + incoming_findings,
        }
    )

merge_metadata(a, b)

Deep-merge two metadata dicts (existing first, incoming second).

Rules: None is treated as {}; for each shared key, list+list concatenates (existing then incoming), dict+dict recurses, and any other combination keeps b (incoming wins).

Source code in packages/axm-audit/src/axm_audit/core/auditor.py
Python
def merge_metadata(
    a: dict[str, object] | None, b: dict[str, object] | None
) -> dict[str, object]:
    """Deep-merge two metadata dicts (existing first, incoming second).

    Rules: ``None`` is treated as ``{}``; for each shared key,
    list+list concatenates (existing then incoming), dict+dict recurses,
    and any other combination keeps ``b`` (incoming wins).
    """
    if not a and not b:
        return {}
    if not a:
        return dict(b or {})
    if not b:
        return dict(a)
    merged: dict[str, object] = dict(a)
    for key, b_val in b.items():
        if key in merged:
            merged[key] = _combine_metadata_values(merged[key], b_val)
        else:
            merged[key] = b_val
    return merged