Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 50 additions & 0 deletions redis/takeover_master.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
-- Atomically attempt to take over as master when current master is dead during setup
-- Returns 1 if takeover succeeded, 0 otherwise

local master_status_key = KEYS[1]
local master_worker_id_key = KEYS[2]
local master_setup_heartbeat_key = KEYS[3]

local new_worker_id = ARGV[1]
local current_time = tonumber(ARGV[2])
local heartbeat_timeout = tonumber(ARGV[3])
local redis_ttl = tonumber(ARGV[4])

-- Step 1: Verify status is still 'setup'
local status = redis.call('get', master_status_key)
if status ~= 'setup' then
return 0
end

-- Step 2: Check if heartbeat is stale or missing
local last_heartbeat = redis.call('get', master_setup_heartbeat_key)
if last_heartbeat then
local heartbeat_age = current_time - tonumber(last_heartbeat)
if heartbeat_age < heartbeat_timeout then
-- Master is still alive, heartbeat is fresh
return 0
end
end
-- If no heartbeat exists and status is 'setup', master may have died before first heartbeat
-- Allow takeover in this case (heartbeat_timeout acts as grace period for initial setup)

-- Step 3: Delete old master-status to allow SETNX
redis.call('del', master_status_key)

-- Step 4: Atomically try to claim master role
local claimed = redis.call('setnx', master_status_key, 'setup')
if claimed == 0 then
-- Another worker beat us to it
return 0
end

-- Step 5: We got master role - update worker ID and heartbeat
redis.call('set', master_worker_id_key, new_worker_id)
redis.call('set', master_setup_heartbeat_key, current_time)

-- Set TTLs
redis.call('expire', master_status_key, redis_ttl)
redis.call('expire', master_worker_id_key, redis_ttl)
redis.call('expire', master_setup_heartbeat_key, redis_ttl)

return 1
7 changes: 6 additions & 1 deletion ruby/lib/ci/queue/configuration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ class Configuration
attr_accessor :timing_redis_url
attr_accessor :write_duration_averages
attr_accessor :heartbeat_grace_period, :heartbeat_interval
attr_accessor :master_setup_heartbeat_interval, :master_setup_heartbeat_timeout
attr_reader :circuit_breakers
attr_writer :seed, :build_id
attr_writer :queue_init_timeout, :report_timeout, :inactive_workers_timeout
Expand Down Expand Up @@ -66,7 +67,9 @@ def initialize(
branch: nil,
timing_redis_url: nil,
heartbeat_grace_period: 30,
heartbeat_interval: 10
heartbeat_interval: 10,
master_setup_heartbeat_interval: 5,
master_setup_heartbeat_timeout: 30
)
@build_id = build_id
@circuit_breakers = [CircuitBreaker::Disabled]
Expand Down Expand Up @@ -105,6 +108,8 @@ def initialize(
@write_duration_averages = false
@heartbeat_grace_period = heartbeat_grace_period
@heartbeat_interval = heartbeat_interval
@master_setup_heartbeat_interval = master_setup_heartbeat_interval
@master_setup_heartbeat_timeout = master_setup_heartbeat_timeout
end

def queue_init_timeout
Expand Down
31 changes: 31 additions & 0 deletions ruby/lib/ci/queue/redis/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,27 @@ def progress
def wait_for_master(timeout: 120)
return true if master?

last_takeover_check = CI::Queue.time_now.to_f

(timeout * 10 + 1).to_i.times do
return true if queue_initialized?

# Periodically check if master is dead and attempt takeover
current_time = CI::Queue.time_now.to_f
if current_time - last_takeover_check >= config.master_setup_heartbeat_interval
last_takeover_check = current_time

if queue_initializing? && master_setup_heartbeat_stale?
if respond_to?(:attempt_master_takeover, true) && attempt_master_takeover
# Takeover succeeded - run master setup
if respond_to?(:run_master_setup, true)
execute_master_setup
return true
end
end
end
end

sleep 0.1
end
raise LostMaster, "The master worker (worker #{master_worker_id}) is still `#{master_status}` after #{timeout} seconds waiting."
Expand Down Expand Up @@ -97,6 +115,19 @@ def master_worker_id
redis.get(key('master-worker-id'))
end

# Check if the master setup heartbeat is stale (or missing)
# Returns true if heartbeat is older than master_setup_heartbeat_timeout
def master_setup_heartbeat_stale?
heartbeat = redis.get(key('master-setup-heartbeat'))
return true unless heartbeat # No heartbeat = stale (master may have died before first heartbeat)

current_time = CI::Queue.time_now.to_f
heartbeat_age = current_time - heartbeat.to_f
heartbeat_age >= config.master_setup_heartbeat_timeout
rescue *CONNECTION_ERRORS
false # On connection error, don't attempt takeover
end

private

attr_reader :redis, :redis_url
Expand Down
151 changes: 131 additions & 20 deletions ruby/lib/ci/queue/redis/worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

require 'ci/queue/static'
require 'concurrent/set'
require 'digest/sha2'
require 'set'

module CI
module Queue
Expand Down Expand Up @@ -37,18 +39,9 @@ def populate(tests, random: Random.new)
# All workers need an index of tests to resolve IDs
@index = tests.map { |t| [t.id, t] }.to_h
@total = tests.size
@random = random

if acquire_master_role?
executables = reorder_tests(tests, random: random)

chunks = executables.select { |e| e.is_a?(CI::Queue::TestChunk) }
individual_tests = executables.reject { |e| e.is_a?(CI::Queue::TestChunk) }

store_chunk_metadata(chunks) if chunks.any?

all_ids = chunks.map(&:id) + individual_tests.map(&:id)
push(all_ids)
end
execute_master_setup(tests) if acquire_master_role?

register_worker_presence

Expand Down Expand Up @@ -294,6 +287,44 @@ def with_heartbeat(test_id)
heartbeat_thread&.join(1) # Wait up to 1 second for thread to finish
end

# Runs a block while sending periodic heartbeats during master setup.
# This allows other workers to detect if the master dies during setup.
def with_master_setup_heartbeat
return yield unless config.master_setup_heartbeat_interval&.positive?

# Send initial heartbeat immediately
send_master_setup_heartbeat

stop_heartbeat = false
heartbeat_thread = Thread.new do
until stop_heartbeat
sleep(config.master_setup_heartbeat_interval)
break if stop_heartbeat

begin
send_master_setup_heartbeat
rescue StandardError => e
warn("[master-setup-heartbeat] Failed to send heartbeat: #{e.message}")
end
end
end

yield
ensure
stop_heartbeat = true
heartbeat_thread&.kill
heartbeat_thread&.join(1)
end

# Send a heartbeat to indicate master is still alive during setup
def send_master_setup_heartbeat
current_time = CI::Queue.time_now.to_f
redis.set(key('master-setup-heartbeat'), current_time)
redis.expire(key('master-setup-heartbeat'), config.redis_ttl)
rescue *CONNECTION_ERRORS => e
warn("[master-setup-heartbeat] Connection error: #{e.message}")
end

def ensure_connection_and_script(script)
# Pre-initialize Redis connection and script in current thread context
# This ensures background threads use the same initialized connection
Expand Down Expand Up @@ -439,18 +470,38 @@ def try_to_reserve_lost_test

def push(tests)
@total = tests.size
return unless @master

if @master
redis.multi do |transaction|
transaction.lpush(key('queue'), tests) unless tests.empty?
transaction.set(key('total'), @total)
transaction.set(key('master-status'), 'ready')
# Use WATCH/MULTI for atomic check-and-push to prevent TOCTOU race.
# If master-worker-id changes between WATCH and MULTI, transaction aborts.
result = redis.watch(key('master-worker-id')) do |rd|
current_master = rd.get(key('master-worker-id'))

transaction.expire(key('queue'), config.redis_ttl)
transaction.expire(key('total'), config.redis_ttl)
transaction.expire(key('master-status'), config.redis_ttl)
if current_master && current_master != worker_id
# We're not the master anymore, unwatch and abort
rd.unwatch
:not_master
else
# We're still master, execute atomic transaction
rd.multi do |transaction|
transaction.lpush(key('queue'), tests) unless tests.empty?
transaction.set(key('total'), @total)
transaction.set(key('master-status'), 'ready')

transaction.expire(key('queue'), config.redis_ttl)
transaction.expire(key('total'), config.redis_ttl)
transaction.expire(key('master-status'), config.redis_ttl)
end
end
end

# result is nil if WATCH detected a change (race condition)
# result is :not_master if we detected we're not master
# result is an array of responses if transaction succeeded
if result.nil? || result == :not_master
warn "Worker #{worker_id} lost master role (race detected), aborting push"
@master = false
end
rescue *CONNECTION_ERRORS
raise if @master
end
Expand All @@ -467,9 +518,15 @@ def acquire_master_role?
begin
redis.set(key('master-worker-id'), worker_id)
redis.expire(key('master-worker-id'), config.redis_ttl)

# Set initial heartbeat immediately to prevent premature takeover
# This closes the window where status='setup' but no heartbeat exists
redis.set(key('master-setup-heartbeat'), CI::Queue.time_now.to_f)
redis.expire(key('master-setup-heartbeat'), config.redis_ttl)

warn "Worker #{worker_id} elected as master"
rescue *CONNECTION_ERRORS
# If setting master-worker-id fails, we still have master status
# If setting master-worker-id/heartbeat fails, we still have master status
# Log but don't lose master role
warn("Failed to set master-worker-id: #{$!.message}")
end
Expand All @@ -480,13 +537,67 @@ def acquire_master_role?
false
end

# Attempt to take over as master when current master appears dead during setup.
# Uses atomic Lua script to ensure only one worker can win the takeover.
# Returns true if takeover succeeded, false otherwise.
def attempt_master_takeover
return false if @master # Already master

warn "Worker #{worker_id} attempting to takeover as master"

current_time = CI::Queue.time_now.to_f
result = eval_script(
:takeover_master,
keys: [
key('master-status'),
key('master-worker-id'),
key('master-setup-heartbeat')
],
argv: [
worker_id,
current_time,
config.master_setup_heartbeat_timeout,
config.redis_ttl
]
)

if result == 1
@master = true
warn "Worker #{worker_id} took over as master (previous master died during setup)"
true
else
warn "Failed to takeover as master. Current master is #{master_worker_id}"
false
end
rescue *CONNECTION_ERRORS => e
warn "[takeover] Connection error during takeover attempt: #{e.message}"
false
end

def register_worker_presence
register
redis.expire(key('workers'), config.redis_ttl)
rescue *CONNECTION_ERRORS
raise if master?
end

# Shared logic for master setup - reorders tests, stores chunk metadata, and pushes to queue.
# Used by both initial master setup (populate) and takeover.
def execute_master_setup(tests)
return unless @master && @index
with_master_setup_heartbeat do
executables = reorder_tests(tests, random: @random)

chunks = executables.select { |e| e.is_a?(CI::Queue::TestChunk) }
individual_tests = executables.reject { |e| e.is_a?(CI::Queue::TestChunk) }

store_chunk_metadata(chunks) if chunks.any?

all_ids = chunks.map(&:id) + individual_tests.map(&:id)
push(all_ids)
end
end

def store_chunk_metadata(chunks)
# Batch operations to avoid exceeding Redis multi operation limits
# Each chunk requires 4 commands (set, expire, sadd, hset), so batch conservatively
Expand Down