Poll a PR until merged or timeout.
Reads pr_number (or pr_url) from context and polls
gh pr view --json state every 30 seconds. Times out after
10 minutes by default.
Source code in packages/axm-git/src/axm_git/hooks/await_merge.py
| @dataclass
class AwaitMergeHook:
"""Poll a PR until merged or timeout.
Reads ``pr_number`` (or ``pr_url``) from *context* and polls
``gh pr view --json state`` every 30 seconds. Times out after
10 minutes by default.
"""
def execute(self, context: dict[str, Any], **params: Any) -> HookResult:
"""Execute the hook action.
Args:
context: Session context dictionary (must contain
``pr_number`` or ``pr_url``).
**params: Optional ``enabled``, ``timeout`` (seconds),
``interval`` (seconds).
Returns:
HookResult with ``merged=True`` on success.
"""
if not params.get("enabled", True):
return HookResult.ok(skipped=True, reason="git disabled")
if not gh_available():
return HookResult.ok(skipped=True, reason="gh not available")
working_dir = _resolve_working_dir(params, context)
pr_ref = (
params.get("pr_number")
or params.get("pr_url")
or context.get("pr_number")
or context.get("pr_url")
)
if not pr_ref:
return HookResult.fail("no pr_number or pr_url in params or context")
timeout = int(params.get("timeout", _DEFAULT_TIMEOUT))
interval = int(params.get("interval", _DEFAULT_INTERVAL))
elapsed = 0
while elapsed < timeout:
state = _poll_pr_state(str(pr_ref), working_dir)
if state is None:
return HookResult.fail(f"failed to query PR {pr_ref} state")
if state == "MERGED":
return HookResult.ok(merged=True, pr_ref=str(pr_ref))
if state == "CLOSED":
return HookResult.fail(f"PR {pr_ref} was closed without merging")
time.sleep(interval)
elapsed += interval
return HookResult.fail(f"PR {pr_ref} not merged after {timeout}s timeout")
|
execute(context, **params)
Execute the hook action.
Parameters:
| Name |
Type |
Description |
Default |
context
|
dict[str, Any]
|
Session context dictionary (must contain
pr_number or pr_url).
|
required
|
**params
|
Any
|
Optional enabled, timeout (seconds),
interval (seconds).
|
{}
|
Returns:
| Type |
Description |
HookResult
|
HookResult with merged=True on success.
|
Source code in packages/axm-git/src/axm_git/hooks/await_merge.py
| def execute(self, context: dict[str, Any], **params: Any) -> HookResult:
"""Execute the hook action.
Args:
context: Session context dictionary (must contain
``pr_number`` or ``pr_url``).
**params: Optional ``enabled``, ``timeout`` (seconds),
``interval`` (seconds).
Returns:
HookResult with ``merged=True`` on success.
"""
if not params.get("enabled", True):
return HookResult.ok(skipped=True, reason="git disabled")
if not gh_available():
return HookResult.ok(skipped=True, reason="gh not available")
working_dir = _resolve_working_dir(params, context)
pr_ref = (
params.get("pr_number")
or params.get("pr_url")
or context.get("pr_number")
or context.get("pr_url")
)
if not pr_ref:
return HookResult.fail("no pr_number or pr_url in params or context")
timeout = int(params.get("timeout", _DEFAULT_TIMEOUT))
interval = int(params.get("interval", _DEFAULT_INTERVAL))
elapsed = 0
while elapsed < timeout:
state = _poll_pr_state(str(pr_ref), working_dir)
if state is None:
return HookResult.fail(f"failed to query PR {pr_ref} state")
if state == "MERGED":
return HookResult.ok(merged=True, pr_ref=str(pr_ref))
if state == "CLOSED":
return HookResult.fail(f"PR {pr_ref} was closed without merging")
time.sleep(interval)
elapsed += interval
return HookResult.fail(f"PR {pr_ref} not merged after {timeout}s timeout")
|