Files
agentic-pd-hybrid/third_party/sglang/.github/actions/wait-for-jobs/action.yml

178 lines
6.8 KiB
YAML

name: Wait for Jobs
description: Poll and wait for specified jobs in the current workflow run to complete
inputs:
stage-name:
description: 'Human-readable stage name for log messages (e.g. "stage-a")'
required: true
jobs:
description: |
JSON array of job specs to wait for. Each element is either:
- a string: exact job name (e.g. "stage-a-test-1-gpu-small")
- an object { "prefix": "...", "expected_count": N }: for matrix jobs
required: true
max-wait-minutes:
description: 'Maximum time to wait before timing out'
required: false
default: '240'
poll-interval-seconds:
description: 'Seconds between polling attempts'
required: false
default: '60'
github-token:
description: 'GitHub token for API calls'
required: false
default: ${{ github.token }}
outputs:
result:
description: 'Overall result: success, failure, or timeout'
value: ${{ steps.wait.outputs.result }}
runs:
using: composite
steps:
- name: Wait for jobs to complete
id: wait
uses: actions/github-script@v7
env:
INPUT_STAGE_NAME: ${{ inputs.stage-name }}
INPUT_JOBS: ${{ inputs.jobs }}
INPUT_MAX_WAIT_MINUTES: ${{ inputs.max-wait-minutes }}
INPUT_POLL_INTERVAL_SECONDS: ${{ inputs.poll-interval-seconds }}
with:
github-token: ${{ inputs.github-token }}
script: |
const stageName = process.env.INPUT_STAGE_NAME;
const jobSpecs = JSON.parse(process.env.INPUT_JOBS);
const maxWaitMinutes = parseInt(process.env.INPUT_MAX_WAIT_MINUTES);
const pollIntervalSeconds = parseInt(process.env.INPUT_POLL_INTERVAL_SECONDS);
const maxAttempts = (maxWaitMinutes * 60) / pollIntervalSeconds;
// Normalize job specs into a uniform format
const normalizedSpecs = jobSpecs.map(spec => {
if (typeof spec === 'string') {
return { prefix: spec, expected_count: 1, exact: true };
}
return { ...spec, exact: false };
});
const totalExpectedJobs = normalizedSpecs.reduce((sum, s) => sum + s.expected_count, 0);
const matchesSpec = (jobName, spec) => {
if (spec.exact) {
return jobName === spec.prefix;
}
return jobName === spec.prefix || jobName.startsWith(spec.prefix + ' (');
};
// Use ETag conditional requests to avoid consuming rate limit when nothing changed.
// GitHub returns 304 Not Modified for unchanged data, which is FREE (no rate limit cost).
let lastEtag = '';
let lastJobs = null;
let apiCalls = 0;
let cachedCalls = 0;
async function fetchJobs() {
const url = `GET /repos/{owner}/{repo}/actions/runs/{run_id}/jobs`;
const params = {
owner: context.repo.owner,
repo: context.repo.repo,
run_id: context.runId,
per_page: 100,
headers: {},
};
if (lastEtag) {
params.headers['if-none-match'] = lastEtag;
}
try {
const response = await github.request(url, params);
apiCalls++;
const rateRemaining = response.headers['x-ratelimit-remaining'] || '?';
const rateLimit = response.headers['x-ratelimit-limit'] || '?';
console.log(`[rate-limit] ${rateRemaining}/${rateLimit} remaining (ETag: ${lastEtag ? 'sent' : 'none'}) | this session: ${apiCalls} paid, ${cachedCalls} free`);
lastEtag = response.headers.etag || '';
const jobs = response.data.jobs;
// Handle pagination if >100 jobs
// ETag only covers page 1, so invalidate it to avoid stale cache
// when later pages change but page 1 doesn't.
if (response.data.total_count > 100) {
lastEtag = '';
for (let page = 2; page <= Math.ceil(response.data.total_count / 100); page++) {
const { data: pageData } = await github.request(url, {
...params,
page,
headers: {},
});
jobs.push(...pageData.jobs);
}
}
lastJobs = jobs;
return { jobs, cached: false };
} catch (err) {
if (err.status === 304 && lastJobs) {
cachedCalls++;
console.log(`[rate-limit] 304 Not Modified | this session: ${apiCalls} paid, ${cachedCalls} free`);
return { jobs: lastJobs, cached: true };
}
throw err;
}
}
for (let attempt = 0; attempt < maxAttempts; attempt++) {
const { jobs, cached } = await fetchJobs();
let allCompleted = true;
let failedJobs = [];
let completedCount = 0;
let totalCount = 0;
for (const spec of normalizedSpecs) {
const matchingJobs = jobs.filter(job => matchesSpec(job.name, spec));
for (const job of matchingJobs) {
totalCount++;
if (!cached) {
console.log(`${job.name}: status=${job.status}, conclusion=${job.conclusion}`);
}
if (job.status === 'completed') {
completedCount++;
if (job.conclusion !== 'success' && job.conclusion !== 'skipped') {
failedJobs.push(job.name);
}
} else {
allCompleted = false;
}
}
if (matchingJobs.length < spec.expected_count) {
console.log(`${spec.prefix}: found ${matchingJobs.length}/${spec.expected_count} jobs (waiting for more)`);
allCompleted = false;
}
}
console.log(`[${stageName}] Progress: ${completedCount}/${totalCount} jobs completed (expected ${totalExpectedJobs})${cached ? ' (cached, no rate limit cost)' : ''}`);
// Fail fast if any jobs failed
if (failedJobs.length > 0) {
core.setOutput('result', 'failure');
core.setFailed(`${stageName} jobs failed: ${failedJobs.join(', ')}`);
return;
}
if (allCompleted && totalCount >= totalExpectedJobs) {
core.setOutput('result', 'success');
return;
}
console.log(`Waiting ${pollIntervalSeconds}s... (attempt ${attempt + 1}/${maxAttempts})`);
await new Promise(resolve => setTimeout(resolve, pollIntervalSeconds * 1000));
}
core.setFailed(`Timeout waiting for ${stageName} jobs`);
core.setOutput('result', 'timeout');