From 8d0ff4f688dae9bf93bede2fa77f76c83d2062d7 Mon Sep 17 00:00:00 2001 From: Sammy Steele Date: Tue, 3 Feb 2026 16:05:06 -0800 Subject: [PATCH 1/8] Detect and recover from master worker death during setup When the master worker is killed before completing queue setup, the master-status stays at 'setup' and all non-master workers poll for ~120s then fail with LostMaster. This change adds a master setup heartbeat mechanism that allows workers to detect a dead master and atomically re-elect a new one. Changes: - Add master_setup_heartbeat_interval (5s) and master_setup_heartbeat_timeout (30s) config options - Master sends heartbeat during setup via background thread - Workers detect stale heartbeat and attempt atomic takeover via Lua script - Guard in push() prevents split-brain (old master pushing after replacement) Co-Authored-By: Claude Opus 4.5 --- redis/takeover_master.lua | 50 ++++++++++++ ruby/lib/ci/queue/configuration.rb | 7 +- ruby/lib/ci/queue/redis/base.rb | 31 ++++++++ ruby/lib/ci/queue/redis/worker.rb | 120 +++++++++++++++++++++++++++-- 4 files changed, 201 insertions(+), 7 deletions(-) create mode 100644 redis/takeover_master.lua diff --git a/redis/takeover_master.lua b/redis/takeover_master.lua new file mode 100644 index 0000000..54f7e46 --- /dev/null +++ b/redis/takeover_master.lua @@ -0,0 +1,50 @@ +-- Atomically attempt to take over as master when current master is dead during setup +-- Returns 1 if takeover succeeded, 0 otherwise + +local master_status_key = KEYS[1] +local master_worker_id_key = KEYS[2] +local master_setup_heartbeat_key = KEYS[3] + +local new_worker_id = ARGV[1] +local current_time = tonumber(ARGV[2]) +local heartbeat_timeout = tonumber(ARGV[3]) +local redis_ttl = tonumber(ARGV[4]) + +-- Step 1: Verify status is still 'setup' +local status = redis.call('get', master_status_key) +if status ~= 'setup' then + return 0 +end + +-- Step 2: Check if heartbeat is stale or missing +local last_heartbeat = redis.call('get', master_setup_heartbeat_key) +if last_heartbeat then + local heartbeat_age = current_time - tonumber(last_heartbeat) + if heartbeat_age < heartbeat_timeout then + -- Master is still alive, heartbeat is fresh + return 0 + end +end +-- If no heartbeat exists and status is 'setup', master may have died before first heartbeat +-- Allow takeover in this case (heartbeat_timeout acts as grace period for initial setup) + +-- Step 3: Delete old master-status to allow SETNX +redis.call('del', master_status_key) + +-- Step 4: Atomically try to claim master role +local claimed = redis.call('setnx', master_status_key, 'setup') +if claimed == 0 then + -- Another worker beat us to it + return 0 +end + +-- Step 5: We got master role - update worker ID and heartbeat +redis.call('set', master_worker_id_key, new_worker_id) +redis.call('set', master_setup_heartbeat_key, current_time) + +-- Set TTLs +redis.call('expire', master_status_key, redis_ttl) +redis.call('expire', master_worker_id_key, redis_ttl) +redis.call('expire', master_setup_heartbeat_key, redis_ttl) + +return 1 diff --git a/ruby/lib/ci/queue/configuration.rb b/ruby/lib/ci/queue/configuration.rb index 9e9c133..200e889 100644 --- a/ruby/lib/ci/queue/configuration.rb +++ b/ruby/lib/ci/queue/configuration.rb @@ -15,6 +15,7 @@ class Configuration attr_accessor :timing_redis_url attr_accessor :write_duration_averages attr_accessor :heartbeat_grace_period, :heartbeat_interval + attr_accessor :master_setup_heartbeat_interval, :master_setup_heartbeat_timeout attr_reader :circuit_breakers attr_writer :seed, :build_id attr_writer :queue_init_timeout, :report_timeout, :inactive_workers_timeout @@ -66,7 +67,9 @@ def initialize( branch: nil, timing_redis_url: nil, heartbeat_grace_period: 30, - heartbeat_interval: 10 + heartbeat_interval: 10, + master_setup_heartbeat_interval: 5, + master_setup_heartbeat_timeout: 30 ) @build_id = build_id @circuit_breakers = [CircuitBreaker::Disabled] @@ -105,6 +108,8 @@ def initialize( @write_duration_averages = false @heartbeat_grace_period = heartbeat_grace_period @heartbeat_interval = heartbeat_interval + @master_setup_heartbeat_interval = master_setup_heartbeat_interval + @master_setup_heartbeat_timeout = master_setup_heartbeat_timeout end def queue_init_timeout diff --git a/ruby/lib/ci/queue/redis/base.rb b/ruby/lib/ci/queue/redis/base.rb index 9ebdd58..8599fab 100644 --- a/ruby/lib/ci/queue/redis/base.rb +++ b/ruby/lib/ci/queue/redis/base.rb @@ -56,9 +56,27 @@ def progress def wait_for_master(timeout: 120) return true if master? + last_takeover_check = CI::Queue.time_now.to_f + (timeout * 10 + 1).to_i.times do return true if queue_initialized? + # Periodically check if master is dead and attempt takeover + current_time = CI::Queue.time_now.to_f + if current_time - last_takeover_check >= config.master_setup_heartbeat_interval + last_takeover_check = current_time + + if queue_initializing? && master_setup_heartbeat_stale? + if respond_to?(:attempt_master_takeover, true) && attempt_master_takeover + # Takeover succeeded - run master setup + if respond_to?(:run_master_setup, true) + run_master_setup + return true + end + end + end + end + sleep 0.1 end raise LostMaster, "The master worker (worker #{master_worker_id}) is still `#{master_status}` after #{timeout} seconds waiting." @@ -97,6 +115,19 @@ def master_worker_id redis.get(key('master-worker-id')) end + # Check if the master setup heartbeat is stale (or missing) + # Returns true if heartbeat is older than master_setup_heartbeat_timeout + def master_setup_heartbeat_stale? + heartbeat = redis.get(key('master-setup-heartbeat')) + return true unless heartbeat # No heartbeat = stale (master may have died before first heartbeat) + + current_time = CI::Queue.time_now.to_f + heartbeat_age = current_time - heartbeat.to_f + heartbeat_age >= config.master_setup_heartbeat_timeout + rescue *CONNECTION_ERRORS + false # On connection error, don't attempt takeover + end + private attr_reader :redis, :redis_url diff --git a/ruby/lib/ci/queue/redis/worker.rb b/ruby/lib/ci/queue/redis/worker.rb index 7de1f17..c3c49c2 100644 --- a/ruby/lib/ci/queue/redis/worker.rb +++ b/ruby/lib/ci/queue/redis/worker.rb @@ -2,6 +2,8 @@ require 'ci/queue/static' require 'concurrent/set' +require 'digest/sha2' +require 'set' module CI module Queue @@ -39,15 +41,17 @@ def populate(tests, random: Random.new) @total = tests.size if acquire_master_role? - executables = reorder_tests(tests, random: random) + with_master_setup_heartbeat do + executables = reorder_tests(tests, random: random) - chunks = executables.select { |e| e.is_a?(CI::Queue::TestChunk) } - individual_tests = executables.reject { |e| e.is_a?(CI::Queue::TestChunk) } + chunks = executables.select { |e| e.is_a?(CI::Queue::TestChunk) } + individual_tests = executables.reject { |e| e.is_a?(CI::Queue::TestChunk) } - store_chunk_metadata(chunks) if chunks.any? + store_chunk_metadata(chunks) if chunks.any? - all_ids = chunks.map(&:id) + individual_tests.map(&:id) - push(all_ids) + all_ids = chunks.map(&:id) + individual_tests.map(&:id) + push(all_ids) + end end register_worker_presence @@ -294,6 +298,44 @@ def with_heartbeat(test_id) heartbeat_thread&.join(1) # Wait up to 1 second for thread to finish end + # Runs a block while sending periodic heartbeats during master setup. + # This allows other workers to detect if the master dies during setup. + def with_master_setup_heartbeat + return yield unless config.master_setup_heartbeat_interval&.positive? + + # Send initial heartbeat immediately + send_master_setup_heartbeat + + stop_heartbeat = false + heartbeat_thread = Thread.new do + until stop_heartbeat + sleep(config.master_setup_heartbeat_interval) + break if stop_heartbeat + + begin + send_master_setup_heartbeat + rescue StandardError => e + warn("[master-setup-heartbeat] Failed to send heartbeat: #{e.message}") + end + end + end + + yield + ensure + stop_heartbeat = true + heartbeat_thread&.kill + heartbeat_thread&.join(1) + end + + # Send a heartbeat to indicate master is still alive during setup + def send_master_setup_heartbeat + current_time = CI::Queue.time_now.to_f + redis.set(key('master-setup-heartbeat'), current_time) + redis.expire(key('master-setup-heartbeat'), config.redis_ttl) + rescue *CONNECTION_ERRORS => e + warn("[master-setup-heartbeat] Connection error: #{e.message}") + end + def ensure_connection_and_script(script) # Pre-initialize Redis connection and script in current thread context # This ensures background threads use the same initialized connection @@ -441,6 +483,15 @@ def push(tests) @total = tests.size if @master + # Guard against split-brain: verify we're still the master before pushing + # This prevents race where old master reconnects after being replaced + current_master = redis.get(key('master-worker-id')) + if current_master && current_master != worker_id + warn "Worker #{worker_id} lost master role to #{current_master}, aborting push" + @master = false + return + end + redis.multi do |transaction| transaction.lpush(key('queue'), tests) unless tests.empty? transaction.set(key('total'), @total) @@ -480,6 +531,40 @@ def acquire_master_role? false end + # Attempt to take over as master when current master appears dead during setup. + # Uses atomic Lua script to ensure only one worker can win the takeover. + # Returns true if takeover succeeded, false otherwise. + def attempt_master_takeover + return false if @master # Already master + + current_time = CI::Queue.time_now.to_f + result = eval_script( + :takeover_master, + keys: [ + key('master-status'), + key('master-worker-id'), + key('master-setup-heartbeat') + ], + argv: [ + worker_id, + current_time, + config.master_setup_heartbeat_timeout, + config.redis_ttl + ] + ) + + if result == 1 + @master = true + warn "Worker #{worker_id} took over as master (previous master died during setup)" + true + else + false + end + rescue *CONNECTION_ERRORS => e + warn "[takeover] Connection error during takeover attempt: #{e.message}" + false + end + def register_worker_presence register redis.expire(key('workers'), config.redis_ttl) @@ -487,6 +572,29 @@ def register_worker_presence raise if master? end + # Run master setup after a successful takeover. + # Reconstructs the work the original master would have done. + def run_master_setup + return unless @master && @index + + # Reconstruct tests array from index + tests = @index.values + + with_master_setup_heartbeat do + # Use the same seed for deterministic ordering + random = Random.new(Digest::SHA256.hexdigest(config.seed).to_i(16)) + executables = reorder_tests(tests, random: random) + + chunks = executables.select { |e| e.is_a?(CI::Queue::TestChunk) } + individual_tests = executables.reject { |e| e.is_a?(CI::Queue::TestChunk) } + + store_chunk_metadata(chunks) if chunks.any? + + all_ids = chunks.map(&:id) + individual_tests.map(&:id) + push(all_ids) + end + end + def store_chunk_metadata(chunks) # Batch operations to avoid exceeding Redis multi operation limits # Each chunk requires 4 commands (set, expire, sadd, hset), so batch conservatively From 51df5db3aa16550ed6f606833d289c1d4de3a2ed Mon Sep 17 00:00:00 2001 From: Sammy Steele Date: Thu, 5 Feb 2026 14:20:00 -0800 Subject: [PATCH 2/8] Fix TOCTOU race in push() using WATCH/MULTI The previous guard check in push() had a race window between checking master-worker-id and executing the transaction. A takeover could occur in that window, causing both old and new master to push tests. Use Redis WATCH to monitor master-worker-id. If it changes between WATCH and MULTI execution, the transaction automatically aborts, preventing duplicate test pushes. Co-Authored-By: Claude Opus 4.5 --- ruby/lib/ci/queue/redis/worker.rb | 43 +++++++++++++++++++------------ 1 file changed, 27 insertions(+), 16 deletions(-) diff --git a/ruby/lib/ci/queue/redis/worker.rb b/ruby/lib/ci/queue/redis/worker.rb index c3c49c2..8978326 100644 --- a/ruby/lib/ci/queue/redis/worker.rb +++ b/ruby/lib/ci/queue/redis/worker.rb @@ -481,26 +481,37 @@ def try_to_reserve_lost_test def push(tests) @total = tests.size + return unless @master + + # Use WATCH/MULTI for atomic check-and-push to prevent TOCTOU race. + # If master-worker-id changes between WATCH and MULTI, transaction aborts. + result = redis.watch(key('master-worker-id')) do |rd| + current_master = rd.get(key('master-worker-id')) - if @master - # Guard against split-brain: verify we're still the master before pushing - # This prevents race where old master reconnects after being replaced - current_master = redis.get(key('master-worker-id')) if current_master && current_master != worker_id - warn "Worker #{worker_id} lost master role to #{current_master}, aborting push" - @master = false - return + # We're not the master anymore, unwatch and abort + rd.unwatch + :not_master + else + # We're still master, execute atomic transaction + rd.multi do |transaction| + transaction.lpush(key('queue'), tests) unless tests.empty? + transaction.set(key('total'), @total) + transaction.set(key('master-status'), 'ready') + + transaction.expire(key('queue'), config.redis_ttl) + transaction.expire(key('total'), config.redis_ttl) + transaction.expire(key('master-status'), config.redis_ttl) + end end + end - redis.multi do |transaction| - transaction.lpush(key('queue'), tests) unless tests.empty? - transaction.set(key('total'), @total) - transaction.set(key('master-status'), 'ready') - - transaction.expire(key('queue'), config.redis_ttl) - transaction.expire(key('total'), config.redis_ttl) - transaction.expire(key('master-status'), config.redis_ttl) - end + # result is nil if WATCH detected a change (race condition) + # result is :not_master if we detected we're not master + # result is an array of responses if transaction succeeded + if result.nil? || result == :not_master + warn "Worker #{worker_id} lost master role (race detected), aborting push" + @master = false end rescue *CONNECTION_ERRORS raise if @master From bbb62d9e347a953ac2ee91329e69cf5464498f2f Mon Sep 17 00:00:00 2001 From: Sammy Steele Date: Thu, 5 Feb 2026 15:07:13 -0800 Subject: [PATCH 3/8] Fix race condition: set initial heartbeat in acquire_master_role Previously, there was a window between acquiring master role and entering with_master_setup_heartbeat where no heartbeat existed. Other workers could see status='setup' with no heartbeat and incorrectly attempt takeover. Now the initial heartbeat is set immediately after acquiring master role, closing this window. Co-Authored-By: Claude Opus 4.5 --- ruby/lib/ci/queue/redis/worker.rb | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/ruby/lib/ci/queue/redis/worker.rb b/ruby/lib/ci/queue/redis/worker.rb index 8978326..71ac91e 100644 --- a/ruby/lib/ci/queue/redis/worker.rb +++ b/ruby/lib/ci/queue/redis/worker.rb @@ -529,9 +529,15 @@ def acquire_master_role? begin redis.set(key('master-worker-id'), worker_id) redis.expire(key('master-worker-id'), config.redis_ttl) + + # Set initial heartbeat immediately to prevent premature takeover + # This closes the window where status='setup' but no heartbeat exists + redis.set(key('master-setup-heartbeat'), CI::Queue.time_now.to_f) + redis.expire(key('master-setup-heartbeat'), config.redis_ttl) + warn "Worker #{worker_id} elected as master" rescue *CONNECTION_ERRORS - # If setting master-worker-id fails, we still have master status + # If setting master-worker-id/heartbeat fails, we still have master status # Log but don't lose master role warn("Failed to set master-worker-id: #{$!.message}") end From 86e003a14ff03af805a3a1d618ebc10eb92a54ea Mon Sep 17 00:00:00 2001 From: Sammy Steele Date: Thu, 5 Feb 2026 15:08:16 -0800 Subject: [PATCH 4/8] Fix race condition: check status before running master setup after takeover After taking over as master, check if master-status is still "setup" before running setup. If status is already "ready", the original master completed its push before we could act, so we skip to avoid duplicate test pushes. This fixes the race where: 1. Original master's push() passes WATCH check 2. Heartbeat becomes stale during redis.multi execution 3. Worker takes over (status still "setup") 4. Original master's transaction commits, sets status="ready" 5. Without this fix, takeover worker would also push tests Co-Authored-By: Claude Opus 4.5 --- ruby/lib/ci/queue/redis/worker.rb | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/ruby/lib/ci/queue/redis/worker.rb b/ruby/lib/ci/queue/redis/worker.rb index 71ac91e..13b0367 100644 --- a/ruby/lib/ci/queue/redis/worker.rb +++ b/ruby/lib/ci/queue/redis/worker.rb @@ -594,6 +594,19 @@ def register_worker_presence def run_master_setup return unless @master && @index + # Check if setup is already complete (original master finished before we took over) + # This prevents duplicate test pushes in the race where: + # 1. Original master's push() passes WATCH check + # 2. Original master's heartbeat becomes stale during redis.multi + # 3. We take over (status still "setup") + # 4. Original master's transaction commits, sets status="ready" + # 5. We would push again without this check + status = master_status + if status != 'setup' + warn "Worker #{worker_id} took over but setup already complete (status=#{status}), skipping" + return + end + # Reconstruct tests array from index tests = @index.values From 402d00adbce6907e719af47e15453cdddfb4067b Mon Sep 17 00:00:00 2001 From: Sammy Steele Date: Thu, 5 Feb 2026 15:25:18 -0800 Subject: [PATCH 5/8] Revert "Fix race condition: check status before running master setup after takeover" This reverts commit 636796e6470f8f9a0fda74b668791dacb30836d4. --- ruby/lib/ci/queue/redis/worker.rb | 13 ------------- 1 file changed, 13 deletions(-) diff --git a/ruby/lib/ci/queue/redis/worker.rb b/ruby/lib/ci/queue/redis/worker.rb index 13b0367..71ac91e 100644 --- a/ruby/lib/ci/queue/redis/worker.rb +++ b/ruby/lib/ci/queue/redis/worker.rb @@ -594,19 +594,6 @@ def register_worker_presence def run_master_setup return unless @master && @index - # Check if setup is already complete (original master finished before we took over) - # This prevents duplicate test pushes in the race where: - # 1. Original master's push() passes WATCH check - # 2. Original master's heartbeat becomes stale during redis.multi - # 3. We take over (status still "setup") - # 4. Original master's transaction commits, sets status="ready" - # 5. We would push again without this check - status = master_status - if status != 'setup' - warn "Worker #{worker_id} took over but setup already complete (status=#{status}), skipping" - return - end - # Reconstruct tests array from index tests = @index.values From 48442ba37b29f24dbff9203caa9c50e35f1a8cad Mon Sep 17 00:00:00 2001 From: Sammy Steele Date: Thu, 5 Feb 2026 15:26:57 -0800 Subject: [PATCH 6/8] Fix md5 hash difference --- ruby/lib/ci/queue/redis/worker.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ruby/lib/ci/queue/redis/worker.rb b/ruby/lib/ci/queue/redis/worker.rb index 71ac91e..5fb659a 100644 --- a/ruby/lib/ci/queue/redis/worker.rb +++ b/ruby/lib/ci/queue/redis/worker.rb @@ -599,7 +599,7 @@ def run_master_setup with_master_setup_heartbeat do # Use the same seed for deterministic ordering - random = Random.new(Digest::SHA256.hexdigest(config.seed).to_i(16)) + random = Random.new(Digest::MD5.hexdigest(config.seed).to_i(16)) executables = reorder_tests(tests, random: random) chunks = executables.select { |e| e.is_a?(CI::Queue::TestChunk) } From ddfef015692f8145fdf48872dcd33db8533d1084 Mon Sep 17 00:00:00 2001 From: Sammy Steele Date: Thu, 5 Feb 2026 15:45:15 -0800 Subject: [PATCH 7/8] Refactor: extract execute_master_setup to remove duplication Extract shared master setup logic (reorder tests, store chunk metadata, push to queue) into execute_master_setup method. Used by both initial master setup in populate() and takeover in run_master_setup(). Co-Authored-By: Claude Opus 4.5 --- ruby/lib/ci/queue/redis/base.rb | 2 +- ruby/lib/ci/queue/redis/worker.rb | 29 ++++++----------------------- 2 files changed, 7 insertions(+), 24 deletions(-) diff --git a/ruby/lib/ci/queue/redis/base.rb b/ruby/lib/ci/queue/redis/base.rb index 8599fab..0314b69 100644 --- a/ruby/lib/ci/queue/redis/base.rb +++ b/ruby/lib/ci/queue/redis/base.rb @@ -70,7 +70,7 @@ def wait_for_master(timeout: 120) if respond_to?(:attempt_master_takeover, true) && attempt_master_takeover # Takeover succeeded - run master setup if respond_to?(:run_master_setup, true) - run_master_setup + execute_master_setup return true end end diff --git a/ruby/lib/ci/queue/redis/worker.rb b/ruby/lib/ci/queue/redis/worker.rb index 5fb659a..f182f16 100644 --- a/ruby/lib/ci/queue/redis/worker.rb +++ b/ruby/lib/ci/queue/redis/worker.rb @@ -39,20 +39,9 @@ def populate(tests, random: Random.new) # All workers need an index of tests to resolve IDs @index = tests.map { |t| [t.id, t] }.to_h @total = tests.size + @random = random - if acquire_master_role? - with_master_setup_heartbeat do - executables = reorder_tests(tests, random: random) - - chunks = executables.select { |e| e.is_a?(CI::Queue::TestChunk) } - individual_tests = executables.reject { |e| e.is_a?(CI::Queue::TestChunk) } - - store_chunk_metadata(chunks) if chunks.any? - - all_ids = chunks.map(&:id) + individual_tests.map(&:id) - push(all_ids) - end - end + execute_master_setup(tests) if acquire_master_role? register_worker_presence @@ -589,18 +578,12 @@ def register_worker_presence raise if master? end - # Run master setup after a successful takeover. - # Reconstructs the work the original master would have done. - def run_master_setup + # Shared logic for master setup - reorders tests, stores chunk metadata, and pushes to queue. + # Used by both initial master setup (populate) and takeover. + def execute_master_setup(tests) return unless @master && @index - - # Reconstruct tests array from index - tests = @index.values - with_master_setup_heartbeat do - # Use the same seed for deterministic ordering - random = Random.new(Digest::MD5.hexdigest(config.seed).to_i(16)) - executables = reorder_tests(tests, random: random) + executables = reorder_tests(tests, random: @random) chunks = executables.select { |e| e.is_a?(CI::Queue::TestChunk) } individual_tests = executables.reject { |e| e.is_a?(CI::Queue::TestChunk) } From 9b76034e03bac974a789f6ce7fead18223e57a86 Mon Sep 17 00:00:00 2001 From: Sammy Steele Date: Thu, 5 Feb 2026 15:58:45 -0800 Subject: [PATCH 8/8] add logging --- ruby/lib/ci/queue/redis/worker.rb | 3 +++ 1 file changed, 3 insertions(+) diff --git a/ruby/lib/ci/queue/redis/worker.rb b/ruby/lib/ci/queue/redis/worker.rb index f182f16..810f723 100644 --- a/ruby/lib/ci/queue/redis/worker.rb +++ b/ruby/lib/ci/queue/redis/worker.rb @@ -543,6 +543,8 @@ def acquire_master_role? def attempt_master_takeover return false if @master # Already master + warn "Worker #{worker_id} attempting to takeover as master" + current_time = CI::Queue.time_now.to_f result = eval_script( :takeover_master, @@ -564,6 +566,7 @@ def attempt_master_takeover warn "Worker #{worker_id} took over as master (previous master died during setup)" true else + warn "Failed to takeover as master. Current master is #{master_worker_id}" false end rescue *CONNECTION_ERRORS => e