@dataclass
@register_rule("complexity")
class ComplexityRule(ProjectRule):
"""Analyse complexity via radon (CC) and complexipy (Cognitive).
Double constraint: a function is flagged if either radon grade is
C+ (CC >= 11, aligned with ruff C901) or cognitive complexity > 15
(SonarSource convention: strictly higher than 15, aligned with
complexipy). A function exceeding both thresholds counts
as one violation (no double penalty) but is reported with
``reason='cc+cog'``. Falls back gracefully to CC-only mode when
complexipy is unavailable.
"""
@property
def rule_id(self) -> str:
"""Unique identifier for this rule."""
return "QUALITY_COMPLEXITY"
def check(self, project_path: Path) -> CheckResult:
"""Check project complexity with radon + complexipy."""
early = self.check_src(project_path)
if early is not None:
return early
src_path = project_path / "src"
cog_map, cog_disabled = _compute_cognitive_map(src_path)
radon_api = _try_import_radon()
if radon_api is not None:
cc_visit, cc_rank = radon_api
return self._check_via_api(
src_path, cc_visit, cc_rank, cog_map, cog_disabled
)
return self._check_via_subprocess(src_path, cog_map, cog_disabled)
# ------------------------------------------------------------------
# Private helpers
# ------------------------------------------------------------------
def _check_via_api(
self,
src_path: Path,
cc_visit: Callable[[str], list[object]],
cc_rank: Callable[[int], str],
cog_map: dict[tuple[str, str], int],
cog_disabled: bool,
) -> CheckResult:
"""Analyse complexity using the radon Python API."""
offenders: list[dict[str, str | int]] = []
for py_file in src_path.rglob("*.py"):
try:
source = py_file.read_text(encoding="utf-8")
blocks = cc_visit(source)
except (SyntaxError, UnicodeDecodeError):
continue
rel = _relative_key(str(py_file), src_path)
for block in blocks:
if not hasattr(block, "complexity"):
continue
cc = int(getattr(block, "complexity", 0))
rank = cc_rank(cc)
classname = getattr(block, "classname", "") or ""
block_name = getattr(block, "name", "") or ""
name = f"{classname}.{block_name}" if classname else block_name
cognitive = cog_map.get((rel, name), 0)
offender = _classify(rel, name, cc, rank, cognitive)
if offender is not None:
offenders.append(offender)
return self._build_result(offenders, cog_disabled)
def _check_via_subprocess(
self,
src_path: Path,
cog_map: dict[tuple[str, str], int],
cog_disabled: bool,
) -> CheckResult:
"""Analyse complexity by shelling out to ``radon cc --json``."""
radon_bin = shutil.which("radon")
if radon_bin is None:
return CheckResult(
rule_id=self.rule_id,
passed=False,
message=("radon not found — complexity analysis skipped"),
severity=Severity.ERROR,
score=0,
fix_hint=(
"Run 'uv sync' at workspace root or "
"'uv pip install axm-audit' to make radon available"
),
)
try:
with tempfile.TemporaryDirectory() as _cwd:
proc = subprocess.run( # noqa: S603
[radon_bin, "cc", "--json", str(src_path)],
capture_output=True,
text=True,
check=False,
cwd=_cwd,
)
data: dict[str, list[dict[str, object]]] = (
json.loads(proc.stdout) if proc.stdout.strip() else {}
)
except (json.JSONDecodeError, OSError) as exc:
logger.warning(
"radon cc --json failed: %s",
exc,
exc_info=True,
)
return CheckResult(
rule_id=self.rule_id,
passed=False,
message="radon cc --json failed",
severity=Severity.ERROR,
score=0,
fix_hint="Check radon installation",
)
return self._process_radon_output(data, cog_map, cog_disabled, src_path)
def _process_radon_output(
self,
data: dict[str, list[dict[str, object]]],
cog_map: dict[tuple[str, str], int] | None = None,
cog_disabled: bool = False,
src_path: Path | None = None,
) -> CheckResult:
"""Process JSON output from radon cc."""
if cog_map is None:
cog_map = {}
offenders: list[dict[str, str | int]] = []
for file_path, blocks in data.items():
file_key = _relative_key(file_path, src_path)
for block in blocks:
if not isinstance(block, dict):
continue
rank = str(block.get("rank", ""))
raw_cc = block.get("complexity", 0)
cc = int(raw_cc) if isinstance(raw_cc, int | float | str) else 0
raw_name = str(block.get("name", ""))
classname = str(block.get("classname", ""))
name = f"{classname}.{raw_name}" if classname else raw_name
cognitive = cog_map.get((file_key, name), 0)
offender = _classify(file_key, name, cc, rank, cognitive)
if offender is not None:
offenders.append(offender)
return self._build_result(offenders, cog_disabled)
def _build_result(
self,
offenders: list[dict[str, str | int]],
cog_disabled: bool = False,
) -> CheckResult:
"""Build the final ``CheckResult`` from computed metrics."""
top_offenders = sorted(
offenders,
key=lambda x: max(int(x["cc"]), int(x.get("cognitive", 0))),
reverse=True,
)
high_complexity_count = len(top_offenders)
score = max(0, 100 - high_complexity_count * 10)
passed = score >= PASS_THRESHOLD
text_lines = [
(
f"• {o['file']}:{o['function']} "
f"cc={o['cc']} ({o['rank']}) cog={o['cognitive']} [{o['reason']}]"
)
for o in top_offenders
]
message = (
f"Complexity score: {score}/100 "
f"({high_complexity_count} high-complexity functions)"
)
if cog_disabled:
message += " — cognitive layer disabled (complexipy unavailable)"
return CheckResult(
rule_id=self.rule_id,
passed=passed,
message=message,
severity=(
Severity.WARNING if not passed or cog_disabled else Severity.INFO
),
score=int(score),
details={
"high_complexity_count": high_complexity_count,
"top_offenders": top_offenders,
"cognitive_disabled": cog_disabled,
},
text="\n".join(text_lines) if text_lines else None,
fix_hint=(
"Refactor complex functions into smaller units"
if high_complexity_count > 0
else None
),
)