Skip to content

Duplicate tests

duplicate_tests

Duplicate-tests rule — cluster likely-duplicate test functions.

Three clustering signals (S1/S2/S3) + four rescue anti-signals (P1-P4) ported from the detect_duplicates.py prototype.

DuplicateTestsCheckResult

Bases: CheckResult

:class:CheckResult with cluster metadata and a scoring field.

Source code in packages/axm-audit/src/axm_audit/core/rules/test_quality/duplicate_tests.py
Python
class DuplicateTestsCheckResult(CheckResult):
    """:class:`CheckResult` with cluster metadata and a scoring field."""

    metadata: dict[str, Any] = Field(default_factory=dict)
    score: int = 100

    model_config = {"extra": "forbid"}

DuplicateTestsRule dataclass

Bases: ProjectRule

Cluster likely-duplicate test functions via structural signals.

Source code in packages/axm-audit/src/axm_audit/core/rules/test_quality/duplicate_tests.py
Python
@register_rule("test_quality")
@dataclass
class DuplicateTestsRule(ProjectRule):
    """Cluster likely-duplicate test functions via structural signals."""

    ast_similarity_threshold: float = 0.8

    @property
    def rule_id(self) -> str:
        """Stable identifier for this rule."""
        return "TEST_QUALITY_DUPLICATE_TESTS"

    def check(self, project_path: Path) -> DuplicateTestsCheckResult:
        """Cluster duplicate tests in ``project_path`` and return verdicts."""
        tests = _collect_tests(project_path)
        if not tests:
            return DuplicateTestsCheckResult(
                rule_id=self.rule_id,
                passed=True,
                message="no tests found",
                severity=Severity.INFO,
                score=100,
                metadata={
                    "clusters": [],
                    "buckets": {"CLUSTERED": [], "AMBIGUOUS": [], "UNIQUE": []},
                },
            )

        clusters = _cluster(tests, self.ast_similarity_threshold)
        buckets = _buckets(tests, clusters)
        n_clustered_pairs = sum(
            _pairs_in_cluster(len(c["tests"]))
            for c in clusters
            if not c["signal"].startswith("ambiguous_")
        )
        score = max(0, 100 - n_clustered_pairs * _SCORE_PENALTY)
        passed = n_clustered_pairs == 0
        if not clusters:
            message = "no duplicate-test clusters found"
        else:
            message = (
                f"{len(clusters)} cluster(s), {n_clustered_pairs} clustered pair(s)"
            )
        return DuplicateTestsCheckResult(
            rule_id=self.rule_id,
            passed=passed,
            message=message,
            severity=Severity.WARNING,
            score=score,
            metadata={"clusters": clusters, "buckets": buckets},
        )
rule_id property

Stable identifier for this rule.

check(project_path)

Cluster duplicate tests in project_path and return verdicts.

Source code in packages/axm-audit/src/axm_audit/core/rules/test_quality/duplicate_tests.py
Python
def check(self, project_path: Path) -> DuplicateTestsCheckResult:
    """Cluster duplicate tests in ``project_path`` and return verdicts."""
    tests = _collect_tests(project_path)
    if not tests:
        return DuplicateTestsCheckResult(
            rule_id=self.rule_id,
            passed=True,
            message="no tests found",
            severity=Severity.INFO,
            score=100,
            metadata={
                "clusters": [],
                "buckets": {"CLUSTERED": [], "AMBIGUOUS": [], "UNIQUE": []},
            },
        )

    clusters = _cluster(tests, self.ast_similarity_threshold)
    buckets = _buckets(tests, clusters)
    n_clustered_pairs = sum(
        _pairs_in_cluster(len(c["tests"]))
        for c in clusters
        if not c["signal"].startswith("ambiguous_")
    )
    score = max(0, 100 - n_clustered_pairs * _SCORE_PENALTY)
    passed = n_clustered_pairs == 0
    if not clusters:
        message = "no duplicate-test clusters found"
    else:
        message = (
            f"{len(clusters)} cluster(s), {n_clustered_pairs} clustered pair(s)"
        )
    return DuplicateTestsCheckResult(
        rule_id=self.rule_id,
        passed=passed,
        message=message,
        severity=Severity.WARNING,
        score=score,
        metadata={"clusters": clusters, "buckets": buckets},
    )