diff --git a/.evergreen/.evg.yml b/.evergreen/.evg.yml
index 525861928f3..3524dca8002 100644
--- a/.evergreen/.evg.yml
+++ b/.evergreen/.evg.yml
@@ -2430,18 +2430,27 @@ buildvariants:
- name: "socket-test-task"
- matrix_name: "tests-netty"
- matrix_spec: { auth: "noauth", ssl: "*", jdk: "jdk8", version: [ "7.0" ], topology: "replicaset", os: "linux",
+ matrix_spec: { auth: "noauth", ssl: "*", jdk: "jdk8", version: [ "8.0" ], topology: "replicaset", os: "linux",
async-transport: "netty" }
display_name: "Netty: ${version} ${topology} ${ssl} ${auth} ${jdk} ${os} "
tags: [ "tests-netty-variant" ]
+ tasks:
+ - name: "test-core-task"
+ - name: "test-reactive-task"
+
+ - matrix_name: "tests-netty-compression"
+ matrix_spec: { compressor: "snappy", auth: "noauth", ssl: "nossl", jdk: "jdk21", version: [ "8.0" ], topology: "replicaset",
+ os: "linux", async-transport: "netty" }
+ display_name: "Netty with Compression '${compressor}': ${version} ${topology} ${ssl} ${auth} ${jdk} ${os} "
+ tags: [ "tests-netty-variant" ]
tasks:
- name: "test-reactive-task"
- name: "test-core-task"
- matrix_name: "tests-netty-ssl-provider"
- matrix_spec: { auth: "auth", ssl: "ssl", jdk: "jdk8", version: [ "7.0" ], topology: "replicaset", os: "linux",
+ matrix_spec: { auth: "auth", ssl: "ssl", jdk: "jdk8", version: [ "8.0" ], topology: "replicaset", os: "linux",
async-transport: "netty", netty-ssl-provider: "*" }
- display_name: "Netty SSL provider: ${version} ${topology} ${ssl} SslProvider.${netty-ssl-provider} ${auth} ${jdk} ${os} "
+ display_name: "Netty with SSL provider '${netty-ssl-provider}': ${version} ${topology} ${ssl} ${auth} ${jdk} ${os} "
tags: [ "tests-netty-variant" ]
tasks:
- name: "test-reactive-task"
diff --git a/.evergreen/run-tests.sh b/.evergreen/run-tests.sh
index 10bd5bc107d..1ffe7a229a5 100755
--- a/.evergreen/run-tests.sh
+++ b/.evergreen/run-tests.sh
@@ -67,14 +67,17 @@ provision_ssl () {
}
provision_multi_mongos_uri_for_ssl () {
- # Arguments for auth + SSL
- if [ "$AUTH" != "noauth" ] || [ "$TOPOLOGY" == "replica_set" ]; then
- export MONGODB_URI="${MONGODB_URI}&ssl=true&sslInvalidHostNameAllowed=true"
- export MULTI_MONGOS_URI="${MULTI_MONGOS_URI}&ssl=true&sslInvalidHostNameAllowed=true"
- else
- export MONGODB_URI="${MONGODB_URI}/?ssl=true&sslInvalidHostNameAllowed=true"
- export MULTI_MONGOS_URI="${MULTI_MONGOS_URI}/?ssl=true&sslInvalidHostNameAllowed=true"
- fi
+ if [[ "$MONGODB_URI" == *"?"* ]]; then
+ export MONGODB_URI="${MONGODB_URI}&ssl=true&sslInvalidHostNameAllowed=true"
+ else
+ export MONGODB_URI="${MONGODB_URI}/?ssl=true&sslInvalidHostNameAllowed=true"
+ fi
+
+ if [[ "$MULTI_MONGOS_URI" == *"?"* ]]; then
+ export MULTI_MONGOS_URI="${MULTI_MONGOS_URI}&ssl=true&sslInvalidHostNameAllowed=true"
+ else
+ export MULTI_MONGOS_URI="${MULTI_MONGOS_URI}/?ssl=true&sslInvalidHostNameAllowed=true"
+ fi
}
############################################
@@ -136,6 +139,8 @@ echo "Running tests with Java ${JAVA_VERSION}"
--stacktrace --info --continue ${TESTS} | tee -a logs.txt
if grep -q 'LEAK:' logs.txt ; then
- echo "Netty Leak detected, please inspect build log"
+ echo "==============================================="
+ echo " Netty Leak detected, please inspect build log "
+ echo "==============================================="
exit 1
fi
diff --git a/driver-core/src/main/com/mongodb/internal/connection/netty/NettyStream.java b/driver-core/src/main/com/mongodb/internal/connection/netty/NettyStream.java
index 76e10653454..ccd2cda298b 100644
--- a/driver-core/src/main/com/mongodb/internal/connection/netty/NettyStream.java
+++ b/driver-core/src/main/com/mongodb/internal/connection/netty/NettyStream.java
@@ -29,6 +29,8 @@
import com.mongodb.connection.SslSettings;
import com.mongodb.internal.connection.OperationContext;
import com.mongodb.internal.connection.Stream;
+import com.mongodb.internal.diagnostics.logging.Logger;
+import com.mongodb.internal.diagnostics.logging.Loggers;
import com.mongodb.lang.Nullable;
import com.mongodb.spi.dns.InetAddressResolver;
import io.netty.bootstrap.Bootstrap;
@@ -86,6 +88,24 @@
* {@linkplain #readAsync(int, OperationContext, AsyncCompletionHandler) asynchronous})
* are not supported by {@link NettyStream}.
* However, this class does not have a fail-fast mechanism checking for such situations.
+ *
+ *
ByteBuf Ownership and Reference Counting
+ * This class manages Netty {@link io.netty.buffer.ByteBuf} instances which use reference counting for memory management.
+ * The following ownership rules apply:
+ *
+ * - Inbound buffers from Netty: When Netty delivers a buffer via {@link InboundBufferHandler#channelRead0},
+ * the buffer is initially owned by Netty. To keep the buffer, we actively call {@link io.netty.buffer.ByteBuf#retain()}
+ * to increment its reference count before adding it to {@link #pendingInboundBuffers}.
+ * - {@link #pendingInboundBuffers}: All buffers in this queue have been retained and are owned by this class.
+ * They must be released when consumed or when the stream is closed.
+ * - Read completion: When a read operation completes successfully, ownership of the returned {@link ByteBuf}
+ * transfers to the caller (via {@link AsyncCompletionHandler#completed}). The caller is responsible for releasing it.
+ * - Handler exceptions: If {@link AsyncCompletionHandler#completed} throws an exception, ownership was not
+ * transferred, and this class releases the buffer.
+ * - Stream closure: When {@link #close()} is called, all buffers in {@link #pendingInboundBuffers} are released.
+ * Callers must ensure any previously returned buffers are released before calling close.
+ *
+ *
*
* 1We cannot simply say that read methods are not allowed be run concurrently because strictly speaking they are allowed,
* as explained below.
@@ -113,6 +133,7 @@
* is invoked after the first operation has completed reading despite the method has not returned yet.
*/
final class NettyStream implements Stream {
+ private static final Logger LOGGER = Loggers.getLogger("connection");
private static final byte NO_SCHEDULE_TIME = 0;
private final ServerAddress address;
private final InetAddressResolver inetAddressResolver;
@@ -127,6 +148,19 @@ final class NettyStream implements Stream {
private boolean isClosed;
private volatile Channel channel;
+ /**
+ * Queue of inbound buffers received from Netty that have not yet been consumed by read operations.
+ *
+ * Ownership: All buffers in this queue have been {@link io.netty.buffer.ByteBuf#retain() retained}
+ * and are owned by this class. When a buffer is removed from this queue, the remover takes ownership and
+ * is responsible for either:
+ *
+ * - Transferring ownership to a caller (e.g., via {@link AsyncCompletionHandler#completed}), or
+ * - Releasing the buffer via {@link io.netty.buffer.ByteBuf#release()}
+ *
+ *
+ * When the stream is {@link #close() closed}, all remaining buffers are released.
+ */
private final LinkedList pendingInboundBuffers = new LinkedList<>();
private final Lock lock = new ReentrantLock();
// access to the fields `pendingReader`, `pendingException` is guarded by `lock`
@@ -227,6 +261,7 @@ public void initChannel(final SocketChannel ch) {
@Override
public void write(final List buffers, final OperationContext operationContext) throws IOException {
+ validateConnectionState();
FutureAsyncCompletionHandler future = new FutureAsyncCompletionHandler<>();
writeAsync(buffers, operationContext, future);
future.get();
@@ -242,29 +277,49 @@ public ByteBuf read(final int numBytes, final OperationContext operationContext)
@Override
public void writeAsync(final List buffers, final OperationContext operationContext,
final AsyncCompletionHandler handler) {
+ // Early validation before allocating resources
+ if (isClosed) {
+ handler.failed(new MongoSocketException("Stream is closed", address));
+ return;
+ }
+ Channel localChannel = channel;
+ if (localChannel == null || !localChannel.isActive()) {
+ handler.failed(new MongoSocketException("Channel is not active", address));
+ return;
+ }
+
CompositeByteBuf composite = PooledByteBufAllocator.DEFAULT.compositeBuffer();
- for (ByteBuf cur : buffers) {
- // The Netty framework releases `CompositeByteBuf` after writing
- // (see https://netty.io/wiki/reference-counted-objects.html#outbound-messages),
- // which results in the buffer we pass to `CompositeByteBuf.addComponent` being released.
- // However, `CompositeByteBuf.addComponent` does not retain this buffer,
- // which means we must retain it to conform to the `Stream.writeAsync` contract.
- composite.addComponent(true, ((NettyByteBuf) cur).asByteBuf().retain());
- }
-
- long writeTimeoutMS = operationContext.getTimeoutContext().getWriteTimeoutMS();
- final Optional writeTimeoutHandler = addWriteTimeoutHandler(writeTimeoutMS);
- channel.writeAndFlush(composite).addListener((ChannelFutureListener) future -> {
- writeTimeoutHandler.map(w -> channel.pipeline().remove(w));
- if (!future.isSuccess()) {
- handler.failed(future.cause());
- } else {
- handler.completed(null);
+
+ try {
+ for (ByteBuf cur : buffers) {
+ // The Netty framework releases `CompositeByteBuf` after writing
+ // (see https://netty.io/wiki/reference-counted-objects.html#outbound-messages),
+ // which results in the buffer we pass to `CompositeByteBuf.addComponent` being released.
+ // However, `CompositeByteBuf.addComponent` does not retain this buffer,
+ // which means we must retain it to conform to the `Stream.writeAsync` contract.
+ composite.addComponent(true, ((NettyByteBuf) cur).asByteBuf().retain());
}
- });
+
+ long writeTimeoutMS = operationContext.getTimeoutContext().getWriteTimeoutMS();
+ final Optional writeTimeoutHandler = addWriteTimeoutHandler(localChannel, writeTimeoutMS);
+
+ localChannel.writeAndFlush(composite).addListener((ChannelFutureListener) future -> {
+ writeTimeoutHandler.map(w -> localChannel.pipeline().remove(w));
+
+ if (!future.isSuccess()) {
+ handler.failed(future.cause());
+ } else {
+ handler.completed(null);
+ }
+ });
+ } catch (Throwable t) {
+ // If we fail before submitting the write, release the composite
+ composite.release();
+ handler.failed(t);
+ }
}
- private Optional addWriteTimeoutHandler(final long writeTimeoutMS) {
+ private Optional addWriteTimeoutHandler(final Channel channel, final long writeTimeoutMS) {
if (writeTimeoutMS != NO_SCHEDULE_TIME) {
WriteTimeoutHandler writeTimeoutHandler = new WriteTimeoutHandler(writeTimeoutMS, MILLISECONDS);
channel.pipeline().addBefore("ChannelInboundHandlerAdapter", "WriteTimeoutHandler", writeTimeoutHandler);
@@ -285,52 +340,76 @@ public void readAsync(final int numBytes, final OperationContext operationContex
* Timeouts may be scheduled only by the public read methods. Taking into account that concurrent pending
* readers are not allowed, there must not be a situation when threads attempt to schedule a timeout
* before the previous one is either cancelled or completed.
+ *
+ * Buffer Ownership
+ * When this method completes a read operation successfully:
+ *
+ * - Buffers are removed from {@link #pendingInboundBuffers} and assembled into a composite buffer
+ * - Ownership of the composite buffer is transferred to the handler via {@link AsyncCompletionHandler#completed}
+ * - If the handler throws an exception, the buffer is released by {@link #invokeHandlerWithBuffer}
+ *
+ * If an exception occurs during buffer assembly, the partially-built composite is released before propagating the exception.
*/
private void readAsync(final int numBytes, final AsyncCompletionHandler handler, final long readTimeoutMillis) {
ByteBuf buffer = null;
- Throwable exceptionResult = null;
+ Throwable exceptionResult;
lock.lock();
try {
- exceptionResult = pendingException;
- if (exceptionResult == null) {
- if (!hasBytesAvailable(numBytes)) {
+
+ if (pendingException == null) {
+ if (isClosed) {
+ pendingException = new MongoSocketException("Stream was closed", address);
+ // Release any pending buffers that were retained before stream was closed
+ releaseAllPendingInboundBuffers();
+ } else if (channel == null || !channel.isActive()) {
+ pendingException = new MongoSocketException("Channel is not active", address);
+ // Release any pending buffers that were retained before channel became inactive
+ releaseAllPendingInboundBuffers();
+ } else if (!hasBytesAvailable(numBytes)) {
if (pendingReader == null) {//called by a public read method
pendingReader = new PendingReader(numBytes, handler, scheduleReadTimeout(readTimeoutTask, readTimeoutMillis));
}
} else {
CompositeByteBuf composite = allocator.compositeBuffer(pendingInboundBuffers.size());
- int bytesNeeded = numBytes;
- for (Iterator iter = pendingInboundBuffers.iterator(); iter.hasNext();) {
- io.netty.buffer.ByteBuf next = iter.next();
- int bytesNeededFromCurrentBuffer = Math.min(next.readableBytes(), bytesNeeded);
- if (bytesNeededFromCurrentBuffer == next.readableBytes()) {
- composite.addComponent(next);
- iter.remove();
- } else {
- composite.addComponent(next.readRetainedSlice(bytesNeededFromCurrentBuffer));
- }
- composite.writerIndex(composite.writerIndex() + bytesNeededFromCurrentBuffer);
- bytesNeeded -= bytesNeededFromCurrentBuffer;
- if (bytesNeeded == 0) {
- break;
+ try {
+ int bytesNeeded = numBytes;
+ for (Iterator iter = pendingInboundBuffers.iterator(); iter.hasNext();) {
+ io.netty.buffer.ByteBuf next = iter.next();
+ int bytesNeededFromCurrentBuffer = Math.min(next.readableBytes(), bytesNeeded);
+ if (bytesNeededFromCurrentBuffer == next.readableBytes()) {
+ iter.remove(); // Remove BEFORE adding to composite
+ composite.addComponent(true, next);
+ } else {
+ composite.addComponent(true, next.readRetainedSlice(bytesNeededFromCurrentBuffer));
+ }
+
+ bytesNeeded -= bytesNeededFromCurrentBuffer;
+ if (bytesNeeded == 0) {
+ break;
+ }
}
+ buffer = new NettyByteBuf(composite).flip();
+ } catch (Throwable t) {
+ composite.release();
+ pendingException = t;
}
- buffer = new NettyByteBuf(composite).flip();
}
}
- if (!(exceptionResult == null && buffer == null)//the read operation has completed
- && pendingReader != null) {//we need to clear the pending reader
+
+ exceptionResult = pendingException;
+ if (!(exceptionResult == null && buffer == null) //the read operation has completed
+ && pendingReader != null) { //we need to clear the pending reader
cancel(pendingReader.timeout);
this.pendingReader = null;
}
} finally {
lock.unlock();
}
+
if (exceptionResult != null) {
handler.failed(exceptionResult);
- }
- if (buffer != null) {
- handler.completed(buffer);
+ } else if (buffer != null) {
+ invokeHandlerWithBuffer(buffer, handler);
}
}
@@ -350,6 +429,10 @@ private void handleReadResponse(@Nullable final io.netty.buffer.ByteBuf buffer,
if (buffer != null) {
if (isClosed) {
pendingException = new MongoSocketException("Received data after the stream was closed.", address);
+ // Do not retain the buffer since we're not storing it - let it be released by the caller
+ } else if (channel == null || !channel.isActive()) {
+ pendingException = new MongoSocketException("Channel is not active during read", address);
+ // Do not retain the buffer - channel is unusable
} else {
pendingInboundBuffers.add(buffer.retain());
}
@@ -370,22 +453,94 @@ public ServerAddress getAddress() {
return address;
}
+ /**
+ * Closes this stream and releases all resources.
+ *
+ * Buffer Cleanup
+ * All buffers remaining in {@link #pendingInboundBuffers} are released synchronously while holding
+ * the lock to prevent race conditions. Buffers are forcefully released (all reference counts dropped)
+ * to prevent silent leaks.
+ *
+ * Write Buffer Handling
+ * Write buffers use retainedSlice() which creates independent reference counts for Netty and the caller.
+ * This eliminates the need for explicit tracking - each party manages its own buffer lifecycle independently.
+ *
+ * Channel Cleanup
+ * If a channel exists, it is closed asynchronously. The async listener performs defensive cleanup
+ * to ensure any buffers added during the close process are also released. This provides defense-in-depth
+ * against race conditions.
+ *
+ * Important: Callers must ensure that any {@link ByteBuf} instances previously returned by
+ * {@link #read} or {@link #readAsync} have been released before calling this method. This class does not
+ * track buffers after ownership has been transferred to callers.
+ */
@Override
public void close() {
- withLock(lock, () -> {
- isClosed = true;
- if (channel != null) {
- channel.close();
+ Channel channelToClose = withLock(lock, () -> {
+ if (!isClosed) {
+ isClosed = true;
+
+ // Clean up all pending inbound buffers synchronously while holding the lock
+ // This prevents race conditions where buffers might be added during close
+ releaseAllPendingInboundBuffers();
+
+ // Save channel reference for async close, then null it out
+ Channel localChannel = channel;
channel = null;
+ return localChannel;
}
- for (Iterator iterator = pendingInboundBuffers.iterator(); iterator.hasNext();) {
- io.netty.buffer.ByteBuf nextByteBuf = iterator.next();
- iterator.remove();
- // Drops all retains to prevent silent leaks; assumes callers have already released
- // ByteBuffers returned by that NettyStream before calling close.
- nextByteBuf.release(nextByteBuf.refCnt());
- }
+ return null;
});
+
+
+ // Close the channel outside the lock to avoid potential deadlocks
+ if (channelToClose != null) {
+ channelToClose.close().addListener((ChannelFutureListener) future -> {
+ // Defensive cleanup: release any buffers that might have been added during close
+ // This is safe because isClosed=true prevents handleReadResponse from retaining new buffers
+ withLock(lock, () -> {
+ try {
+ releaseAllPendingInboundBuffers();
+ } catch (Throwable t) {
+ // Log but don't propagate - we're in an async callback
+ LOGGER.warn("Exception while releasing buffers in channel close listener", t);
+ }
+ });
+ });
+ }
+ }
+
+ /**
+ * Releases all buffers in {@link #pendingInboundBuffers} and clears the list.
+ * This method must be called while holding {@link #lock}.
+ *
+ * Each buffer is forcefully released by dropping all its reference counts (not just one)
+ * to prevent silent leaks in case of reference counting errors elsewhere.
+ *
+ * This method is idempotent - it can be safely called multiple times.
+ */
+ private void releaseAllPendingInboundBuffers() {
+ int releasedCount = 0;
+ int errorCount = 0;
+
+ for (io.netty.buffer.ByteBuf buffer : pendingInboundBuffers) {
+ try {
+ int refCnt = buffer.refCnt();
+ if (refCnt > 0) {
+ buffer.release(refCnt);
+ releasedCount++;
+ }
+ } catch (Throwable t) {
+ errorCount++;
+ // Log but continue releasing other buffers - we want to release all buffers even if one fails
+ LOGGER.warn("Exception while releasing buffer with refCount " + buffer.refCnt(), t);
+ }
+ }
+ pendingInboundBuffers.clear();
+
+ if (LOGGER.isDebugEnabled() && (releasedCount > 0 || errorCount > 0)) {
+ LOGGER.debug(String.format("Released %d buffers for %s (%d errors)", releasedCount, address, errorCount));
+ }
}
@Override
@@ -413,6 +568,52 @@ public ByteBufAllocator getAllocator() {
return allocator;
}
+ /**
+ * Validates that the stream is open and the channel is active.
+ *
+ * @throws MongoSocketException if the stream is closed or the channel is not active
+ */
+ private void validateConnectionState() throws MongoSocketException {
+ if (isClosed) {
+ throw new MongoSocketException("Stream is closed", address);
+ }
+ Channel localChannel = channel;
+ if (localChannel == null || !localChannel.isActive()) {
+ throw new MongoSocketException("Channel is not active", address);
+ }
+ }
+
+ /**
+ * Invokes the handler with the buffer, ensuring the buffer is released if the handler throws an exception.
+ *
+ * Ownership Transfer Protocol
+ * This method implements a safe ownership transfer:
+ *
+ * - The buffer is passed to {@link AsyncCompletionHandler#completed}
+ * - If the handler returns normally, ownership has been successfully transferred to the handler/caller
+ * - If the handler throws an exception, ownership was NOT transferred, so this method releases the buffer
+ * before re-throwing the exception
+ *
+ *
+ * This ensures that buffers are never leaked, regardless of whether the handler succeeds or fails.
+ *
+ * @param buffer The buffer to pass to the handler. Must not be null. Ownership is transferred to the handler
+ * on successful completion.
+ * @param handler The handler to invoke with the buffer.
+ * @throws RuntimeException if the handler throws an exception (after releasing the buffer)
+ */
+ private void invokeHandlerWithBuffer(final ByteBuf buffer, final AsyncCompletionHandler handler) {
+ try {
+ handler.completed(buffer);
+ } catch (Throwable t) {
+ // Handler threw an exception, so it didn't take ownership - release the buffer
+ if (buffer.getReferenceCount() > 0) {
+ buffer.release();
+ }
+ throw t;
+ }
+ }
+
private void addSslHandler(final SocketChannel channel) {
SSLEngine engine;
if (sslContext == null) {
@@ -525,13 +726,22 @@ private class OpenChannelFutureListener implements ChannelFutureListener {
public void operationComplete(final ChannelFuture future) {
withLock(lock, () -> {
if (future.isSuccess()) {
+ Channel newChannel = channelFuture.channel();
if (isClosed) {
- channelFuture.channel().close();
+ // Monitor closed during connection - clean up immediately
+ if (newChannel != null) {
+ newChannel.close();
+ }
+ handler.completed(null);
+ } else if (newChannel == null || !newChannel.isActive()) {
+ // Channel invalid - treat as failure
+ handler.failed(new MongoSocketException("Channel is not active after connection", address));
} else {
- channel = channelFuture.channel();
- channel.closeFuture().addListener((ChannelFutureListener) future1 -> handleReadResponse(null, new IOException("The connection to the server was closed")));
+ channel = newChannel;
+ channel.closeFuture().addListener((ChannelFutureListener) future1 ->
+ handleReadResponse(null, new IOException("The connection to the server was closed")));
+ handler.completed(null);
}
- handler.completed(null);
} else {
if (isClosed) {
handler.completed(null);
diff --git a/driver-core/src/test/functional/com/mongodb/connection/netty/NettyStreamSpecification.groovy b/driver-core/src/test/functional/com/mongodb/connection/netty/NettyStreamSpecification.groovy
deleted file mode 100644
index e582e0fc398..00000000000
--- a/driver-core/src/test/functional/com/mongodb/connection/netty/NettyStreamSpecification.groovy
+++ /dev/null
@@ -1,125 +0,0 @@
-package com.mongodb.connection.netty
-
-import com.mongodb.MongoSocketException
-import com.mongodb.MongoSocketOpenException
-import com.mongodb.ServerAddress
-import com.mongodb.connection.AsyncCompletionHandler
-import com.mongodb.connection.SocketSettings
-import com.mongodb.connection.SslSettings
-import com.mongodb.internal.connection.netty.NettyStreamFactory
-import com.mongodb.spi.dns.InetAddressResolver
-import io.netty.buffer.PooledByteBufAllocator
-import io.netty.channel.nio.NioEventLoopGroup
-import io.netty.channel.socket.nio.NioSocketChannel
-import spock.lang.IgnoreIf
-import spock.lang.Specification
-import com.mongodb.spock.Slow
-
-import java.util.concurrent.CountDownLatch
-import java.util.concurrent.TimeUnit
-
-import static com.mongodb.ClusterFixture.OPERATION_CONTEXT
-import static com.mongodb.ClusterFixture.getSslSettings
-
-class NettyStreamSpecification extends Specification {
-
- @Slow
- @IgnoreIf({ getSslSettings().isEnabled() })
- def 'should successfully connect with working ip address group'() {
- given:
- SocketSettings socketSettings = SocketSettings.builder().connectTimeout(1000, TimeUnit.MILLISECONDS).build()
- SslSettings sslSettings = SslSettings.builder().build()
- def inetAddressResolver = new InetAddressResolver() {
- @Override
- List lookupByName(String host) {
- [InetAddress.getByName('192.168.255.255'),
- InetAddress.getByName('1.2.3.4'),
- InetAddress.getByName('127.0.0.1')]
- }
- }
- def factory = new NettyStreamFactory(inetAddressResolver, socketSettings, sslSettings, new NioEventLoopGroup(),
- NioSocketChannel, PooledByteBufAllocator.DEFAULT, null)
-
- def stream = factory.create(new ServerAddress())
-
- when:
- stream.open(OPERATION_CONTEXT)
-
- then:
- !stream.isClosed()
- }
-
- @Slow
- @IgnoreIf({ getSslSettings().isEnabled() })
- def 'should throw exception with non-working ip address group'() {
- given:
- SocketSettings socketSettings = SocketSettings.builder().connectTimeout(1000, TimeUnit.MILLISECONDS).build()
- SslSettings sslSettings = SslSettings.builder().build()
- def inetAddressResolver = new InetAddressResolver() {
- @Override
- List lookupByName(String host) {
- [InetAddress.getByName('192.168.255.255'),
- InetAddress.getByName('1.2.3.4'),
- InetAddress.getByName('1.2.3.5')]
- }
- }
- def factory = new NettyStreamFactory(inetAddressResolver, socketSettings, sslSettings, new NioEventLoopGroup(),
- NioSocketChannel, PooledByteBufAllocator.DEFAULT, null)
-
- def stream = factory.create(new ServerAddress())
-
- when:
- stream.open(OPERATION_CONTEXT)
-
- then:
- thrown(MongoSocketOpenException)
- }
-
- @IgnoreIf({ getSslSettings().isEnabled() })
- def 'should fail AsyncCompletionHandler if name resolution fails'() {
- given:
- def serverAddress = Stub(ServerAddress)
- def exception = new MongoSocketException('Temporary failure in name resolution', serverAddress)
- serverAddress.getSocketAddresses() >> { throw exception }
-
- SocketSettings socketSettings = SocketSettings.builder().connectTimeout(1000, TimeUnit.MILLISECONDS).build()
- SslSettings sslSettings = SslSettings.builder().build()
- def inetAddressResolver = new InetAddressResolver() {
- @Override
- List lookupByName(String host) {
- throw exception
- }
- }
- def stream = new NettyStreamFactory(inetAddressResolver, socketSettings, sslSettings, new NioEventLoopGroup(),
- NioSocketChannel, PooledByteBufAllocator.DEFAULT, null)
- .create(new ServerAddress())
- def callback = new CallbackErrorHolder()
-
- when:
- stream.openAsync(OPERATION_CONTEXT, callback)
-
- then:
- callback.getError().is(exception)
- }
-
- class CallbackErrorHolder implements AsyncCompletionHandler {
- CountDownLatch latch = new CountDownLatch(1)
- Throwable throwable = null
-
- Throwable getError() {
- latch.countDown()
- throwable
- }
-
- @Override
- void completed(Void r) {
- latch.await()
- }
-
- @Override
- void failed(Throwable t) {
- throwable = t
- latch.countDown()
- }
- }
-}
diff --git a/driver-core/src/test/functional/com/mongodb/connection/netty/NettyStreamTest.java b/driver-core/src/test/functional/com/mongodb/connection/netty/NettyStreamTest.java
new file mode 100644
index 00000000000..a72a8228066
--- /dev/null
+++ b/driver-core/src/test/functional/com/mongodb/connection/netty/NettyStreamTest.java
@@ -0,0 +1,1110 @@
+/*
+ * Copyright 2008-present MongoDB, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.mongodb.connection.netty;
+
+import com.mongodb.MongoSocketException;
+import com.mongodb.MongoSocketOpenException;
+import com.mongodb.ServerAddress;
+import com.mongodb.connection.AsyncCompletionHandler;
+import com.mongodb.connection.SocketSettings;
+import com.mongodb.connection.SslSettings;
+import com.mongodb.internal.ResourceUtil;
+import com.mongodb.internal.connection.Stream;
+import com.mongodb.internal.connection.netty.NettyByteBuf;
+import com.mongodb.internal.connection.netty.NettyStreamFactory;
+import com.mongodb.spi.dns.InetAddressResolver;
+import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.util.ReferenceCounted;
+import org.bson.ByteBuf;
+import org.jetbrains.annotations.NotNull;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.condition.DisabledIf;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static com.mongodb.ClusterFixture.OPERATION_CONTEXT;
+import static com.mongodb.ClusterFixture.getSslSettings;
+import static java.util.Arrays.asList;
+import static java.util.Collections.emptyList;
+import static java.util.Collections.singletonList;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+
+@SuppressWarnings("deprecation")
+@DisplayName("NettyStream - Connection, Validation & Leak Prevention Tests")
+class NettyStreamTest {
+
+ private NioEventLoopGroup eventLoopGroup;
+ private NettyStreamFactory factory;
+ private Stream stream;
+ private TrackingNettyByteBufAllocator trackingAllocator;
+
+ @BeforeEach
+ void setUp() {
+ eventLoopGroup = new NioEventLoopGroup();
+ trackingAllocator = new TrackingNettyByteBufAllocator();
+ }
+
+ @AfterEach
+ void tearDown() {
+ if (stream != null && !stream.isClosed()) {
+ stream.close();
+ }
+ if (eventLoopGroup != null) {
+ eventLoopGroup.shutdownGracefully(0, 0, TimeUnit.MILLISECONDS);
+ }
+ System.gc();
+ }
+
+ private static boolean isSslEnabled() {
+ return getSslSettings().isEnabled();
+ }
+
+ // ========== Original Tests Converted from Spock ==========
+
+ @Test
+ @Tag("Slow")
+ @DisabledIf("isSslEnabled")
+ @DisplayName("Should successfully connect when at least one IP address in the group is reachable")
+ void shouldSuccessfullyConnectWithWorkingIpAddressGroup() throws IOException {
+ // given
+ SocketSettings socketSettings = SocketSettings.builder()
+ .connectTimeout(1000, TimeUnit.MILLISECONDS)
+ .build();
+ SslSettings sslSettings = SslSettings.builder().build();
+
+ InetAddressResolver inetAddressResolver = host -> {
+ try {
+ return asList(
+ InetAddress.getByName("192.168.255.255"),
+ InetAddress.getByName("1.2.3.4"),
+ InetAddress.getByName("127.0.0.1")
+ );
+ } catch (UnknownHostException e) {
+ throw new RuntimeException(e);
+ }
+ };
+
+ factory = new NettyStreamFactory(inetAddressResolver, socketSettings, sslSettings,
+ eventLoopGroup, NioSocketChannel.class, trackingAllocator, null);
+ stream = factory.create(new ServerAddress());
+
+ // when
+ stream.open(OPERATION_CONTEXT);
+
+ // then
+ assertFalse(stream.isClosed());
+
+ // Verify no buffer leaks
+ trackingAllocator.assertAllBuffersReleased();
+ }
+
+ @Test
+ @Tag("Slow")
+ @DisabledIf("isSslEnabled")
+ @DisplayName("Should throw MongoSocketOpenException when all IP addresses in the group are unreachable")
+ void shouldThrowExceptionWithNonWorkingIpAddressGroup() {
+ // given
+ SocketSettings socketSettings = SocketSettings.builder()
+ .connectTimeout(1000, TimeUnit.MILLISECONDS)
+ .build();
+ SslSettings sslSettings = SslSettings.builder().build();
+
+ InetAddressResolver inetAddressResolver = host -> {
+ try {
+ return asList(
+ InetAddress.getByName("192.168.255.255"),
+ InetAddress.getByName("1.2.3.4"),
+ InetAddress.getByName("1.2.3.5")
+ );
+ } catch (UnknownHostException e) {
+ throw new RuntimeException(e);
+ }
+ };
+
+ factory = new NettyStreamFactory(inetAddressResolver, socketSettings, sslSettings,
+ eventLoopGroup, NioSocketChannel.class, trackingAllocator, null);
+ stream = factory.create(new ServerAddress());
+
+ // when/then
+ assertThrows(MongoSocketOpenException.class, () -> stream.open(OPERATION_CONTEXT));
+
+ // Verify no buffer leaks
+ trackingAllocator.assertAllBuffersReleased();
+ }
+
+ @Test
+ @DisabledIf("isSslEnabled")
+ @DisplayName("Should fail AsyncCompletionHandler when DNS name resolution fails")
+ void shouldFailAsyncCompletionHandlerIfNameResolutionFails() throws InterruptedException {
+ // given
+ ServerAddress serverAddress = new ServerAddress("nonexistent.invalid.hostname.test");
+ MongoSocketException exception = new MongoSocketException("Temporary failure in name resolution", serverAddress);
+
+ InetAddressResolver inetAddressResolver = host -> {
+ throw exception;
+ };
+
+ SocketSettings socketSettings = SocketSettings.builder()
+ .connectTimeout(1000, TimeUnit.MILLISECONDS)
+ .build();
+ SslSettings sslSettings = SslSettings.builder().build();
+
+ factory = new NettyStreamFactory(inetAddressResolver, socketSettings, sslSettings,
+ eventLoopGroup, NioSocketChannel.class, trackingAllocator, null);
+ stream = factory.create(serverAddress);
+
+ CallbackErrorHolder callback = new CallbackErrorHolder<>();
+
+ // when
+ stream.openAsync(OPERATION_CONTEXT, callback);
+
+ // then
+ Throwable error = callback.getError();
+ assertSame(exception, error);
+
+ // Verify no buffer leaks
+ trackingAllocator.assertAllBuffersReleased(); }
+
+ // ========== New Tests for Defensive Improvements ==========
+
+ @Test
+ @DisabledIf("isSslEnabled")
+ @DisplayName("Should throw MongoSocketException when attempting to write to a closed stream")
+ void shouldThrowExceptionWhenWritingToClosedStream() throws IOException {
+ // given - open and then close stream
+ factory = createDefaultFactory();
+ stream = factory.create(new ServerAddress());
+ stream.open(OPERATION_CONTEXT);
+ stream.close();
+
+ // when/then - write should fail with MongoSocketException
+ List buffers = createTestBuffers("test");
+
+ MongoSocketException exception = assertThrows(MongoSocketException.class,
+ () -> stream.write(buffers, OPERATION_CONTEXT));
+
+ // stream.write doesn't release the passed in buffers
+ ResourceUtil.release(buffers);
+
+ assertTrue(exception.getMessage().contains("Stream is closed"));
+
+ // Verify no buffer leaks
+ trackingAllocator.assertAllBuffersReleased();
+ }
+
+ @Test
+ @DisabledIf("isSslEnabled")
+ @DisplayName("Should fail async write operation with clear error when stream is closed")
+ void shouldFailAsyncWriteWhenStreamIsClosed() throws IOException, InterruptedException {
+ // given - open and then close stream
+ factory = createDefaultFactory();
+ stream = factory.create(new ServerAddress());
+ stream.open(OPERATION_CONTEXT);
+ stream.close();
+
+ // when - attempt async write
+ List buffers = createTestBuffers("test");
+
+ CallbackErrorHolder callback = new CallbackErrorHolder<>();
+ stream.writeAsync(buffers, OPERATION_CONTEXT, callback);
+
+ // then
+ Throwable error = callback.getError();
+ assertNotNull(error);
+ assertInstanceOf(MongoSocketException.class, error);
+ assertTrue(error.getMessage().contains("Stream is closed"));
+
+ // stream.writeAsync doesn't release the passed in buffers
+ ResourceUtil.release(buffers);
+
+ // Verify no buffer leaks
+ trackingAllocator.assertAllBuffersReleased();
+ }
+
+ @Test
+ @DisabledIf("isSslEnabled")
+ @DisplayName("Should throw exception when attempting to read from a closed stream")
+ void shouldThrowExceptionWhenReadingFromClosedStream() throws IOException {
+ // given - open and then close stream
+ factory = createDefaultFactory();
+ stream = factory.create(new ServerAddress());
+ stream.open(OPERATION_CONTEXT);
+ stream.close();
+
+ // when/then - read should fail with IOException or MongoSocketException
+ Exception exception = assertThrows(Exception.class,
+ () -> stream.read(1024, OPERATION_CONTEXT));
+
+ assertTrue(exception instanceof IOException || exception instanceof MongoSocketException,
+ "Expected IOException or MongoSocketException but got: " + exception.getClass().getName());
+ assertTrue(exception.getMessage().contains("closed")
+ || exception.getMessage().contains("Channel is not active")
+ || exception.getMessage().contains("connection"));
+
+ // Verify no buffer leaks
+ trackingAllocator.assertAllBuffersReleased();
+ }
+
+ @Test
+ @DisabledIf("isSslEnabled")
+ @DisplayName("Should handle multiple consecutive close() calls gracefully without errors")
+ void shouldHandleMultipleCloseCallsGracefully() throws IOException {
+ // given
+ factory = createDefaultFactory();
+ stream = factory.create(new ServerAddress());
+ stream.open(OPERATION_CONTEXT);
+
+ // when - close multiple times
+ stream.close();
+ stream.close();
+ stream.close();
+
+ // then - should not throw exception and stream should be closed
+ assertTrue(stream.isClosed());
+
+ // Verify no buffer leaks
+ trackingAllocator.assertAllBuffersReleased();
+ }
+
+ @Test
+ @DisabledIf("isSslEnabled")
+ @DisplayName("Should prevent write operations after the underlying channel becomes inactive")
+ void shouldPreventWriteOperationsAfterChannelBecomesInactive() throws IOException {
+ // given - stream opened
+ factory = createDefaultFactory();
+ stream = factory.create(new ServerAddress());
+ stream.open(OPERATION_CONTEXT);
+
+ // Simulate channel becoming inactive by closing
+ stream.close();
+
+ // when/then - operations should fail
+ List buffers = createTestBuffers("test");
+
+ MongoSocketException exception = assertThrows(MongoSocketException.class,
+ () -> stream.write(buffers, OPERATION_CONTEXT));
+
+ // stream.write doesn't release the passed in buffers
+ ResourceUtil.release(buffers);
+
+ assertTrue(exception.getMessage().contains("closed")
+ || exception.getMessage().contains("not active"));
+
+ // Verify no buffer leaks
+ trackingAllocator.assertAllBuffersReleased();
+ }
+
+ @Test
+ @DisabledIf("isSslEnabled")
+ @DisplayName("Should validate connection state before executing write operations")
+ void shouldValidateConnectionStateBeforeWrite() throws IOException {
+ // given - create stream but don't open it
+ factory = createDefaultFactory();
+ stream = factory.create(new ServerAddress());
+
+ // when/then - write should fail because stream not opened
+ List buffers = createTestBuffers("test");
+
+ assertThrows(MongoSocketException.class,
+ () -> stream.write(buffers, OPERATION_CONTEXT));
+
+ // stream.write doesn't release the passed in buffers
+ ResourceUtil.release(buffers);
+
+ // Verify no buffer leaks
+ trackingAllocator.assertAllBuffersReleased();
+ }
+
+ @Test
+ @DisabledIf("isSslEnabled")
+ @DisplayName("Should handle async write failure gracefully with proper error callback")
+ void shouldHandleAsyncWriteFailureGracefully() throws InterruptedException {
+ // given - stream not opened
+ factory = createDefaultFactory();
+ stream = factory.create(new ServerAddress());
+
+ // when - attempt async write without opening
+ List buffers = createTestBuffers("test");
+
+ CallbackErrorHolder callback = new CallbackErrorHolder<>();
+ stream.writeAsync(buffers, OPERATION_CONTEXT, callback);
+
+ // then - should fail with clear error
+ Throwable error = callback.getError();
+ assertNotNull(error);
+ assertInstanceOf(MongoSocketException.class, error);
+
+ // stream.writeAsync doesn't release the passed in buffers
+ ResourceUtil.release(buffers);
+
+ // Verify no buffer leaks
+ trackingAllocator.assertAllBuffersReleased();
+ }
+
+ @Test
+ @DisplayName("Should return the correct server address from getAddress()")
+ void shouldReturnCorrectAddress() {
+ // given
+ ServerAddress serverAddress = new ServerAddress("localhost", 27017);
+ factory = createDefaultFactory();
+ stream = factory.create(serverAddress);
+
+ // when
+ ServerAddress address = stream.getAddress();
+
+ // then
+ assertEquals(serverAddress, address);
+
+ // Verify no buffer leaks
+ trackingAllocator.assertAllBuffersReleased();
+ }
+
+ @Test
+ @DisplayName("Should correctly report stream closed state throughout its lifecycle")
+ void shouldReportClosedStateCorrectly() throws IOException {
+ // given
+ factory = createDefaultFactory();
+ stream = factory.create(new ServerAddress());
+
+ // when/then - initially not closed (even without opening)
+ assertFalse(stream.isClosed());
+
+ // open and verify still not closed
+ stream.open(OPERATION_CONTEXT);
+ assertFalse(stream.isClosed());
+
+ // close and verify is closed
+ stream.close();
+ assertTrue(stream.isClosed());
+
+ // Verify no buffer leaks
+ trackingAllocator.assertAllBuffersReleased();
+ }
+
+ @Test
+ @DisabledIf("isSslEnabled")
+ @DisplayName("Should handle concurrent close() operations from multiple threads without errors")
+ void shouldHandleConcurrentCloseGracefully() throws IOException, InterruptedException {
+ // given
+ factory = createDefaultFactory();
+ stream = factory.create(new ServerAddress());
+ stream.open(OPERATION_CONTEXT);
+
+ // when - close from multiple threads
+ CountDownLatch startLatch = new CountDownLatch(1);
+ CountDownLatch completionLatch = new CountDownLatch(3);
+ AtomicReference exceptionRef = new AtomicReference<>();
+
+ for (int i = 0; i < 3; i++) {
+ new Thread(() -> {
+ try {
+ startLatch.await();
+ stream.close();
+ } catch (Exception e) {
+ exceptionRef.set(e);
+ } finally {
+ completionLatch.countDown();
+ }
+ }).start();
+ }
+
+ startLatch.countDown();
+ assertTrue(completionLatch.await(5, TimeUnit.SECONDS));
+
+ // then - no exceptions should occur
+ assertNull(exceptionRef.get());
+ assertTrue(stream.isClosed());
+
+ // Verify no buffer leaks
+ trackingAllocator.assertAllBuffersReleased();
+ }
+
+ @Test
+ @DisabledIf("isSslEnabled")
+ @DisplayName("Should prevent ByteBuf leaks when write operations fail on a closed stream")
+ void shouldPreventBufferLeakWhenWriteFailsOnClosedStream() throws IOException, InterruptedException {
+ // given - create and open stream
+ factory = createDefaultFactory();
+ stream = factory.create(new ServerAddress());
+ stream.open(OPERATION_CONTEXT);
+
+ // Close the stream
+ stream.close();
+
+ // when - attempt multiple writes (to test buffer cleanup)
+ List buffers = createTestBuffers("test1", "test2");
+
+ CallbackErrorHolder callback1 = new CallbackErrorHolder<>();
+ CallbackErrorHolder callback2 = new CallbackErrorHolder<>();
+ CallbackErrorHolder callback3 = new CallbackErrorHolder<>();
+
+ stream.writeAsync(buffers, OPERATION_CONTEXT, callback1);
+ stream.writeAsync(buffers, OPERATION_CONTEXT, callback2);
+ stream.writeAsync(buffers, OPERATION_CONTEXT, callback3);
+
+ // then - all should fail gracefully
+ assertNotNull(callback1.getError());
+ assertNotNull(callback2.getError());
+ assertNotNull(callback3.getError());
+
+ // stream.writeAsync doesn't release the passed in buffers
+ ResourceUtil.release(buffers);
+
+ // Verify no buffer leaks - stream should have released buffers on failed async write
+ trackingAllocator.assertAllBuffersReleased();
+ }
+
+ // ========== Additional Comprehensive Test Scenarios ==========
+
+ @Test
+ @DisabledIf("isSslEnabled")
+ @DisplayName("Should not allocate composite buffer when channel is inactive before write")
+ void shouldNotAllocateBufferWhenChannelInactiveBeforeWrite() throws IOException, InterruptedException {
+ // given - create stream but immediately close it
+ factory = createDefaultFactory();
+ stream = factory.create(new ServerAddress());
+ stream.open(OPERATION_CONTEXT);
+ stream.close();
+
+ // when - attempt async write with large buffers (would allocate resources)
+ List largeBuffers = createLargeTestBuffers(1024 * 1024, 2);
+
+ CallbackErrorHolder callback = new CallbackErrorHolder<>();
+ stream.writeAsync(largeBuffers, OPERATION_CONTEXT, callback);
+
+ // then - should fail immediately without allocating composite buffer
+ Throwable error = callback.getError();
+ assertNotNull(error);
+ assertTrue(error.getMessage().contains("closed") || error.getMessage().contains("not active"));
+
+ // stream.writeAsync doesn't release the passed in buffers
+ ResourceUtil.release(largeBuffers);
+
+ // Verify no buffer leaks
+ trackingAllocator.assertAllBuffersReleased();
+ }
+
+ @Test
+ @DisabledIf("isSslEnabled")
+ @DisplayName("Should fail fast when attempting operations on never-opened stream")
+ void shouldFailFastOnNeverOpenedStream() {
+ // given - stream created but never opened
+ factory = createDefaultFactory();
+ stream = factory.create(new ServerAddress());
+
+ // when/then - all operations should fail immediately
+ List buffers = createTestBuffers("test");
+
+ // Write should fail
+ assertThrows(MongoSocketException.class,
+ () -> stream.write(buffers, OPERATION_CONTEXT));
+
+ // stream.write doesn't release the passed in buffers
+ ResourceUtil.release(buffers);
+
+ // Read should fail
+ assertThrows(Exception.class,
+ () -> stream.read(1024, OPERATION_CONTEXT));
+
+ // Stream should not be marked as closed (never opened)
+ assertFalse(stream.isClosed());
+
+ // Verify no buffer leaks
+ trackingAllocator.assertAllBuffersReleased();
+ }
+
+ @Test
+ @DisabledIf("isSslEnabled")
+ @DisplayName("Should handle async read operation gracefully on closed stream")
+ void shouldHandleAsyncReadOnClosedStream() throws IOException, InterruptedException {
+ // given - open and close stream
+ factory = createDefaultFactory();
+ stream = factory.create(new ServerAddress());
+ stream.open(OPERATION_CONTEXT);
+ stream.close();
+
+ // when - attempt async read
+ CallbackErrorHolder callback = new CallbackErrorHolder<>();
+ stream.readAsync(1024, OPERATION_CONTEXT, callback);
+
+ // then - should fail with appropriate error
+ Throwable error = callback.getError();
+ assertNotNull(error);
+ assertTrue(error instanceof IOException || error instanceof MongoSocketException);
+
+ // Verify no buffer leaks
+ trackingAllocator.assertAllBuffersReleased();
+ }
+
+ @Test
+ @DisabledIf("isSslEnabled")
+ @SuppressWarnings("WriteOnlyObject")
+ @DisplayName("Should handle concurrent write and close operations without deadlock")
+ void shouldHandleConcurrentWriteAndCloseWithoutDeadlock() throws IOException, InterruptedException {
+ // given
+ factory = createDefaultFactory();
+ stream = factory.create(new ServerAddress());
+ stream.open(OPERATION_CONTEXT);
+
+ CountDownLatch writeLatch = new CountDownLatch(1);
+ CountDownLatch closeLatch = new CountDownLatch(1);
+ AtomicReference writeException = new AtomicReference<>();
+ AtomicReference closeException = new AtomicReference<>();
+
+ // when - write and close concurrently
+ Thread writeThread = new Thread(() -> {
+ List buffers = createTestBuffers("concurrent test");
+ try {
+ writeLatch.await();
+ stream.write(buffers, OPERATION_CONTEXT);
+ } catch (Exception e) {
+ writeException.set(e);
+ } finally {
+ // stream.write doesn't release the passed in buffers
+ ResourceUtil.release(buffers);
+ }
+ });
+
+ Thread closeThread = new Thread(() -> {
+ try {
+ closeLatch.await();
+ stream.close();
+ } catch (Exception e) {
+ closeException.set(e);
+ }
+ });
+
+ writeThread.start();
+ closeThread.start();
+
+ // Start both operations nearly simultaneously
+ writeLatch.countDown();
+ closeLatch.countDown();
+
+ // Wait for completion
+ writeThread.join(5000);
+ closeThread.join(5000);
+
+ // then - no deadlock should occur, operations complete
+ assertFalse(writeThread.isAlive(), "Write thread should complete");
+ assertFalse(closeThread.isAlive(), "Close thread should complete");
+ assertTrue(stream.isClosed(), "Stream should be closed");
+
+ // Verify no buffer leaks
+ trackingAllocator.assertAllBuffersReleased();
+ }
+
+ @Test
+ @DisabledIf("isSslEnabled")
+ @DisplayName("Should handle empty buffer list in write operations")
+ void shouldHandleEmptyBufferList() throws IOException {
+ // given
+ factory = createDefaultFactory();
+ stream = factory.create(new ServerAddress());
+ stream.open(OPERATION_CONTEXT);
+
+ // when - write empty buffer list
+ List emptyBuffers = emptyList();
+
+ // then - should not throw exception (implementation specific behavior)
+ assertDoesNotThrow(() -> stream.write(emptyBuffers, OPERATION_CONTEXT));
+
+ // Verify no buffer leaks
+ trackingAllocator.assertAllBuffersReleased();
+ }
+
+ @Test
+ @Tag("Slow")
+ @DisabledIf("isSslEnabled")
+ @DisplayName("Should properly cleanup resources when async open operation fails")
+ void shouldCleanupResourcesWhenAsyncOpenFails() throws InterruptedException {
+ // given - factory with unreachable address
+ SocketSettings socketSettings = SocketSettings.builder()
+ .connectTimeout(500, TimeUnit.MILLISECONDS) // Longer timeout to allow connection attempt
+ .build();
+ SslSettings sslSettings = SslSettings.builder().build();
+
+ InetAddressResolver resolver = host -> {
+ try {
+ return singletonList(InetAddress.getByName("192.168.255.255")); // Unreachable
+ } catch (UnknownHostException e) {
+ throw new RuntimeException(e);
+ }
+ };
+
+ factory = new NettyStreamFactory(resolver, socketSettings, sslSettings,
+ eventLoopGroup, NioSocketChannel.class, trackingAllocator, null);
+ stream = factory.create(new ServerAddress());
+
+ // when - attempt async open
+ CallbackErrorHolder callback = new CallbackErrorHolder<>();
+ stream.openAsync(OPERATION_CONTEXT, callback);
+
+ // then - should fail and cleanup resources (may take time due to connection timeout)
+ Throwable error = callback.getError();
+ assertNotNull(error);
+ assertInstanceOf(MongoSocketException.class, error);
+
+ // Verify no buffer leaks
+ trackingAllocator.assertAllBuffersReleased();
+ }
+
+ @Test
+ @DisplayName("Should return consistent ServerAddress across multiple calls")
+ void shouldReturnConsistentServerAddress() {
+ // given
+ ServerAddress serverAddress = new ServerAddress("test.mongodb.com", 27017);
+ factory = createDefaultFactory();
+ stream = factory.create(serverAddress);
+
+ // when - call getAddress multiple times
+ ServerAddress addr1 = stream.getAddress();
+ ServerAddress addr2 = stream.getAddress();
+ ServerAddress addr3 = stream.getAddress();
+
+ // then - all should return same address
+ assertEquals(serverAddress, addr1);
+ assertEquals(serverAddress, addr2);
+ assertEquals(serverAddress, addr3);
+ assertSame(addr1, addr2); // Should return same instance
+
+ // Verify no buffer leaks
+ trackingAllocator.assertAllBuffersReleased();
+ }
+
+ @Test
+ @DisabledIf("isSslEnabled")
+ @DisplayName("Should handle rapid open and close cycles without resource leaks")
+ void shouldHandleRapidOpenCloseCycles() throws IOException {
+ // given
+ factory = createDefaultFactory();
+
+ // when - perform multiple rapid open/close cycles
+ for (int i = 0; i < 5; i++) {
+ stream = factory.create(new ServerAddress());
+ stream.open(OPERATION_CONTEXT);
+ assertFalse(stream.isClosed(), "Stream should be open after opening");
+
+ stream.close();
+ assertTrue(stream.isClosed(), "Stream should be closed after closing");
+ }
+
+ // Verify no buffer leaks
+ trackingAllocator.assertAllBuffersReleased();
+ }
+
+ @Test
+ @DisabledIf("isSslEnabled")
+ @DisplayName("Should validate stream state remains consistent after failed operations")
+ void shouldMaintainConsistentStateAfterFailedOperations() throws IOException {
+ // given
+ factory = createDefaultFactory();
+ stream = factory.create(new ServerAddress());
+ stream.open(OPERATION_CONTEXT);
+ ServerAddress originalAddress = stream.getAddress();
+
+ // when - perform operation that will fail
+ stream.close();
+
+ List buffers = createTestBuffers("test");
+
+ try {
+ stream.write(buffers, OPERATION_CONTEXT);
+ } catch (MongoSocketException e) {
+ // Expected
+ } finally {
+ // stream.write doesn't release the passed in buffers
+ ResourceUtil.release(buffers);
+ }
+
+ // then - stream state should remain consistent
+ assertTrue(stream.isClosed());
+ assertEquals(originalAddress, stream.getAddress());
+
+ // Verify no buffer leaks
+ trackingAllocator.assertAllBuffersReleased();
+ }
+
+ @Test
+ @DisabledIf("isSslEnabled")
+ @DisplayName("Should release retained buffers when stream closes during read assembly")
+ void shouldReleaseBuffersWhenStreamClosesDuringReadAssembly() throws IOException, InterruptedException {
+ // This test simulates the race condition where:
+ // 1. Async read is initiated (creates pendingReader)
+ // 2. Stream is closed by another thread before data arrives
+ // 3. If data were to arrive and be retained, it must be cleaned up
+ // 4. The fix ensures readAsync() releases buffers when detecting closed state
+
+ // given - open stream and initiate async read
+ factory = createDefaultFactory();
+ stream = factory.create(new ServerAddress());
+ stream.open(OPERATION_CONTEXT);
+
+ // Start an async read that will wait for data (creates pendingReader)
+ CallbackErrorHolder asyncReadCallback = new CallbackErrorHolder<>();
+ stream.readAsync(1024, OPERATION_CONTEXT, asyncReadCallback);
+
+ // Give the async read time to set up
+ Thread.sleep(10);
+
+ // when - close the stream while async read is pending
+ stream.close();
+
+ // then - the async read should fail with appropriate error
+ Throwable error = asyncReadCallback.getError();
+ assertNotNull(error, "Async read should fail when stream is closed");
+ assertTrue(error instanceof IOException || error instanceof MongoSocketException,
+ "Expected IOException or MongoSocketException but got: " + error.getClass().getName());
+ assertTrue(stream.isClosed(), "Stream should be closed");
+
+ // Attempt another read after close - should also fail and clean up any pending buffers
+ Exception exception = assertThrows(Exception.class,
+ () -> stream.read(1024, OPERATION_CONTEXT));
+ assertTrue(exception instanceof IOException || exception instanceof MongoSocketException);
+
+ // Verify no buffer leaks
+ trackingAllocator.assertAllBuffersReleased();
+ }
+
+ @Test
+ @DisabledIf("isSslEnabled")
+ @DisplayName("Should handle rapid close during async read operation with race condition")
+ void shouldHandleCloseRaceWithConcurrentReads() throws IOException, InterruptedException {
+ // This test exercises the specific race condition from the SSL/TLS leak:
+ // Thread 1: Initiates async read (creates pendingReader)
+ // Thread 2: Closes the stream
+ // Thread 1: If data arrives, handleReadResponse would retain buffer
+ // Thread 1: readAsync detects closed state and must clean up
+
+ // given
+ factory = createDefaultFactory();
+ stream = factory.create(new ServerAddress());
+ stream.open(OPERATION_CONTEXT);
+
+ // Start an async read that will wait for data
+ CallbackErrorHolder callback = new CallbackErrorHolder<>();
+
+ CountDownLatch readStarted = new CountDownLatch(1);
+ CountDownLatch closeStarted = new CountDownLatch(1);
+ AtomicReference closeException = new AtomicReference<>();
+
+ // Thread 1: Start async read
+ Thread readThread = new Thread(() -> {
+ try {
+ readStarted.countDown();
+ stream.readAsync(1024, OPERATION_CONTEXT, callback);
+ closeStarted.await(); // Wait for close to happen
+ } catch (Exception e) {
+ // Expected - stream may be closed
+ }
+ });
+
+ // Thread 2: Close the stream
+ Thread closeThread = new Thread(() -> {
+ try {
+ readStarted.await(); // Wait for read to start
+ Thread.sleep(10); // Small delay to increase race window
+ closeStarted.countDown();
+ stream.close();
+ } catch (Exception e) {
+ closeException.set(e);
+ }
+ });
+
+ // when - execute both threads
+ readThread.start();
+ closeThread.start();
+
+ readThread.join(2000);
+ closeThread.join(2000);
+
+ // then - threads should complete
+ assertFalse(readThread.isAlive(), "Read thread should complete");
+ assertFalse(closeThread.isAlive(), "Close thread should complete");
+ assertNull(closeException.get(), "Close should not throw exception");
+ assertTrue(stream.isClosed(), "Stream should be closed");
+
+ // The async read callback should either:
+ // - Receive an error (if it noticed the close), OR
+ // - Still be waiting (if close happened first and prevented read setup)
+ // Either way, no buffer leaks should occur
+
+ // Try another operation to ensure state is consistent
+ assertThrows(Exception.class, () -> stream.read(100, OPERATION_CONTEXT),
+ "Operations after close should fail");
+
+ // Verify no buffer leaks
+ trackingAllocator.assertAllBuffersReleased();
+ }
+
+ @Test
+ @DisabledIf("isSslEnabled")
+ @SuppressWarnings("ThrowableNotThrown")
+ @DisplayName("Should handle multiple async operations with different callbacks")
+ void shouldHandleMultipleAsyncOperationsWithDifferentCallbacks() throws IOException, InterruptedException {
+ // given
+ factory = createDefaultFactory();
+ stream = factory.create(new ServerAddress());
+ stream.open(OPERATION_CONTEXT);
+
+ List buffers = createTestBuffers("async1");
+
+ // when - submit multiple async writes with different callbacks
+ CallbackErrorHolder callback1 = new CallbackErrorHolder<>();
+ CallbackErrorHolder callback2 = new CallbackErrorHolder<>();
+ CallbackErrorHolder callback3 = new CallbackErrorHolder<>();
+
+ stream.writeAsync(buffers, OPERATION_CONTEXT, callback1);
+ stream.writeAsync(buffers, OPERATION_CONTEXT, callback2);
+ stream.writeAsync(buffers, OPERATION_CONTEXT, callback3);
+
+ // then - all callbacks should be invoked (either success or failure)
+ // Wait for all to complete
+ callback1.getError(); // May be null (success) or error
+ callback2.getError();
+ callback3.getError();
+
+ // stream.writeAsync doesn't release the passed in buffers
+ ResourceUtil.release(buffers);
+
+ // Test passes if we reach here without timeout
+ // Verify no buffer leaks
+ trackingAllocator.assertAllBuffersReleased();
+ }
+
+ // ========== Helper Methods ==========
+
+ /**
+ * Creates ByteBufs with the given data, wrapped in NettyByteBuf.
+ */
+ private List createTestBuffers(final String... dataArray) {
+ List buffers = new ArrayList<>();
+ for (String data : dataArray) {
+ io.netty.buffer.ByteBuf nettyBuffer = trackingAllocator.buffer();
+ nettyBuffer.writeBytes(data.getBytes());
+ buffers.add(new NettyByteBuf(nettyBuffer));
+ }
+ return buffers;
+ }
+
+ /**
+ * Creates multiple large ByteBufs with specified capacity, wrapped in NettyByteBuf.
+ */
+ private List createLargeTestBuffers(final int capacityBytes, final int count) {
+ List buffers = new ArrayList<>();
+ for (int i = 0; i < count; i++) {
+ io.netty.buffer.ByteBuf nettyBuffer = trackingAllocator.buffer(capacityBytes);
+ buffers.add(new NettyByteBuf(nettyBuffer));
+ }
+ return buffers;
+ }
+
+ private NettyStreamFactory createDefaultFactory() {
+ SocketSettings socketSettings = SocketSettings.builder()
+ .connectTimeout(10000, TimeUnit.MILLISECONDS)
+ .build();
+ SslSettings sslSettings = SslSettings.builder().build();
+
+ InetAddressResolver resolver = host -> {
+ try {
+ return singletonList(InetAddress.getByName(host));
+ } catch (UnknownHostException e) {
+ throw new RuntimeException(e);
+ }
+ };
+
+ // Use tracking allocator for all tests to get explicit leak verification
+ return new NettyStreamFactory(resolver, socketSettings, sslSettings,
+ eventLoopGroup, NioSocketChannel.class, trackingAllocator, null);
+ }
+
+ /**
+ * Helper class to capture async completion handler results.
+ */
+ private static class CallbackErrorHolder implements AsyncCompletionHandler {
+ private final CountDownLatch latch = new CountDownLatch(1);
+ private final AtomicReference throwableRef = new AtomicReference<>();
+ private final AtomicReference resultRef = new AtomicReference<>();
+
+ Throwable getError() throws InterruptedException {
+ assertTrue(latch.await(5, TimeUnit.SECONDS), "Callback not completed within timeout");
+ return throwableRef.get();
+ }
+
+ T getResult() throws InterruptedException {
+ assertTrue(latch.await(5, TimeUnit.SECONDS), "Callback not completed within timeout");
+ return resultRef.get();
+ }
+
+ @Override
+ public void completed(final T result) {
+ resultRef.set(result);
+ latch.countDown();
+ }
+
+ @Override
+ public void failed(@NotNull final Throwable t) {
+ throwableRef.set(t);
+ latch.countDown();
+ }
+ }
+
+ /**
+ * Tracking allocator that records all buffer allocations and can verify they are all released.
+ * This allows us to explicitly prove no buffer leaks occur, rather than relying solely on
+ * Netty's leak detector.
+ */
+ private static class TrackingNettyByteBufAllocator extends PooledByteBufAllocator {
+ private final List allocatedBuffers = new ArrayList<>();
+ private final AtomicInteger allocationCount = new AtomicInteger(0);
+
+ TrackingNettyByteBufAllocator() {
+ super(false); // Use heap buffers for testing
+ }
+
+ @Override
+ public io.netty.buffer.CompositeByteBuf compositeBuffer(final int maxNumComponents) {
+ io.netty.buffer.CompositeByteBuf buffer = super.compositeBuffer(maxNumComponents);
+ trackBuffer(buffer);
+ return buffer;
+ }
+
+ @Override
+ public io.netty.buffer.CompositeByteBuf compositeDirectBuffer(final int maxNumComponents) {
+ io.netty.buffer.CompositeByteBuf buffer = super.compositeDirectBuffer(maxNumComponents);
+ trackBuffer(buffer);
+ return buffer;
+ }
+
+ @Override
+ public io.netty.buffer.ByteBuf buffer() {
+ io.netty.buffer.ByteBuf buffer = super.buffer();
+ trackBuffer(buffer);
+ return buffer;
+ }
+
+ @Override
+ public io.netty.buffer.ByteBuf buffer(final int initialCapacity) {
+ io.netty.buffer.ByteBuf buffer = super.buffer(initialCapacity);
+ trackBuffer(buffer);
+ return buffer;
+ }
+
+ @Override
+ public io.netty.buffer.ByteBuf buffer(final int initialCapacity, final int maxCapacity) {
+ io.netty.buffer.ByteBuf buffer = super.buffer(initialCapacity, maxCapacity);
+ trackBuffer(buffer);
+ return buffer;
+ }
+
+ @Override
+ public io.netty.buffer.ByteBuf directBuffer(final int initialCapacity, final int maxCapacity) {
+ io.netty.buffer.ByteBuf buffer = super.directBuffer(initialCapacity, maxCapacity);
+ trackBuffer(buffer);
+ return buffer;
+ }
+
+ private void trackBuffer(final io.netty.buffer.ByteBuf buffer) {
+ synchronized (allocatedBuffers) {
+ allocatedBuffers.add(buffer);
+ allocationCount.incrementAndGet();
+ }
+ }
+
+ /**
+ * Asserts that all allocated buffers have been released (refCnt == 0).
+ * This provides explicit proof that no leaks occurred.
+ */
+ void assertAllBuffersReleased() {
+ synchronized (allocatedBuffers) {
+ List leakedBuffers = new ArrayList<>();
+ for (io.netty.buffer.ByteBuf buffer : allocatedBuffers) {
+ if (buffer.refCnt() > 0) {
+ leakedBuffers.add(buffer);
+ }
+ }
+
+ if (!leakedBuffers.isEmpty()) {
+ StringBuilder message = new StringBuilder();
+ message.append("BUFFER LEAK DETECTED: ")
+ .append(leakedBuffers.size())
+ .append(" of ")
+ .append(allocatedBuffers.size())
+ .append(" buffers were not released\n");
+
+ message.append(getStats())
+ .append("\n");
+
+ for (int i = 0; i < leakedBuffers.size(); i++) {
+ io.netty.buffer.ByteBuf leaked = leakedBuffers.get(i);
+ message.append(" [").append(i).append("] ")
+ .append(leaked.getClass().getSimpleName())
+ .append(" refCnt=").append(leaked.refCnt())
+ .append(" capacity=").append(leaked.capacity())
+ .append("\n");
+ }
+
+ leakedBuffers.forEach(ReferenceCounted::release);
+ fail(message.toString());
+ }
+ }
+ }
+
+ /**
+ * Returns allocation statistics for debugging.
+ */
+ String getStats() {
+ synchronized (allocatedBuffers) {
+ int leaked = 0;
+ int released = 0;
+ for (io.netty.buffer.ByteBuf buffer : allocatedBuffers) {
+ if (buffer.refCnt() > 0) {
+ leaked++;
+ } else {
+ released++;
+ }
+ }
+ return String.format("Allocations: %d, Released: %d, Leaked: %d",
+ allocationCount.get(), released, leaked);
+ }
+ }
+ }
+}
diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml
index 8a08c34f213..c75eb6d7000 100644
--- a/gradle/libs.versions.toml
+++ b/gradle/libs.versions.toml
@@ -18,7 +18,7 @@ aws-sdk-v2 = "2.30.31"
graal-sdk = "24.0.0"
jna = "5.11.0"
jnr-unixsocket = "0.38.17"
-netty-bom = "4.1.87.Final"
+netty-bom = "4.2.9.Final"
project-reactor-bom = "2022.0.0"
reactive-streams = "1.0.4"
snappy = "1.1.10.3"