diff --git a/driver-core/src/main/com/mongodb/internal/connection/ByteBufBsonArray.java b/driver-core/src/main/com/mongodb/internal/connection/ByteBufBsonArray.java index e02cee12629..5442e81de68 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/ByteBufBsonArray.java +++ b/driver-core/src/main/com/mongodb/internal/connection/ByteBufBsonArray.java @@ -23,6 +23,7 @@ import org.bson.ByteBuf; import org.bson.io.ByteBufferBsonInput; +import java.io.Closeable; import java.util.ArrayList; import java.util.Collection; import java.util.Iterator; @@ -33,20 +34,31 @@ import static com.mongodb.internal.connection.ByteBufBsonHelper.readBsonValue; -final class ByteBufBsonArray extends BsonArray { +final class ByteBufBsonArray extends BsonArray implements Closeable { private final ByteBuf byteBuf; + /** + * List of resources that need to be closed when this array is closed. + * Tracks the main ByteBuf and iterator duplicates. Iterator buffers are automatically + * removed and released when iteration completes normally to prevent memory accumulation. + */ + private final List trackedResources = new ArrayList<>(); + private boolean closed; + ByteBufBsonArray(final ByteBuf byteBuf) { this.byteBuf = byteBuf; + trackedResources.add(byteBuf::release); } @Override public Iterator iterator() { + ensureOpen(); return new ByteBufBsonArrayIterator(); } @Override public List getValues() { + ensureOpen(); List values = new ArrayList<>(); for (BsonValue cur: this) { //noinspection UseBulkOperation @@ -59,6 +71,7 @@ public List getValues() { @Override public int size() { + ensureOpen(); int size = 0; for (BsonValue ignored : this) { size++; @@ -68,11 +81,13 @@ public int size() { @Override public boolean isEmpty() { + ensureOpen(); return !iterator().hasNext(); } @Override public boolean equals(final Object o) { + ensureOpen(); if (o == this) { return true; } @@ -91,6 +106,7 @@ public boolean equals(final Object o) { @Override public int hashCode() { + ensureOpen(); int hashCode = 1; for (BsonValue cur : this) { hashCode = 31 * hashCode + (cur == null ? 0 : cur.hashCode()); @@ -100,6 +116,7 @@ public int hashCode() { @Override public boolean contains(final Object o) { + ensureOpen(); for (BsonValue cur : this) { if (Objects.equals(o, cur)) { return true; @@ -111,6 +128,7 @@ public boolean contains(final Object o) { @Override public Object[] toArray() { + ensureOpen(); Object[] retVal = new Object[size()]; Iterator it = iterator(); for (int i = 0; i < retVal.length; i++) { @@ -122,6 +140,7 @@ public Object[] toArray() { @Override @SuppressWarnings("unchecked") public T[] toArray(final T[] a) { + ensureOpen(); int size = size(); T[] retVal = a.length >= size ? a : (T[]) java.lang.reflect.Array.newInstance(a.getClass().getComponentType(), size); Iterator it = iterator(); @@ -133,6 +152,7 @@ public T[] toArray(final T[] a) { @Override public boolean containsAll(final Collection c) { + ensureOpen(); for (Object e : c) { if (!contains(e)) { return false; @@ -143,6 +163,7 @@ public boolean containsAll(final Collection c) { @Override public BsonValue get(final int index) { + ensureOpen(); if (index < 0) { throw new IndexOutOfBoundsException("Index out of range: " + index); } @@ -159,6 +180,7 @@ public BsonValue get(final int index) { @Override public int indexOf(final Object o) { + ensureOpen(); int i = 0; for (BsonValue cur : this) { if (Objects.equals(o, cur)) { @@ -172,6 +194,7 @@ public int indexOf(final Object o) { @Override public int lastIndexOf(final Object o) { + ensureOpen(); ListIterator listIterator = listIterator(size()); while (listIterator.hasPrevious()) { if (Objects.equals(o, listIterator.previous())) { @@ -183,17 +206,20 @@ public int lastIndexOf(final Object o) { @Override public ListIterator listIterator() { + ensureOpen(); return listIterator(0); } @Override public ListIterator listIterator(final int index) { + ensureOpen(); // Not the most efficient way to do this, but unlikely anyone will notice in practice return new ArrayList<>(this).listIterator(index); } @Override public List subList(final int fromIndex, final int toIndex) { + ensureOpen(); if (fromIndex < 0) { throw new IndexOutOfBoundsException("fromIndex = " + fromIndex); } @@ -234,6 +260,7 @@ public boolean addAll(final Collection c) { @Override public boolean addAll(final int index, final Collection c) { + ensureOpen(); throw new UnsupportedOperationException(READ_ONLY_MESSAGE); } @@ -267,11 +294,43 @@ public BsonValue remove(final int index) { throw new UnsupportedOperationException(READ_ONLY_MESSAGE); } + @Override + public void close(){ + if (!closed) { + for (Closeable closeable : trackedResources) { + try { + closeable.close(); + } catch (Exception e) { + // Log and continue closing other resources + } + } + trackedResources.clear(); + closed = true; + } + } + + private void ensureOpen() { + if (closed) { + throw new IllegalStateException("The BsonArray resources have been released."); + } + } + private class ByteBufBsonArrayIterator implements Iterator { - private final ByteBuf duplicatedByteBuf = byteBuf.duplicate(); - private final BsonBinaryReader bsonReader; + private ByteBuf duplicatedByteBuf; + private BsonBinaryReader bsonReader; + private Closeable resourceHandle; + private boolean finished; { + ensureOpen(); + duplicatedByteBuf = byteBuf.duplicate(); + resourceHandle = () -> { + if (duplicatedByteBuf != null) { + duplicatedByteBuf.release(); + duplicatedByteBuf = null; + } + }; + trackedResources.add(resourceHandle); bsonReader = new BsonBinaryReader(new ByteBufferBsonInput(duplicatedByteBuf)); // While one might expect that this would be a call to BsonReader#readStartArray that doesn't work because BsonBinaryReader // expects to be positioned at the start at the beginning of a document, not an array. Fortunately, a BSON array has exactly @@ -283,7 +342,11 @@ private class ByteBufBsonArrayIterator implements Iterator { @Override public boolean hasNext() { - return bsonReader.getCurrentBsonType() != BsonType.END_OF_DOCUMENT; + boolean hasNext = bsonReader.getCurrentBsonType() != BsonType.END_OF_DOCUMENT; + if (!hasNext) { + cleanup(); + } + return hasNext; } @Override @@ -292,9 +355,22 @@ public BsonValue next() { throw new NoSuchElementException(); } bsonReader.skipName(); - BsonValue value = readBsonValue(duplicatedByteBuf, bsonReader); + BsonValue value = readBsonValue(duplicatedByteBuf, bsonReader, trackedResources); bsonReader.readBsonType(); return value; } + + private void cleanup() { + if (!finished) { + finished = true; + // Remove from tracked resources since we're cleaning up immediately + trackedResources.remove(resourceHandle); + try { + resourceHandle.close(); + } catch (Exception e) { + // Ignore + } + } + } } } diff --git a/driver-core/src/main/com/mongodb/internal/connection/ByteBufBsonDocument.java b/driver-core/src/main/com/mongodb/internal/connection/ByteBufBsonDocument.java index 70ed10a75a8..57b5967cd23 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/ByteBufBsonDocument.java +++ b/driver-core/src/main/com/mongodb/internal/connection/ByteBufBsonDocument.java @@ -16,139 +16,529 @@ package com.mongodb.internal.connection; +import com.mongodb.MongoInternalException; +import com.mongodb.internal.VisibleForTesting; import com.mongodb.lang.Nullable; +import org.bson.BsonArray; import org.bson.BsonBinaryReader; import org.bson.BsonDocument; +import org.bson.BsonReader; import org.bson.BsonType; import org.bson.BsonValue; import org.bson.ByteBuf; -import org.bson.RawBsonDocument; import org.bson.codecs.BsonDocumentCodec; import org.bson.codecs.DecoderContext; import org.bson.io.ByteBufferBsonInput; import org.bson.json.JsonMode; -import org.bson.json.JsonWriter; import org.bson.json.JsonWriterSettings; +import java.io.ByteArrayOutputStream; +import java.io.Closeable; import java.io.InvalidObjectException; import java.io.ObjectInputStream; -import java.io.StringWriter; +import java.io.UnsupportedEncodingException; +import java.nio.charset.StandardCharsets; import java.util.AbstractCollection; import java.util.AbstractMap; import java.util.AbstractSet; import java.util.ArrayList; import java.util.Collection; +import java.util.HashMap; import java.util.Iterator; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.NoSuchElementException; import java.util.Set; +import static com.mongodb.assertions.Assertions.assertFalse; import static com.mongodb.assertions.Assertions.assertNotNull; +import static com.mongodb.assertions.Assertions.assertTrue; import static com.mongodb.assertions.Assertions.notNull; +import static com.mongodb.internal.VisibleForTesting.AccessModifier.PACKAGE; +import static com.mongodb.internal.VisibleForTesting.AccessModifier.PRIVATE; import static com.mongodb.internal.connection.ByteBufBsonHelper.readBsonValue; +import static java.util.Collections.emptyMap; -final class ByteBufBsonDocument extends BsonDocument { +/** + * A memory-efficient, read-only {@link BsonDocument} implementation backed by a {@link ByteBuf}. + * + *

Overview

+ *

This class provides lazy access to BSON document fields without fully deserializing the document + * into memory. It reads field values directly from the underlying byte buffer on demand, which is + * particularly useful for large documents where only a few fields need to be accessed.

+ * + *

Data Sources

+ *

A {@code ByteBufBsonDocument} can contain data from two sources:

+ *
    + *
  • Body fields: Standard BSON document fields stored in {@link #bodyByteBuf}. These are + * read lazily using a {@link BsonBinaryReader}.
  • + *
  • Sequence fields: MongoDB OP_MSG Type 1 payload sequences stored in {@link #sequenceFields}. + * These are used when parsing command messages that contain document sequences (e.g., bulk inserts). + * Each sequence field appears as an array of documents when accessed.
  • + *
+ * + *

OP_MSG Command Message Support

+ *

The {@link #createCommandMessage(CompositeByteBuf)} factory method parses MongoDB OP_MSG format, + * which consists of:

+ *
    + *
  1. A body section (Type 0): The main command document
  2. + *
  3. Zero or more document sequence sections (Type 1): Arrays of documents identified by field name
  4. + *
+ *

For example, an insert command might have the body containing {@code {insert: "collection", $db: "test"}} + * and a sequence section with field name "documents" containing the documents to insert.

+ * + *

Resource Management

+ *

This class implements {@link Closeable} and manages several types of resources:

+ *
    + *
  • ByteBuf instances: The body buffer and any duplicated buffers created during iteration + * or value access are tracked in {@link #trackedResources} and released on {@link #close()}.
  • + *
  • Nested ByteBufBsonDocument/ByteBufBsonArray: When accessing nested documents or arrays, + * new {@code ByteBufBsonDocument} or {@link ByteBufBsonArray} instances are created. These are + * registered as closeables and closed recursively when the parent is closed.
  • + *
  • Sequence field documents: Documents within sequence fields are also {@code ByteBufBsonDocument} + * instances that are tracked and closed with the parent.
  • + *
+ * + *

Important: Always close this document when done to prevent memory leaks. After closing, + * any operation will throw {@link IllegalStateException}.

+ * + *

Caching Strategy

+ *

The class uses lazy caching to optimize repeated access:

+ *
    + *
  • {@link #cachedDocument}: Once {@link #toBsonDocument()} is called, the fully hydrated document + * is cached and all subsequent operations use this cache. At this point, the underlying buffers + * are released since they're no longer needed.
  • + *
  • {@link #cachedFirstKey}: The first key is cached after the first call to {@link #getFirstKey()}.
  • + *
  • Sequence field arrays are cached within {@link SequenceField} after first access.
  • + *
+ * + *

Immutability

+ *

This class is read-only. All mutation methods ({@link #put}, {@link #remove}, {@link #clear}, etc.) + * throw {@link UnsupportedOperationException}.

+ * + *

Thread Safety

+ *

This class is not thread-safe. Concurrent access from multiple threads requires external synchronization.

+ * + *

Serialization

+ *

Java serialization is supported via {@link #writeReplace()}, which converts this document to a + * regular {@link BsonDocument} before serialization.

+ * + * @see ByteBufBsonArray + * @see ByteBufBsonHelper + */ +public final class ByteBufBsonDocument extends BsonDocument implements Closeable { private static final long serialVersionUID = 2L; - private final transient ByteBuf byteBuf; + /** + * The underlying byte buffer containing the BSON document body. + * This is the main document data, excluding any OP_MSG sequence sections. + * Set to null after {@link #releaseResources()} is called. + */ + private transient ByteBuf bodyByteBuf; + + /** + * Map of sequence field names to their corresponding {@link SequenceField} instances. + * These represent OP_MSG Type 1 payload sections. Each sequence field appears as an + * array when accessed via {@link #get(Object)}. + * Empty for simple documents not created from OP_MSG. + */ + private transient Map sequenceFields; + + /** + * List of resources that need to be closed/released when this document is closed. + * + *

Memory Management Strategy:

+ *
    + *
  • Always tracked: The main bodyByteBuf and any nested ByteBufBsonDocument/ByteBufBsonArray + * instances returned to callers are permanently tracked until this document is closed or + * {@link #toBsonDocument()} caches and releases them.
  • + *
  • Temporarily tracked: Iterator duplicate buffers are tracked during iteration + * but automatically removed and released when iteration completes. This prevents memory accumulation + * from completed iterations while ensuring cleanup if the parent document is closed mid-iteration.
  • + *
  • Not tracked: Short-lived duplicate buffers used in query methods + * (e.g., {@link #findKeyInBody}, {@link #containsKey}) are released immediately in finally blocks + * and never added to this list. Temporary nested documents created during value comparison + * use separate tracking lists.
  • + *
+ */ + private final transient List trackedResources; /** - * Create a list of ByteBufBsonDocument from a buffer positioned at the start of the first document of an OP_MSG Section - * of type Document Sequence (Kind 1). - *

- * The provided buffer will be positioned at the end of the section upon normal completion of the method + * Cached fully-hydrated BsonDocument. Once populated via {@link #toBsonDocument()}, + * all subsequent read operations use this cache instead of reading from the byte buffer. */ - static List createList(final ByteBuf outputByteBuf) { - List documents = new ArrayList<>(); - while (outputByteBuf.hasRemaining()) { - ByteBufBsonDocument curDocument = createOne(outputByteBuf); - documents.add(curDocument); + private transient BsonDocument cachedDocument; + + /** + * Cached first key of the document. Populated on first call to {@link #getFirstKey()}. + */ + private transient String cachedFirstKey; + + /** + * Flag indicating whether this document has been closed. + * Once closed, all operations throw {@link IllegalStateException}. + */ + private transient boolean closed; + + + /** + * Creates a {@code ByteBufBsonDocument} from an OP_MSG command message. + * + *

This factory method parses the MongoDB OP_MSG wire protocol format, which consists of:

+ *
    + *
  1. Body section (Type 0): A single BSON document containing the command
  2. + *
  3. Document sequence sections (Type 1): Zero or more sections, each containing + * a field identifier and a sequence of BSON documents
  4. + *
+ * + *

The sequence sections are stored in {@link #sequenceFields} and appear as array fields + * when the document is accessed. For example, an insert command's "documents" sequence + * will appear as an array when calling {@code get("documents")}.

+ * + *

Wire Format Parsed

+ *
+     * [body document bytes]
+     * [section type: 1 byte] [section size: 4 bytes] [identifier: cstring] [document bytes...]
+     * ... (more sections)
+     * 
+ * + * @param commandMessageByteBuf The composite buffer positioned at the start of the body document. + * Position will be advanced past all parsed sections. + * @return A new {@code ByteBufBsonDocument} representing the command with any sequence fields. + */ + @VisibleForTesting(otherwise = PRIVATE) + public static ByteBufBsonDocument createCommandMessage(final CompositeByteBuf commandMessageByteBuf) { + // Parse body document: read size, create a view of just the body bytes + int bodyStart = commandMessageByteBuf.position(); + int bodySizeInBytes = commandMessageByteBuf.getInt(); + int bodyEnd = bodyStart + bodySizeInBytes; + ByteBuf bodyByteBuf = commandMessageByteBuf.duplicate().position(bodyStart).limit(bodyEnd); + + List trackedResources = new ArrayList<>(); + commandMessageByteBuf.position(bodyEnd); + + // Parse any Type 1 (document sequence) sections that follow the body + Map sequences = new LinkedHashMap<>(); + while (commandMessageByteBuf.hasRemaining()) { + // Skip section type byte (we only support Type 1 here) + commandMessageByteBuf.position(commandMessageByteBuf.position() + 1); + + // Read section size and calculate bounds + int sequenceStart = commandMessageByteBuf.position(); + int sequenceSizeInBytes = commandMessageByteBuf.getInt(); + int sectionEnd = sequenceStart + sequenceSizeInBytes; + + // Read the field identifier (null-terminated string) + String fieldName = readCString(commandMessageByteBuf); + assertFalse(fieldName.contains(".")); + + // Create a view of just the document sequence bytes (after the identifier) + ByteBuf sequenceByteBuf = commandMessageByteBuf.duplicate(); + sequenceByteBuf.position(commandMessageByteBuf.position()).limit(sectionEnd); + sequences.put(fieldName, new SequenceField(sequenceByteBuf, trackedResources)); + commandMessageByteBuf.position(sectionEnd); } - return documents; + return new ByteBufBsonDocument(bodyByteBuf, trackedResources, sequences); } /** - * Create a ByteBufBsonDocument from a buffer positioned at the start of a BSON document. - * The provided buffer will be positioned at the end of the document upon normal completion of the method + * Creates a simple {@code ByteBufBsonDocument} from a byte buffer containing a single BSON document. + * + *

Use this constructor for standard BSON documents. For OP_MSG command messages with + * document sequences, use {@link #createCommandMessage(CompositeByteBuf)} instead.

+ * + * @param byteBuf The buffer containing the BSON document. The buffer should be positioned + * at the start of the document and contain the complete document bytes. + */ + @VisibleForTesting(otherwise = PACKAGE) + public ByteBufBsonDocument(final ByteBuf byteBuf) { + this(byteBuf, new ArrayList<>(), new HashMap<>()); + } + + /** + * Private constructor used by factory methods. + * + * @param bodyByteBuf The buffer containing the body document bytes + * @param trackedResources Mutable list for tracking resources to close + * @param sequenceFields Map of sequence field names to their data (empty for simple documents) */ - static ByteBufBsonDocument createOne(final ByteBuf outputByteBuf) { - int documentStart = outputByteBuf.position(); - int documentSizeInBytes = outputByteBuf.getInt(); - int documentEnd = documentStart + documentSizeInBytes; - ByteBuf slice = outputByteBuf.duplicate().position(documentStart).limit(documentEnd); - outputByteBuf.position(documentEnd); - return new ByteBufBsonDocument(slice); + private ByteBufBsonDocument(final ByteBuf bodyByteBuf, final List trackedResources, + final Map sequenceFields) { + this.bodyByteBuf = bodyByteBuf; + this.trackedResources = trackedResources; + this.sequenceFields = sequenceFields; + trackedResources.add(bodyByteBuf::release); } + // ==================== Size and Empty Checks ==================== + @Override - public String toJson() { - return toJson(JsonWriterSettings.builder().outputMode(JsonMode.RELAXED).build()); + public int size() { + ensureOpen(); + if (cachedDocument != null) { + return cachedDocument.size(); + } + // Total size = body fields + sequence fields + return countBodyFields() + sequenceFields.size(); } @Override - public String toJson(final JsonWriterSettings settings) { - StringWriter stringWriter = new StringWriter(); - JsonWriter jsonWriter = new JsonWriter(stringWriter, settings); - ByteBuf duplicate = byteBuf.duplicate(); - try (BsonBinaryReader reader = new BsonBinaryReader(new ByteBufferBsonInput(duplicate))) { - jsonWriter.pipe(reader); - return stringWriter.toString(); - } finally { - duplicate.release(); + public boolean isEmpty() { + ensureOpen(); + if (cachedDocument != null) { + return cachedDocument.isEmpty(); } + return !hasBodyFields() && sequenceFields.isEmpty(); } + // ==================== Key/Value Lookups ==================== + @Override - public BsonBinaryReader asBsonReader() { - return new BsonBinaryReader(new ByteBufferBsonInput(byteBuf.duplicate())); + public boolean containsKey(final Object key) { + ensureOpen(); + if (cachedDocument != null) { + return cachedDocument.containsKey(key); + } + if (key == null) { + throw new IllegalArgumentException("key can not be null"); + } + // Check sequence fields first (fast HashMap lookup), then scan body + if (sequenceFields.containsKey(key)) { + return true; + } + return findKeyInBody((String) key); } - @SuppressWarnings("MethodDoesntCallSuperMethod") @Override - public BsonDocument clone() { - byte[] clonedBytes = new byte[byteBuf.remaining()]; - byteBuf.get(byteBuf.position(), clonedBytes); - return new RawBsonDocument(clonedBytes); + public boolean containsValue(final Object value) { + ensureOpen(); + if (!(value instanceof BsonValue)) { + return false; + } + + if (cachedDocument != null) { + return cachedDocument.containsValue(value); + } + + // Search body fields first, then sequence fields + if (findValueInBody((BsonValue) value)) { + return true; + } + for (SequenceField field : sequenceFields.values()) { + if (field.containsValue(value)) { + return true; + } + } + return false; } + /** + * {@inheritDoc} + * + *

For sequence fields (OP_MSG document sequences), returns a {@link BsonArray} containing + * {@code ByteBufBsonDocument} instances for each document in the sequence.

+ */ @Nullable - T findInDocument(final Finder finder) { - ByteBuf duplicateByteBuf = byteBuf.duplicate(); - try (BsonBinaryReader bsonReader = new BsonBinaryReader(new ByteBufferBsonInput(duplicateByteBuf))) { - bsonReader.readStartDocument(); - while (bsonReader.readBsonType() != BsonType.END_OF_DOCUMENT) { - T found = finder.find(duplicateByteBuf, bsonReader); - if (found != null) { - return found; - } + @Override + public BsonValue get(final Object key) { + ensureOpen(); + notNull("key", key); + + if (!(key instanceof String)) { + return null; + } + if (cachedDocument != null) { + return cachedDocument.get(key); + } + + // Check sequence fields first, then body + if (sequenceFields.containsKey(key)) { + return sequenceFields.get(key).asArray(); + } + return getValueFromBody((String) key); + } + + @Override + public String getFirstKey() { + ensureOpen(); + if (cachedDocument != null) { + return cachedDocument.getFirstKey(); + } + if (cachedFirstKey != null) { + return cachedFirstKey; + } + cachedFirstKey = getFirstKeyFromBody(); + return assertNotNull(cachedFirstKey); + } + + // ==================== Collection Views ==================== + // These return lazy views that iterate over both body and sequence fields + + @Override + public Set> entrySet() { + ensureOpen(); + if (cachedDocument != null) { + return cachedDocument.entrySet(); + } + return new AbstractSet>() { + @Override + public Iterator> iterator() { + // Combine body entries with sequence entries + return new CombinedIterator<>(createBodyIterator(IteratorMode.ENTRIES), createSequenceEntryIterator()); } - bsonReader.readEndDocument(); - } finally { - duplicateByteBuf.release(); + + @Override + public int size() { + return ByteBufBsonDocument.this.size(); + } + }; + } + + @Override + public Collection values() { + ensureOpen(); + if (cachedDocument != null) { + return cachedDocument.values(); } + return new AbstractCollection() { + @Override + public Iterator iterator() { + return new CombinedIterator<>(createBodyIterator(IteratorMode.VALUES), createSequenceValueIterator()); + } - return finder.notFound(); + @Override + public int size() { + return ByteBufBsonDocument.this.size(); + } + }; } - BsonDocument toBaseBsonDocument() { - ByteBuf duplicateByteBuf = byteBuf.duplicate(); - try (BsonBinaryReader bsonReader = new BsonBinaryReader(new ByteBufferBsonInput(duplicateByteBuf))) { - return new BsonDocumentCodec().decode(bsonReader, DecoderContext.builder().build()); - } finally { - duplicateByteBuf.release(); + @Override + public Set keySet() { + ensureOpen(); + if (cachedDocument != null) { + return cachedDocument.keySet(); + } + return new AbstractSet() { + @Override + public Iterator iterator() { + return new CombinedIterator<>(createBodyIterator(IteratorMode.KEYS), sequenceFields.keySet().iterator()); + } + + @Override + public int size() { + return ByteBufBsonDocument.this.size(); + } + }; + } + + // ==================== Conversion Methods ==================== + + @Override + public BsonReader asBsonReader() { + ensureOpen(); + // Must hydrate first since we need to include sequence fields + return toBsonDocument().asBsonReader(); + } + + /** + * Converts this document to a regular {@link BsonDocument}, fully deserializing all data. + * + *

After this method is called:

+ *
    + *
  • The result is cached for future calls
  • + *
  • All underlying byte buffers are released
  • + *
  • Sequence field documents are hydrated to regular {@code BsonDocument} instances
  • + *
  • All subsequent read operations use the cached document
  • + *
+ * + * @return A fully materialized {@link BsonDocument} containing all fields + */ + @Override + public BsonDocument toBsonDocument() { + ensureOpen(); + if (cachedDocument == null) { + ByteBuf dup = bodyByteBuf.duplicate(); + try (BsonBinaryReader reader = new BsonBinaryReader(new ByteBufferBsonInput(dup))) { + // Decode body document + BsonDocument doc = new BsonDocumentCodec().decode(reader, DecoderContext.builder().build()); + // Add hydrated sequence fields + for (Map.Entry entry : sequenceFields.entrySet()) { + doc.put(entry.getKey(), entry.getValue().toHydratedArray()); + } + cachedDocument = doc; + // Release buffers since we no longer need them + releaseResources(); + } finally { + dup.release(); + } } + return cachedDocument; } - ByteBufBsonDocument(final ByteBuf byteBuf) { - this.byteBuf = byteBuf; + @Override + public String toJson() { + return toJson(JsonWriterSettings.builder().outputMode(JsonMode.RELAXED).build()); } @Override - public void clear() { - throw new UnsupportedOperationException("ByteBufBsonDocument instances are immutable"); + public String toJson(final JsonWriterSettings settings) { + ensureOpen(); + return toBsonDocument().toJson(settings); } + @Override + public String toString() { + ensureOpen(); + return toBsonDocument().toString(); + } + + @SuppressWarnings("MethodDoesntCallSuperMethod") + @Override + public BsonDocument clone() { + ensureOpen(); + return toBsonDocument().clone(); + } + + @SuppressWarnings("EqualsDoesntCheckParameterClass") + @Override + public boolean equals(final Object o) { + ensureOpen(); + return toBsonDocument().equals(o); + } + + @Override + public int hashCode() { + ensureOpen(); + return toBsonDocument().hashCode(); + } + + // ==================== Resource Management ==================== + + /** + * Releases all resources held by this document. + * + *

This includes:

+ *
    + *
  • Releasing all tracked {@link ByteBuf} instances
  • + *
  • Closing all nested {@code ByteBufBsonDocument} and {@link ByteBufBsonArray} instances
  • + *
  • Clearing internal references
  • + *
+ * + *

After calling this method, any operation on this document will throw + * {@link IllegalStateException}. This method is idempotent.

+ */ + @Override + public void close() { + if (!closed) { + closed = true; + releaseResources(); + } + } + + // ==================== Mutation Methods (Unsupported) ==================== + @Override public BsonValue put(final String key, final BsonValue value) { throw new UnsupportedOperationException("ByteBufBsonDocument instances are immutable"); @@ -170,260 +560,476 @@ public BsonValue remove(final Object key) { } @Override - public boolean isEmpty() { - return assertNotNull(findInDocument(new Finder() { - @Override - public Boolean find(final ByteBuf byteBuf, final BsonBinaryReader bsonReader) { - return false; - } - - @Override - public Boolean notFound() { - return true; - } - })); + public void clear() { + throw new UnsupportedOperationException("ByteBufBsonDocument instances are immutable"); } - @Override - public int size() { - return assertNotNull(findInDocument(new Finder() { - private int size; + // ==================== Private Body Field Operations ==================== + // These methods read from bodyByteBuf using a temporary duplicate buffer - @Override - @Nullable - public Integer find(final ByteBuf byteBuf, final BsonBinaryReader bsonReader) { - size++; - bsonReader.readName(); - bsonReader.skipValue(); - return null; + /** + * Searches the body for a field with the given key. + * Uses a duplicated buffer to avoid modifying the original position. + */ + private boolean findKeyInBody(final String key) { + ByteBuf dup = bodyByteBuf.duplicate(); + try (BsonBinaryReader reader = new BsonBinaryReader(new ByteBufferBsonInput(dup))) { + reader.readStartDocument(); + while (reader.readBsonType() != BsonType.END_OF_DOCUMENT) { + if (reader.readName().equals(key)) { + return true; + } + reader.skipValue(); } + return false; + } finally { + dup.release(); + } + } - @Override - public Integer notFound() { - return size; + /** + * Searches the body for a field with the given value. + * Creates ByteBufBsonDocument/ByteBufBsonArray for nested structures during comparison or vanilla BsonValues. + * Uses temporary tracking list to avoid polluting the main trackedResources with short-lived objects. + */ + private boolean findValueInBody(final BsonValue targetValue) { + ByteBuf dup = bodyByteBuf.duplicate(); + List tempTrackedResources = new ArrayList<>(); + try (BsonBinaryReader reader = new BsonBinaryReader(new ByteBufferBsonInput(dup))) { + reader.readStartDocument(); + while (reader.readBsonType() != BsonType.END_OF_DOCUMENT) { + reader.skipName(); + if (readBsonValue(dup, reader, tempTrackedResources).equals(targetValue)) { + return true; + } } - })); + return false; + } finally { + // Release temporary resources created during comparison + for (Closeable resource : tempTrackedResources) { + try { + resource.close(); + } catch (Exception e) { + // Continue closing other resources + } + } + dup.release(); + } } - @Override - public Set> entrySet() { - return new ByteBufBsonDocumentEntrySet(); + /** + * Retrieves a value from the body by key. + * Returns null if the key is not found in the body. + */ + @Nullable + private BsonValue getValueFromBody(final String key) { + ByteBuf dup = bodyByteBuf.duplicate(); + try (BsonBinaryReader reader = new BsonBinaryReader(new ByteBufferBsonInput(dup))) { + reader.readStartDocument(); + while (reader.readBsonType() != BsonType.END_OF_DOCUMENT) { + if (reader.readName().equals(key)) { + return readBsonValue(dup, reader, trackedResources); + } + reader.skipValue(); + } + return null; + } finally { + dup.release(); + } } - @Override - public Collection values() { - return new ByteBufBsonDocumentValuesCollection(); + /** + * Gets the first key from the body, or from sequence fields if body is empty. + * Throws NoSuchElementException if the document is completely empty. + */ + private String getFirstKeyFromBody() { + ByteBuf dup = bodyByteBuf.duplicate(); + try (BsonBinaryReader reader = new BsonBinaryReader(new ByteBufferBsonInput(dup))) { + reader.readStartDocument(); + if (reader.readBsonType() != BsonType.END_OF_DOCUMENT) { + return reader.readName(); + } + // Body is empty, try sequence fields + if (!sequenceFields.isEmpty()) { + return sequenceFields.keySet().iterator().next(); + } + throw new NoSuchElementException(); + } finally { + dup.release(); + } } - @Override - public Set keySet() { - return new ByteBufBsonDocumentKeySet(); + /** + * Checks if the body contains at least one field. + */ + private boolean hasBodyFields() { + ByteBuf dup = bodyByteBuf.duplicate(); + try (BsonBinaryReader reader = new BsonBinaryReader(new ByteBufferBsonInput(dup))) { + reader.readStartDocument(); + return reader.readBsonType() != BsonType.END_OF_DOCUMENT; + } finally { + dup.release(); + } } - @Override - public boolean containsKey(final Object key) { - if (key == null) { - throw new IllegalArgumentException("key can not be null"); + /** + * Counts the number of fields in the body document. + */ + private int countBodyFields() { + ByteBuf dup = bodyByteBuf.duplicate(); + try (BsonBinaryReader reader = new BsonBinaryReader(new ByteBufferBsonInput(dup))) { + reader.readStartDocument(); + int count = 0; + while (reader.readBsonType() != BsonType.END_OF_DOCUMENT) { + count++; + reader.skipName(); + reader.skipValue(); + } + return count; + } finally { + dup.release(); } + } + + // ==================== Iterator Support ==================== + + /** + * Mode for the body iterator, determining what type of elements it produces. + */ + private enum IteratorMode { ENTRIES, KEYS, VALUES } + + /** + * Creates an iterator over the body document fields. + * + *

The iterator creates a duplicated ByteBuf that is temporarily tracked for safety. + * When iteration completes normally, the buffer is released immediately and removed from tracking. + * This prevents accumulation of finished iterator buffers while ensuring cleanup if the parent + * document is closed before iteration completes.

+ * + * @param mode Determines whether to return entries, keys, or values + * @return An iterator of the appropriate type + */ + @SuppressWarnings("unchecked") + private Iterator createBodyIterator(final IteratorMode mode) { + return new Iterator() { + private ByteBuf duplicatedByteBuf; + private BsonBinaryReader reader; + private Closeable resourceHandle; + private boolean started; + private boolean finished; + + { + // Create duplicate buffer for iteration and track it temporarily + duplicatedByteBuf = bodyByteBuf.duplicate(); + resourceHandle = () -> { + if (duplicatedByteBuf != null) { + try { + if (reader != null) { + reader.close(); + } + } catch (Exception e) { + // Ignore + } + duplicatedByteBuf.release(); + duplicatedByteBuf = null; + reader = null; + } + }; + trackedResources.add(resourceHandle); + reader = new BsonBinaryReader(new ByteBufferBsonInput(duplicatedByteBuf)); + } - Boolean containsKey = findInDocument(new Finder() { @Override - public Boolean find(final ByteBuf byteBuf, final BsonBinaryReader bsonReader) { - if (bsonReader.readName().equals(key)) { - return true; + public boolean hasNext() { + if (finished) { + return false; } - bsonReader.skipValue(); - return null; + if (!started) { + reader.readStartDocument(); + reader.readBsonType(); + started = true; + } + boolean hasNext = reader.getCurrentBsonType() != BsonType.END_OF_DOCUMENT; + if (!hasNext) { + cleanup(); + } + return hasNext; } @Override - public Boolean notFound() { - return false; + public T next() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + String key = reader.readName(); + BsonValue value = readBsonValue(duplicatedByteBuf, reader, trackedResources); + reader.readBsonType(); + + switch (mode) { + case ENTRIES: + return (T) new AbstractMap.SimpleImmutableEntry<>(key, value); + case KEYS: + return (T) key; + case VALUES: + return (T) value; + default: + throw new IllegalStateException("Unknown iterator mode: " + mode); + } } - }); - return containsKey != null ? containsKey : false; + + private void cleanup() { + if (!finished) { + finished = true; + // Remove from tracked resources since we're cleaning up immediately + trackedResources.remove(resourceHandle); + try { + resourceHandle.close(); + } catch (Exception e) { + // Ignore + } + } + } + }; } - @Override - public boolean containsValue(final Object value) { - Boolean containsValue = findInDocument(new Finder() { + /** + * Creates an iterator over sequence fields as map entries. + * Each entry contains the field name and its array value. + */ + private Iterator> createSequenceEntryIterator() { + Iterator> iter = sequenceFields.entrySet().iterator(); + return new Iterator>() { @Override - public Boolean find(final ByteBuf byteBuf, final BsonBinaryReader bsonReader) { - bsonReader.skipName(); - if (readBsonValue(byteBuf, bsonReader).equals(value)) { - return true; - } - return null; + public boolean hasNext() { + return iter.hasNext(); } @Override - public Boolean notFound() { - return false; + public Entry next() { + Map.Entry entry = iter.next(); + return new AbstractMap.SimpleImmutableEntry<>(entry.getKey(), entry.getValue().asArray()); } - }); - return containsValue != null ? containsValue : false; + }; } - @Nullable - @Override - public BsonValue get(final Object key) { - notNull("key", key); - return findInDocument(new Finder() { + /** + * Creates an iterator over sequence field values (arrays). + */ + private Iterator createSequenceValueIterator() { + Iterator iter = sequenceFields.values().iterator(); + return new Iterator() { @Override - public BsonValue find(final ByteBuf byteBuf, final BsonBinaryReader bsonReader) { - if (bsonReader.readName().equals(key)) { - return readBsonValue(byteBuf, bsonReader); - } - bsonReader.skipValue(); - return null; + public boolean hasNext() { + return iter.hasNext(); } - @Nullable @Override - public BsonValue notFound() { - return null; + public BsonValue next() { + return iter.next().asArray(); } - }); + }; } + // ==================== Resource Management Helpers ==================== + /** - * Gets the first key in this document. + * Releases all tracked resources and clears internal state. * - * @return the first key in this document - * @throws java.util.NoSuchElementException if the document is empty + *

Called by {@link #close()} and after {@link #toBsonDocument()} caches the result. + * Resources include ByteBuf instances and nested ByteBufBsonDocument/ByteBufBsonArray.

*/ - public String getFirstKey() { - return assertNotNull(findInDocument(new Finder() { - @Override - public String find(final ByteBuf byteBuf, final BsonBinaryReader bsonReader) { - return bsonReader.readName(); + private void releaseResources() { + for (Closeable resource : trackedResources) { + try { + resource.close(); + } catch (Exception e) { + // Log and continue closing other resources } + } - @Override - public String notFound() { - throw new NoSuchElementException(); - } - })); + assertTrue(bodyByteBuf == null || bodyByteBuf.getReferenceCount() == 0, "Failed to release all `bodyByteBuf` resources"); + assertTrue(sequenceFields.values().stream().allMatch(b -> b.sequenceByteBuf.getReferenceCount() == 0), + "Failed to release all `sequenceField` resources"); + + trackedResources.clear(); + sequenceFields = emptyMap(); + bodyByteBuf = null; + cachedFirstKey = null; } - private interface Finder { - @Nullable - T find(ByteBuf byteBuf, BsonBinaryReader bsonReader); - @Nullable - T notFound(); + /** + * Throws IllegalStateException if this document has been closed. + */ + private void ensureOpen() { + if (closed) { + throw new IllegalStateException("The BsonDocument resources have been released."); + } + } + + // ==================== Utility Methods ==================== + + /** + * Reads a null-terminated C-string from the buffer. + * Used for parsing OP_MSG sequence identifiers. + */ + private static String readCString(final ByteBuf byteBuf) { + ByteArrayOutputStream bytes = new ByteArrayOutputStream(); + byte b = byteBuf.get(); + while (b != 0) { + bytes.write(b); + b = byteBuf.get(); + } + try { + return bytes.toString(StandardCharsets.UTF_8.name()); + } catch (UnsupportedEncodingException e) { + throw new MongoInternalException("Unexpected exception", e); + } } - // see https://docs.oracle.com/javase/6/docs/platform/serialization/spec/output.html + /** + * Serialization support: converts to a regular BsonDocument before serialization. + */ private Object writeReplace() { - return toBaseBsonDocument(); + ensureOpen(); + return toBsonDocument(); } - // see https://docs.oracle.com/javase/6/docs/platform/serialization/spec/input.html private void readObject(final ObjectInputStream stream) throws InvalidObjectException { throw new InvalidObjectException("Proxy required"); } - private class ByteBufBsonDocumentEntrySet extends AbstractSet> { - @Override - public Iterator> iterator() { - return new Iterator>() { - private final ByteBuf duplicatedByteBuf = byteBuf.duplicate(); - private final BsonBinaryReader bsonReader; - - { - bsonReader = new BsonBinaryReader(new ByteBufferBsonInput(duplicatedByteBuf)); - bsonReader.readStartDocument(); - bsonReader.readBsonType(); - } - - @Override - public boolean hasNext() { - return bsonReader.getCurrentBsonType() != BsonType.END_OF_DOCUMENT; - } + // ==================== Inner Classes ==================== - @Override - public Entry next() { - if (!hasNext()) { - throw new NoSuchElementException(); - } - String key = bsonReader.readName(); - BsonValue value = readBsonValue(duplicatedByteBuf, bsonReader); - bsonReader.readBsonType(); - return new AbstractMap.SimpleEntry<>(key, value); - } + /** + * Represents an OP_MSG Type 1 document sequence section. + * + *

A sequence field contains a contiguous series of BSON documents in the buffer. + * When accessed via {@link #asArray()}, it returns a {@link BsonArray} containing + * {@link ByteBufBsonDocument} instances for each document.

+ * + *

The documents are lazily parsed on first access and cached for subsequent calls.

+ */ + private static final class SequenceField { + /** Buffer containing the sequence of BSON documents */ + private final ByteBuf sequenceByteBuf; - }; - } + /** Reference to parent's tracked resources for registering created documents */ + private final List trackedResources; - @Override - public boolean isEmpty() { - return !iterator().hasNext(); - } + /** Cached list of parsed documents, populated on first access */ + private List documents; - @Override - public int size() { - return ByteBufBsonDocument.this.size(); + SequenceField(final ByteBuf sequenceByteBuf, final List trackedResources) { + this.sequenceByteBuf = sequenceByteBuf; + this.trackedResources = trackedResources; + trackedResources.add(sequenceByteBuf::release); } - } - private class ByteBufBsonDocumentKeySet extends AbstractSet { - @SuppressWarnings("MismatchedQueryAndUpdateOfCollection") - private final Set> entrySet = new ByteBufBsonDocumentEntrySet(); - - @Override - public Iterator iterator() { - final Iterator> entrySetIterator = entrySet.iterator(); - return new Iterator() { - @Override - public boolean hasNext() { - return entrySetIterator.hasNext(); - } + /** + * Returns this sequence as a BsonArray of ByteBufBsonDocument instances. + * + *

On first call, parses the buffer to create ByteBufBsonDocument for each + * document and registers them with the parent's tracked resources.

+ * + * @return A BsonArray containing the sequence documents + */ + BsonValue asArray() { + if (documents == null) { + documents = new ArrayList<>(); + ByteBuf dup = sequenceByteBuf.duplicate(); + try { + while (dup.hasRemaining()) { + // Read document size to determine bounds + int docStart = dup.position(); + int docSize = dup.getInt(); + int docEnd = docStart + docSize; - @Override - public String next() { - return entrySetIterator.next().getKey(); + // Create a view of just this document's bytes + ByteBuf docBuf = sequenceByteBuf.duplicate().position(docStart).limit(docEnd); + ByteBufBsonDocument doc = new ByteBufBsonDocument(docBuf); + // Track for cleanup when parent is closed + trackedResources.add(doc); + documents.add(doc); + dup.position(docEnd); + } + } finally { + dup.release(); } - }; + } + // Return a new array each time to prevent external modification of cached list + return new BsonArray(new ArrayList<>(documents)); } - @Override - public boolean isEmpty() { - return entrySet.isEmpty(); + /** + * Checks if this sequence contains the given value. + */ + boolean containsValue(final Object value) { + return value instanceof BsonValue && asArray().asArray().contains(value); } - @Override - public int size() { - return entrySet.size(); + /** + * Converts this sequence to a BsonArray of regular BsonDocument instances. + * + *

Used by {@link ByteBufBsonDocument#toBsonDocument()} to fully hydrate the document. + * Unlike {@link #asArray()}, this creates regular BsonDocument instances, not + * ByteBufBsonDocument wrappers.

+ * + * @return A BsonArray containing fully deserialized BsonDocument instances + */ + BsonArray toHydratedArray() { + ByteBuf dup = sequenceByteBuf.duplicate(); + try { + List hydratedDocs = new ArrayList<>(); + while (dup.hasRemaining()) { + int docStart = dup.position(); + int docSize = dup.getInt(); + int docEnd = docStart + docSize; + ByteBuf docBuf = dup.duplicate().position(docStart).limit(docEnd); + try (BsonBinaryReader reader = new BsonBinaryReader(new ByteBufferBsonInput(docBuf))) { + hydratedDocs.add(new BsonDocumentCodec().decode(reader, DecoderContext.builder().build())); + } finally { + docBuf.release(); + } + dup.position(docEnd); + } + return new BsonArray(hydratedDocs); + } finally { + dup.release(); + } } } - private class ByteBufBsonDocumentValuesCollection extends AbstractCollection { - @SuppressWarnings("MismatchedQueryAndUpdateOfCollection") - private final Set> entrySet = new ByteBufBsonDocumentEntrySet(); - - @Override - public Iterator iterator() { - final Iterator> entrySetIterator = entrySet.iterator(); - return new Iterator() { - @Override - public boolean hasNext() { - return entrySetIterator.hasNext(); - } + /** + * An iterator that combines two iterators sequentially. + * + *

Used to merge body field iteration with sequence field iteration, + * presenting a unified view of all document fields.

+ * + * @param The type of elements returned by the iterator + */ + private static final class CombinedIterator implements Iterator { + private final Iterator primary; + private final Iterator secondary; - @Override - public BsonValue next() { - return entrySetIterator.next().getValue(); - } - }; + CombinedIterator(final Iterator primary, final Iterator secondary) { + this.primary = primary; + this.secondary = secondary; } @Override - public boolean isEmpty() { - return entrySet.isEmpty(); + public boolean hasNext() { + return primary.hasNext() || secondary.hasNext(); } + @Override - public int size() { - return entrySet.size(); + public T next() { + if (primary.hasNext()) { + return primary.next(); + } + if (secondary.hasNext()) { + return secondary.next(); + } + throw new NoSuchElementException(); } } } diff --git a/driver-core/src/main/com/mongodb/internal/connection/ByteBufBsonHelper.java b/driver-core/src/main/com/mongodb/internal/connection/ByteBufBsonHelper.java index 55054112bf2..4d4d4846afa 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/ByteBufBsonHelper.java +++ b/driver-core/src/main/com/mongodb/internal/connection/ByteBufBsonHelper.java @@ -38,18 +38,25 @@ import org.bson.codecs.BsonDocumentCodec; import org.bson.codecs.DecoderContext; +import java.io.Closeable; +import java.util.List; + final class ByteBufBsonHelper { - static BsonValue readBsonValue(final ByteBuf byteBuf, final BsonBinaryReader bsonReader) { + static BsonValue readBsonValue(final ByteBuf byteBuf, final BsonBinaryReader bsonReader, final List trackedResources) { BsonValue value; switch (bsonReader.getCurrentBsonType()) { case DOCUMENT: ByteBuf documentByteBuf = byteBuf.duplicate(); - value = new ByteBufBsonDocument(documentByteBuf); + ByteBufBsonDocument document = new ByteBufBsonDocument(documentByteBuf); + trackedResources.add(document); + value = document; bsonReader.skipValue(); break; case ARRAY: ByteBuf arrayByteBuf = byteBuf.duplicate(); - value = new ByteBufBsonArray(arrayByteBuf); + ByteBufBsonArray array = new ByteBufBsonArray(arrayByteBuf); + trackedResources.add(array); + value = array; bsonReader.skipValue(); break; case INT32: diff --git a/driver-core/src/main/com/mongodb/internal/connection/CommandMessage.java b/driver-core/src/main/com/mongodb/internal/connection/CommandMessage.java index 348349fd18c..6f300dc226b 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/CommandMessage.java +++ b/driver-core/src/main/com/mongodb/internal/connection/CommandMessage.java @@ -17,17 +17,16 @@ package com.mongodb.internal.connection; import com.mongodb.MongoClientException; -import com.mongodb.MongoInternalException; import com.mongodb.MongoNamespace; import com.mongodb.ReadPreference; import com.mongodb.ServerApi; import com.mongodb.connection.ClusterConnectionMode; import com.mongodb.internal.MongoNamespaceHelper; +import com.mongodb.internal.ResourceUtil; import com.mongodb.internal.TimeoutContext; import com.mongodb.internal.connection.MessageSequences.EmptyMessageSequences; import com.mongodb.internal.session.SessionContext; import com.mongodb.lang.Nullable; -import org.bson.BsonArray; import org.bson.BsonBinaryWriter; import org.bson.BsonBoolean; import org.bson.BsonDocument; @@ -38,9 +37,6 @@ import org.bson.FieldNameValidator; import org.bson.io.BsonOutput; -import java.io.ByteArrayOutputStream; -import java.io.UnsupportedEncodingException; -import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.List; @@ -61,8 +57,6 @@ import static com.mongodb.internal.connection.BsonWriterHelper.encodeUsingRegistry; import static com.mongodb.internal.connection.BsonWriterHelper.writeDocumentsOfDualMessageSequences; import static com.mongodb.internal.connection.BsonWriterHelper.writePayload; -import static com.mongodb.internal.connection.ByteBufBsonDocument.createList; -import static com.mongodb.internal.connection.ByteBufBsonDocument.createOne; import static com.mongodb.internal.connection.ReadConcernHelper.getReadConcernDocument; import static com.mongodb.internal.operation.ServerVersionHelper.UNKNOWN_WIRE_VERSION; @@ -143,74 +137,26 @@ public final class CommandMessage extends RequestMessage { } /** - * Create a BsonDocument representing the logical document encoded by an OP_MSG. + * Create a ByteBufBsonDocument representing the logical document encoded by an OP_MSG. *

* The returned document will contain all the fields from the `PAYLOAD_TYPE_0_DOCUMENT` section, as well as all fields represented by * `PAYLOAD_TYPE_1_DOCUMENT_SEQUENCE` sections. + * + *

Note: This document MUST be closed after use, otherwise when using Netty it could report the leaking of resources when the + * underlying {@code byteBuf's} are garbage collected */ - BsonDocument getCommandDocument(final ByteBufferBsonOutput bsonOutput) { + ByteBufBsonDocument getCommandDocument(final ByteBufferBsonOutput bsonOutput) { List byteBuffers = bsonOutput.getByteBuffers(); try { - CompositeByteBuf byteBuf = new CompositeByteBuf(byteBuffers); + CompositeByteBuf compositeByteBuf = new CompositeByteBuf(byteBuffers); try { - byteBuf.position(firstDocumentPosition); - ByteBufBsonDocument byteBufBsonDocument = createOne(byteBuf); - - // If true, it means there is at least one `PAYLOAD_TYPE_1_DOCUMENT_SEQUENCE` section in the OP_MSG - if (byteBuf.hasRemaining()) { - BsonDocument commandBsonDocument = byteBufBsonDocument.toBaseBsonDocument(); - - // Each loop iteration processes one Document Sequence - // When there are no more bytes remaining, there are no more Document Sequences - while (byteBuf.hasRemaining()) { - // skip reading the payload type, we know it is `PAYLOAD_TYPE_1` - byteBuf.position(byteBuf.position() + 1); - int sequenceStart = byteBuf.position(); - int sequenceSizeInBytes = byteBuf.getInt(); - int sectionEnd = sequenceStart + sequenceSizeInBytes; - - String fieldName = getSequenceIdentifier(byteBuf); - // If this assertion fires, it means that the driver has started using document sequences for nested fields. If - // so, this method will need to change in order to append the value to the correct nested document. - assertFalse(fieldName.contains(".")); - - ByteBuf documentsByteBufSlice = byteBuf.duplicate().limit(sectionEnd); - try { - commandBsonDocument.append(fieldName, new BsonArray(createList(documentsByteBufSlice))); - } finally { - documentsByteBufSlice.release(); - } - byteBuf.position(sectionEnd); - } - return commandBsonDocument; - } else { - return byteBufBsonDocument; - } + compositeByteBuf.position(firstDocumentPosition); + return ByteBufBsonDocument.createCommandMessage(compositeByteBuf); } finally { - byteBuf.release(); + compositeByteBuf.release(); } } finally { - byteBuffers.forEach(ByteBuf::release); - } - } - - /** - * Get the field name from a buffer positioned at the start of the document sequence identifier of an OP_MSG Section of type - * `PAYLOAD_TYPE_1_DOCUMENT_SEQUENCE`. - *

- * Upon normal completion of the method, the buffer will be positioned at the start of the first BSON object in the sequence. - */ - private String getSequenceIdentifier(final ByteBuf byteBuf) { - ByteArrayOutputStream sequenceIdentifierBytes = new ByteArrayOutputStream(); - byte curByte = byteBuf.get(); - while (curByte != 0) { - sequenceIdentifierBytes.write(curByte); - curByte = byteBuf.get(); - } - try { - return sequenceIdentifierBytes.toString(StandardCharsets.UTF_8.name()); - } catch (UnsupportedEncodingException e) { - throw new MongoInternalException("Unexpected exception", e); + ResourceUtil.release(byteBuffers); } } diff --git a/driver-core/src/main/com/mongodb/internal/connection/InternalStreamConnection.java b/driver-core/src/main/com/mongodb/internal/connection/InternalStreamConnection.java index 6b20c467191..aeef4e0a6a1 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/InternalStreamConnection.java +++ b/driver-core/src/main/com/mongodb/internal/connection/InternalStreamConnection.java @@ -53,12 +53,10 @@ import com.mongodb.internal.session.SessionContext; import com.mongodb.internal.time.Timeout; import com.mongodb.lang.Nullable; -import org.bson.BsonBinaryReader; import org.bson.BsonDocument; import org.bson.ByteBuf; import org.bson.codecs.BsonDocumentCodec; import org.bson.codecs.Decoder; -import org.bson.io.ByteBufferBsonInput; import java.io.IOException; import java.net.SocketTimeoutException; @@ -444,46 +442,44 @@ private T sendAndReceiveInternal(final CommandMessage message, final Decoder Span tracingSpan; try (ByteBufferBsonOutput bsonOutput = new ByteBufferBsonOutput(this)) { message.encode(bsonOutput, operationContext); - tracingSpan = operationContext - .getTracingManager() - .createTracingSpan(message, - operationContext, - () -> message.getCommandDocument(bsonOutput), - cmdName -> SECURITY_SENSITIVE_COMMANDS.contains(cmdName) - || SECURITY_SENSITIVE_HELLO_COMMANDS.contains(cmdName), - () -> getDescription().getServerAddress(), - () -> getDescription().getConnectionId() - ); - - boolean isLoggingCommandNeeded = isLoggingCommandNeeded(); - boolean isTracingCommandPayloadNeeded = tracingSpan != null && operationContext.getTracingManager().isCommandPayloadEnabled(); - - // Only hydrate the command document if necessary - BsonDocument commandDocument = null; - if (isLoggingCommandNeeded || isTracingCommandPayloadNeeded) { - commandDocument = message.getCommandDocument(bsonOutput); - } - if (isLoggingCommandNeeded) { - commandEventSender = new LoggingCommandEventSender( - SECURITY_SENSITIVE_COMMANDS, SECURITY_SENSITIVE_HELLO_COMMANDS, description, commandListener, - operationContext, message, commandDocument, - COMMAND_PROTOCOL_LOGGER, loggerSettings); - commandEventSender.sendStartedEvent(); - } else { - commandEventSender = new NoOpCommandEventSender(); - } - if (isTracingCommandPayloadNeeded) { - tracingSpan.tagHighCardinality(QUERY_TEXT.asString(), commandDocument); - } + try (ByteBufBsonDocument commandDocument = message.getCommandDocument(bsonOutput)) { + tracingSpan = operationContext + .getTracingManager() + .createTracingSpan(message, + operationContext, + commandDocument, + cmdName -> SECURITY_SENSITIVE_COMMANDS.contains(cmdName) + || SECURITY_SENSITIVE_HELLO_COMMANDS.contains(cmdName), + () -> getDescription().getServerAddress(), + () -> getDescription().getConnectionId() + ); + + boolean isLoggingCommandNeeded = isLoggingCommandNeeded(); + + if (isLoggingCommandNeeded) { + commandEventSender = new LoggingCommandEventSender( + SECURITY_SENSITIVE_COMMANDS, SECURITY_SENSITIVE_HELLO_COMMANDS, description, commandListener, + operationContext, message, commandDocument, + COMMAND_PROTOCOL_LOGGER, loggerSettings); + commandEventSender.sendStartedEvent(); + } else { + commandEventSender = new NoOpCommandEventSender(); + } - try { - sendCommandMessage(message, bsonOutput, operationContext); - } catch (Exception e) { - if (tracingSpan != null) { - tracingSpan.error(e); + boolean isTracingCommandPayloadNeeded = tracingSpan != null && operationContext.getTracingManager().isCommandPayloadEnabled(); + if (isTracingCommandPayloadNeeded) { + tracingSpan.tagHighCardinality(QUERY_TEXT.asString(), commandDocument); + } + + try { + sendCommandMessage(commandDocument.getFirstKey(), message, bsonOutput, operationContext); + } catch (Exception e) { + if (tracingSpan != null) { + tracingSpan.error(e); + } + commandEventSender.sendFailedEvent(e); + throw e; } - commandEventSender.sendFailedEvent(e); - throw e; } } @@ -502,7 +498,9 @@ private T sendAndReceiveInternal(final CommandMessage message, final Decoder public void send(final CommandMessage message, final Decoder decoder, final OperationContext operationContext) { try (ByteBufferBsonOutput bsonOutput = new ByteBufferBsonOutput(this)) { message.encode(bsonOutput, operationContext); - sendCommandMessage(message, bsonOutput, operationContext); + try (ByteBufBsonDocument commandDocument = message.getCommandDocument(bsonOutput)) { + sendCommandMessage(commandDocument.getFirstKey(), message, bsonOutput, operationContext); + } if (message.isResponseExpected()) { hasMoreToCome = true; } @@ -520,27 +518,41 @@ public boolean hasMoreToCome() { return hasMoreToCome; } - private void sendCommandMessage(final CommandMessage message, final ByteBufferBsonOutput bsonOutput, - final OperationContext operationContext) { + private void sendCommandMessage(final String commandName, final CommandMessage message, + final ByteBufferBsonOutput bsonOutput, final OperationContext operationContext) { + List messageByteBuffers = getMessageByteBuffers(commandName, message, bsonOutput, operationContext); + try { + Timeout.onExistsAndExpired(operationContext.getTimeoutContext().timeoutIncludingRoundTrip(), () -> { + throw TimeoutContext.createMongoRoundTripTimeoutException(); + }); + sendMessage(messageByteBuffers, message.getId(), operationContext); + } finally { + ResourceUtil.release(messageByteBuffers); + } + responseTo = message.getId(); + } + private List getMessageByteBuffers(final String commandName, final CommandMessage message, + final ByteBufferBsonOutput bsonOutput, final OperationContext operationContext) { Compressor localSendCompressor = sendCompressor; - if (localSendCompressor == null || SECURITY_SENSITIVE_COMMANDS.contains(message.getCommandDocument(bsonOutput).getFirstKey())) { - trySendMessage(message, bsonOutput, operationContext); + List messageByteBuffers; + // Check if compressed + if (localSendCompressor == null || SECURITY_SENSITIVE_COMMANDS.contains(commandName)) { + messageByteBuffers = bsonOutput.getByteBuffers(); } else { - ByteBufferBsonOutput compressedBsonOutput; List byteBuffers = bsonOutput.getByteBuffers(); try { CompressedMessage compressedMessage = new CompressedMessage(message.getOpCode(), byteBuffers, localSendCompressor, getMessageSettings(description, initialServerDescription)); - compressedBsonOutput = new ByteBufferBsonOutput(this); - compressedMessage.encode(compressedBsonOutput, operationContext); + try (ByteBufferBsonOutput compressedBsonOutput = new ByteBufferBsonOutput(this)) { + compressedMessage.encode(compressedBsonOutput, operationContext); + messageByteBuffers = compressedBsonOutput.getByteBuffers(); + } } finally { ResourceUtil.release(byteBuffers); - bsonOutput.close(); } - trySendMessage(message, compressedBsonOutput, operationContext); } - responseTo = message.getId(); + return messageByteBuffers; } private void trySendMessage(final CommandMessage message, final ByteBufferBsonOutput bsonOutput, @@ -598,60 +610,54 @@ private T receiveCommandMessageResponse(final Decoder decoder, final Comm } private void sendAndReceiveAsyncInternal(final CommandMessage message, final Decoder decoder, - final OperationContext operationContext, final SingleResultCallback callback) { + final OperationContext operationContext, final SingleResultCallback callback) { if (isClosed()) { callback.onResult(null, new MongoSocketClosedException("Can not read from a closed socket", getServerAddress())); return; } + // Async try with resources release after the write ByteBufferBsonOutput bsonOutput = new ByteBufferBsonOutput(this); - ByteBufferBsonOutput compressedBsonOutput = new ByteBufferBsonOutput(this); - try { message.encode(bsonOutput, operationContext); + String commandName; CommandEventSender commandEventSender; - if (isLoggingCommandNeeded()) { - BsonDocument commandDocument = message.getCommandDocument(bsonOutput); - commandEventSender = new LoggingCommandEventSender( - SECURITY_SENSITIVE_COMMANDS, SECURITY_SENSITIVE_HELLO_COMMANDS, description, commandListener, - operationContext, message, commandDocument, - COMMAND_PROTOCOL_LOGGER, loggerSettings); - } else { - commandEventSender = new NoOpCommandEventSender(); - } - - commandEventSender.sendStartedEvent(); - Compressor localSendCompressor = sendCompressor; - if (localSendCompressor == null || SECURITY_SENSITIVE_COMMANDS.contains(message.getCommandDocument(bsonOutput).getFirstKey())) { - sendCommandMessageAsync(message.getId(), decoder, operationContext, callback, bsonOutput, commandEventSender, - message.isResponseExpected()); - } else { - List byteBuffers = bsonOutput.getByteBuffers(); - try { - CompressedMessage compressedMessage = new CompressedMessage(message.getOpCode(), byteBuffers, localSendCompressor, - getMessageSettings(description, initialServerDescription)); - compressedMessage.encode(compressedBsonOutput, operationContext); - } finally { - ResourceUtil.release(byteBuffers); - bsonOutput.close(); + try (ByteBufBsonDocument commandDocument = message.getCommandDocument(bsonOutput)) { + commandName = commandDocument.getFirstKey(); + if (isLoggingCommandNeeded()) { + commandEventSender = new LoggingCommandEventSender( + SECURITY_SENSITIVE_COMMANDS, SECURITY_SENSITIVE_HELLO_COMMANDS, description, commandListener, + operationContext, message, commandDocument, + COMMAND_PROTOCOL_LOGGER, loggerSettings); + } else { + commandEventSender = new NoOpCommandEventSender(); } - sendCommandMessageAsync(message.getId(), decoder, operationContext, callback, compressedBsonOutput, commandEventSender, - message.isResponseExpected()); + commandEventSender.sendStartedEvent(); } + + List messageByteBuffers = getMessageByteBuffers(commandName, message, bsonOutput, operationContext); + sendCommandMessageAsync(messageByteBuffers, message.getId(), decoder, operationContext, + commandEventSender, message.isResponseExpected(), (r, t) -> { + ResourceUtil.release(messageByteBuffers); + bsonOutput.close(); // Close AFTER async write completes + if (t != null) { + callback.onResult(null, t); + } else { + callback.onResult(r, null); + } + }); } catch (Throwable t) { bsonOutput.close(); - compressedBsonOutput.close(); callback.onResult(null, t); } } - private void sendCommandMessageAsync(final int messageId, final Decoder decoder, final OperationContext operationContext, - final SingleResultCallback callback, final ByteBufferBsonOutput bsonOutput, - final CommandEventSender commandEventSender, final boolean responseExpected) { + private void sendCommandMessageAsync(final List messageByteBuffers, final int messageId, final Decoder decoder, + final OperationContext operationContext, final CommandEventSender commandEventSender, + final boolean responseExpected, final SingleResultCallback callback) { boolean[] shouldReturn = {false}; Timeout.onExistsAndExpired(operationContext.getTimeoutContext().timeoutIncludingRoundTrip(), () -> { - bsonOutput.close(); MongoOperationTimeoutException operationTimeoutException = TimeoutContext.createMongoRoundTripTimeoutException(); commandEventSender.sendFailedEvent(operationTimeoutException); callback.onResult(null, operationTimeoutException); @@ -661,10 +667,7 @@ private void sendCommandMessageAsync(final int messageId, final Decoder d return; } - List byteBuffers = bsonOutput.getByteBuffers(); - sendMessageAsync(byteBuffers, messageId, operationContext, (result, t) -> { - ResourceUtil.release(byteBuffers); - bsonOutput.close(); + sendMessageAsync(messageByteBuffers, messageId, operationContext, (result, t) -> { if (t != null) { commandEventSender.sendFailedEvent(t); callback.onResult(null, t); @@ -682,18 +685,16 @@ private void sendCommandMessageAsync(final int messageId, final Decoder d T commandResult; try { updateSessionContext(operationContext.getSessionContext(), responseBuffers); - boolean commandOk = - isCommandOk(new BsonBinaryReader(new ByteBufferBsonInput(responseBuffers.getBodyByteBuffer()))); - responseBuffers.reset(); - if (!commandOk) { + + if (!isCommandOk(responseBuffers)) { MongoException commandFailureException = getCommandFailureException( responseBuffers.getResponseDocument(messageId, new BsonDocumentCodec()), description.getServerAddress(), operationContext.getTimeoutContext()); commandEventSender.sendFailedEvent(commandFailureException); throw commandFailureException; } - commandEventSender.sendSucceededEvent(responseBuffers); + commandEventSender.sendSucceededEvent(responseBuffers); commandResult = getCommandResult(decoder, responseBuffers, messageId, operationContext.getTimeoutContext()); } catch (Throwable localThrowable) { callback.onResult(null, localThrowable); diff --git a/driver-core/src/main/com/mongodb/internal/observability/micrometer/TracingManager.java b/driver-core/src/main/com/mongodb/internal/observability/micrometer/TracingManager.java index 2dadd11efec..b26cb396e7b 100644 --- a/driver-core/src/main/com/mongodb/internal/observability/micrometer/TracingManager.java +++ b/driver-core/src/main/com/mongodb/internal/observability/micrometer/TracingManager.java @@ -37,6 +37,7 @@ import static com.mongodb.internal.observability.micrometer.MongodbObservation.LowCardinalityKeyNames.CLIENT_CONNECTION_ID; import static com.mongodb.internal.observability.micrometer.MongodbObservation.LowCardinalityKeyNames.COLLECTION; import static com.mongodb.internal.observability.micrometer.MongodbObservation.LowCardinalityKeyNames.COMMAND_NAME; +import static com.mongodb.internal.observability.micrometer.MongodbObservation.LowCardinalityKeyNames.CURSOR_ID; import static com.mongodb.internal.observability.micrometer.MongodbObservation.LowCardinalityKeyNames.NAMESPACE; import static com.mongodb.internal.observability.micrometer.MongodbObservation.LowCardinalityKeyNames.NETWORK_TRANSPORT; import static com.mongodb.internal.observability.micrometer.MongodbObservation.LowCardinalityKeyNames.QUERY_SUMMARY; @@ -46,7 +47,6 @@ import static com.mongodb.internal.observability.micrometer.MongodbObservation.LowCardinalityKeyNames.SESSION_ID; import static com.mongodb.internal.observability.micrometer.MongodbObservation.LowCardinalityKeyNames.SYSTEM; import static com.mongodb.internal.observability.micrometer.MongodbObservation.LowCardinalityKeyNames.TRANSACTION_NUMBER; -import static com.mongodb.internal.observability.micrometer.MongodbObservation.LowCardinalityKeyNames.CURSOR_ID; import static java.lang.System.getenv; /** @@ -178,7 +178,7 @@ public boolean isCommandPayloadEnabled() { * * @param message the command message to trace * @param operationContext the operation context containing tracing and session information - * @param commandDocumentSupplier a supplier that provides the command document when needed + * @param commandDocument the command document, note this is an internally managed resource * @param isSensitiveCommand a predicate that determines if a command is security-sensitive based on its name * @param serverAddressSupplier a supplier that provides the server address when needed * @param connectionIdSupplier a supplier that provides the connection ID when needed @@ -187,26 +187,26 @@ public boolean isCommandPayloadEnabled() { @Nullable public Span createTracingSpan(final CommandMessage message, final OperationContext operationContext, - final Supplier commandDocumentSupplier, + final BsonDocument commandDocument, final Predicate isSensitiveCommand, final Supplier serverAddressSupplier, final Supplier connectionIdSupplier ) { - if (!isEnabled()) { + if (!isEnabled()) { return null; } - BsonDocument command = commandDocumentSupplier.get(); - String commandName = command.getFirstKey(); + + String commandName = commandDocument.getFirstKey(); if (isSensitiveCommand.test(commandName)) { return null; } Span operationSpan = operationContext.getTracingSpan(); - Span span = addSpan(commandName, operationSpan != null ? operationSpan.context() : null); + Span span = addSpan(commandName, operationSpan != null ? operationSpan.context() : null); - if (command.containsKey("getMore")) { - long cursorId = command.getInt64("getMore").longValue(); + if (commandDocument.containsKey("getMore")) { + long cursorId = commandDocument.getInt64("getMore").longValue(); span.tagLowCardinality(CURSOR_ID.withValue(String.valueOf(cursorId))); if (operationSpan != null) { operationSpan.tagLowCardinality(CURSOR_ID.withValue(String.valueOf(cursorId))); diff --git a/driver-core/src/test/unit/com/mongodb/internal/connection/ByteBufBsonDocumentSpecification.groovy b/driver-core/src/test/unit/com/mongodb/internal/connection/ByteBufBsonDocumentSpecification.groovy deleted file mode 100644 index 8dc599706a9..00000000000 --- a/driver-core/src/test/unit/com/mongodb/internal/connection/ByteBufBsonDocumentSpecification.groovy +++ /dev/null @@ -1,313 +0,0 @@ -/* - * Copyright 2008-present MongoDB, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.mongodb.internal.connection - -import org.bson.BsonArray -import org.bson.BsonBinaryWriter -import org.bson.BsonBoolean -import org.bson.BsonDocument -import org.bson.BsonInt32 -import org.bson.BsonNull -import org.bson.BsonValue -import org.bson.ByteBuf -import org.bson.ByteBufNIO -import org.bson.codecs.BsonDocumentCodec -import org.bson.codecs.DecoderContext -import org.bson.codecs.EncoderContext -import org.bson.io.BasicOutputBuffer -import org.bson.json.JsonMode -import org.bson.json.JsonWriterSettings -import spock.lang.Specification - -import java.nio.ByteBuffer - -import static java.util.Arrays.asList - -class ByteBufBsonDocumentSpecification extends Specification { - def emptyDocumentByteBuf = new ByteBufNIO(ByteBuffer.wrap([5, 0, 0, 0, 0] as byte[])) - ByteBuf documentByteBuf - ByteBufBsonDocument emptyByteBufDocument = new ByteBufBsonDocument(emptyDocumentByteBuf) - def document = new BsonDocument() - .append('a', new BsonInt32(1)) - .append('b', new BsonInt32(2)) - .append('c', new BsonDocument('x', BsonBoolean.TRUE)) - .append('d', new BsonArray(asList(new BsonDocument('y', BsonBoolean.FALSE), new BsonInt32(1)))) - - ByteBufBsonDocument byteBufDocument - - def setup() { - def buffer = new BasicOutputBuffer() - new BsonDocumentCodec().encode(new BsonBinaryWriter(buffer), document, EncoderContext.builder().build()) - ByteArrayOutputStream baos = new ByteArrayOutputStream() - buffer.pipe(baos) - documentByteBuf = new ByteBufNIO(ByteBuffer.wrap(baos.toByteArray())) - byteBufDocument = new ByteBufBsonDocument(documentByteBuf) - } - - def 'get should get the value of the given key'() { - expect: - emptyByteBufDocument.get('a') == null - byteBufDocument.get('z') == null - byteBufDocument.get('a') == new BsonInt32(1) - byteBufDocument.get('b') == new BsonInt32(2) - } - - def 'get should throw if the key is null'() { - when: - byteBufDocument.get(null) - - then: - thrown(IllegalArgumentException) - documentByteBuf.referenceCount == 1 - } - - def 'containKey should throw if the key name is null'() { - when: - byteBufDocument.containsKey(null) - - then: - thrown(IllegalArgumentException) - documentByteBuf.referenceCount == 1 - } - - def 'containsKey should find an existing key'() { - expect: - byteBufDocument.containsKey('a') - byteBufDocument.containsKey('b') - byteBufDocument.containsKey('c') - byteBufDocument.containsKey('d') - documentByteBuf.referenceCount == 1 - } - - def 'containsKey should not find a non-existing key'() { - expect: - !byteBufDocument.containsKey('e') - !byteBufDocument.containsKey('x') - !byteBufDocument.containsKey('y') - documentByteBuf.referenceCount == 1 - } - - def 'containValue should find an existing value'() { - expect: - byteBufDocument.containsValue(document.get('a')) - byteBufDocument.containsValue(document.get('b')) - byteBufDocument.containsValue(document.get('c')) - byteBufDocument.containsValue(document.get('d')) - documentByteBuf.referenceCount == 1 - } - - def 'containValue should not find a non-existing value'() { - expect: - !byteBufDocument.containsValue(new BsonInt32(3)) - !byteBufDocument.containsValue(new BsonDocument('e', BsonBoolean.FALSE)) - !byteBufDocument.containsValue(new BsonArray(asList(new BsonInt32(2), new BsonInt32(4)))) - documentByteBuf.referenceCount == 1 - } - - def 'isEmpty should return false when the document is not empty'() { - expect: - !byteBufDocument.isEmpty() - documentByteBuf.referenceCount == 1 - } - - def 'isEmpty should return true when the document is empty'() { - expect: - emptyByteBufDocument.isEmpty() - emptyDocumentByteBuf.referenceCount == 1 - } - - def 'should get correct size'() { - expect: - emptyByteBufDocument.size() == 0 - byteBufDocument.size() == 4 - documentByteBuf.referenceCount == 1 - emptyDocumentByteBuf.referenceCount == 1 - } - - def 'should get correct key set'() { - expect: - emptyByteBufDocument.keySet().isEmpty() - byteBufDocument.keySet() == ['a', 'b', 'c', 'd'] as Set - documentByteBuf.referenceCount == 1 - emptyDocumentByteBuf.referenceCount == 1 - } - - def 'should get correct values set'() { - expect: - emptyByteBufDocument.values().isEmpty() - byteBufDocument.values() as Set == [document.get('a'), document.get('b'), document.get('c'), document.get('d')] as Set - documentByteBuf.referenceCount == 1 - emptyDocumentByteBuf.referenceCount == 1 - } - - def 'should get correct entry set'() { - expect: - emptyByteBufDocument.entrySet().isEmpty() - byteBufDocument.entrySet() == [new TestEntry('a', document.get('a')), - new TestEntry('b', document.get('b')), - new TestEntry('c', document.get('c')), - new TestEntry('d', document.get('d'))] as Set - documentByteBuf.referenceCount == 1 - emptyDocumentByteBuf.referenceCount == 1 - } - - def 'all write methods should throw UnsupportedOperationException'() { - when: - byteBufDocument.clear() - - then: - thrown(UnsupportedOperationException) - - when: - byteBufDocument.put('x', BsonNull.VALUE) - - then: - thrown(UnsupportedOperationException) - - when: - byteBufDocument.append('x', BsonNull.VALUE) - - then: - thrown(UnsupportedOperationException) - - when: - byteBufDocument.putAll(new BsonDocument('x', BsonNull.VALUE)) - - then: - thrown(UnsupportedOperationException) - - when: - byteBufDocument.remove(BsonNull.VALUE) - - then: - thrown(UnsupportedOperationException) - } - - def 'should get first key'() { - expect: - byteBufDocument.getFirstKey() == document.keySet().iterator().next() - documentByteBuf.referenceCount == 1 - } - - def 'getFirstKey should throw NoSuchElementException if the document is empty'() { - when: - emptyByteBufDocument.getFirstKey() - - then: - thrown(NoSuchElementException) - emptyDocumentByteBuf.referenceCount == 1 - } - - def 'should create BsonReader'() { - when: - def reader = document.asBsonReader() - - then: - new BsonDocumentCodec().decode(reader, DecoderContext.builder().build()) == document - - cleanup: - reader.close() - } - - def 'clone should make a deep copy'() { - when: - BsonDocument cloned = byteBufDocument.clone() - - then: - cloned == byteBufDocument - documentByteBuf.referenceCount == 1 - } - - def 'should serialize and deserialize'() { - given: - def baos = new ByteArrayOutputStream() - def oos = new ObjectOutputStream(baos) - - when: - oos.writeObject(byteBufDocument) - def bais = new ByteArrayInputStream(baos.toByteArray()) - def ois = new ObjectInputStream(bais) - def deserializedDocument = ois.readObject() - - then: - byteBufDocument == deserializedDocument - documentByteBuf.referenceCount == 1 - } - - def 'toJson should return equivalent'() { - expect: - document.toJson() == byteBufDocument.toJson() - documentByteBuf.referenceCount == 1 - } - - def 'toJson should be callable multiple times'() { - expect: - byteBufDocument.toJson() - byteBufDocument.toJson() - documentByteBuf.referenceCount == 1 - } - - def 'size should be callable multiple times'() { - expect: - byteBufDocument.size() - byteBufDocument.size() - documentByteBuf.referenceCount == 1 - } - - def 'toJson should respect JsonWriteSettings'() { - given: - def settings = JsonWriterSettings.builder().outputMode(JsonMode.SHELL).build() - - expect: - document.toJson(settings) == byteBufDocument.toJson(settings) - } - - def 'toJson should return equivalent when a ByteBufBsonDocument is nested in a BsonDocument'() { - given: - def topLevel = new BsonDocument('nested', byteBufDocument) - - expect: - new BsonDocument('nested', document).toJson() == topLevel.toJson() - } - - class TestEntry implements Map.Entry { - - private final String key - private BsonValue value - - TestEntry(String key, BsonValue value) { - this.key = key - this.value = value - } - - @Override - String getKey() { - key - } - - @Override - BsonValue getValue() { - value - } - - @Override - BsonValue setValue(final BsonValue value) { - this.value = value - } - } - -} diff --git a/driver-core/src/test/unit/com/mongodb/internal/connection/ByteBufBsonDocumentTest.java b/driver-core/src/test/unit/com/mongodb/internal/connection/ByteBufBsonDocumentTest.java new file mode 100644 index 00000000000..e12c091b01f --- /dev/null +++ b/driver-core/src/test/unit/com/mongodb/internal/connection/ByteBufBsonDocumentTest.java @@ -0,0 +1,706 @@ +/* + * Copyright 2008-present MongoDB, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.mongodb.internal.connection; + +import org.bson.BsonArray; +import org.bson.BsonBinaryWriter; +import org.bson.BsonBoolean; +import org.bson.BsonDocument; +import org.bson.BsonInt32; +import org.bson.BsonReader; +import org.bson.BsonString; +import org.bson.BsonValue; +import org.bson.ByteBuf; +import org.bson.ByteBufNIO; +import org.bson.RawBsonDocument; +import org.bson.codecs.BsonDocumentCodec; +import org.bson.codecs.DecoderContext; +import org.bson.codecs.EncoderContext; +import org.bson.io.BasicOutputBuffer; +import org.bson.json.JsonMode; +import org.bson.json.JsonWriterSettings; +import org.jetbrains.annotations.NotNull; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.util.AbstractMap; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.Set; + +import static java.util.Arrays.asList; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + + +@DisplayName("ByteBufBsonDocument") +class ByteBufBsonDocumentTest { + private ByteBuf emptyDocumentByteBuf; + private ByteBuf documentByteBuf; + private ByteBufBsonDocument emptyByteBufDocument; + private ByteBufBsonDocument byteBufDocument; + private BsonDocument document; + + @BeforeEach + void setUp() { + emptyDocumentByteBuf = new ByteBufNIO(ByteBuffer.wrap(new byte[]{5, 0, 0, 0, 0})); + emptyByteBufDocument = new ByteBufBsonDocument(emptyDocumentByteBuf); + + document = new BsonDocument() + .append("a", new BsonInt32(1)) + .append("b", new BsonInt32(2)) + .append("c", new BsonDocument("x", BsonBoolean.TRUE)) + .append("d", new BsonArray(asList( + new BsonDocument("y", BsonBoolean.FALSE), + new BsonInt32(1) + ))); + + RawBsonDocument rawBsonDocument = RawBsonDocument.parse(document.toString()); + documentByteBuf = rawBsonDocument.getByteBuffer(); + byteBufDocument = new ByteBufBsonDocument(documentByteBuf); + } + + @AfterEach + void tearDown() { + if (byteBufDocument != null) { + byteBufDocument.close(); + } + if (emptyByteBufDocument != null) { + emptyByteBufDocument.close(); + } + } + + // Basic Operations + + @Test + @DisplayName("get() returns value for existing key, null for missing key") + void getShouldReturnCorrectValue() { + assertNull(emptyByteBufDocument.get("a")); + assertNull(byteBufDocument.get("z")); + assertEquals(new BsonInt32(1), byteBufDocument.get("a")); + assertEquals(new BsonInt32(2), byteBufDocument.get("b")); + } + + @Test + @DisplayName("get() throws IllegalArgumentException for null key") + void getShouldThrowForNullKey() { + assertThrows(IllegalArgumentException.class, () -> byteBufDocument.get(null)); + assertEquals(1, documentByteBuf.getReferenceCount()); + } + + @Test + @DisplayName("containsKey() finds existing keys and rejects missing keys") + void containsKeyShouldWork() { + assertThrows(IllegalArgumentException.class, () -> byteBufDocument.containsKey(null)); + assertTrue(byteBufDocument.containsKey("a")); + assertTrue(byteBufDocument.containsKey("d")); + assertFalse(byteBufDocument.containsKey("z")); + assertEquals(1, documentByteBuf.getReferenceCount()); + } + + @Test + @DisplayName("containsValue() finds existing values and rejects missing values") + void containsValueShouldWork() { + assertTrue(byteBufDocument.containsValue(document.get("a"))); + assertTrue(byteBufDocument.containsValue(document.get("c"))); + assertFalse(byteBufDocument.containsValue(new BsonInt32(999))); + assertEquals(1, documentByteBuf.getReferenceCount()); + } + + @Test + @DisplayName("isEmpty() returns correct result") + void isEmptyShouldWork() { + assertTrue(emptyByteBufDocument.isEmpty()); + assertFalse(byteBufDocument.isEmpty()); + } + + @Test + @DisplayName("size() returns correct count") + void sizeShouldWork() { + assertEquals(0, emptyByteBufDocument.size()); + assertEquals(4, byteBufDocument.size()); + assertEquals(4, byteBufDocument.size()); // Verify caching works + } + + @Test + @DisplayName("getFirstKey() returns first key or throws for empty document") + void getFirstKeyShouldWork() { + assertEquals("a", byteBufDocument.getFirstKey()); + assertThrows(NoSuchElementException.class, () -> emptyByteBufDocument.getFirstKey()); + } + + // Collection Views + + @Test + @DisplayName("keySet() returns all keys") + void keySetShouldWork() { + assertTrue(emptyByteBufDocument.keySet().isEmpty()); + assertEquals(new HashSet<>(asList("a", "b", "c", "d")), byteBufDocument.keySet()); + } + + @Test + @DisplayName("values() returns all values") + void valuesShouldWork() { + assertTrue(emptyByteBufDocument.values().isEmpty()); + Set expected = new HashSet<>(asList( + document.get("a"), document.get("b"), document.get("c"), document.get("d") + )); + assertEquals(expected, new HashSet<>(byteBufDocument.values())); + } + + @Test + @DisplayName("entrySet() returns all entries") + void entrySetShouldWork() { + assertTrue(emptyByteBufDocument.entrySet().isEmpty()); + Set> expected = new HashSet<>(asList( + new AbstractMap.SimpleImmutableEntry<>("a", document.get("a")), + new AbstractMap.SimpleImmutableEntry<>("b", document.get("b")), + new AbstractMap.SimpleImmutableEntry<>("c", document.get("c")), + new AbstractMap.SimpleImmutableEntry<>("d", document.get("d")) + )); + assertEquals(expected, byteBufDocument.entrySet()); + } + + // Type-Specific Accessors + + @Test + @DisplayName("getDocument() returns nested document") + void getDocumentShouldWork() { + BsonDocument nested = byteBufDocument.getDocument("c"); + assertNotNull(nested); + assertEquals(BsonBoolean.TRUE, nested.get("x")); + } + + @Test + @DisplayName("getArray() returns array") + void getArrayShouldWork() { + BsonArray array = byteBufDocument.getArray("d"); + assertNotNull(array); + assertEquals(2, array.size()); + } + + @Test + @DisplayName("get() with default value works correctly") + void getWithDefaultShouldWork() { + assertEquals(new BsonInt32(1), byteBufDocument.get("a", new BsonInt32(999))); + assertEquals(new BsonInt32(999), byteBufDocument.get("missing", new BsonInt32(999))); + } + + @Test + @DisplayName("Type check methods return correct results") + void typeChecksShouldWork() { + assertTrue(byteBufDocument.isNumber("a")); + assertTrue(byteBufDocument.isInt32("a")); + assertTrue(byteBufDocument.isDocument("c")); + assertTrue(byteBufDocument.isArray("d")); + assertFalse(byteBufDocument.isDocument("a")); + } + + // Immutability + + @Test + @DisplayName("All write methods throw UnsupportedOperationException") + void writeMethodsShouldThrow() { + assertThrows(UnsupportedOperationException.class, () -> byteBufDocument.clear()); + assertThrows(UnsupportedOperationException.class, () -> byteBufDocument.put("x", new BsonInt32(1))); + assertThrows(UnsupportedOperationException.class, () -> byteBufDocument.append("x", new BsonInt32(1))); + assertThrows(UnsupportedOperationException.class, () -> byteBufDocument.putAll(new BsonDocument())); + assertThrows(UnsupportedOperationException.class, () -> byteBufDocument.remove("a")); + } + + // Conversion and Serialization + + @Test + @DisplayName("toBsonDocument() returns equivalent document and caches result") + void toBsonDocumentShouldWork() { + assertEquals(document, byteBufDocument.toBsonDocument()); + BsonDocument first = byteBufDocument.toBsonDocument(); + BsonDocument second = byteBufDocument.toBsonDocument(); + assertEquals(first, second); + } + + @Test + @DisplayName("asBsonReader() creates valid reader") + void asBsonReaderShouldWork() { + try (BsonReader reader = byteBufDocument.asBsonReader()) { + BsonDocument decoded = new BsonDocumentCodec().decode(reader, DecoderContext.builder().build()); + assertEquals(document, decoded); + } + } + + @Test + @DisplayName("toJson() returns correct JSON with different settings") + void toJsonShouldWork() { + assertEquals(document.toJson(), byteBufDocument.toJson()); + assertNotNull(byteBufDocument.toJson()); // Verify caching + + JsonWriterSettings shellSettings = JsonWriterSettings.builder().outputMode(JsonMode.SHELL).build(); + assertEquals(document.toJson(shellSettings), byteBufDocument.toJson(shellSettings)); + } + + @Test + @DisplayName("toString() returns equivalent string") + void toStringShouldWork() { + assertEquals(document.toString(), byteBufDocument.toString()); + } + + @Test + @DisplayName("clone() creates deep copy") + void cloneShouldWork() { + BsonDocument cloned = byteBufDocument.clone(); + assertEquals(byteBufDocument, cloned); + } + + @Test + @DisplayName("Java serialization works correctly") + void serializationShouldWork() throws Exception { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + new ObjectOutputStream(baos).writeObject(byteBufDocument); + Object deserialized = new ObjectInputStream(new ByteArrayInputStream(baos.toByteArray())).readObject(); + assertEquals(byteBufDocument, deserialized); + } + + // Equality and HashCode + + @Test + @DisplayName("equals() and hashCode() work correctly") + void equalsAndHashCodeShouldWork() { + assertEquals(document, byteBufDocument); + assertEquals(byteBufDocument, document); + assertEquals(document.hashCode(), byteBufDocument.hashCode()); + assertNotEquals(byteBufDocument, new BsonDocument("x", new BsonInt32(99))); + } + + // Resource Management + + @Test + @DisplayName("Closed document throws IllegalStateException on all operations") + void closedDocumentShouldThrow() { + byteBufDocument.close(); + assertThrows(IllegalStateException.class, () -> byteBufDocument.size()); + assertThrows(IllegalStateException.class, () -> byteBufDocument.isEmpty()); + assertThrows(IllegalStateException.class, () -> byteBufDocument.containsKey("a")); + assertThrows(IllegalStateException.class, () -> byteBufDocument.get("a")); + assertThrows(IllegalStateException.class, () -> byteBufDocument.keySet()); + assertThrows(IllegalStateException.class, () -> byteBufDocument.values()); + assertThrows(IllegalStateException.class, () -> byteBufDocument.entrySet()); + assertThrows(IllegalStateException.class, () -> byteBufDocument.getFirstKey()); + assertThrows(IllegalStateException.class, () -> byteBufDocument.toBsonDocument()); + assertThrows(IllegalStateException.class, () -> byteBufDocument.toJson()); + } + + @Test + @DisplayName("close() can be called multiple times safely") + void closeIsIdempotent() { + byteBufDocument.close(); + byteBufDocument.close(); // Should not throw + } + + @Test + @DisplayName("Nested documents are closed when parent is closed") + void nestedDocumentsClosedWithParent() { + BsonDocument doc = new BsonDocument("outer", new BsonDocument("inner", new BsonInt32(42))); + ByteBuf buf = createByteBufFromDocument(doc); + ByteBufBsonDocument byteBufDoc = new ByteBufBsonDocument(buf); + + BsonDocument retrieved = byteBufDoc.getDocument("outer"); + byteBufDoc.close(); + + assertThrows(IllegalStateException.class, byteBufDoc::size); + if (retrieved instanceof ByteBufBsonDocument) { + assertThrows(IllegalStateException.class, retrieved::size); + } + } + + @Test + @DisplayName("Nested arrays are closed when parent is closed") + void nestedArraysClosedWithParent() { + BsonDocument doc = new BsonDocument("arr", new BsonArray(asList( + new BsonInt32(1), new BsonDocument("x", new BsonInt32(2)) + ))); + ByteBuf buf = createByteBufFromDocument(doc); + ByteBufBsonDocument byteBufDoc = new ByteBufBsonDocument(buf); + + BsonArray retrieved = byteBufDoc.getArray("arr"); + byteBufDoc.close(); + + assertThrows(IllegalStateException.class, byteBufDoc::size); + if (retrieved instanceof ByteBufBsonArray) { + assertThrows(IllegalStateException.class, retrieved::size); + } + } + + @Test + @DisplayName("Deeply nested structures are closed recursively") + void deeplyNestedClosedRecursively() { + BsonDocument doc = new BsonDocument() + .append("level1", new BsonArray(asList( + new BsonDocument("level2", new BsonDocument("level3", new BsonInt32(999))), + new BsonInt32(1) + ))) + .append("sibling", new BsonDocument("key", new BsonString("value"))); + + ByteBuf buf = createByteBufFromDocument(doc); + ByteBufBsonDocument byteBufDoc = new ByteBufBsonDocument(buf); + + BsonArray level1 = byteBufDoc.getArray("level1"); + byteBufDoc.getDocument("sibling"); + + if (level1.get(0).isDocument()) { + BsonDocument level2Doc = level1.get(0).asDocument(); + if (level2Doc.containsKey("level2")) { + assertEquals(new BsonInt32(999), level2Doc.getDocument("level2").get("level3")); + } + } + + byteBufDoc.close(); + assertThrows(IllegalStateException.class, byteBufDoc::size); + } + + @Test + @DisplayName("Iteration tracks resources correctly") + void iterationTracksResources() { + BsonDocument doc = new BsonDocument() + .append("doc1", new BsonDocument("a", new BsonInt32(1))) + .append("arr1", new BsonArray(asList(new BsonInt32(2), new BsonInt32(3)))) + .append("primitive", new BsonString("test")); + + ByteBuf buf = createByteBufFromDocument(doc); + ByteBufBsonDocument byteBufDoc = new ByteBufBsonDocument(buf); + + int count = 0; + for (Map.Entry entry : byteBufDoc.entrySet()) { + assertNotNull(entry.getKey()); + assertNotNull(entry.getValue()); + count++; + } + assertEquals(3, count); + + byteBufDoc.close(); + assertThrows(IllegalStateException.class, byteBufDoc::size); + } + + @Test + @DisplayName("toBsonDocument() handles nested structures and allows close") + void toBsonDocumentHandlesNestedStructures() { + BsonDocument complexDoc = new BsonDocument() + .append("doc", new BsonDocument("x", new BsonInt32(1))) + .append("arr", new BsonArray(asList(new BsonDocument("y", new BsonInt32(2)), new BsonInt32(3)))); + + ByteBuf buf = createByteBufFromDocument(complexDoc); + ByteBufBsonDocument byteBufDoc = new ByteBufBsonDocument(buf); + + BsonDocument hydrated = byteBufDoc.toBsonDocument(); + assertEquals(complexDoc, hydrated); + + byteBufDoc.close(); + assertThrows(IllegalStateException.class, byteBufDoc::size); + } + + // Sequence Fields (OP_MSG) + + @Test + @DisplayName("Sequence field is accessible as array of ByteBufBsonDocuments") + void sequenceFieldAccessibleAsArray() { + try (ByteBufferBsonOutput output = new ByteBufferBsonOutput(new SimpleBufferProvider()); + ByteBufBsonDocument commandDoc = createCommandMessageDocument(output, 3)) { + + BsonValue documentsValue = commandDoc.get("documents"); + assertNotNull(documentsValue); + assertTrue(documentsValue.isArray()); + + BsonArray documents = documentsValue.asArray(); + assertEquals(3, documents.size()); + + for (int i = 0; i < 3; i++) { + BsonValue doc = documents.get(i); + assertInstanceOf(ByteBufBsonDocument.class, doc); + assertEquals(new BsonInt32(i), doc.asDocument().get("_id")); + assertEquals(new BsonString("doc" + i), doc.asDocument().get("name")); + } + } + } + + @Test + @DisplayName("Sequence field is included in size, keySet, values, and entrySet") + void sequenceFieldIncludedInCollectionViews() { + try (ByteBufferBsonOutput output = new ByteBufferBsonOutput(new SimpleBufferProvider()); + ByteBufBsonDocument commandDoc = createCommandMessageDocument(output, 2)) { + + assertTrue(commandDoc.size() >= 3); + assertTrue(commandDoc.keySet().contains("documents")); + assertTrue(commandDoc.keySet().contains("insert")); + + boolean foundDocumentsArray = false; + for (BsonValue value : commandDoc.values()) { + if (value.isArray() && value.asArray().size() == 2) { + foundDocumentsArray = true; + break; + } + } + assertTrue(foundDocumentsArray); + + boolean foundDocumentsEntry = false; + for (Map.Entry entry : commandDoc.entrySet()) { + if ("documents".equals(entry.getKey())) { + foundDocumentsEntry = true; + assertEquals(2, entry.getValue().asArray().size()); + break; + } + } + assertTrue(foundDocumentsEntry); + } + } + + @Test + @DisplayName("containsKey and containsValue work with sequence fields") + void containsMethodsWorkWithSequenceFields() { + try (ByteBufferBsonOutput output = new ByteBufferBsonOutput(new SimpleBufferProvider()); + ByteBufBsonDocument commandDoc = createCommandMessageDocument(output, 3)) { + + assertTrue(commandDoc.containsKey("documents")); + assertTrue(commandDoc.containsKey("insert")); + assertFalse(commandDoc.containsKey("nonexistent")); + + BsonDocument expectedDoc = new BsonDocument() + .append("_id", new BsonInt32(1)) + .append("name", new BsonString("doc1")); + assertTrue(commandDoc.containsValue(expectedDoc)); + } + } + + @Test + @DisplayName("Sequence field documents are closed when parent is closed") + void sequenceFieldDocumentsClosedWithParent() { + ByteBufferBsonOutput output = new ByteBufferBsonOutput(new SimpleBufferProvider()); + ByteBufBsonDocument commandDoc = createCommandMessageDocument(output, 2); + + BsonArray documents = commandDoc.getArray("documents"); + List docRefs = new ArrayList<>(); + for (BsonValue doc : documents) { + docRefs.add(doc.asDocument()); + } + + commandDoc.close(); + output.close(); + + assertThrows(IllegalStateException.class, commandDoc::size); + for (BsonDocument doc : docRefs) { + if (doc instanceof ByteBufBsonDocument) { + assertThrows(IllegalStateException.class, doc::size); + } + } + } + + @Test + @DisplayName("Sequence field is cached on multiple access") + void sequenceFieldCached() { + try (ByteBufferBsonOutput output = new ByteBufferBsonOutput(new SimpleBufferProvider()); + ByteBufBsonDocument commandDoc = createCommandMessageDocument(output, 2)) { + + BsonArray first = commandDoc.getArray("documents"); + BsonArray second = commandDoc.getArray("documents"); + assertNotNull(first); + assertEquals(first.size(), second.size()); + } + } + + @Test + @DisplayName("toBsonDocument() hydrates sequence fields to regular BsonDocuments") + void toBsonDocumentHydratesSequenceFields() { + try (ByteBufferBsonOutput output = new ByteBufferBsonOutput(new SimpleBufferProvider()); + ByteBufBsonDocument commandDoc = createCommandMessageDocument(output, 2)) { + + BsonDocument hydrated = commandDoc.toBsonDocument(); + assertTrue(hydrated.containsKey("documents")); + + BsonArray documents = hydrated.getArray("documents"); + assertEquals(2, documents.size()); + for (BsonValue doc : documents) { + assertFalse(doc instanceof ByteBufBsonDocument); + } + } + } + + @Test + @DisplayName("Sequence field with nested documents works correctly") + void sequenceFieldWithNestedDocuments() { + try (ByteBufferBsonOutput output = new ByteBufferBsonOutput(new SimpleBufferProvider())) { + ByteBufBsonDocument commandDoc = createNestedCommandMessageDocument(output); + + BsonArray documents = commandDoc.getArray("documents"); + assertEquals(2, documents.size()); + + BsonDocument firstDoc = documents.get(0).asDocument(); + BsonDocument nested = firstDoc.getDocument("nested"); + assertEquals(new BsonInt32(0), nested.get("inner")); + + BsonArray array = firstDoc.getArray("array"); + assertEquals(2, array.size()); + + commandDoc.close(); + } + } + + @Test + @DisplayName("Empty sequence field returns empty array") + void emptySequenceField() { + try (ByteBufferBsonOutput output = new ByteBufferBsonOutput(new SimpleBufferProvider()); + ByteBufBsonDocument commandDoc = createCommandMessageDocument(output, 0)) { + + assertTrue(commandDoc.containsKey("insert")); + assertTrue(commandDoc.containsKey("documents")); + assertTrue(commandDoc.getArray("documents").isEmpty()); + } + } + + @Test + @DisplayName("getFirstKey() returns body field, not sequence field") + void getFirstKeyReturnsBodyField() { + try (ByteBufferBsonOutput output = new ByteBufferBsonOutput(new SimpleBufferProvider()); + ByteBufBsonDocument commandDoc = createCommandMessageDocument(output, 2)) { + + assertEquals("insert", commandDoc.getFirstKey()); + } + } + + @Test + @DisplayName("toJson() includes sequence fields") + void toJsonIncludesSequenceFields() { + try (ByteBufferBsonOutput output = new ByteBufferBsonOutput(new SimpleBufferProvider()); + ByteBufBsonDocument commandDoc = createCommandMessageDocument(output, 2)) { + + String json = commandDoc.toJson(); + assertTrue(json.contains("documents")); + assertTrue(json.contains("_id")); + } + } + + @Test + @DisplayName("equals() and hashCode() include sequence fields") + void equalsAndHashCodeIncludeSequenceFields() { + try (ByteBufferBsonOutput output1 = new ByteBufferBsonOutput(new SimpleBufferProvider()); + ByteBufBsonDocument commandDoc1 = createCommandMessageDocument(output1, 2); + ByteBufferBsonOutput output2 = new ByteBufferBsonOutput(new SimpleBufferProvider()); + ByteBufBsonDocument commandDoc2 = createCommandMessageDocument(output2, 2)) { + + assertEquals(commandDoc1.toBsonDocument(), commandDoc2.toBsonDocument()); + assertEquals(commandDoc1.hashCode(), commandDoc2.hashCode()); + } + } + + // --- Helper Methods --- + + private ByteBufBsonDocument createCommandMessageDocument(final ByteBufferBsonOutput output, final int numDocuments) { + BsonDocument bodyDoc = new BsonDocument() + .append("insert", new BsonString("test")) + .append("$db", new BsonString("db")); + + byte[] bodyBytes = encodeBsonDocument(bodyDoc); + List sequenceDocBytes = new ArrayList<>(); + for (int i = 0; i < numDocuments; i++) { + BsonDocument seqDoc = new BsonDocument() + .append("_id", new BsonInt32(i)) + .append("name", new BsonString("doc" + i)); + sequenceDocBytes.add(encodeBsonDocument(seqDoc)); + } + + writeOpMsgFormat(output, bodyBytes, "documents", sequenceDocBytes); + + List buffers = output.getByteBuffers(); + return ByteBufBsonDocument.createCommandMessage(new CompositeByteBuf(buffers)); + } + + private ByteBufBsonDocument createNestedCommandMessageDocument(final ByteBufferBsonOutput output) { + BsonDocument bodyDoc = new BsonDocument() + .append("insert", new BsonString("test")) + .append("$db", new BsonString("db")); + + byte[] bodyBytes = encodeBsonDocument(bodyDoc); + List sequenceDocBytes = new ArrayList<>(); + for (int i = 0; i < 2; i++) { + BsonDocument seqDoc = new BsonDocument() + .append("_id", new BsonInt32(i)) + .append("nested", new BsonDocument("inner", new BsonInt32(i * 10))) + .append("array", new BsonArray(asList( + new BsonInt32(i), + new BsonDocument("arrayNested", new BsonString("value" + i)) + ))); + sequenceDocBytes.add(encodeBsonDocument(seqDoc)); + } + + writeOpMsgFormat(output, bodyBytes, "documents", sequenceDocBytes); + return ByteBufBsonDocument.createCommandMessage(new CompositeByteBuf(output.getByteBuffers())); + } + + private void writeOpMsgFormat(final ByteBufferBsonOutput output, final byte[] bodyBytes, + final String sequenceIdentifier, final List sequenceDocBytes) { + output.writeBytes(bodyBytes, 0, bodyBytes.length); + + int sequencePayloadSize = sequenceDocBytes.stream().mapToInt(b -> b.length).sum(); + int sequenceSectionSize = 4 + sequenceIdentifier.length() + 1 + sequencePayloadSize; + + output.writeByte(1); + output.writeInt32(sequenceSectionSize); + output.writeCString(sequenceIdentifier); + for (byte[] docBytes : sequenceDocBytes) { + output.writeBytes(docBytes, 0, docBytes.length); + } + } + + private static byte[] encodeBsonDocument(final BsonDocument doc) { + try { + BasicOutputBuffer buffer = new BasicOutputBuffer(); + new BsonDocumentCodec().encode(new BsonBinaryWriter(buffer), doc, EncoderContext.builder().build()); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + buffer.pipe(baos); + return baos.toByteArray(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + private static ByteBuf createByteBufFromDocument(final BsonDocument doc) { + return new ByteBufNIO(ByteBuffer.wrap(encodeBsonDocument(doc))); + } + + private static class SimpleBufferProvider implements BufferProvider { + @NotNull + @Override + public ByteBuf getBuffer(final int size) { + return new ByteBufNIO(ByteBuffer.allocate(size).order(ByteOrder.LITTLE_ENDIAN)); + } + } +} diff --git a/driver-core/src/test/unit/com/mongodb/internal/connection/CommandMessageSpecification.groovy b/driver-core/src/test/unit/com/mongodb/internal/connection/CommandMessageSpecification.groovy deleted file mode 100644 index 77bdd5e2045..00000000000 --- a/driver-core/src/test/unit/com/mongodb/internal/connection/CommandMessageSpecification.groovy +++ /dev/null @@ -1,365 +0,0 @@ -/* - * Copyright 2008-present MongoDB, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.mongodb.internal.connection - - -import com.mongodb.MongoNamespace -import com.mongodb.ReadConcern -import com.mongodb.ReadPreference -import com.mongodb.connection.ClusterConnectionMode -import com.mongodb.connection.ServerType -import com.mongodb.internal.IgnorableRequestContext -import com.mongodb.internal.TimeoutContext -import com.mongodb.internal.bulk.InsertRequest -import com.mongodb.internal.bulk.WriteRequestWithIndex -import com.mongodb.internal.session.SessionContext -import com.mongodb.internal.validator.NoOpFieldNameValidator -import org.bson.BsonArray -import org.bson.BsonBinary -import org.bson.BsonDocument -import org.bson.BsonInt32 -import org.bson.BsonMaximumSizeExceededException -import org.bson.BsonString -import org.bson.BsonTimestamp -import org.bson.ByteBuf -import org.bson.ByteBufNIO -import org.bson.codecs.BsonDocumentCodec -import spock.lang.Specification - -import java.nio.ByteBuffer - -import static com.mongodb.internal.connection.SplittablePayload.Type.INSERT -import static com.mongodb.internal.operation.ServerVersionHelper.LATEST_WIRE_VERSION - -/** - * New tests must be added to {@link CommandMessageTest}. - */ -class CommandMessageSpecification extends Specification { - - def namespace = new MongoNamespace('db.test') - def command = new BsonDocument('find', new BsonString(namespace.collectionName)) - def fieldNameValidator = NoOpFieldNameValidator.INSTANCE - - def 'should encode command message with OP_MSG when server version is >= 3.6'() { - given: - def message = new CommandMessage(namespace.getDatabaseName(), command, fieldNameValidator, readPreference, - MessageSettings.builder() - .maxWireVersion(LATEST_WIRE_VERSION) - .serverType(serverType as ServerType) - .sessionSupported(true) - .build(), - responseExpected, MessageSequences.EmptyMessageSequences.INSTANCE, clusterConnectionMode, null) - def output = new ByteBufferBsonOutput(new SimpleBufferProvider()) - - when: - message.encode(output, operationContext) - - then: - def byteBuf = new ByteBufNIO(ByteBuffer.wrap(output.toByteArray())) - def messageHeader = new MessageHeader(byteBuf, 512) - def replyHeader = new ReplyHeader(byteBuf, messageHeader) - messageHeader.opCode == OpCode.OP_MSG.value - replyHeader.requestId < RequestMessage.currentGlobalId - replyHeader.responseTo == 0 - replyHeader.hasMoreToCome() != responseExpected - - def expectedCommandDocument = command.clone() - .append('$db', new BsonString(namespace.databaseName)) - - if (operationContext.getSessionContext().clusterTime != null) { - expectedCommandDocument.append('$clusterTime', operationContext.getSessionContext().clusterTime) - } - if (operationContext.getSessionContext().hasSession() && responseExpected) { - expectedCommandDocument.append('lsid', operationContext.getSessionContext().sessionId) - } - - if (readPreference != ReadPreference.primary()) { - expectedCommandDocument.append('$readPreference', readPreference.toDocument()) - } else if (clusterConnectionMode == ClusterConnectionMode.SINGLE && serverType != ServerType.SHARD_ROUTER) { - expectedCommandDocument.append('$readPreference', ReadPreference.primaryPreferred().toDocument()) - } - getCommandDocument(byteBuf, replyHeader) == expectedCommandDocument - - cleanup: - output.close() - - where: - [readPreference, serverType, clusterConnectionMode, operationContext, responseExpected, isCryptd] << [ - [ReadPreference.primary(), ReadPreference.secondary()], - [ServerType.REPLICA_SET_PRIMARY, ServerType.SHARD_ROUTER], - [ClusterConnectionMode.SINGLE, ClusterConnectionMode.MULTIPLE], - [ - new OperationContext( - IgnorableRequestContext.INSTANCE, - Stub(SessionContext) { - hasSession() >> false - getClusterTime() >> null - getSessionId() >> new BsonDocument('id', new BsonBinary([1, 2, 3] as byte[])) - getReadConcern() >> ReadConcern.DEFAULT - }, Stub(TimeoutContext), null), - new OperationContext( - IgnorableRequestContext.INSTANCE, - Stub(SessionContext) { - hasSession() >> false - getClusterTime() >> new BsonDocument('clusterTime', new BsonTimestamp(42, 1)) - getReadConcern() >> ReadConcern.DEFAULT - }, Stub(TimeoutContext), null), - new OperationContext( - IgnorableRequestContext.INSTANCE, - Stub(SessionContext) { - hasSession() >> true - getClusterTime() >> null - getSessionId() >> new BsonDocument('id', new BsonBinary([1, 2, 3] as byte[])) - getReadConcern() >> ReadConcern.DEFAULT - }, Stub(TimeoutContext), null), - new OperationContext( - IgnorableRequestContext.INSTANCE, - Stub(SessionContext) { - hasSession() >> true - getClusterTime() >> new BsonDocument('clusterTime', new BsonTimestamp(42, 1)) - getSessionId() >> new BsonDocument('id', new BsonBinary([1, 2, 3] as byte[])) - getReadConcern() >> ReadConcern.DEFAULT - }, Stub(TimeoutContext), null) - ], - [true, false], - [true, false] - ].combinations() - } - - String getString(final ByteBuf byteBuf) { - def byteArrayOutputStream = new ByteArrayOutputStream() - def cur = byteBuf.get() - while (cur != 0) { - byteArrayOutputStream.write(cur) - cur = byteBuf.get() - } - new String(byteArrayOutputStream.toByteArray(), 'UTF-8') - } - - def 'should get command document'() { - given: - def message = new CommandMessage(namespace.getDatabaseName(), originalCommandDocument, fieldNameValidator, - ReadPreference.primary(), MessageSettings.builder().maxWireVersion(maxWireVersion).build(), true, - payload == null ? MessageSequences.EmptyMessageSequences.INSTANCE : payload, - ClusterConnectionMode.MULTIPLE, null) - def output = new ByteBufferBsonOutput(new SimpleBufferProvider()) - message.encode(output, new OperationContext(IgnorableRequestContext.INSTANCE, NoOpSessionContext.INSTANCE, - Stub(TimeoutContext), null)) - - when: - def commandDocument = message.getCommandDocument(output) - - def expectedCommandDocument = new BsonDocument('insert', new BsonString('coll')).append('documents', - new BsonArray([new BsonDocument('_id', new BsonInt32(1)), new BsonDocument('_id', new BsonInt32(2))])) - expectedCommandDocument.append('$db', new BsonString(namespace.getDatabaseName())) - then: - commandDocument == expectedCommandDocument - - - where: - [maxWireVersion, originalCommandDocument, payload] << [ - [ - LATEST_WIRE_VERSION, - new BsonDocument('insert', new BsonString('coll')), - new SplittablePayload(INSERT, [new BsonDocument('_id', new BsonInt32(1)), - new BsonDocument('_id', new BsonInt32(2))] - .withIndex().collect { doc, i -> new WriteRequestWithIndex(new InsertRequest(doc), i) }, - true, NoOpFieldNameValidator.INSTANCE), - ], - [ - LATEST_WIRE_VERSION, - new BsonDocument('insert', new BsonString('coll')).append('documents', - new BsonArray([new BsonDocument('_id', new BsonInt32(1)), new BsonDocument('_id', new BsonInt32(2))])), - null - ] - ] - } - - def 'should respect the max message size'() { - given: - def maxMessageSize = 1024 - def messageSettings = MessageSettings.builder().maxMessageSize(maxMessageSize).maxWireVersion(LATEST_WIRE_VERSION).build() - def insertCommand = new BsonDocument('insert', new BsonString(namespace.collectionName)) - def payload = new SplittablePayload(INSERT, [new BsonDocument('_id', new BsonInt32(1)).append('a', new BsonBinary(new byte[913])), - new BsonDocument('_id', new BsonInt32(2)).append('b', new BsonBinary(new byte[441])), - new BsonDocument('_id', new BsonInt32(3)).append('c', new BsonBinary(new byte[450])), - new BsonDocument('_id', new BsonInt32(4)).append('b', new BsonBinary(new byte[441])), - new BsonDocument('_id', new BsonInt32(5)).append('c', new BsonBinary(new byte[451]))] - .withIndex().collect { doc, i -> new WriteRequestWithIndex(new InsertRequest(doc), i) }, true, fieldNameValidator) - def message = new CommandMessage(namespace.getDatabaseName(), insertCommand, fieldNameValidator, ReadPreference.primary(), - messageSettings, false, payload, ClusterConnectionMode.MULTIPLE, null) - def output = new ByteBufferBsonOutput(new SimpleBufferProvider()) - def sessionContext = Stub(SessionContext) { - getReadConcern() >> ReadConcern.DEFAULT - } - - when: - message.encode(output, new OperationContext(IgnorableRequestContext.INSTANCE, sessionContext, - Stub(TimeoutContext), null)) - def byteBuf = new ByteBufNIO(ByteBuffer.wrap(output.toByteArray())) - def messageHeader = new MessageHeader(byteBuf, maxMessageSize) - - then: - messageHeader.opCode == OpCode.OP_MSG.value - messageHeader.requestId < RequestMessage.currentGlobalId - messageHeader.responseTo == 0 - messageHeader.messageLength == 1024 - byteBuf.getInt() == 0 - payload.getPosition() == 1 - payload.hasAnotherSplit() - - when: - payload = payload.getNextSplit() - message = new CommandMessage(namespace.getDatabaseName(), insertCommand, fieldNameValidator, ReadPreference.primary(), - messageSettings, false, payload, ClusterConnectionMode.MULTIPLE, null) - output.truncateToPosition(0) - message.encode(output, new OperationContext(IgnorableRequestContext.INSTANCE, sessionContext, Stub(TimeoutContext), null)) - byteBuf = new ByteBufNIO(ByteBuffer.wrap(output.toByteArray())) - messageHeader = new MessageHeader(byteBuf, maxMessageSize) - - then: - messageHeader.opCode == OpCode.OP_MSG.value - messageHeader.requestId < RequestMessage.currentGlobalId - messageHeader.responseTo == 0 - messageHeader.messageLength == 1024 - byteBuf.getInt() == 0 - payload.getPosition() == 2 - payload.hasAnotherSplit() - - when: - payload = payload.getNextSplit() - message = new CommandMessage(namespace.getDatabaseName(), insertCommand, fieldNameValidator, ReadPreference.primary(), - messageSettings, false, payload, ClusterConnectionMode.MULTIPLE, null) - output.truncateToPosition(0) - message.encode(output, new OperationContext(IgnorableRequestContext.INSTANCE, sessionContext, Stub(TimeoutContext), null)) - byteBuf = new ByteBufNIO(ByteBuffer.wrap(output.toByteArray())) - messageHeader = new MessageHeader(byteBuf, maxMessageSize) - - then: - messageHeader.opCode == OpCode.OP_MSG.value - messageHeader.requestId < RequestMessage.currentGlobalId - messageHeader.responseTo == 0 - messageHeader.messageLength == 552 - byteBuf.getInt() == 0 - payload.getPosition() == 1 - payload.hasAnotherSplit() - - when: - payload = payload.getNextSplit() - message = new CommandMessage(namespace.getDatabaseName(), insertCommand, fieldNameValidator, ReadPreference.primary(), - messageSettings, false, payload, ClusterConnectionMode.MULTIPLE, null) - output.truncateToPosition(0) - message.encode(output, new OperationContext(IgnorableRequestContext.INSTANCE, - sessionContext, - Stub(TimeoutContext), - null)) - byteBuf = new ByteBufNIO(ByteBuffer.wrap(output.toByteArray())) - messageHeader = new MessageHeader(byteBuf, maxMessageSize) - - then: - messageHeader.opCode == OpCode.OP_MSG.value - messageHeader.requestId < RequestMessage.currentGlobalId - messageHeader.responseTo == 0 - messageHeader.messageLength == 562 - byteBuf.getInt() == 1 << 1 - payload.getPosition() == 1 - !payload.hasAnotherSplit() - - cleanup: - output.close() - } - - def 'should respect the max batch count'() { - given: - def messageSettings = MessageSettings.builder().maxBatchCount(2).maxWireVersion(LATEST_WIRE_VERSION).build() - def payload = new SplittablePayload(INSERT, [new BsonDocument('a', new BsonBinary(new byte[900])), - new BsonDocument('b', new BsonBinary(new byte[450])), - new BsonDocument('c', new BsonBinary(new byte[450]))] - .withIndex().collect { doc, i -> new WriteRequestWithIndex(new InsertRequest(doc), i) }, true, fieldNameValidator) - def message = new CommandMessage(namespace.getDatabaseName(), command, fieldNameValidator, ReadPreference.primary(), - messageSettings, false, payload, ClusterConnectionMode.MULTIPLE, null) - def output = new ByteBufferBsonOutput(new SimpleBufferProvider()) - def sessionContext = Stub(SessionContext) { - getReadConcern() >> ReadConcern.DEFAULT - } - - when: - message.encode(output, new OperationContext(IgnorableRequestContext.INSTANCE, sessionContext, - Stub(TimeoutContext), - null)) - def byteBuf = new ByteBufNIO(ByteBuffer.wrap(output.toByteArray())) - def messageHeader = new MessageHeader(byteBuf, 2048) - - then: - messageHeader.opCode == OpCode.OP_MSG.value - messageHeader.requestId < RequestMessage.currentGlobalId - messageHeader.responseTo == 0 - messageHeader.messageLength == 1497 - byteBuf.getInt() == 0 - payload.getPosition() == 2 - payload.hasAnotherSplit() - - when: - payload = payload.getNextSplit() - message = new CommandMessage(namespace.getDatabaseName(), command, fieldNameValidator, ReadPreference.primary(), messageSettings, - false, payload, ClusterConnectionMode.MULTIPLE, null) - output.truncateToPosition(0) - message.encode(output, new OperationContext(IgnorableRequestContext.INSTANCE, sessionContext, - Stub(TimeoutContext), null)) - byteBuf = new ByteBufNIO(ByteBuffer.wrap(output.toByteArray())) - messageHeader = new MessageHeader(byteBuf, 1024) - - then: - messageHeader.opCode == OpCode.OP_MSG.value - messageHeader.requestId < RequestMessage.currentGlobalId - messageHeader.responseTo == 0 - byteBuf.getInt() == 1 << 1 - payload.getPosition() == 1 - !payload.hasAnotherSplit() - - cleanup: - output.close() - } - - def 'should throw if payload document bigger than max document size'() { - given: - def messageSettings = MessageSettings.builder().maxDocumentSize(900) - .maxWireVersion(LATEST_WIRE_VERSION).build() - def payload = new SplittablePayload(INSERT, [new BsonDocument('a', new BsonBinary(new byte[900]))] - .withIndex().collect { doc, i -> new WriteRequestWithIndex(new InsertRequest(doc), i) }, true, fieldNameValidator) - def message = new CommandMessage(namespace.getDatabaseName(), command, fieldNameValidator, ReadPreference.primary(), - messageSettings, false, payload, ClusterConnectionMode.MULTIPLE, null) - def output = new ByteBufferBsonOutput(new SimpleBufferProvider()) - def sessionContext = Stub(SessionContext) { - getReadConcern() >> ReadConcern.DEFAULT - } - - when: - message.encode(output, new OperationContext(IgnorableRequestContext.INSTANCE, sessionContext, - Stub(TimeoutContext), null)) - - then: - thrown(BsonMaximumSizeExceededException) - - cleanup: - output.close() - } - - private static BsonDocument getCommandDocument(ByteBufNIO byteBuf, ReplyHeader replyHeader) { - new ReplyMessage(new ResponseBuffers(replyHeader, byteBuf), new BsonDocumentCodec(), 0).document - } -} diff --git a/driver-core/src/test/unit/com/mongodb/internal/connection/CommandMessageTest.java b/driver-core/src/test/unit/com/mongodb/internal/connection/CommandMessageTest.java index 091518c715c..e5eab18869b 100644 --- a/driver-core/src/test/unit/com/mongodb/internal/connection/CommandMessageTest.java +++ b/driver-core/src/test/unit/com/mongodb/internal/connection/CommandMessageTest.java @@ -27,6 +27,8 @@ import com.mongodb.internal.IgnorableRequestContext; import com.mongodb.internal.TimeoutContext; import com.mongodb.internal.TimeoutSettings; +import com.mongodb.internal.bulk.InsertRequest; +import com.mongodb.internal.bulk.WriteRequestWithIndex; import com.mongodb.internal.client.model.bulk.ConcreteClientBulkWriteOptions; import com.mongodb.internal.connection.MessageSequences.EmptyMessageSequences; import com.mongodb.internal.operation.ClientBulkWriteOperation; @@ -34,11 +36,14 @@ import com.mongodb.internal.session.SessionContext; import com.mongodb.internal.validator.NoOpFieldNameValidator; import org.bson.BsonArray; +import org.bson.BsonBinary; import org.bson.BsonBoolean; import org.bson.BsonDocument; import org.bson.BsonInt32; +import org.bson.BsonMaximumSizeExceededException; import org.bson.BsonString; import org.bson.BsonTimestamp; +import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Test; import java.util.List; @@ -53,17 +58,20 @@ import static java.util.Collections.singletonList; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.verifyNoInteractions; import static org.mockito.Mockito.when; +@DisplayName("CommandMessage") class CommandMessageTest { private static final MongoNamespace NAMESPACE = new MongoNamespace("db.test"); private static final BsonDocument COMMAND = new BsonDocument("find", new BsonString(NAMESPACE.getCollectionName())); @Test + @DisplayName("encode should throw timeout exception when timeout context is called") void encodeShouldThrowTimeoutExceptionWhenTimeoutContextIsCalled() { //given CommandMessage commandMessage = new CommandMessage(NAMESPACE.getDatabaseName(), COMMAND, NoOpFieldNameValidator.INSTANCE, ReadPreference.primary(), @@ -91,6 +99,7 @@ void encodeShouldThrowTimeoutExceptionWhenTimeoutContextIsCalled() { } @Test + @DisplayName("encode should not add extra elements from timeout context when connected to mongocryptd") void encodeShouldNotAddExtraElementsFromTimeoutContextWhenConnectedToMongoCrypt() { //given CommandMessage commandMessage = new CommandMessage(NAMESPACE.getDatabaseName(), COMMAND, NoOpFieldNameValidator.INSTANCE, ReadPreference.primary(), @@ -126,6 +135,7 @@ void encodeShouldNotAddExtraElementsFromTimeoutContextWhenConnectedToMongoCrypt( } @Test + @DisplayName("get command document from client bulk write operation") void getCommandDocumentFromClientBulkWrite() { MongoNamespace ns = new MongoNamespace("db", "test"); boolean retryWrites = false; @@ -164,8 +174,466 @@ void getCommandDocumentFromClientBulkWrite() { new OperationContext( IgnorableRequestContext.INSTANCE, NoOpSessionContext.INSTANCE, new TimeoutContext(TimeoutSettings.DEFAULT), null)); - BsonDocument actualCommandDocument = commandMessage.getCommandDocument(output); - assertEquals(expectedCommandDocument, actualCommandDocument); + + try (ByteBufBsonDocument actualCommandDocument = commandMessage.getCommandDocument(output)) { + assertEquals(expectedCommandDocument, actualCommandDocument); + } + } + } + + @Test + @DisplayName("get command document with payload containing documents") + void getCommandDocumentWithPayload() { + // given + BsonDocument originalCommandDocument = new BsonDocument("insert", new BsonString("coll")); + List documents = asList( + new BsonDocument("_id", new BsonInt32(1)), + new BsonDocument("_id", new BsonInt32(2)) + ); + List requestsFromDocs = IntStream.range(0, documents.size()) + .mapToObj(i -> new WriteRequestWithIndex(new InsertRequest(documents.get(i)), i)) + .collect(Collectors.toList()); + + SplittablePayload payload = new SplittablePayload( + SplittablePayload.Type.INSERT, + requestsFromDocs, + true, + NoOpFieldNameValidator.INSTANCE + ); + + CommandMessage message = new CommandMessage( + NAMESPACE.getDatabaseName(), originalCommandDocument, NoOpFieldNameValidator.INSTANCE, + ReadPreference.primary(), MessageSettings.builder().maxWireVersion(LATEST_WIRE_VERSION).build(), true, + payload, ClusterConnectionMode.MULTIPLE, null + ); + + try (ByteBufferBsonOutput output = new ByteBufferBsonOutput(new SimpleBufferProvider())) { + message.encode( + output, + new OperationContext(IgnorableRequestContext.INSTANCE, NoOpSessionContext.INSTANCE, + new TimeoutContext(TimeoutSettings.DEFAULT), null) + ); + + // when + try (ByteBufBsonDocument commandDoc = message.getCommandDocument(output)) { + // then + assertEquals("coll", commandDoc.getString("insert").getValue()); + assertEquals(NAMESPACE.getDatabaseName(), commandDoc.getString("$db").getValue()); + BsonArray docsArray = commandDoc.getArray("documents"); + assertEquals(2, docsArray.size()); + } + } + } + + @Test + @DisplayName("get command document with pre-encoded documents") + void getCommandDocumentWithPreEncodedDocuments() { + // given + BsonDocument originalCommandDocument = new BsonDocument("insert", new BsonString("coll")) + .append("documents", new BsonArray(asList( + new BsonDocument("_id", new BsonInt32(1)), + new BsonDocument("_id", new BsonInt32(2)) + ))); + + CommandMessage message = new CommandMessage( + NAMESPACE.getDatabaseName(), originalCommandDocument, NoOpFieldNameValidator.INSTANCE, + ReadPreference.primary(), MessageSettings.builder().maxWireVersion(LATEST_WIRE_VERSION).build(), true, + EmptyMessageSequences.INSTANCE, ClusterConnectionMode.MULTIPLE, null + ); + + try (ByteBufferBsonOutput output = new ByteBufferBsonOutput(new SimpleBufferProvider())) { + message.encode( + output, + new OperationContext(IgnorableRequestContext.INSTANCE, NoOpSessionContext.INSTANCE, + new TimeoutContext(TimeoutSettings.DEFAULT), null) + ); + + // when + try (ByteBufBsonDocument commandDoc = message.getCommandDocument(output)) { + // then + assertEquals("coll", commandDoc.getString("insert").getValue()); + assertEquals(NAMESPACE.getDatabaseName(), commandDoc.getString("$db").getValue()); + BsonArray docsArray = commandDoc.getArray("documents"); + assertEquals(2, docsArray.size()); + } + } + } + + @Test + @DisplayName("encode respects max message size constraint") + void encodeShouldRespectMaxMessageSize() { + // given + int maxMessageSize = 1024; + MessageSettings messageSettings = MessageSettings.builder() + .maxMessageSize(maxMessageSize) + .maxWireVersion(LATEST_WIRE_VERSION) + .build(); + BsonDocument insertCommand = new BsonDocument("insert", new BsonString(NAMESPACE.getCollectionName())); + + List requests = asList( + new WriteRequestWithIndex( + new InsertRequest(new BsonDocument("_id", new BsonInt32(1)).append("a", new BsonBinary(new byte[913]))), + 0), + new WriteRequestWithIndex( + new InsertRequest(new BsonDocument("_id", new BsonInt32(2)).append("b", new BsonBinary(new byte[441]))), + 1), + new WriteRequestWithIndex( + new InsertRequest(new BsonDocument("_id", new BsonInt32(3)).append("c", new BsonBinary(new byte[450]))), + 2), + new WriteRequestWithIndex( + new InsertRequest(new BsonDocument("_id", new BsonInt32(4)).append("b", new BsonBinary(new byte[441]))), + 3), + new WriteRequestWithIndex( + new InsertRequest(new BsonDocument("_id", new BsonInt32(5)).append("c", new BsonBinary(new byte[451]))), + 4) + ); + + SplittablePayload payload = new SplittablePayload( + SplittablePayload.Type.INSERT, requests, true, NoOpFieldNameValidator.INSTANCE + ); + + CommandMessage message = new CommandMessage( + NAMESPACE.getDatabaseName(), insertCommand, NoOpFieldNameValidator.INSTANCE, + ReadPreference.primary(), messageSettings, false, payload, ClusterConnectionMode.MULTIPLE, null + ); + + try (ByteBufferBsonOutput output = new ByteBufferBsonOutput(new SimpleBufferProvider())) { + // when - encode first batch + message.encode(output, new OperationContext( + IgnorableRequestContext.INSTANCE, NoOpSessionContext.INSTANCE, + new TimeoutContext(TimeoutSettings.DEFAULT), null + )); + + // then - first batch respects size constraint + assertTrue(output.size() <= maxMessageSize, "Output size " + output.size() + " should not exceed max " + maxMessageSize); + assertEquals(1, payload.getPosition()); + + // Verify multiple splits were created + assertTrue(payload.hasAnotherSplit()); + } + } + + @Test + @DisplayName("encode respects max batch count constraint") + void encodeShouldRespectMaxBatchCount() { + // given + MessageSettings messageSettings = MessageSettings.builder() + .maxBatchCount(2) + .maxWireVersion(LATEST_WIRE_VERSION) + .build(); + + List requests = asList( + new WriteRequestWithIndex( + new InsertRequest(new BsonDocument("a", new BsonBinary(new byte[900]))), + 0), + new WriteRequestWithIndex( + new InsertRequest(new BsonDocument("b", new BsonBinary(new byte[450]))), + 1), + new WriteRequestWithIndex( + new InsertRequest(new BsonDocument("c", new BsonBinary(new byte[450]))), + 2) + ); + + SplittablePayload payload = new SplittablePayload( + SplittablePayload.Type.INSERT, requests, true, NoOpFieldNameValidator.INSTANCE + ); + + CommandMessage message = new CommandMessage( + NAMESPACE.getDatabaseName(), COMMAND, NoOpFieldNameValidator.INSTANCE, + ReadPreference.primary(), messageSettings, false, payload, ClusterConnectionMode.MULTIPLE, null + ); + + try (ByteBufferBsonOutput output = new ByteBufferBsonOutput(new SimpleBufferProvider())) { + // when - encode first batch with max 2 documents + message.encode(output, new OperationContext( + IgnorableRequestContext.INSTANCE, NoOpSessionContext.INSTANCE, + new TimeoutContext(TimeoutSettings.DEFAULT), null + )); + + // then - first batch has 2 documents + assertEquals(2, payload.getPosition()); + assertTrue(payload.hasAnotherSplit()); } } + + @Test + @DisplayName("encode throws exception when payload document exceeds max document size") + void encodeShouldThrowWhenPayloadDocumentExceedsMaxSize() { + // given + MessageSettings messageSettings = MessageSettings.builder() + .maxDocumentSize(900) + .maxWireVersion(LATEST_WIRE_VERSION) + .build(); + + List requests = singletonList( + new WriteRequestWithIndex( + new InsertRequest(new BsonDocument("a", new BsonBinary(new byte[900]))), + 0) + ); + + SplittablePayload payload = new SplittablePayload( + SplittablePayload.Type.INSERT, requests, true, NoOpFieldNameValidator.INSTANCE + ); + + CommandMessage message = new CommandMessage( + NAMESPACE.getDatabaseName(), COMMAND, NoOpFieldNameValidator.INSTANCE, + ReadPreference.primary(), messageSettings, false, payload, ClusterConnectionMode.MULTIPLE, null + ); + + try (ByteBufferBsonOutput output = new ByteBufferBsonOutput(new SimpleBufferProvider())) { + // when & then + assertThrows(BsonMaximumSizeExceededException.class, () -> + message.encode(output, new OperationContext( + IgnorableRequestContext.INSTANCE, NoOpSessionContext.INSTANCE, + new TimeoutContext(TimeoutSettings.DEFAULT), null + )) + ); + } + } + + @Test + @DisplayName("encode message with cluster time encodes successfully") + void encodeWithClusterTime() { + CommandMessage message = new CommandMessage( + NAMESPACE.getDatabaseName(), COMMAND, NoOpFieldNameValidator.INSTANCE, + ReadPreference.primary(), + MessageSettings.builder().maxWireVersion(LATEST_WIRE_VERSION).build(), + true, EmptyMessageSequences.INSTANCE, ClusterConnectionMode.MULTIPLE, null + ); + + try (ByteBufferBsonOutput output = new ByteBufferBsonOutput(new SimpleBufferProvider())) { + message.encode(output, new OperationContext( + IgnorableRequestContext.INSTANCE, NoOpSessionContext.INSTANCE, + new TimeoutContext(TimeoutSettings.DEFAULT), null + )); + + try (ByteBufBsonDocument commandDoc = message.getCommandDocument(output)) { + assertTrue(output.size() > 0, "Output should contain encoded message"); + assertEquals(NAMESPACE.getDatabaseName(), commandDoc.getString("$db").getValue()); + } + } + } + + @Test + @DisplayName("encode message with active session encodes successfully") + void encodeWithActiveSession() { + CommandMessage message = new CommandMessage( + NAMESPACE.getDatabaseName(), COMMAND, NoOpFieldNameValidator.INSTANCE, + ReadPreference.primary(), + MessageSettings.builder().maxWireVersion(LATEST_WIRE_VERSION).build(), + true, EmptyMessageSequences.INSTANCE, ClusterConnectionMode.MULTIPLE, null + ); + + try (ByteBufferBsonOutput output = new ByteBufferBsonOutput(new SimpleBufferProvider())) { + message.encode(output, new OperationContext( + IgnorableRequestContext.INSTANCE, NoOpSessionContext.INSTANCE, + new TimeoutContext(TimeoutSettings.DEFAULT), null + )); + + try (ByteBufBsonDocument commandDoc = message.getCommandDocument(output)) { + assertTrue(output.size() > 0, "Output should contain encoded message"); + assertEquals(NAMESPACE.getDatabaseName(), commandDoc.getString("$db").getValue()); + } + } + } + + @Test + @DisplayName("encode message with secondary read preference encodes successfully") + void encodeWithSecondaryReadPreference() { + ReadPreference secondary = ReadPreference.secondary(); + CommandMessage message = new CommandMessage( + NAMESPACE.getDatabaseName(), COMMAND, NoOpFieldNameValidator.INSTANCE, + secondary, + MessageSettings.builder().maxWireVersion(LATEST_WIRE_VERSION).build(), + true, EmptyMessageSequences.INSTANCE, ClusterConnectionMode.MULTIPLE, null + ); + + try (ByteBufferBsonOutput output = new ByteBufferBsonOutput(new SimpleBufferProvider())) { + message.encode(output, new OperationContext( + IgnorableRequestContext.INSTANCE, NoOpSessionContext.INSTANCE, + new TimeoutContext(TimeoutSettings.DEFAULT), null + )); + + try (ByteBufBsonDocument commandDoc = message.getCommandDocument(output)) { + assertTrue(output.size() > 0, "Output should contain encoded message"); + assertEquals(NAMESPACE.getDatabaseName(), commandDoc.getString("$db").getValue()); + } + } + } + + @Test + @DisplayName("encode message in single cluster mode encodes successfully") + void encodeInSingleClusterMode() { + CommandMessage message = new CommandMessage( + NAMESPACE.getDatabaseName(), COMMAND, NoOpFieldNameValidator.INSTANCE, + ReadPreference.primary(), + MessageSettings.builder() + .maxWireVersion(LATEST_WIRE_VERSION) + .serverType(ServerType.REPLICA_SET_PRIMARY) + .build(), + true, EmptyMessageSequences.INSTANCE, ClusterConnectionMode.SINGLE, null + ); + + try (ByteBufferBsonOutput output = new ByteBufferBsonOutput(new SimpleBufferProvider())) { + message.encode(output, new OperationContext( + IgnorableRequestContext.INSTANCE, NoOpSessionContext.INSTANCE, + new TimeoutContext(TimeoutSettings.DEFAULT), null + )); + + try (ByteBufBsonDocument commandDoc = message.getCommandDocument(output)) { + assertTrue(output.size() > 0, "Output should contain encoded message"); + assertEquals(NAMESPACE.getDatabaseName(), commandDoc.getString("$db").getValue()); + } + } + } + + @Test + @DisplayName("encode includes database name in command document") + void encodeIncludesDatabaseName() { + CommandMessage message = new CommandMessage( + NAMESPACE.getDatabaseName(), COMMAND, NoOpFieldNameValidator.INSTANCE, + ReadPreference.primary(), + MessageSettings.builder().maxWireVersion(LATEST_WIRE_VERSION).build(), + true, EmptyMessageSequences.INSTANCE, ClusterConnectionMode.MULTIPLE, null + ); + + try (ByteBufferBsonOutput output = new ByteBufferBsonOutput(new SimpleBufferProvider())) { + message.encode(output, new OperationContext( + IgnorableRequestContext.INSTANCE, NoOpSessionContext.INSTANCE, + new TimeoutContext(TimeoutSettings.DEFAULT), null + )); + + try (ByteBufBsonDocument commandDoc = message.getCommandDocument(output)) { + assertEquals(NAMESPACE.getDatabaseName(), commandDoc.getString("$db").getValue()); + } + } + } + + @Test + @DisplayName("command document can be accessed multiple times") + void commandDocumentCanBeAccessedMultipleTimes() { + BsonDocument originalCommand = new BsonDocument("find", new BsonString("coll")) + .append("filter", new BsonDocument("_id", new BsonInt32(1))); + + CommandMessage message = new CommandMessage( + NAMESPACE.getDatabaseName(), originalCommand, NoOpFieldNameValidator.INSTANCE, + ReadPreference.primary(), + MessageSettings.builder().maxWireVersion(LATEST_WIRE_VERSION).build(), + true, EmptyMessageSequences.INSTANCE, ClusterConnectionMode.MULTIPLE, null + ); + + try (ByteBufferBsonOutput output = new ByteBufferBsonOutput(new SimpleBufferProvider())) { + message.encode(output, new OperationContext( + IgnorableRequestContext.INSTANCE, NoOpSessionContext.INSTANCE, + new TimeoutContext(TimeoutSettings.DEFAULT), null + )); + + try (ByteBufBsonDocument commandDoc = message.getCommandDocument(output)) { + // Access same fields multiple times + assertEquals("coll", commandDoc.getString("find").getValue()); + assertEquals("coll", commandDoc.getString("find").getValue()); + BsonDocument filter = commandDoc.getDocument("filter"); + BsonDocument filter2 = commandDoc.getDocument("filter"); + assertEquals(filter, filter2); + } + } + } + + @Test + @DisplayName("encode with multiple document sequences creates proper arrays") + void encodeWithMultipleDocumentsInSequence() { + BsonDocument insertCommand = new BsonDocument("insert", new BsonString("coll")); + List requests = asList( + new WriteRequestWithIndex( + new InsertRequest(new BsonDocument("_id", new BsonInt32(1)).append("name", new BsonString("doc1"))), + 0), + new WriteRequestWithIndex( + new InsertRequest(new BsonDocument("_id", new BsonInt32(2)).append("name", new BsonString("doc2"))), + 1), + new WriteRequestWithIndex( + new InsertRequest(new BsonDocument("_id", new BsonInt32(3)).append("name", new BsonString("doc3"))), + 2) + ); + + SplittablePayload payload = new SplittablePayload( + SplittablePayload.Type.INSERT, requests, true, NoOpFieldNameValidator.INSTANCE + ); + + CommandMessage message = new CommandMessage( + NAMESPACE.getDatabaseName(), insertCommand, NoOpFieldNameValidator.INSTANCE, + ReadPreference.primary(), + MessageSettings.builder().maxWireVersion(LATEST_WIRE_VERSION).build(), + true, payload, ClusterConnectionMode.MULTIPLE, null + ); + + try (ByteBufferBsonOutput output = new ByteBufferBsonOutput(new SimpleBufferProvider())) { + message.encode(output, new OperationContext( + IgnorableRequestContext.INSTANCE, NoOpSessionContext.INSTANCE, + new TimeoutContext(TimeoutSettings.DEFAULT), null + )); + + try (ByteBufBsonDocument commandDoc = message.getCommandDocument(output)) { + BsonArray documents = commandDoc.getArray("documents"); + assertEquals(3, documents.size()); + assertEquals(1, documents.get(0).asDocument().getInt32("_id").getValue()); + assertEquals(2, documents.get(1).asDocument().getInt32("_id").getValue()); + assertEquals(3, documents.get(2).asDocument().getInt32("_id").getValue()); + } + } + } + + @Test + @DisplayName("encode with response not expected sets continuation flag") + void encodeWithResponseNotExpected() { + CommandMessage message = new CommandMessage( + NAMESPACE.getDatabaseName(), COMMAND, NoOpFieldNameValidator.INSTANCE, + ReadPreference.primary(), + MessageSettings.builder().maxWireVersion(LATEST_WIRE_VERSION).build(), + false, EmptyMessageSequences.INSTANCE, ClusterConnectionMode.MULTIPLE, null + ); + + try (ByteBufferBsonOutput output = new ByteBufferBsonOutput(new SimpleBufferProvider())) { + message.encode(output, new OperationContext( + IgnorableRequestContext.INSTANCE, NoOpSessionContext.INSTANCE, + new TimeoutContext(TimeoutSettings.DEFAULT), null + )); + + // Verify encoded message has continuation flag (0x02) + assertTrue(output.size() > 0, "Output should contain encoded message"); + } + } + + @Test + @DisplayName("encode preserves original command structure") + void encodePreservesCommandStructure() { + BsonDocument complexCommand = new BsonDocument("aggregate", new BsonString("coll")) + .append("pipeline", new BsonArray(asList( + new BsonDocument("$match", new BsonDocument("status", new BsonString("active"))), + new BsonDocument("$group", new BsonDocument("_id", new BsonString("$category"))) + ))) + .append("cursor", new BsonDocument("batchSize", new BsonInt32(100))); + + CommandMessage message = new CommandMessage( + NAMESPACE.getDatabaseName(), complexCommand, NoOpFieldNameValidator.INSTANCE, + ReadPreference.primary(), + MessageSettings.builder().maxWireVersion(LATEST_WIRE_VERSION).build(), + true, EmptyMessageSequences.INSTANCE, ClusterConnectionMode.MULTIPLE, null + ); + + try (ByteBufferBsonOutput output = new ByteBufferBsonOutput(new SimpleBufferProvider())) { + message.encode(output, new OperationContext( + IgnorableRequestContext.INSTANCE, NoOpSessionContext.INSTANCE, + new TimeoutContext(TimeoutSettings.DEFAULT), null + )); + + try (ByteBufBsonDocument commandDoc = message.getCommandDocument(output)) { + assertEquals("coll", commandDoc.getString("aggregate").getValue()); + BsonArray pipeline = commandDoc.getArray("pipeline"); + assertEquals(2, pipeline.size()); + BsonDocument cursor = commandDoc.getDocument("cursor"); + assertEquals(100, cursor.getInt32("batchSize").getValue()); + } + } + } + } diff --git a/driver-core/src/test/unit/com/mongodb/internal/connection/LoggingCommandEventSenderSpecification.groovy b/driver-core/src/test/unit/com/mongodb/internal/connection/LoggingCommandEventSenderSpecification.groovy index e6f6afb02e0..8e7a7b9d78d 100644 --- a/driver-core/src/test/unit/com/mongodb/internal/connection/LoggingCommandEventSenderSpecification.groovy +++ b/driver-core/src/test/unit/com/mongodb/internal/connection/LoggingCommandEventSenderSpecification.groovy @@ -64,8 +64,10 @@ class LoggingCommandEventSenderSpecification extends Specification { isDebugEnabled() >> debugLoggingEnabled } def operationContext = OPERATION_CONTEXT + def commandMessageDocument = message.getCommandDocument(bsonOutput) + def sender = new LoggingCommandEventSender([] as Set, [] as Set, connectionDescription, commandListener, - operationContext, message, message.getCommandDocument(bsonOutput), + operationContext, message, commandMessageDocument, new StructuredLogger(logger), LoggerSettings.builder().build()) when: @@ -87,6 +89,9 @@ class LoggingCommandEventSenderSpecification extends Specification { database, commandDocument.getFirstKey(), 1, failureException) ]) + cleanup: + commandMessageDocument?.close() + where: debugLoggingEnabled << [true, false] } @@ -110,8 +115,10 @@ class LoggingCommandEventSenderSpecification extends Specification { isDebugEnabled() >> true } def operationContext = OPERATION_CONTEXT + def commandMessageDocument = message.getCommandDocument(bsonOutput) + def sender = new LoggingCommandEventSender([] as Set, [] as Set, connectionDescription, commandListener, - operationContext, message, message.getCommandDocument(bsonOutput), new StructuredLogger(logger), + operationContext, message, commandMessageDocument, new StructuredLogger(logger), LoggerSettings.builder().build()) when: sender.sendStartedEvent() @@ -146,6 +153,9 @@ class LoggingCommandEventSenderSpecification extends Specification { "request ID is ${message.getId()} and the operation ID is ${operationContext.getId()}.") }, failureException) + cleanup: + commandMessageDocument?.close() + where: commandListener << [null, Stub(CommandListener)] } @@ -167,6 +177,7 @@ class LoggingCommandEventSenderSpecification extends Specification { isDebugEnabled() >> true } def operationContext = OPERATION_CONTEXT + def commandMessageDocument = message.getCommandDocument(bsonOutput) def sender = new LoggingCommandEventSender([] as Set, [] as Set, connectionDescription, null, operationContext, message, message.getCommandDocument(bsonOutput), new StructuredLogger(logger), LoggerSettings.builder().build()) @@ -182,6 +193,9 @@ class LoggingCommandEventSenderSpecification extends Specification { "request ID is ${message.getId()} and the operation ID is ${operationContext.getId()}. " + "Command: {\"fake\": {\"\$binary\": {\"base64\": \"${'A' * 967} ..." } + + cleanup: + commandMessageDocument?.close() } def 'should log redacted command with ellipses'() { @@ -201,8 +215,9 @@ class LoggingCommandEventSenderSpecification extends Specification { isDebugEnabled() >> true } def operationContext = OPERATION_CONTEXT + def commandMessageDocument = message.getCommandDocument(bsonOutput) def sender = new LoggingCommandEventSender(['createUser'] as Set, [] as Set, connectionDescription, null, - operationContext, message, message.getCommandDocument(bsonOutput), new StructuredLogger(logger), + operationContext, message, commandMessageDocument, new StructuredLogger(logger), LoggerSettings.builder().build()) when: @@ -215,5 +230,8 @@ class LoggingCommandEventSenderSpecification extends Specification { "${connectionDescription.connectionId.serverValue} to 127.0.0.1:27017. The " + "request ID is ${message.getId()} and the operation ID is ${operationContext.getId()}. Command: {}" } + + cleanup: + commandMessageDocument?.close() } }