Skip to content

Index

hooks

Git hook actions for AXM lifecycle hooks.

Provides BranchDeleteHook, CreateBranchHook, CommitPhaseHook, MergeSquashHook, PreflightHook, WorktreeAddHook, WorktreeRemoveHook, PushHook, CreatePRHook, and AwaitMergeHook, auto-discovered by HookRegistry via the axm.hooks entry-point group.

AwaitMergeHook dataclass

Poll a PR until merged or timeout.

Reads pr_number (or pr_url) from context and polls gh pr view --json state every 30 seconds. Times out after 10 minutes by default.

Source code in packages/axm-git/src/axm_git/hooks/await_merge.py
Python
@dataclass
class AwaitMergeHook:
    """Poll a PR until merged or timeout.

    Reads ``pr_number`` (or ``pr_url``) from *context* and polls
    ``gh pr view --json state`` every 30 seconds.  Times out after
    10 minutes by default.
    """

    def execute(self, context: dict[str, object], **params: object) -> HookResult:
        """Execute the hook action.

        Args:
            context: Session context dictionary (must contain
                ``pr_number`` or ``pr_url``).
            **params: Optional ``enabled``, ``timeout`` (seconds),
                ``interval`` (seconds).

        Returns:
            HookResult with ``merged=True`` on success.
        """
        if not params.get("enabled", True):
            return HookResult.ok(skipped=True, reason="git disabled")

        if not gh_available():
            return HookResult.ok(skipped=True, reason="gh not available")

        working_dir = resolve_working_dir(params, context)

        pr_ref = resolve_pr_ref(params, context)
        if not pr_ref:
            return HookResult.fail("no pr_number or pr_url in params or context")

        timeout = int(cast("int", params.get("timeout", _DEFAULT_TIMEOUT)))
        interval = int(cast("int", params.get("interval", _DEFAULT_INTERVAL)))

        elapsed = 0
        while elapsed < timeout:
            state = _poll_pr_state(str(pr_ref), working_dir)
            if state is None:
                return HookResult.fail(f"failed to query PR {pr_ref} state")
            if state == "MERGED":
                return HookResult.ok(merged=True, pr_ref=str(pr_ref))
            if state == "CLOSED":
                return HookResult.fail(f"PR {pr_ref} was closed without merging")

            time.sleep(interval)
            elapsed += interval

        return HookResult.fail(f"PR {pr_ref} not merged after {timeout}s timeout")
execute(context, **params)

Execute the hook action.

Parameters:

Name Type Description Default
context dict[str, object]

Session context dictionary (must contain pr_number or pr_url).

required
**params object

Optional enabled, timeout (seconds), interval (seconds).

{}

Returns:

Type Description
HookResult

HookResult with merged=True on success.

Source code in packages/axm-git/src/axm_git/hooks/await_merge.py
Python
def execute(self, context: dict[str, object], **params: object) -> HookResult:
    """Execute the hook action.

    Args:
        context: Session context dictionary (must contain
            ``pr_number`` or ``pr_url``).
        **params: Optional ``enabled``, ``timeout`` (seconds),
            ``interval`` (seconds).

    Returns:
        HookResult with ``merged=True`` on success.
    """
    if not params.get("enabled", True):
        return HookResult.ok(skipped=True, reason="git disabled")

    if not gh_available():
        return HookResult.ok(skipped=True, reason="gh not available")

    working_dir = resolve_working_dir(params, context)

    pr_ref = resolve_pr_ref(params, context)
    if not pr_ref:
        return HookResult.fail("no pr_number or pr_url in params or context")

    timeout = int(cast("int", params.get("timeout", _DEFAULT_TIMEOUT)))
    interval = int(cast("int", params.get("interval", _DEFAULT_INTERVAL)))

    elapsed = 0
    while elapsed < timeout:
        state = _poll_pr_state(str(pr_ref), working_dir)
        if state is None:
            return HookResult.fail(f"failed to query PR {pr_ref} state")
        if state == "MERGED":
            return HookResult.ok(merged=True, pr_ref=str(pr_ref))
        if state == "CLOSED":
            return HookResult.fail(f"PR {pr_ref} was closed without merging")

        time.sleep(interval)
        elapsed += interval

    return HookResult.fail(f"PR {pr_ref} not merged after {timeout}s timeout")

BranchDeleteHook dataclass

Delete a branch by name.

Branch name priority:

  1. branch param (direct override)
  2. branch context key

Skips gracefully when disabled or when the working directory is not a git repository.

Source code in packages/axm-git/src/axm_git/hooks/branch_delete.py
Python
@dataclass
class BranchDeleteHook:
    """Delete a branch by name.

    Branch name priority:

    1. ``branch`` param (direct override)
    2. ``branch`` context key

    Skips gracefully when disabled or when the working directory is not
    a git repository.
    """

    def execute(self, context: dict[str, object], **params: object) -> HookResult:
        """Execute the hook action.

        Args:
            context: Session context dictionary.
            **params: Optional ``branch``, ``enabled``, ``working_dir``.

        Returns:
            HookResult with ``branch`` and ``deleted`` in metadata on success.
        """
        working_dir = resolve_working_dir(params, context)

        if not params.get("enabled", True):
            return HookResult.ok(skipped=True, reason="git disabled")

        git_root = find_git_root(working_dir)
        if git_root is None:
            return HookResult.ok(skipped=True, reason="not a git repo")

        branch = cast("str | None", params.get("branch") or context.get("branch"))
        if not branch:
            return HookResult.fail("no branch specified in params or context")

        result = run_git(["branch", "-D", branch], git_root)
        if result.returncode != 0:
            return HookResult.fail(f"git branch -D failed: {result.stderr}")

        return HookResult.ok(branch=branch, deleted=True)
execute(context, **params)

Execute the hook action.

Parameters:

Name Type Description Default
context dict[str, object]

Session context dictionary.

required
**params object

Optional branch, enabled, working_dir.

{}

Returns:

Type Description
HookResult

HookResult with branch and deleted in metadata on success.

Source code in packages/axm-git/src/axm_git/hooks/branch_delete.py
Python
def execute(self, context: dict[str, object], **params: object) -> HookResult:
    """Execute the hook action.

    Args:
        context: Session context dictionary.
        **params: Optional ``branch``, ``enabled``, ``working_dir``.

    Returns:
        HookResult with ``branch`` and ``deleted`` in metadata on success.
    """
    working_dir = resolve_working_dir(params, context)

    if not params.get("enabled", True):
        return HookResult.ok(skipped=True, reason="git disabled")

    git_root = find_git_root(working_dir)
    if git_root is None:
        return HookResult.ok(skipped=True, reason="not a git repo")

    branch = cast("str | None", params.get("branch") or context.get("branch"))
    if not branch:
        return HookResult.fail("no branch specified in params or context")

    result = run_git(["branch", "-D", branch], git_root)
    if result.returncode != 0:
        return HookResult.fail(f"git branch -D failed: {result.stderr}")

    return HookResult.ok(branch=branch, deleted=True)

CommitPhaseHook dataclass

Commit all changes with message [axm] {phase_name}.

Reads phase_name from context. An optional message_format can be provided via params (default "[axm] {phase}"). Skips gracefully when there is nothing to commit or no git repository.

When from_outputs=True is passed in params, reads commit_spec from context instead: {message, body?, files}. Only the listed files are staged (no git add -A).

Source code in packages/axm-git/src/axm_git/hooks/commit_phase.py
Python
@dataclass
class CommitPhaseHook:
    """Commit all changes with message ``[axm] {phase_name}``.

    Reads ``phase_name`` from *context*.  An optional
    ``message_format`` can be provided via *params*
    (default ``"[axm] {phase}"``).  Skips gracefully when
    there is nothing to commit or no git repository.

    When ``from_outputs=True`` is passed in *params*, reads
    ``commit_spec`` from *context* instead: ``{message, body?, files}``.
    Only the listed files are staged (no ``git add -A``).
    """

    def execute(self, context: dict[str, object], **params: object) -> HookResult:
        """Execute the hook action.

        Args:
            context: Session context dictionary.
            **params: Optional ``message_format``, ``from_outputs``,
                ``working_dir``, ``skip_hooks`` (default ``False`` for
                ``from_outputs`` mode — pass ``True`` to append
                ``--no-verify`` and bypass project pre-commit hooks).

        Returns:
            HookResult with ``commit`` hash and ``message`` in metadata.
        """
        wd_raw = params.get("working_dir", context.get("working_dir", "."))
        working_dir = Path(cast("str | Path", wd_raw))

        if not params.get("enabled", True):
            return HookResult.ok(skipped=True, reason="git disabled")

        if find_git_root(working_dir) is None:
            return HookResult.ok(skipped=True, reason="not a git repo")

        profile = cast("str | None", params.pop("profile", None))

        if params.get("from_outputs"):
            skip_hooks = bool(params.get("skip_hooks", False))
            return self.commit_from_outputs(
                context, working_dir, skip_hooks=skip_hooks, profile=profile
            )

        return self._commit_legacy(context, working_dir, profile=profile, **params)

    def _commit_legacy(
        self,
        context: dict[str, object],
        working_dir: Path,
        *,
        profile: str | None = None,
        **params: object,
    ) -> HookResult:
        """Legacy mode: stage all changes and commit with a format string.

        Resolves the author identity via :func:`resolve_identity` and
        injects ``--author`` into the commit command when a profile is
        found.  Identity metadata (name, email) is included in the
        returned :class:`HookResult`.

        Args:
            context: Session context containing ``phase_name``.
            working_dir: Repository working directory.
            profile: Optional identity profile name override.
            **params: Extra params; ``message_format`` controls the
                commit message template (default ``"[axm] {phase}"``).
        """
        phase_name = cast("str", context["phase_name"])

        # Resolve author identity
        identity = resolve_identity(working_dir, profile_override=profile)
        author = f"{identity.name} <{identity.email}>" if identity else None

        # Stage all changes (scoped to working_dir for workspace layouts)
        run_git(["add", "-A", "."], working_dir)

        # Check if there's anything to commit
        status = run_git(["status", "--porcelain", "--", "."], working_dir)
        if not status.stdout.strip():
            return HookResult.ok(skipped=True, reason="nothing to commit")

        # Commit
        message_format = cast("str", params.get("message_format", "[axm] {phase}"))
        msg = message_format.format(phase=phase_name)
        commit_cmd = build_commit_cmd(msg, None, author=author)
        result = run_git(commit_cmd, working_dir)
        if result.returncode != 0:
            return HookResult.fail(f"git commit failed: {result.stderr}")

        # Get commit hash
        hash_result = run_git(["rev-parse", "--short", "HEAD"], working_dir)
        result_kw: dict[str, str] = {
            "commit": hash_result.stdout.strip(),
            "message": msg,
        }
        if identity:
            result_kw["author_name"] = identity.name
            result_kw["author_email"] = identity.email
        return HookResult.ok(**result_kw)

    def commit_from_outputs(
        self,
        context: dict[str, object],
        working_dir: Path,
        *,
        skip_hooks: bool = False,
        profile: str | None = None,
    ) -> HookResult:
        """Outputs mode: read ``commit_spec`` from context, stage listed files.

        Resolves the author identity via :func:`resolve_identity` and
        injects ``--author`` into the commit command when a profile is
        found.  If the commit fails because pre-commit hooks auto-fixed
        files (stderr contains ``"files were modified"``), the listed
        files are re-staged and the commit is retried once.

        Args:
            context: Session context containing ``commit_spec``.
            working_dir: Repository working directory.
            skip_hooks: When *True*, append ``--no-verify`` to bypass
                project pre-commit hooks.  Defaults to ``False`` so
                hooks run and surface failures via ``HookResult.fail``.
            profile: Optional identity profile name override.
        """
        spec, err = _validate_commit_spec(
            cast("dict[str, object] | None", context.get("commit_spec"))
        )
        if err:
            return HookResult.fail(err)
        # spec is guaranteed non-None when err is None
        spec = cast("dict[str, object]", spec)

        files = cast("list[str]", spec["files"])
        message = cast("str", spec["message"])
        body = cast("str | None", spec.get("body"))

        git_root = find_git_root(working_dir) or working_dir

        # Resolve author identity
        identity = resolve_identity(git_root, profile_override=profile)
        author = f"{identity.name} <{identity.email}>" if identity else None

        _format_spec_files(files, git_root, working_dir=working_dir)

        warnings: list[str] = []
        stage_err = stage_spec_files(
            files, git_root, working_dir=working_dir, warnings=warnings
        )
        if stage_err:
            return HookResult.fail(stage_err)

        # Check if there's anything to commit
        status = run_git(["diff", "--cached", "--name-only"], git_root)
        if not status.stdout.strip():
            return HookResult.ok(skipped=True, reason="nothing to commit")

        commit_cmd = build_commit_cmd(
            message, body, skip_hooks=skip_hooks, author=author
        )

        result: _GitResultLike = run_git(commit_cmd, git_root)
        if result.returncode != 0:
            result = retry_commit_on_autofix(
                files, commit_cmd, git_root, result, working_dir=working_dir
            )
            if result.returncode != 0:
                return HookResult.fail(f"git commit failed: {result.stderr}")

        return build_commit_result(git_root, message, identity, warnings)
commit_from_outputs(context, working_dir, *, skip_hooks=False, profile=None)

Outputs mode: read commit_spec from context, stage listed files.

Resolves the author identity via :func:resolve_identity and injects --author into the commit command when a profile is found. If the commit fails because pre-commit hooks auto-fixed files (stderr contains "files were modified"), the listed files are re-staged and the commit is retried once.

Parameters:

Name Type Description Default
context dict[str, object]

Session context containing commit_spec.

required
working_dir Path

Repository working directory.

required
skip_hooks bool

When True, append --no-verify to bypass project pre-commit hooks. Defaults to False so hooks run and surface failures via HookResult.fail.

False
profile str | None

Optional identity profile name override.

None
Source code in packages/axm-git/src/axm_git/hooks/commit_phase.py
Python
def commit_from_outputs(
    self,
    context: dict[str, object],
    working_dir: Path,
    *,
    skip_hooks: bool = False,
    profile: str | None = None,
) -> HookResult:
    """Outputs mode: read ``commit_spec`` from context, stage listed files.

    Resolves the author identity via :func:`resolve_identity` and
    injects ``--author`` into the commit command when a profile is
    found.  If the commit fails because pre-commit hooks auto-fixed
    files (stderr contains ``"files were modified"``), the listed
    files are re-staged and the commit is retried once.

    Args:
        context: Session context containing ``commit_spec``.
        working_dir: Repository working directory.
        skip_hooks: When *True*, append ``--no-verify`` to bypass
            project pre-commit hooks.  Defaults to ``False`` so
            hooks run and surface failures via ``HookResult.fail``.
        profile: Optional identity profile name override.
    """
    spec, err = _validate_commit_spec(
        cast("dict[str, object] | None", context.get("commit_spec"))
    )
    if err:
        return HookResult.fail(err)
    # spec is guaranteed non-None when err is None
    spec = cast("dict[str, object]", spec)

    files = cast("list[str]", spec["files"])
    message = cast("str", spec["message"])
    body = cast("str | None", spec.get("body"))

    git_root = find_git_root(working_dir) or working_dir

    # Resolve author identity
    identity = resolve_identity(git_root, profile_override=profile)
    author = f"{identity.name} <{identity.email}>" if identity else None

    _format_spec_files(files, git_root, working_dir=working_dir)

    warnings: list[str] = []
    stage_err = stage_spec_files(
        files, git_root, working_dir=working_dir, warnings=warnings
    )
    if stage_err:
        return HookResult.fail(stage_err)

    # Check if there's anything to commit
    status = run_git(["diff", "--cached", "--name-only"], git_root)
    if not status.stdout.strip():
        return HookResult.ok(skipped=True, reason="nothing to commit")

    commit_cmd = build_commit_cmd(
        message, body, skip_hooks=skip_hooks, author=author
    )

    result: _GitResultLike = run_git(commit_cmd, git_root)
    if result.returncode != 0:
        result = retry_commit_on_autofix(
            files, commit_cmd, git_root, result, working_dir=working_dir
        )
        if result.returncode != 0:
            return HookResult.fail(f"git commit failed: {result.stderr}")

    return build_commit_result(git_root, message, identity, warnings)
execute(context, **params)

Execute the hook action.

Parameters:

Name Type Description Default
context dict[str, object]

Session context dictionary.

required
**params object

Optional message_format, from_outputs, working_dir, skip_hooks (default False for from_outputs mode — pass True to append --no-verify and bypass project pre-commit hooks).

{}

Returns:

Type Description
HookResult

HookResult with commit hash and message in metadata.

Source code in packages/axm-git/src/axm_git/hooks/commit_phase.py
Python
def execute(self, context: dict[str, object], **params: object) -> HookResult:
    """Execute the hook action.

    Args:
        context: Session context dictionary.
        **params: Optional ``message_format``, ``from_outputs``,
            ``working_dir``, ``skip_hooks`` (default ``False`` for
            ``from_outputs`` mode — pass ``True`` to append
            ``--no-verify`` and bypass project pre-commit hooks).

    Returns:
        HookResult with ``commit`` hash and ``message`` in metadata.
    """
    wd_raw = params.get("working_dir", context.get("working_dir", "."))
    working_dir = Path(cast("str | Path", wd_raw))

    if not params.get("enabled", True):
        return HookResult.ok(skipped=True, reason="git disabled")

    if find_git_root(working_dir) is None:
        return HookResult.ok(skipped=True, reason="not a git repo")

    profile = cast("str | None", params.pop("profile", None))

    if params.get("from_outputs"):
        skip_hooks = bool(params.get("skip_hooks", False))
        return self.commit_from_outputs(
            context, working_dir, skip_hooks=skip_hooks, profile=profile
        )

    return self._commit_legacy(context, working_dir, profile=profile, **params)

CreateBranchHook dataclass

Create a session branch.

Branch name priority:

  1. branch param (direct override)
  2. ticket_id + ticket_title params → branch_name_from_ticket()
  3. {prefix}/{session_id} (legacy fallback)

Skips gracefully when the working directory is not a git repository.

Source code in packages/axm-git/src/axm_git/hooks/create_branch.py
Python
@dataclass
class CreateBranchHook:
    """Create a session branch.

    Branch name priority:

    1. ``branch`` param (direct override)
    2. ``ticket_id`` + ``ticket_title`` params → ``branch_name_from_ticket()``
    3. ``{prefix}/{session_id}`` (legacy fallback)

    Skips gracefully when the working directory is not a git repository.
    """

    def execute(self, context: dict[str, object], **params: object) -> HookResult:
        """Execute the hook action.

        Args:
            context: Session context dictionary (must contain ``session_id``).
            **params: Optional ``branch``, ``ticket_id``, ``ticket_title``,
                ``ticket_labels``, ``prefix`` (default ``"axm"``).

        Returns:
            HookResult with ``branch`` in metadata on success.
        """
        working_dir = resolve_working_dir(params, context)
        session_id = cast("str", context["session_id"])

        if not params.get("enabled", True):
            return HookResult.ok(skipped=True, reason="git disabled")

        git_root = find_git_root(working_dir)
        if git_root is None:
            return HookResult.ok(skipped=True, reason="not a git repo")

        branch = self._resolve_branch(params, session_id)

        result = run_git(["checkout", "-b", branch], git_root)
        if result.returncode != 0:
            return HookResult.fail(f"git checkout -b failed: {result.stderr}")

        return HookResult.ok(branch=branch)

    @staticmethod
    def _resolve_branch(params: dict[str, object], session_id: str) -> str:
        """Resolve branch name from params with fallback to session_id."""
        if branch := params.get("branch"):
            return str(branch)

        ticket_id = params.get("ticket_id")
        ticket_title = params.get("ticket_title")
        if ticket_id and ticket_title:
            labels = cast("list[str]", params.get("ticket_labels", []))
            return branch_name_from_ticket(
                cast("str", ticket_id),
                cast("str", ticket_title),
                labels,
            )

        prefix = params.get("prefix", "axm")
        return f"{prefix}/{session_id}"
execute(context, **params)

Execute the hook action.

Parameters:

Name Type Description Default
context dict[str, object]

Session context dictionary (must contain session_id).

required
**params object

Optional branch, ticket_id, ticket_title, ticket_labels, prefix (default "axm").

{}

Returns:

Type Description
HookResult

HookResult with branch in metadata on success.

Source code in packages/axm-git/src/axm_git/hooks/create_branch.py
Python
def execute(self, context: dict[str, object], **params: object) -> HookResult:
    """Execute the hook action.

    Args:
        context: Session context dictionary (must contain ``session_id``).
        **params: Optional ``branch``, ``ticket_id``, ``ticket_title``,
            ``ticket_labels``, ``prefix`` (default ``"axm"``).

    Returns:
        HookResult with ``branch`` in metadata on success.
    """
    working_dir = resolve_working_dir(params, context)
    session_id = cast("str", context["session_id"])

    if not params.get("enabled", True):
        return HookResult.ok(skipped=True, reason="git disabled")

    git_root = find_git_root(working_dir)
    if git_root is None:
        return HookResult.ok(skipped=True, reason="not a git repo")

    branch = self._resolve_branch(params, session_id)

    result = run_git(["checkout", "-b", branch], git_root)
    if result.returncode != 0:
        return HookResult.fail(f"git checkout -b failed: {result.stderr}")

    return HookResult.ok(branch=branch)

CreatePRHook dataclass

Create a GitHub PR with auto-merge squash.

Reads branch, commit_spec, and ticket_id from context. Runs gh pr create followed by gh pr merge --auto --squash. Skips gracefully when gh is not installed.

Source code in packages/axm-git/src/axm_git/hooks/create_pr.py
Python
@dataclass
class CreatePRHook:
    """Create a GitHub PR with auto-merge squash.

    Reads ``branch``, ``commit_spec``, and ``ticket_id`` from *context*.
    Runs ``gh pr create`` followed by ``gh pr merge --auto --squash``.
    Skips gracefully when ``gh`` is not installed.
    """

    def execute(self, context: dict[str, object], **params: object) -> HookResult:
        """Execute the hook action.

        Args:
            context: Session context dictionary.
            **params: Optional ``enabled`` (default ``True``),
                ``base`` (default ``"main"``), ``commit_spec``,
                ``ticket_id``.  Params take precedence over context.

        Returns:
            HookResult with ``pr_url`` and ``pr_number`` in metadata.
        """
        if not params.get("enabled", True):
            return HookResult.ok(skipped=True, reason="git disabled")

        if not gh_available():
            return HookResult.ok(skipped=True, reason="gh not available")

        working_dir = resolve_working_dir(params, context)

        commit_spec = cast(
            "dict[str, object]",
            params.get("commit_spec", context.get("commit_spec", {})),
        )
        ticket_id = cast("str", params.get("ticket_id", context.get("ticket_id", "")))
        base = cast("str", params.get("base", "main"))

        title = format_pr_title(commit_spec, ticket_id)
        body = cast("str", commit_spec.get("body", ""))

        # Create the PR
        create_args = [
            "pr",
            "create",
            "--title",
            title,
            "--body",
            body,
            "--base",
            base,
        ]
        result = run_gh(create_args, working_dir)

        if result.returncode != 0:
            stderr = result.stderr.strip()
            if "already exists" in stderr:
                return _recover_existing_pr(working_dir)
            return HookResult.fail(f"gh pr create failed: {stderr}")

        pr_url = result.stdout.strip()

        # Extract PR number from URL
        pr_number = pr_url.rstrip("/").rsplit("/", maxsplit=1)[-1]

        # Enable auto-merge
        merge_result = run_gh(
            ["pr", "merge", pr_number, "--auto", "--squash"],
            working_dir,
        )
        if merge_result.returncode != 0:
            # Non-fatal: PR created but auto-merge not enabled
            return HookResult.ok(
                pr_url=pr_url,
                pr_number=pr_number,
                auto_merge=False,
                auto_merge_error=merge_result.stderr.strip(),
            )

        return HookResult.ok(
            pr_url=pr_url,
            pr_number=pr_number,
            auto_merge=True,
        )
execute(context, **params)

Execute the hook action.

Parameters:

Name Type Description Default
context dict[str, object]

Session context dictionary.

required
**params object

Optional enabled (default True), base (default "main"), commit_spec, ticket_id. Params take precedence over context.

{}

Returns:

Type Description
HookResult

HookResult with pr_url and pr_number in metadata.

Source code in packages/axm-git/src/axm_git/hooks/create_pr.py
Python
def execute(self, context: dict[str, object], **params: object) -> HookResult:
    """Execute the hook action.

    Args:
        context: Session context dictionary.
        **params: Optional ``enabled`` (default ``True``),
            ``base`` (default ``"main"``), ``commit_spec``,
            ``ticket_id``.  Params take precedence over context.

    Returns:
        HookResult with ``pr_url`` and ``pr_number`` in metadata.
    """
    if not params.get("enabled", True):
        return HookResult.ok(skipped=True, reason="git disabled")

    if not gh_available():
        return HookResult.ok(skipped=True, reason="gh not available")

    working_dir = resolve_working_dir(params, context)

    commit_spec = cast(
        "dict[str, object]",
        params.get("commit_spec", context.get("commit_spec", {})),
    )
    ticket_id = cast("str", params.get("ticket_id", context.get("ticket_id", "")))
    base = cast("str", params.get("base", "main"))

    title = format_pr_title(commit_spec, ticket_id)
    body = cast("str", commit_spec.get("body", ""))

    # Create the PR
    create_args = [
        "pr",
        "create",
        "--title",
        title,
        "--body",
        body,
        "--base",
        base,
    ]
    result = run_gh(create_args, working_dir)

    if result.returncode != 0:
        stderr = result.stderr.strip()
        if "already exists" in stderr:
            return _recover_existing_pr(working_dir)
        return HookResult.fail(f"gh pr create failed: {stderr}")

    pr_url = result.stdout.strip()

    # Extract PR number from URL
    pr_number = pr_url.rstrip("/").rsplit("/", maxsplit=1)[-1]

    # Enable auto-merge
    merge_result = run_gh(
        ["pr", "merge", pr_number, "--auto", "--squash"],
        working_dir,
    )
    if merge_result.returncode != 0:
        # Non-fatal: PR created but auto-merge not enabled
        return HookResult.ok(
            pr_url=pr_url,
            pr_number=pr_number,
            auto_merge=False,
            auto_merge_error=merge_result.stderr.strip(),
        )

    return HookResult.ok(
        pr_url=pr_url,
        pr_number=pr_number,
        auto_merge=True,
    )

MergeSquashHook dataclass

Merge session branch back to main with squash.

Branch name priority: branch param > context["branch"]

{prefix}/{session_id} fallback.

Commit message priority: message param > [AXM] {protocol_name}: {session_id} fallback.

Source code in packages/axm-git/src/axm_git/hooks/merge_squash.py
Python
@dataclass
class MergeSquashHook:
    """Merge session branch back to main with squash.

    Branch name priority: ``branch`` param > ``context["branch"]``
    > ``{prefix}/{session_id}`` fallback.

    Commit message priority: ``message`` param >
    ``[AXM] {protocol_name}: {session_id}`` fallback.
    """

    def execute(self, context: dict[str, object], **params: object) -> HookResult:
        """Execute the hook action.

        Args:
            context: Session context dictionary (must contain
                ``session_id`` and ``protocol_name``).
            **params: Optional ``branch``, ``message``, ``prefix``,
                and ``target_branch``.

        Returns:
            HookResult with ``merged``, ``into``, and ``message`` in metadata.
        """
        working_dir = resolve_working_dir(params, context)
        session_id = cast("str", context["session_id"])
        protocol_name = cast("str", context["protocol_name"])

        if not params.get("enabled", True):
            return HookResult.ok(skipped=True, reason="git disabled")

        git_root = find_git_root(working_dir)
        if git_root is None:
            return HookResult.ok(skipped=True, reason="not a git repo")

        branch = self._resolve_branch(params, context, session_id)
        target = cast("str", params.get("target_branch", "main"))

        # Checkout target branch
        result = run_git(["checkout", target], git_root)
        if result.returncode != 0:
            return HookResult.fail(f"checkout {target} failed: {result.stderr}")

        # Merge with squash
        result = run_git(["merge", "--squash", branch], git_root)
        if result.returncode != 0:
            return HookResult.fail(f"merge --squash failed: {result.stderr}")

        # Commit
        msg = cast(
            "str",
            params.get("message") or f"[AXM] {protocol_name}: {session_id}",
        )
        result = run_git(["commit", "-m", msg], git_root)
        if result.returncode != 0:
            return HookResult.fail(f"commit failed: {result.stderr}")

        return HookResult.ok(merged=branch, into=target, message=msg)

    @staticmethod
    def _resolve_branch(
        params: dict[str, object],
        context: dict[str, object],
        session_id: str,
    ) -> str:
        """Resolve branch name from params, context, then fallback."""
        if branch := params.get("branch"):
            return str(branch)
        if branch := context.get("branch"):
            return str(branch)
        prefix = params.get("prefix", "axm")
        return f"{prefix}/{session_id}"
execute(context, **params)

Execute the hook action.

Parameters:

Name Type Description Default
context dict[str, object]

Session context dictionary (must contain session_id and protocol_name).

required
**params object

Optional branch, message, prefix, and target_branch.

{}

Returns:

Type Description
HookResult

HookResult with merged, into, and message in metadata.

Source code in packages/axm-git/src/axm_git/hooks/merge_squash.py
Python
def execute(self, context: dict[str, object], **params: object) -> HookResult:
    """Execute the hook action.

    Args:
        context: Session context dictionary (must contain
            ``session_id`` and ``protocol_name``).
        **params: Optional ``branch``, ``message``, ``prefix``,
            and ``target_branch``.

    Returns:
        HookResult with ``merged``, ``into``, and ``message`` in metadata.
    """
    working_dir = resolve_working_dir(params, context)
    session_id = cast("str", context["session_id"])
    protocol_name = cast("str", context["protocol_name"])

    if not params.get("enabled", True):
        return HookResult.ok(skipped=True, reason="git disabled")

    git_root = find_git_root(working_dir)
    if git_root is None:
        return HookResult.ok(skipped=True, reason="not a git repo")

    branch = self._resolve_branch(params, context, session_id)
    target = cast("str", params.get("target_branch", "main"))

    # Checkout target branch
    result = run_git(["checkout", target], git_root)
    if result.returncode != 0:
        return HookResult.fail(f"checkout {target} failed: {result.stderr}")

    # Merge with squash
    result = run_git(["merge", "--squash", branch], git_root)
    if result.returncode != 0:
        return HookResult.fail(f"merge --squash failed: {result.stderr}")

    # Commit
    msg = cast(
        "str",
        params.get("message") or f"[AXM] {protocol_name}: {session_id}",
    )
    result = run_git(["commit", "-m", msg], git_root)
    if result.returncode != 0:
        return HookResult.fail(f"commit failed: {result.stderr}")

    return HookResult.ok(merged=branch, into=target, message=msg)

PreflightHook dataclass

Report working-tree status and diff as a pre-hook.

Designed for injection into protocol briefings via inject_result + inline: true.

params: path — project root (default "."). diff_lines — max diff lines (default 200, 0 to disable).

Source code in packages/axm-git/src/axm_git/hooks/preflight.py
Python
@dataclass
class PreflightHook:
    """Report working-tree status and diff as a pre-hook.

    Designed for injection into protocol briefings via
    ``inject_result`` + ``inline: true``.

    *params*:
        ``path`` — project root (default ``"."``).
        ``diff_lines`` — max diff lines (default 200, 0 to disable).
    """

    def execute(self, context: dict[str, object], **params: object) -> HookResult:
        """Execute the hook action.

        Args:
            context: Session context dictionary.
            **params: Optional ``path`` and ``diff_lines``.

        Returns:
            HookResult with a compact ``text`` render (via ``render_text``)
            and metadata containing ``files``, ``diff``, ``file_count``,
            and ``clean``.
        """
        if not params.get("enabled", True):
            return HookResult.ok(skipped=True, reason="git disabled")

        working_dir = resolve_working_dir(params, context, param_key="path").resolve()
        max_diff_lines = int(cast("int | str", params.get("diff_lines", 200)))

        git_root = find_git_root(working_dir)
        if git_root is None:
            return HookResult.ok(skipped=True, reason="not a git repo")

        pathspec = _compute_pathspec(working_dir, git_root)

        status = run_git(["status", "--porcelain", *pathspec], git_root)
        if status.returncode != 0:
            return HookResult.fail(f"git status failed: {status.stderr}")

        files = _collect_status_files(status.stdout)
        diff_stat_out = _collect_diff_stat(git_root, pathspec)
        diff_content, diff_truncated = _collect_diff(git_root, pathspec, max_diff_lines)

        rendered = render_text(
            files=files,
            diff_stat=diff_stat_out,
            diff=diff_content,
            diff_truncated=diff_truncated,
            max_diff_lines=max_diff_lines,
        )

        return HookResult(
            success=True,
            text=rendered,
            metadata={
                "files": files,
                "diff": diff_content,
                "file_count": len(files),
                "clean": len(files) == 0,
            },
        )
execute(context, **params)

Execute the hook action.

Parameters:

Name Type Description Default
context dict[str, object]

Session context dictionary.

required
**params object

Optional path and diff_lines.

{}

Returns:

Type Description
HookResult

HookResult with a compact text render (via render_text)

HookResult

and metadata containing files, diff, file_count,

HookResult

and clean.

Source code in packages/axm-git/src/axm_git/hooks/preflight.py
Python
def execute(self, context: dict[str, object], **params: object) -> HookResult:
    """Execute the hook action.

    Args:
        context: Session context dictionary.
        **params: Optional ``path`` and ``diff_lines``.

    Returns:
        HookResult with a compact ``text`` render (via ``render_text``)
        and metadata containing ``files``, ``diff``, ``file_count``,
        and ``clean``.
    """
    if not params.get("enabled", True):
        return HookResult.ok(skipped=True, reason="git disabled")

    working_dir = resolve_working_dir(params, context, param_key="path").resolve()
    max_diff_lines = int(cast("int | str", params.get("diff_lines", 200)))

    git_root = find_git_root(working_dir)
    if git_root is None:
        return HookResult.ok(skipped=True, reason="not a git repo")

    pathspec = _compute_pathspec(working_dir, git_root)

    status = run_git(["status", "--porcelain", *pathspec], git_root)
    if status.returncode != 0:
        return HookResult.fail(f"git status failed: {status.stderr}")

    files = _collect_status_files(status.stdout)
    diff_stat_out = _collect_diff_stat(git_root, pathspec)
    diff_content, diff_truncated = _collect_diff(git_root, pathspec, max_diff_lines)

    rendered = render_text(
        files=files,
        diff_stat=diff_stat_out,
        diff=diff_content,
        diff_truncated=diff_truncated,
        max_diff_lines=max_diff_lines,
    )

    return HookResult(
        success=True,
        text=rendered,
        metadata={
            "files": files,
            "diff": diff_content,
            "file_count": len(files),
            "clean": len(files) == 0,
        },
    )

PushHook dataclass

Push the current branch to origin -u.

Reads branch from context (or detects it from HEAD). Skips gracefully when the working directory is not a git repository.

Source code in packages/axm-git/src/axm_git/hooks/push.py
Python
@dataclass
class PushHook:
    """Push the current branch to ``origin -u``.

    Reads ``branch`` from *context* (or detects it from HEAD).
    Skips gracefully when the working directory is not a git repository.
    """

    def execute(self, context: dict[str, object], **params: object) -> HookResult:
        """Execute the hook action.

        Args:
            context: Session context dictionary.
            **params: Optional ``enabled`` (default ``True``).

        Returns:
            HookResult with ``pushed`` and ``branch`` in metadata.
        """
        if not params.get("enabled", True):
            return HookResult.ok(skipped=True, reason="git disabled")

        working_dir = resolve_working_dir(params, context)

        git_root = find_git_root(working_dir)
        if git_root is None:
            return HookResult.ok(skipped=True, reason="not a git repo")

        branch_raw = context.get("branch")
        branch = cast("str", branch_raw) if branch_raw else ""
        if not branch:
            head = run_git(["rev-parse", "--abbrev-ref", "HEAD"], git_root)
            if head.returncode != 0:
                return HookResult.fail(f"failed to detect branch: {head.stderr}")
            branch = head.stdout.strip()

        result = run_git(["push", "-u", "origin", branch], git_root)
        if result.returncode != 0:
            stderr = result.stderr.strip()
            if "Everything up-to-date" in (result.stderr + result.stdout):
                return HookResult.ok(pushed=True, branch=branch)
            return HookResult.fail(f"git push failed: {stderr}")

        return HookResult.ok(pushed=True, branch=branch)
execute(context, **params)

Execute the hook action.

Parameters:

Name Type Description Default
context dict[str, object]

Session context dictionary.

required
**params object

Optional enabled (default True).

{}

Returns:

Type Description
HookResult

HookResult with pushed and branch in metadata.

Source code in packages/axm-git/src/axm_git/hooks/push.py
Python
def execute(self, context: dict[str, object], **params: object) -> HookResult:
    """Execute the hook action.

    Args:
        context: Session context dictionary.
        **params: Optional ``enabled`` (default ``True``).

    Returns:
        HookResult with ``pushed`` and ``branch`` in metadata.
    """
    if not params.get("enabled", True):
        return HookResult.ok(skipped=True, reason="git disabled")

    working_dir = resolve_working_dir(params, context)

    git_root = find_git_root(working_dir)
    if git_root is None:
        return HookResult.ok(skipped=True, reason="not a git repo")

    branch_raw = context.get("branch")
    branch = cast("str", branch_raw) if branch_raw else ""
    if not branch:
        head = run_git(["rev-parse", "--abbrev-ref", "HEAD"], git_root)
        if head.returncode != 0:
            return HookResult.fail(f"failed to detect branch: {head.stderr}")
        branch = head.stdout.strip()

    result = run_git(["push", "-u", "origin", branch], git_root)
    if result.returncode != 0:
        stderr = result.stderr.strip()
        if "Everything up-to-date" in (result.stderr + result.stdout):
            return HookResult.ok(pushed=True, branch=branch)
        return HookResult.fail(f"git push failed: {stderr}")

    return HookResult.ok(pushed=True, branch=branch)

WorktreeAddHook dataclass

Create a worktree + branch for a ticket.

Reads ticket_id, ticket_title, ticket_labels, and repo_path from context. The worktree is placed under /tmp/axm-worktrees/<ticket_id>/.

Skips gracefully when the working directory is not a git repository or the worktree already exists.

Source code in packages/axm-git/src/axm_git/hooks/worktree_add.py
Python
@dataclass
class WorktreeAddHook:
    """Create a worktree + branch for a ticket.

    Reads ``ticket_id``, ``ticket_title``, ``ticket_labels``, and
    ``repo_path`` from *context*.  The worktree is placed under
    ``/tmp/axm-worktrees/<ticket_id>/``.

    Skips gracefully when the working directory is not a git repository
    or the worktree already exists.
    """

    def execute(self, context: dict[str, object], **params: object) -> HookResult:
        """Execute the hook action.

        Args:
            context: Session context dictionary.
            **params: Optional ``repo_path``, ``ticket_id``,
                ``ticket_title``, ``ticket_labels``, ``enabled``
                (default ``True``).  Params take precedence over context.

        Returns:
            HookResult with ``worktree_path`` and ``branch`` in metadata.
        """
        if not params.get("enabled", True):
            return HookResult.ok(skipped=True, reason="git disabled")

        repo_path = Path(
            cast(
                "str | Path",
                params.get("repo_path", context.get("repo_path", ".")),
            )
        )

        if find_git_root(repo_path) is None:
            return HookResult.ok(skipped=True, reason="not a git repo")

        ticket_id = cast(
            "str",
            params["ticket_id"] if "ticket_id" in params else context["ticket_id"],
        )
        title = cast(
            "str",
            params["ticket_title"]
            if "ticket_title" in params
            else context["ticket_title"],
        )
        labels = cast(
            "list[str]",
            params.get("ticket_labels", context.get("ticket_labels", [])),
        )

        branch = branch_name_from_ticket(ticket_id, title, labels)
        worktree_path = Path("/tmp/axm-worktrees") / ticket_id  # noqa: S108
        worktree_path.parent.mkdir(parents=True, exist_ok=True)

        if worktree_path.exists():
            return HookResult.ok(
                skipped=True,
                reason=f"worktree already exists: {worktree_path}",
            )

        result = run_git(
            ["worktree", "add", "-b", branch, str(worktree_path), "main"],
            repo_path,
        )
        if result.returncode != 0:
            return HookResult.fail(f"git worktree add failed: {result.stderr}")

        return HookResult.ok(
            worktree_path=str(worktree_path),
            branch=branch,
        )
execute(context, **params)

Execute the hook action.

Parameters:

Name Type Description Default
context dict[str, object]

Session context dictionary.

required
**params object

Optional repo_path, ticket_id, ticket_title, ticket_labels, enabled (default True). Params take precedence over context.

{}

Returns:

Type Description
HookResult

HookResult with worktree_path and branch in metadata.

Source code in packages/axm-git/src/axm_git/hooks/worktree_add.py
Python
def execute(self, context: dict[str, object], **params: object) -> HookResult:
    """Execute the hook action.

    Args:
        context: Session context dictionary.
        **params: Optional ``repo_path``, ``ticket_id``,
            ``ticket_title``, ``ticket_labels``, ``enabled``
            (default ``True``).  Params take precedence over context.

    Returns:
        HookResult with ``worktree_path`` and ``branch`` in metadata.
    """
    if not params.get("enabled", True):
        return HookResult.ok(skipped=True, reason="git disabled")

    repo_path = Path(
        cast(
            "str | Path",
            params.get("repo_path", context.get("repo_path", ".")),
        )
    )

    if find_git_root(repo_path) is None:
        return HookResult.ok(skipped=True, reason="not a git repo")

    ticket_id = cast(
        "str",
        params["ticket_id"] if "ticket_id" in params else context["ticket_id"],
    )
    title = cast(
        "str",
        params["ticket_title"]
        if "ticket_title" in params
        else context["ticket_title"],
    )
    labels = cast(
        "list[str]",
        params.get("ticket_labels", context.get("ticket_labels", [])),
    )

    branch = branch_name_from_ticket(ticket_id, title, labels)
    worktree_path = Path("/tmp/axm-worktrees") / ticket_id  # noqa: S108
    worktree_path.parent.mkdir(parents=True, exist_ok=True)

    if worktree_path.exists():
        return HookResult.ok(
            skipped=True,
            reason=f"worktree already exists: {worktree_path}",
        )

    result = run_git(
        ["worktree", "add", "-b", branch, str(worktree_path), "main"],
        repo_path,
    )
    if result.returncode != 0:
        return HookResult.fail(f"git worktree add failed: {result.stderr}")

    return HookResult.ok(
        worktree_path=str(worktree_path),
        branch=branch,
    )

WorktreeRemoveHook dataclass

Remove a worktree after merge.

Reads worktree_path and repo_path from context. Uses git worktree remove --force to handle dirty worktrees. Skips gracefully when the path doesn't exist or isn't a git repo.

Source code in packages/axm-git/src/axm_git/hooks/worktree_remove.py
Python
@dataclass
class WorktreeRemoveHook:
    """Remove a worktree after merge.

    Reads ``worktree_path`` and ``repo_path`` from *context*.
    Uses ``git worktree remove --force`` to handle dirty worktrees.
    Skips gracefully when the path doesn't exist or isn't a git repo.
    """

    def execute(self, context: dict[str, object], **params: object) -> HookResult:
        """Execute the hook action.

        Args:
            context: Session context (must contain ``repo_path``
                and ``worktree_path``).
            **params: Optional ``enabled`` (default ``True``).

        Returns:
            HookResult on success.
        """
        if not params.get("enabled", True):
            return HookResult.ok(skipped=True, reason="git disabled")

        repo_path = Path(cast("str | Path", context.get("repo_path", ".")))

        if find_git_root(repo_path) is None:
            return HookResult.ok(skipped=True, reason="not a git repo")

        worktree_path = resolve_working_dir({}, context)

        if not worktree_path.exists():
            return HookResult.ok(
                skipped=True,
                reason=f"worktree path does not exist: {worktree_path}",
            )

        result = run_git(
            ["worktree", "remove", str(worktree_path), "--force"],
            repo_path,
        )
        if result.returncode != 0:
            return HookResult.fail(
                f"git worktree remove failed: {result.stderr}",
            )

        return HookResult.ok()
execute(context, **params)

Execute the hook action.

Parameters:

Name Type Description Default
context dict[str, object]

Session context (must contain repo_path and worktree_path).

required
**params object

Optional enabled (default True).

{}

Returns:

Type Description
HookResult

HookResult on success.

Source code in packages/axm-git/src/axm_git/hooks/worktree_remove.py
Python
def execute(self, context: dict[str, object], **params: object) -> HookResult:
    """Execute the hook action.

    Args:
        context: Session context (must contain ``repo_path``
            and ``worktree_path``).
        **params: Optional ``enabled`` (default ``True``).

    Returns:
        HookResult on success.
    """
    if not params.get("enabled", True):
        return HookResult.ok(skipped=True, reason="git disabled")

    repo_path = Path(cast("str | Path", context.get("repo_path", ".")))

    if find_git_root(repo_path) is None:
        return HookResult.ok(skipped=True, reason="not a git repo")

    worktree_path = resolve_working_dir({}, context)

    if not worktree_path.exists():
        return HookResult.ok(
            skipped=True,
            reason=f"worktree path does not exist: {worktree_path}",
        )

    result = run_git(
        ["worktree", "remove", str(worktree_path), "--force"],
        repo_path,
    )
    if result.returncode != 0:
        return HookResult.fail(
            f"git worktree remove failed: {result.stderr}",
        )

    return HookResult.ok()