Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion bson/src/main/org/bson/BsonDocument.java
Original file line number Diff line number Diff line change
Expand Up @@ -921,10 +921,12 @@ private static class SerializationProxy implements Serializable {
new BsonDocumentCodec().encode(new BsonBinaryWriter(buffer), document, EncoderContext.builder().build());
this.bytes = new byte[buffer.size()];
int curPos = 0;
for (ByteBuf cur : buffer.getByteBuffers()) {
List<ByteBuf> byteBuffers = buffer.getByteBuffers();
for (ByteBuf cur : byteBuffers) {
System.arraycopy(cur.array(), cur.position(), bytes, curPos, cur.limit());
curPos += cur.position();
}
byteBuffers.forEach(ByteBuf::release);
}

private Object readResolve() {
Expand Down
6 changes: 6 additions & 0 deletions config/spotbugs/exclude.xml
Original file line number Diff line number Diff line change
Expand Up @@ -290,4 +290,10 @@
<Bug pattern="NM_CLASS_NAMING_CONVENTION"/>
</Match>

<!-- DefaultServerMonitor -->
<Match>
<class name="com.mongodb.internal.connection.DefaultServerMonitor" />
<Bug pattern="RCN_REDUNDANT_NULLCHECK_OF_NONNULL_VALUE" />
</Match>

</FindBugsFilter>
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@

package com.mongodb.internal.connection;

import com.mongodb.annotations.Sealed;
import com.mongodb.internal.ResourceUtil;
import com.mongodb.internal.VisibleForTesting;
import org.bson.BsonSerializationException;
import org.bson.ByteBuf;
import org.bson.io.OutputBuffer;
Expand All @@ -28,11 +31,28 @@

import static com.mongodb.assertions.Assertions.assertTrue;
import static com.mongodb.assertions.Assertions.notNull;
import static com.mongodb.internal.VisibleForTesting.AccessModifier.PRIVATE;
import static java.lang.String.format;

/**
* A BSON output implementation that uses pooled {@link ByteBuf} instances for efficient memory management.
*
* <h2>ByteBuf Ownership and Lifecycle</h2>
* <p>This class manages the lifecycle of {@link ByteBuf} instances obtained from the {@link BufferProvider}.
* The ownership model is as follows:</p>
* <ul>
* <li>Internal buffers are owned by this output and released when {@link #close()} is called or
* when {@link #truncateToPosition(int)} removes them.</li>
* <li>Methods that return {@link ByteBuf} instances (e.g., {@link #getByteBuffers()}) return
* duplicates with their own reference counts. <strong>Callers are responsible for releasing
* these buffers</strong> to prevent memory leaks.</li>
* <li>The {@link Branch} subclass merges its buffers into the parent on close, transferring
* ownership by retaining buffers before the branch releases them.</li>
* </ul>
*
* <p>This class is not part of the public API and may be removed or changed at any time</p>
*/
@Sealed
public class ByteBufferBsonOutput extends OutputBuffer {

private static final int MAX_SHIFT = 31;
Expand All @@ -50,6 +70,9 @@ public class ByteBufferBsonOutput extends OutputBuffer {
/**
* Construct an instance that uses the given buffer provider to allocate byte buffers as needs as it grows.
*
* <p>The buffer provider is used to allocate new {@link ByteBuf} instances as the output grows.
* All allocated buffers are owned by this output and will be released when {@link #close()} is called.</p>
*
* @param bufferProvider the non-null buffer provider
*/
public ByteBufferBsonOutput(final BufferProvider bufferProvider) {
Expand All @@ -63,6 +86,10 @@ public ByteBufferBsonOutput(final BufferProvider bufferProvider) {
* If multiple branches are created, they are merged in the order they are {@linkplain ByteBufferBsonOutput.Branch#close() closed}.
* {@linkplain #close() Closing} this {@link ByteBufferBsonOutput} does not {@linkplain ByteBufferBsonOutput.Branch#close() close} the branch.
*
* <p><strong>ByteBuf Ownership:</strong> The branch allocates its own buffers. When the branch is closed,
* ownership of these buffers is transferred to the parent by retaining them before the branch releases
* its references. The parent then becomes responsible for releasing these buffers when it is closed.</p>
*
* @return A new {@link ByteBufferBsonOutput.Branch}.
*/
public ByteBufferBsonOutput.Branch branch() {
Expand Down Expand Up @@ -223,17 +250,46 @@ protected void write(final int absolutePosition, final int value) {
byteBuffer.put(bufferPositionPair.position++, (byte) value);
}

/**
* Returns a list of duplicated byte buffers containing the written data, flipped for reading.
*
* <p><strong>ByteBuf Ownership:</strong> The returned buffers are duplicates with their own
* reference counts (each starts with a reference count of 1). <strong>The caller is responsible
* for releasing each buffer</strong> when done to prevent memory leaks. Example usage:</p>
* <pre>{@code
* List<ByteBuf> buffers = output.getByteBuffers();
* try {
* // use buffers
* } finally {
* ResourceUtil.release(buffers);
* }
* }</pre>
* <p><strong>Note:</strong> These buffers must be released before this {@code ByteBufferBsonOutput} is closed.
* Otherwise there is a risk of the buffers being released back to the bufferProvider and data corruption.</p>
*
* @return a list of duplicated buffers, flipped for reading
*/
@Override
public List<ByteBuf> getByteBuffers() {
ensureOpen();

List<ByteBuf> buffers = new ArrayList<>(bufferList.size());
for (final ByteBuf cur : bufferList) {
buffers.add(cur.duplicate().order(ByteOrder.LITTLE_ENDIAN).flip());
}
return buffers;
}

/**
* Returns a list of duplicated byte buffers without flipping them.
*
* <p><strong>ByteBuf Ownership:</strong> The returned buffers are duplicates with their own
* reference counts (each starts with a reference count of 1). <strong>The caller is responsible
* for releasing each buffer</strong> when done to prevent memory leaks.</p>
*
* @return a list of duplicated buffers
* @see #getByteBuffers()
*/
@VisibleForTesting(otherwise = PRIVATE)
public List<ByteBuf> getDuplicateByteBuffers() {
ensureOpen();

Expand All @@ -245,6 +301,13 @@ public List<ByteBuf> getDuplicateByteBuffers() {
}


/**
* {@inheritDoc}
*
* <p><strong>ByteBuf Management:</strong> This method obtains duplicated buffers via
* {@link #getByteBuffers()} and releases them after writing to the output stream,
* ensuring no buffer leaks occur.</p>
*/
@Override
public int pipe(final OutputStream out) throws IOException {
ensureOpen();
Expand All @@ -263,11 +326,20 @@ public int pipe(final OutputStream out) throws IOException {
total += cur.limit();
}
} finally {
byteBuffers.forEach(ByteBuf::release);
ResourceUtil.release(byteBuffers);
}
return total;
}

/**
* Truncates this output to the specified position, releasing any buffers that are no longer needed.
*
* <p><strong>ByteBuf Management:</strong> Any buffers beyond the new position are removed from
* the internal buffer list and released. This ensures no memory leaks when truncating.</p>
*
* @param newPosition the new position to truncate to
* @throws IllegalArgumentException if newPosition is negative or greater than the current position
*/
@Override
public void truncateToPosition(final int newPosition) {
ensureOpen();
Expand Down Expand Up @@ -306,13 +378,15 @@ public final void flush() throws IOException {
* {@inheritDoc}
* <p>
* Idempotent.</p>
*
* <p><strong>ByteBuf Management:</strong> Releases internal buffers and clears the buffer list.
* After this method returns, all buffers that were allocated by this output will have been fully released
* back to the buffer provider.</p>
*/
@Override
public void close() {
if (isOpen()) {
for (final ByteBuf cur : bufferList) {
cur.release();
}
ResourceUtil.release(bufferList);
currentByteBuffer = null;
bufferList.clear();
closed = true;
Expand Down Expand Up @@ -345,7 +419,14 @@ boolean isOpen() {
}

/**
* @see #branch()
* Merges a branch's buffers into this output.
*
* <p><strong>ByteBuf Ownership:</strong> This method retains each buffer from the branch before
* adding it to this output's buffer list. This is necessary because the branch will release its
* references when it closes. The retain ensures the buffers remain valid and are now owned by
* this output.</p>
*
* @param branch the branch to merge
*/
private void merge(final ByteBufferBsonOutput branch) {
assertTrue(branch instanceof ByteBufferBsonOutput.Branch);
Expand All @@ -356,6 +437,20 @@ private void merge(final ByteBufferBsonOutput branch) {
currentByteBuffer = null;
}

/**
* A branch of a {@link ByteBufferBsonOutput} that can be merged back into its parent.
*
* <p><strong>ByteBuf Ownership:</strong> A branch allocates its own buffers independently.
* When {@link #close()} is called:</p>
* <ol>
* <li>The parent's {@link ByteBufferBsonOutput#merge(ByteBufferBsonOutput)} method is called,
* which retains all buffers in this branch.</li>
* <li>Then {@code super.close()} is called, which releases the branch's references to the buffers.</li>
* </ol>
* <p>The retain/release sequence ensures buffers are safely transferred to the parent without leaks.</p>
*
* @see #branch()
*/
public static final class Branch extends ByteBufferBsonOutput {
private final ByteBufferBsonOutput parent;

Expand All @@ -365,6 +460,16 @@ private Branch(final ByteBufferBsonOutput parent) {
}

/**
* Closes this branch and merges its data into the parent output.
*
* <p><strong>ByteBuf Ownership:</strong> On close, this branch's buffers are transferred
* to the parent. The parent retains the buffers (incrementing reference counts), and then
* this branch releases only its own single reference. The parent
* becomes the sole owner of the buffers and is responsible for releasing them.</p>
*
* <p>Idempotent. If already closed, this method does nothing.</p>
*
* @throws AssertionError if the parent has been closed before this branch
* @see #branch()
*/
@Override
Expand Down
Loading