Skip to content

Duplication

duplication

Duplication rule — AST-based copy-paste detection.

DuplicationRule dataclass

Bases: ProjectRule

Detect copy-pasted code via AST structure hashing.

Extracts function and method bodies, normalises them by stripping variable names and locations, hashes each body, and reports groups whose hash appears more than once.

Scoring: 100 - (dup_groups * 10), min 0.

Source code in packages/axm-audit/src/axm_audit/core/rules/duplication.py
@dataclass
@register_rule("architecture")
class DuplicationRule(ProjectRule):
    """Detect copy-pasted code via AST structure hashing.

    Extracts function and method bodies, normalises them by stripping
    variable names and locations, hashes each body, and reports
    groups whose hash appears more than once.

    Scoring: ``100 - (dup_groups * 10)``, min 0.
    """

    min_lines: int = _MIN_DUP_LINES

    @property
    def rule_id(self) -> str:
        """Unique identifier for this rule."""
        return "ARCH_DUPLICATION"

    def check(self, project_path: Path) -> CheckResult:
        """Check for code duplication in the project."""
        early = self.check_src(project_path)
        if early is not None:
            return early

        src_path = project_path / "src"

        clones = self._find_duplicates(src_path)
        dup_count = len(clones)
        score = max(0, 100 - dup_count * 10)
        passed = dup_count == 0

        return CheckResult(
            rule_id=self.rule_id,
            passed=passed,
            message=f"{dup_count} duplicate block(s) found",
            severity=Severity.WARNING if not passed else Severity.INFO,
            details={"dup_count": dup_count, "clones": clones[:20], "score": score},
            fix_hint=(
                "Extract duplicated code into shared functions" if not passed else None
            ),
        )

    def _find_duplicates(self, src_path: Path) -> list[dict[str, str]]:
        """Hash function bodies and find duplicates."""
        seen = self._collect_function_hashes(src_path)

        clones: list[dict[str, str]] = []
        for entries in seen.values():
            if len(entries) < _MIN_CLONE_GROUP:
                continue
            first = entries[0]
            for other in entries[1:]:
                clones.append(
                    {
                        "source": f"{first[0]}:{first[1]}:{first[2]}",
                        "target": f"{other[0]}:{other[1]}:{other[2]}",
                    }
                )
        return clones

    def _collect_function_hashes(
        self,
        src_path: Path,
    ) -> dict[str, list[tuple[str, str, int]]]:
        """Scan source files and hash each function body."""
        seen: dict[str, list[tuple[str, str, int]]] = defaultdict(list)

        for path in _get_python_files(src_path):
            _cache = _get_ast_cache()
            tree = _cache.get_or_parse(path) if _cache else _parse_file_safe(path)
            if tree is None:
                continue
            rel = str(path.relative_to(src_path))
            self._hash_functions_in_tree(tree, rel, seen)

        return seen

    def _hash_functions_in_tree(
        self,
        tree: ast.Module,
        rel: str,
        seen: dict[str, list[tuple[str, str, int]]],
    ) -> None:
        """Hash each function body in a single AST and add to *seen*."""
        for node in ast.walk(tree):
            if not isinstance(node, ast.FunctionDef | ast.AsyncFunctionDef):
                continue
            end = getattr(node, "end_lineno", None) or node.lineno
            if end - node.lineno + 1 < self.min_lines:
                continue
            body_str = _normalize_ast(node)
            h = hashlib.md5(body_str.encode(), usedforsecurity=False).hexdigest()
            seen[h].append((rel, node.name, node.lineno))
rule_id property

Unique identifier for this rule.

check(project_path)

Check for code duplication in the project.

Source code in packages/axm-audit/src/axm_audit/core/rules/duplication.py
def check(self, project_path: Path) -> CheckResult:
    """Check for code duplication in the project."""
    early = self.check_src(project_path)
    if early is not None:
        return early

    src_path = project_path / "src"

    clones = self._find_duplicates(src_path)
    dup_count = len(clones)
    score = max(0, 100 - dup_count * 10)
    passed = dup_count == 0

    return CheckResult(
        rule_id=self.rule_id,
        passed=passed,
        message=f"{dup_count} duplicate block(s) found",
        severity=Severity.WARNING if not passed else Severity.INFO,
        details={"dup_count": dup_count, "clones": clones[:20], "score": score},
        fix_hint=(
            "Extract duplicated code into shared functions" if not passed else None
        ),
    )