Skip to content

Complexity

complexity

Complexity rule — cyclomatic + cognitive complexity analysis.

ComplexityRule dataclass

Bases: ProjectRule

Analyse complexity via radon (CC) and complexipy (Cognitive).

Double constraint: a function is flagged if either radon grade is C+ (CC >= 11, aligned with ruff C901) or cognitive complexity > 15 (SonarSource convention: strictly higher than 15, aligned with complexipy). A function exceeding both thresholds counts as one violation (no double penalty) but is reported with reason='cc+cog'. Falls back gracefully to CC-only mode when complexipy is unavailable.

Source code in packages/axm-audit/src/axm_audit/core/rules/complexity.py
Python
@dataclass
@register_rule("complexity")
class ComplexityRule(ProjectRule):
    """Analyse complexity via radon (CC) and complexipy (Cognitive).

    Double constraint: a function is flagged if either radon grade is
    C+ (CC >= 11, aligned with ruff C901) or cognitive complexity > 15
    (SonarSource convention: strictly higher than 15, aligned with
    complexipy). A function exceeding both thresholds counts
    as one violation (no double penalty) but is reported with
    ``reason='cc+cog'``. Falls back gracefully to CC-only mode when
    complexipy is unavailable.
    """

    @property
    def rule_id(self) -> str:
        """Unique identifier for this rule."""
        return "QUALITY_COMPLEXITY"

    def check(self, project_path: Path) -> CheckResult:
        """Check project complexity with radon + complexipy."""
        early = self.check_src(project_path)
        if early is not None:
            return early

        src_path = project_path / "src"
        cog_map, cog_disabled = _compute_cognitive_map(src_path)

        radon_api = _try_import_radon()
        if radon_api is not None:
            cc_visit, cc_rank = radon_api
            return self._check_via_api(
                src_path, cc_visit, cc_rank, cog_map, cog_disabled
            )

        return self._check_via_subprocess(src_path, cog_map, cog_disabled)

    # ------------------------------------------------------------------
    # Private helpers
    # ------------------------------------------------------------------

    def _check_via_api(
        self,
        src_path: Path,
        cc_visit: Callable[[str], list[object]],
        cc_rank: Callable[[int], str],
        cog_map: dict[tuple[str, str], int],
        cog_disabled: bool,
    ) -> CheckResult:
        """Analyse complexity using the radon Python API."""
        offenders: list[dict[str, str | int]] = []
        for py_file in src_path.rglob("*.py"):
            try:
                source = py_file.read_text(encoding="utf-8")
                blocks = cc_visit(source)
            except (SyntaxError, UnicodeDecodeError):
                continue
            rel = _relative_key(str(py_file), src_path)
            for block in blocks:
                if not hasattr(block, "complexity"):
                    continue
                cc = int(getattr(block, "complexity", 0))
                rank = cc_rank(cc)
                classname = getattr(block, "classname", "") or ""
                block_name = getattr(block, "name", "") or ""
                name = f"{classname}.{block_name}" if classname else block_name
                cognitive = cog_map.get((rel, name), 0)
                offender = _classify(rel, name, cc, rank, cognitive)
                if offender is not None:
                    offenders.append(offender)
        return self._build_result(offenders, cog_disabled)

    def _check_via_subprocess(
        self,
        src_path: Path,
        cog_map: dict[tuple[str, str], int],
        cog_disabled: bool,
    ) -> CheckResult:
        """Analyse complexity by shelling out to ``radon cc --json``."""
        radon_bin = shutil.which("radon")
        if radon_bin is None:
            return CheckResult(
                rule_id=self.rule_id,
                passed=False,
                message=("radon not found — complexity analysis skipped"),
                severity=Severity.ERROR,
                score=0,
                fix_hint=(
                    "Run 'uv sync' at workspace root or "
                    "'uv pip install axm-audit' to make radon available"
                ),
            )

        try:
            with tempfile.TemporaryDirectory() as _cwd:
                proc = subprocess.run(  # noqa: S603
                    [radon_bin, "cc", "--json", str(src_path)],
                    capture_output=True,
                    text=True,
                    check=False,
                    cwd=_cwd,
                )
            data: dict[str, list[dict[str, object]]] = (
                json.loads(proc.stdout) if proc.stdout.strip() else {}
            )
        except (json.JSONDecodeError, OSError) as exc:
            logger.warning(
                "radon cc --json failed: %s",
                exc,
                exc_info=True,
            )
            return CheckResult(
                rule_id=self.rule_id,
                passed=False,
                message="radon cc --json failed",
                severity=Severity.ERROR,
                score=0,
                fix_hint="Check radon installation",
            )

        return self._process_radon_output(data, cog_map, cog_disabled, src_path)

    def _process_radon_output(
        self,
        data: dict[str, list[dict[str, object]]],
        cog_map: dict[tuple[str, str], int] | None = None,
        cog_disabled: bool = False,
        src_path: Path | None = None,
    ) -> CheckResult:
        """Process JSON output from radon cc."""
        if cog_map is None:
            cog_map = {}
        offenders: list[dict[str, str | int]] = []
        for file_path, blocks in data.items():
            file_key = _relative_key(file_path, src_path)
            for block in blocks:
                if not isinstance(block, dict):
                    continue
                rank = str(block.get("rank", ""))
                raw_cc = block.get("complexity", 0)
                cc = int(raw_cc) if isinstance(raw_cc, int | float | str) else 0
                raw_name = str(block.get("name", ""))
                classname = str(block.get("classname", ""))
                name = f"{classname}.{raw_name}" if classname else raw_name
                cognitive = cog_map.get((file_key, name), 0)
                offender = _classify(file_key, name, cc, rank, cognitive)
                if offender is not None:
                    offenders.append(offender)
        return self._build_result(offenders, cog_disabled)

    def _build_result(
        self,
        offenders: list[dict[str, str | int]],
        cog_disabled: bool = False,
    ) -> CheckResult:
        """Build the final ``CheckResult`` from computed metrics."""
        top_offenders = sorted(
            offenders,
            key=lambda x: max(int(x["cc"]), int(x.get("cognitive", 0))),
            reverse=True,
        )
        high_complexity_count = len(top_offenders)
        score = max(0, 100 - high_complexity_count * 10)
        passed = score >= PASS_THRESHOLD

        text_lines = [
            (
                f"• {o['file']}:{o['function']} "
                f"cc={o['cc']} ({o['rank']}) cog={o['cognitive']} [{o['reason']}]"
            )
            for o in top_offenders
        ]

        message = (
            f"Complexity score: {score}/100 "
            f"({high_complexity_count} high-complexity functions)"
        )
        if cog_disabled:
            message += " — cognitive layer disabled (complexipy unavailable)"

        return CheckResult(
            rule_id=self.rule_id,
            passed=passed,
            message=message,
            severity=(
                Severity.WARNING if not passed or cog_disabled else Severity.INFO
            ),
            score=int(score),
            details={
                "high_complexity_count": high_complexity_count,
                "top_offenders": top_offenders,
                "cognitive_disabled": cog_disabled,
            },
            text="\n".join(text_lines) if text_lines else None,
            fix_hint=(
                "Refactor complex functions into smaller units"
                if high_complexity_count > 0
                else None
            ),
        )
rule_id property

Unique identifier for this rule.

check(project_path)

Check project complexity with radon + complexipy.

Source code in packages/axm-audit/src/axm_audit/core/rules/complexity.py
Python
def check(self, project_path: Path) -> CheckResult:
    """Check project complexity with radon + complexipy."""
    early = self.check_src(project_path)
    if early is not None:
        return early

    src_path = project_path / "src"
    cog_map, cog_disabled = _compute_cognitive_map(src_path)

    radon_api = _try_import_radon()
    if radon_api is not None:
        cc_visit, cc_rank = radon_api
        return self._check_via_api(
            src_path, cc_visit, cc_rank, cog_map, cog_disabled
        )

    return self._check_via_subprocess(src_path, cog_map, cog_disabled)