brush-paint-opt: distributed meta-optimizer worker + orchestrator

paint_meta_opt_worker takes one outer-idx, builds the ScoreWeights
for that index, runs the full inner optimizer under those weights,
and prints a MetaResult JSON (matching the in-process struct).

scripts/meta_optimize_distributed.sh splits N outer samples between
local and a remote SSH host, runs them serially on each host (each
already saturates rayon internally), and lex-sorts the merged JSON
by the same ordering compare_reports uses.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Mitchell Hansen
2026-05-02 00:27:50 -07:00
parent 9748e06ca9
commit 9190661d7a
3 changed files with 239 additions and 0 deletions

View File

@@ -48,3 +48,7 @@ path = "src/pipeline_bench.rs"
[[bin]]
name = "paint_opt_worker"
path = "src/bin/paint_opt_worker.rs"
[[bin]]
name = "paint_meta_opt_worker"
path = "src/bin/paint_meta_opt_worker.rs"

View File

@@ -0,0 +1,137 @@
#!/usr/bin/env bash
# Distributed meta-optimizer.
#
# Splits N outer ScoreWeights samples between THIS machine and a remote
# SSH host, runs each as `paint_meta_opt_worker N --inner I --passes P`,
# collects all `MetaResult`s, lexicographically sorts (matches the
# in-process `compare_reports`), and prints the best result + a top-5
# table.
#
# Each outer sample = build a random ScoreWeights candidate at index N
# → run the FULL inner optimizer under those weights → score the result.
# Outer samples are independent → embarrassingly parallel across hosts.
#
# Usage:
# scripts/meta_optimize_distributed.sh [N] [I] [P] [REMOTE]
# N = total outer samples (default 16)
# I = inner starts per outer (default 16)
# P = inner passes (default 4)
# REMOTE = "user@host:/path/to/repo" (default $REMOTE_TRAC3R)
#
# Splits via LOCAL_FRAC env var (default 0.4 = 40 % local, 60 % remote).
# Tune for unequal cores; with 14-core local + 24-core remote, 0.37 is
# proportional.
set -euo pipefail
N="${1:-16}"
I="${2:-16}"
P="${3:-4}"
REMOTE="${4:-${REMOTE_TRAC3R:-}}"
LOCAL_FRAC="${LOCAL_FRAC:-0.37}"
ROOT="$(cd "$(dirname "$0")/.." && pwd)"
TMPDIR="$(mktemp -d -t meta-distrib.XXXXXX)"
trap 'rm -rf "$TMPDIR"' EXIT
echo "[orch] meta-opt: $N outer × $I inner × $P passes" >&2
echo "[orch] root=$ROOT remote=$REMOTE" >&2
LOCAL_N=$(awk -v n="$N" -v f="$LOCAL_FRAC" 'BEGIN{ printf "%d", int(n*f + 0.5) }')
REMOTE_N=$((N - LOCAL_N))
[[ -z "$REMOTE" ]] && { LOCAL_N="$N"; REMOTE_N=0; }
echo "[orch] split: $LOCAL_N local, $REMOTE_N remote" >&2
# Build local first.
echo "[orch] cargo build --release --bin paint_meta_opt_worker (local)…" >&2
( cd "$ROOT" && cargo build --release --bin paint_meta_opt_worker ) >&2
# Build remote in parallel (login shell so cargo is on PATH).
REMOTE_BUILD_PID=""
if [[ -n "$REMOTE" ]]; then
HOST="${REMOTE%%:*}"
RPATH="${REMOTE#*:}"
echo "[orch] cargo build --release on remote ($HOST:$RPATH)…" >&2
( ssh "$HOST" "bash -lc 'cd \"$RPATH\" && cargo build --release --bin paint_meta_opt_worker'" >&2 ) &
REMOTE_BUILD_PID=$!
fi
# Concurrency policy: each meta-worker already saturates rayon
# internally (par_iter inner starts × par_iter corpus). Running TWO
# in parallel on the same box doubles thread pressure with no gain
# and probably some loss. So: at most one local + one remote at a time.
#
# Use xargs -P1 to serialize on each host. The two HOSTS run in
# parallel because we kick them off as separate background pipelines.
LOCAL_OUT="$TMPDIR/local.json"
: > "$LOCAL_OUT"
LOCAL_PID=""
if (( LOCAL_N > 0 )); then
echo "[orch] dispatching $LOCAL_N samples to local (serial)…" >&2
(
for i in $(seq 0 $((LOCAL_N - 1))); do
"$ROOT/target/release/paint_meta_opt_worker" "$i" --inner "$I" --passes "$P"
done
) >> "$LOCAL_OUT" &
LOCAL_PID=$!
fi
REMOTE_OUT="$TMPDIR/remote.json"
: > "$REMOTE_OUT"
REMOTE_PID=""
if [[ -n "$REMOTE" && "$REMOTE_N" -gt 0 ]]; then
[[ -n "$REMOTE_BUILD_PID" ]] && wait "$REMOTE_BUILD_PID"
HOST="${REMOTE%%:*}"
RPATH="${REMOTE#*:}"
echo "[orch] dispatching $REMOTE_N samples to $HOST (serial on remote)…" >&2
(
seq "$LOCAL_N" $((N - 1)) | \
ssh "$HOST" "bash -lc 'cd \"$RPATH\" && while read -r i; do ./target/release/paint_meta_opt_worker \$i --inner $I --passes $P; done'"
) >> "$REMOTE_OUT" &
REMOTE_PID=$!
fi
[[ -n "$LOCAL_PID" ]] && wait "$LOCAL_PID"
[[ -n "$REMOTE_PID" ]] && wait "$REMOTE_PID"
ALL="$TMPDIR/all.json"
cat "$LOCAL_OUT" "$REMOTE_OUT" > "$ALL"
LINES=$(wc -l < "$ALL" | tr -d ' ')
echo "[orch] collected $LINES results" >&2
if [[ "$LINES" -ne "$N" ]]; then
echo "[orch] WARNING: expected $N, got $LINES" >&2
fi
# Lex-sort matching CorpusReport's compare_reports:
# tier-1: fail_coverage, fail_bg, fail_single_stroke, fail_two_stroke, fail_length_budget
# tier-2: total_bg, total_strokes, total_unpainted_density, total_repaint, total_length
echo "" >&2
echo "[orch] top 5 by lex order:" >&2
jq -s 'sort_by([
.report.fail_coverage,
.report.fail_bg,
.report.fail_single_stroke,
.report.fail_two_stroke,
.report.fail_length_budget,
.report.total_bg,
.report.total_strokes,
.report.total_unpainted_density,
.report.total_repaint,
.report.total_length
]) | .[0:5] | .[] | "idx=\(.idx) T1[cov=\(.report.fail_coverage) bg=\(.report.fail_bg) 1stk=\(.report.fail_single_stroke) 2stk=\(.report.fail_two_stroke) len=\(.report.fail_length_budget)] T2[bg=\(.report.total_bg) stk=\(.report.total_strokes) dens=\(.report.total_unpainted_density|round) rep=\(.report.total_repaint) len=\(.report.total_length|round)]"' -r "$ALL" >&2
echo "" >&2
echo "[orch] best (full JSON on stdout):" >&2
jq -s 'sort_by([
.report.fail_coverage,
.report.fail_bg,
.report.fail_single_stroke,
.report.fail_two_stroke,
.report.fail_length_budget,
.report.total_bg,
.report.total_strokes,
.report.total_unpainted_density,
.report.total_repaint,
.report.total_length
]) | .[0]' "$ALL"

View File

@@ -0,0 +1,98 @@
//! Meta-optimizer worker — runs ONE outer sample of the meta search.
//! Builds the ScoreWeights for index N, runs the full inner optimizer
//! under those weights, evaluates the result against the corpus, and
//! prints a JSON `MetaResult` to stdout. Lets the meta-search be
//! sharded across SSH-reachable machines: each box runs
//! `paint_meta_opt_worker N` for its assigned indices in parallel,
//! orchestrator collects + lex-sorts.
//!
//! Usage:
//! paint_meta_opt_worker <outer_idx> [--inner N] [--passes K]
//!
//! Output (stdout): `{ "idx", "weights", "params", "report" }` (JSON).
//! Stderr: human-readable progress; never parse it.
use std::env;
use std::process::ExitCode;
use trac3r_lib::brush_paint::PaintParams;
use trac3r_lib::brush_paint_opt::{
build_meta_weights, build_corpus, default_axes,
evaluate_score_weights, MetaResult,
};
fn parse_args() -> Result<(usize, usize, u32), String> {
let argv: Vec<String> = env::args().collect();
if argv.len() < 2 {
return Err(format!(
"usage: {} <outer_idx> [--inner N] [--passes K]\n\
outer_idx is the meta-optimizer's outer index (0..K-1).\n\
inner defaults to 16, passes to 4.",
argv.first().cloned().unwrap_or_else(|| "paint_meta_opt_worker".to_string())
));
}
let outer_idx: usize = argv[1].parse()
.map_err(|e| format!("outer_idx must be a non-negative integer: {e}"))?;
let mut inner: usize = 16;
let mut passes: u32 = 4;
let mut i = 2;
while i < argv.len() {
match argv[i].as_str() {
"--inner" => {
i += 1;
inner = argv.get(i).ok_or("--inner requires a value")?
.parse().map_err(|e| format!("--inner value invalid: {e}"))?;
}
"--passes" => {
i += 1;
passes = argv.get(i).ok_or("--passes requires a value")?
.parse().map_err(|e| format!("--passes value invalid: {e}"))?;
}
other => return Err(format!("unknown arg: {other}")),
}
i += 1;
}
Ok((outer_idx, inner, passes))
}
fn main() -> ExitCode {
let (outer_idx, n_inner, n_passes) = match parse_args() {
Ok(t) => t,
Err(e) => { eprintln!("{e}"); return ExitCode::from(2); }
};
let host = hostname();
let cores = std::thread::available_parallelism().map(|n| n.get()).unwrap_or(0);
eprintln!("[meta-worker {host}/{cores}t] outer_idx={outer_idx} inner={n_inner} passes={n_passes}");
let t0 = std::time::Instant::now();
let weights = build_meta_weights(outer_idx);
let corpus = build_corpus();
let axes = default_axes();
let base = PaintParams::default();
let (params, report) = evaluate_score_weights(
&weights, &corpus, &axes, &base, n_inner, n_passes
);
let elapsed = t0.elapsed();
eprintln!(
"[meta-worker {host}] done idx={} elapsed={:.1}s {}",
outer_idx, elapsed.as_secs_f64(), report.summary()
);
let result = MetaResult { idx: outer_idx, weights, params, report };
match serde_json::to_string(&result) {
Ok(json) => { println!("{json}"); ExitCode::SUCCESS }
Err(e) => { eprintln!("[meta-worker {host}] JSON serialize failed: {e}"); ExitCode::from(3) }
}
}
fn hostname() -> String {
std::env::var("HOSTNAME")
.or_else(|_| std::env::var("HOST"))
.unwrap_or_else(|_| {
std::process::Command::new("hostname")
.output().ok()
.and_then(|o| String::from_utf8(o.stdout).ok())
.map(|s| s.trim().to_string())
.unwrap_or_else(|| "?".to_string())
})
}