Skip to content

Move compute-heavy endpoints off event loop on to worker threads#2310

Open
RSam25 wants to merge 22 commits intomainfrom
srm-UID2-6480-move-compute-heavy-endpoints-to-worker-pool
Open

Move compute-heavy endpoints off event loop on to worker threads#2310
RSam25 wants to merge 22 commits intomainfrom
srm-UID2-6480-move-compute-heavy-endpoints-to-worker-pool

Conversation

@RSam25
Copy link
Contributor

@RSam25 RSam25 commented Jan 23, 2026

Changes made:

  • Reduced default worker pool size from 20 to 12
  • Created a new thread pool called compute
  • Moved identity and key (bidstream and sharing) endpoints to compute pool
  • Added enable_async_batch_request feature flag
  • Added pool name labels to micrometer worker pool metrics
  • Turned worker queue wait time metric into a histogram

Tests ran:

  • Unit tests succeed
  • Ran in validator (no mismatches)
  • Ran load tests to verify expected improvements

mainRouter.post(V2_IDENTITY_BUCKETS.toString()).handler(bodyHandler).handler(auth.handleV1(
rc -> encryptedPayloadHandler.handle(rc, this::handleBucketsV2), Role.MAPPER));
rc -> encryptedPayloadHandler.handleAsync(rc, this::handleBucketsV2Async), Role.MAPPER));
mainRouter.post(V2_IDENTITY_MAP.toString()).handler(bodyHandler).handler(auth.handleV1(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You don't have to do all this.. please just swap the existing handler with blockingHandler, ordered = false: https://vertx.io/docs/apidocs/io/vertx/ext/web/Route.html#blockingHandler(io.vertx.core.Handler,boolean)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why I didn’t use blockingHandler

  • It runs on Vert.x’s default worker pool, and it cannot be configured to run on a different pool
  • The default worker pool also handles store sync operations (e.g., RotatingStoreVerticle and CloudSyncVerticle from uid2 shared), some of which are IO bound
  • Without a way to isolate these tasks, IO bound tasks might block the worker threads, slowing down processing of batch requests
  • This interference would also distort queue metrics (both wait time and queue length), as the queue would reflect not only request load but also other background tasks. That would make alerts (and potential HPA scaling based on them) flakier


// Create shared compute pool for CPU-intensive operations
final int computePoolSize = config.getInteger(Const.Config.ComputePoolThreadCountProp, Math.max(1, Runtime.getRuntime().availableProcessors() - 2));
final WorkerExecutor computeWorkerPool = vertx.createSharedWorkerExecutor("compute", computePoolSize);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Maybe rename the compute worker pool with a meaningful name (eg: "cpu intensive compute")

// Create shared compute pool for CPU-intensive operations
final int computePoolSize = config.getInteger(Const.Config.ComputePoolThreadCountProp, Math.max(1, Runtime.getRuntime().availableProcessors() - 2));
final WorkerExecutor computeWorkerPool = vertx.createSharedWorkerExecutor("compute", computePoolSize);
LOGGER.info("Created compute worker pool with size: {}", computePoolSize);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: maybe log the name of the worker pool as we might add more worker pools in the future for different usage


Supplier<Verticle> operatorVerticleSupplier = () -> {
UIDOperatorVerticle verticle = new UIDOperatorVerticle(configStore, config, this.clientSideTokenGenerate, siteProvider, clientKeyProvider, clientSideKeypairProvider, getKeyManager(), saltProvider, optOutStore, Clock.systemUTC(), _statsCollectorQueue, new SecureLinkValidatorService(this.serviceLinkProvider, this.serviceProvider), this.shutdownHandler::handleSaltRetrievalResponse, this.uidInstanceIdProvider);
UIDOperatorVerticle verticle = new UIDOperatorVerticle(configStore, config, this.clientSideTokenGenerate, siteProvider, clientKeyProvider, clientSideKeypairProvider, getKeyManager(), saltProvider, optOutStore, Clock.systemUTC(), _statsCollectorQueue, new SecureLinkValidatorService(this.serviceLinkProvider, this.serviceProvider), this.shutdownHandler::handleSaltRetrievalResponse, this.uidInstanceIdProvider, computeWorkerPool);
Copy link
Contributor

@cYKatherine cYKatherine Feb 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question (maybe also for @sunnywu @mcollins-ttd @vishalegbert-ttd ), do we prefer using dependency injection for the WorkerExecutor here, or should we simply pass the pool name?

Since vertx.createSharedWorkerExecutor("name") acts as a "get or create," we can easily fetch the shared pool internally without passing the object.

  • Pros for DI: It makes mocking easier for unit tests.
  • Cons: It might become annoying if we introduce multiple distinct pools in the future (e.g., separating the 'identity map' pool from other heavy compute endpoints), as we'd have to update constructor signatures for every new pool.

wdyt?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants