diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipeTsFileDecompositionWithModsIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipeTsFileDecompositionWithModsIT.java index 89f1c3fb4401d..87571e1eb71e6 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipeTsFileDecompositionWithModsIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipeTsFileDecompositionWithModsIT.java @@ -657,4 +657,320 @@ public void testTsFileDecompositionWithModsLargeScaleSinglePointDeletion() throw "COUNT(root.sg1.d1.s5),", Collections.singleton("3591,")); } + + /** + * Test IoTDB pipe handling TsFile decomposition with Mods (modification operations) in tree model + * - Large scale single point deletion scenario (non-aligned timeseries) + * + *
Test scenario: 1. Create database root.sg1 with 1 device: d1 (non-aligned timeseries with 5 + * sensors) 2. Insert 20000 data points for each sensor with different time ranges: - s1: time + * 1-20000 - s2: time 10001-30000 - s3: time 20001-40000 - s4: time 30001-50000 - s5: time + * 40001-60000 3. Execute FLUSH operation to persist data to TsFile 4. Execute 2000 single point + * DELETE operations, each deleting one time point from different sensors 5. Execute FLUSH + * operation again 6. Create pipe with mods enabled 7. Verify correctness of receiver data: - Each + * sensor should have 19800 remaining data points - Deleted points should not appear in receiver + * + *
Test purpose: Verify that IoTDB pipe can correctly handle large scale single point deletion + * operations in TsFile under tree model with non-aligned timeseries, ensuring the binary search + * optimization in ModsOperationUtil works correctly with many modification entries. + */ + @Test + public void testTsFileDecompositionWithModsLargeScaleSinglePointDeletionNonAligned() + throws Exception { + TestUtils.executeNonQueryWithRetry(senderEnv, "CREATE DATABASE root.sg1"); + TestUtils.executeNonQueryWithRetry(receiverEnv, "CREATE DATABASE root.sg1"); + + // Insert 20000 data points for s1 (time 1-20000) + String s1 = "INSERT INTO root.sg1.d1(time, s1) VALUES "; + StringBuilder insertBuilder1 = new StringBuilder(s1); + for (int i = 1; i <= 20000; i++) { + insertBuilder1.append("(").append(i).append(",").append(1.0f).append(")"); + if (i % 50 != 0) { + insertBuilder1.append(","); + } else { + TestUtils.executeNonQueryWithRetry(senderEnv, insertBuilder1.toString()); + insertBuilder1 = new StringBuilder(s1); + } + } + // Execute remaining data if any + if (insertBuilder1.length() > s1.length()) { + TestUtils.executeNonQueryWithRetry(senderEnv, insertBuilder1.toString()); + } + + // Insert 20000 data points for s2 (time 10001-30000) + String s2 = "INSERT INTO root.sg1.d1(time, s2) VALUES "; + StringBuilder insertBuilder2 = new StringBuilder(s2); + for (int i = 10001; i <= 30000; i++) { + insertBuilder2.append("(").append(i).append(",").append(2.0f).append(")"); + if (i % 50 != 0) { + insertBuilder2.append(","); + } else { + TestUtils.executeNonQueryWithRetry(senderEnv, insertBuilder2.toString()); + insertBuilder2 = new StringBuilder(s2); + } + } + // Execute remaining data if any + if (insertBuilder2.length() > s2.length()) { + TestUtils.executeNonQueryWithRetry(senderEnv, insertBuilder2.toString()); + } + + // Insert 20000 data points for s3 (time 20001-40000) + String s3 = "INSERT INTO root.sg1.d1(time, s3) VALUES "; + StringBuilder insertBuilder3 = new StringBuilder(s3); + for (int i = 20001; i <= 40000; i++) { + insertBuilder3.append("(").append(i).append(",").append(3.0f).append(")"); + if (i % 50 != 0) { + insertBuilder3.append(","); + } else { + TestUtils.executeNonQueryWithRetry(senderEnv, insertBuilder3.toString()); + insertBuilder3 = new StringBuilder(s3); + } + } + // Execute remaining data if any + if (insertBuilder3.length() > s3.length()) { + TestUtils.executeNonQueryWithRetry(senderEnv, insertBuilder3.toString()); + } + + // Insert 20000 data points for s4 (time 30001-50000) + String s4 = "INSERT INTO root.sg1.d1(time, s4) VALUES "; + StringBuilder insertBuilder4 = new StringBuilder(s4); + for (int i = 30001; i <= 50000; i++) { + insertBuilder4.append("(").append(i).append(",").append(4.0f).append(")"); + if (i % 50 != 0) { + insertBuilder4.append(","); + } else { + TestUtils.executeNonQueryWithRetry(senderEnv, insertBuilder4.toString()); + insertBuilder4 = new StringBuilder(s4); + } + } + // Execute remaining data if any + if (insertBuilder4.length() > s4.length()) { + TestUtils.executeNonQueryWithRetry(senderEnv, insertBuilder4.toString()); + } + + // Insert 20000 data points for s5 (time 40001-60000) + String s5 = "INSERT INTO root.sg1.d1(time, s5) VALUES "; + StringBuilder insertBuilder5 = new StringBuilder(s5); + for (int i = 40001; i <= 60000; i++) { + insertBuilder5.append("(").append(i).append(",").append(5.0f).append(")"); + if (i % 50 != 0) { + insertBuilder5.append(","); + } else { + TestUtils.executeNonQueryWithRetry(senderEnv, insertBuilder5.toString()); + insertBuilder5 = new StringBuilder(s5); + } + } + // Execute remaining data if any + if (insertBuilder5.length() > s5.length()) { + TestUtils.executeNonQueryWithRetry(senderEnv, insertBuilder5.toString()); + } + + TestUtils.executeNonQueryWithRetry(senderEnv, "FLUSH"); + + // Execute 2000 single point DELETE operations + // Delete 400 points from each sensor (distributed across different time ranges) + for (int i = 0; i < 400; i++) { + // Delete from s1: every 10th point starting from 10 + TestUtils.executeNonQueryWithRetry( + senderEnv, "DELETE FROM root.sg1.d1.s1 WHERE time = " + (10 + i * 10)); + + // Delete from s2: every 10th point starting from 10010 + TestUtils.executeNonQueryWithRetry( + senderEnv, "DELETE FROM root.sg1.d1.s2 WHERE time = " + (10010 + i * 10)); + + // Delete from s3: every 10th point starting from 20010 + TestUtils.executeNonQueryWithRetry( + senderEnv, "DELETE FROM root.sg1.d1.s3 WHERE time = " + (20010 + i * 10)); + + // Delete from s4: every 10th point starting from 30010 + TestUtils.executeNonQueryWithRetry( + senderEnv, "DELETE FROM root.sg1.d1.s4 WHERE time = " + (30010 + i * 10)); + + // Delete from s5: every 10th point starting from 40010 + TestUtils.executeNonQueryWithRetry( + senderEnv, "DELETE FROM root.sg1.d1.s5 WHERE time = " + (40010 + i * 10)); + } + + TestUtils.executeNonQueryWithRetry(senderEnv, "FLUSH"); + + // Verify sender data integrity before creating pipe to avoid leader election issues + // This ensures all data is properly persisted and consistent on sender side + // before starting the pipe synchronization process + TestUtils.assertDataEventuallyOnEnv( + senderEnv, + "SELECT COUNT(**) FROM root.sg1.d1", + "COUNT(root.sg1.d1.s3),COUNT(root.sg1.d1.s4),COUNT(root.sg1.d1.s5),COUNT(root.sg1.d1.s1),COUNT(root.sg1.d1.s2),", + Collections.singleton("19600,19600,19600,19600,19600,")); + + executeNonQueryWithRetry( + senderEnv, + String.format( + "CREATE PIPE test_pipe WITH SOURCE ('mods.enable'='true') WITH CONNECTOR('ip'='%s', 'port'='%s', 'format'='tablet')", + receiverEnv.getDataNodeWrapperList().get(0).getIp(), + receiverEnv.getDataNodeWrapperList().get(0).getPort())); + + // Verify total count of all sensors using COUNT(*) + TestUtils.assertDataEventuallyOnEnv( + receiverEnv, + "SELECT COUNT(**) FROM root.sg1.d1", + "COUNT(root.sg1.d1.s3),COUNT(root.sg1.d1.s4),COUNT(root.sg1.d1.s5),COUNT(root.sg1.d1.s1),COUNT(root.sg1.d1.s2),", + Collections.singleton("19600,19600,19600,19600,19600,")); + + // Verify individual sensor counts + TestUtils.assertDataEventuallyOnEnv( + receiverEnv, + "SELECT COUNT(s1) FROM root.sg1.d1", + "COUNT(root.sg1.d1.s1),", + Collections.singleton("19600,")); + + TestUtils.assertDataEventuallyOnEnv( + receiverEnv, + "SELECT COUNT(s2) FROM root.sg1.d1", + "COUNT(root.sg1.d1.s2),", + Collections.singleton("19600,")); + + TestUtils.assertDataEventuallyOnEnv( + receiverEnv, + "SELECT COUNT(s3) FROM root.sg1.d1", + "COUNT(root.sg1.d1.s3),", + Collections.singleton("19600,")); + + TestUtils.assertDataEventuallyOnEnv( + receiverEnv, + "SELECT COUNT(s4) FROM root.sg1.d1", + "COUNT(root.sg1.d1.s4),", + Collections.singleton("19600,")); + + TestUtils.assertDataEventuallyOnEnv( + receiverEnv, + "SELECT COUNT(s5) FROM root.sg1.d1", + "COUNT(root.sg1.d1.s5),", + Collections.singleton("19600,")); + + // Verify count of deleted time ranges using COUNT with WHERE clause + // These should return 0 since all points in these ranges were deleted + TestUtils.assertDataEventuallyOnEnv( + receiverEnv, + "SELECT COUNT(s1) FROM root.sg1.d1 WHERE time >= 10 AND time <= 4000 AND time % 10 = 0", + "COUNT(root.sg1.d1.s1),", + Collections.singleton("0,")); + + TestUtils.assertDataEventuallyOnEnv( + receiverEnv, + "SELECT COUNT(s2) FROM root.sg1.d1 WHERE time >= 10010 AND time <= 14000 AND time % 10 = 0", + "COUNT(root.sg1.d1.s2),", + Collections.singleton("0,")); + + TestUtils.assertDataEventuallyOnEnv( + receiverEnv, + "SELECT COUNT(s3) FROM root.sg1.d1 WHERE time >= 20010 AND time <= 24000 AND time % 10 = 0", + "COUNT(root.sg1.d1.s3),", + Collections.singleton("0,")); + + TestUtils.assertDataEventuallyOnEnv( + receiverEnv, + "SELECT COUNT(s4) FROM root.sg1.d1 WHERE time >= 30010 AND time <= 34000 AND time % 10 = 0", + "COUNT(root.sg1.d1.s4),", + Collections.singleton("0,")); + + TestUtils.assertDataEventuallyOnEnv( + receiverEnv, + "SELECT COUNT(s5) FROM root.sg1.d1 WHERE time >= 40010 AND time <= 44000 AND time % 10 = 0", + "COUNT(root.sg1.d1.s5),", + Collections.singleton("0,")); + + // Verify count of non-deleted time ranges using multiple range queries + // Check ranges before deletion area + TestUtils.assertDataEventuallyOnEnv( + receiverEnv, + "SELECT COUNT(s1) FROM root.sg1.d1 WHERE time >= 1 AND time < 10", + "COUNT(root.sg1.d1.s1),", + Collections.singleton("9,")); + + TestUtils.assertDataEventuallyOnEnv( + receiverEnv, + "SELECT COUNT(s2) FROM root.sg1.d1 WHERE time >= 10001 AND time < 10010", + "COUNT(root.sg1.d1.s2),", + Collections.singleton("9,")); + + TestUtils.assertDataEventuallyOnEnv( + receiverEnv, + "SELECT COUNT(s3) FROM root.sg1.d1 WHERE time >= 20001 AND time < 20010", + "COUNT(root.sg1.d1.s3),", + Collections.singleton("9,")); + + TestUtils.assertDataEventuallyOnEnv( + receiverEnv, + "SELECT COUNT(s4) FROM root.sg1.d1 WHERE time >= 30001 AND time < 30010", + "COUNT(root.sg1.d1.s4),", + Collections.singleton("9,")); + + TestUtils.assertDataEventuallyOnEnv( + receiverEnv, + "SELECT COUNT(s5) FROM root.sg1.d1 WHERE time >= 40001 AND time < 40010", + "COUNT(root.sg1.d1.s5),", + Collections.singleton("9,")); + + // Check ranges after deletion area + TestUtils.assertDataEventuallyOnEnv( + receiverEnv, + "SELECT COUNT(s1) FROM root.sg1.d1 WHERE time > 4000 AND time <= 20000", + "COUNT(root.sg1.d1.s1),", + Collections.singleton("16000,")); + + TestUtils.assertDataEventuallyOnEnv( + receiverEnv, + "SELECT COUNT(s2) FROM root.sg1.d1 WHERE time > 14000 AND time <= 30000", + "COUNT(root.sg1.d1.s2),", + Collections.singleton("16000,")); + + TestUtils.assertDataEventuallyOnEnv( + receiverEnv, + "SELECT COUNT(s3) FROM root.sg1.d1 WHERE time > 24000 AND time <= 40000", + "COUNT(root.sg1.d1.s3),", + Collections.singleton("16000,")); + + TestUtils.assertDataEventuallyOnEnv( + receiverEnv, + "SELECT COUNT(s4) FROM root.sg1.d1 WHERE time > 34000 AND time <= 50000", + "COUNT(root.sg1.d1.s4),", + Collections.singleton("16000,")); + + TestUtils.assertDataEventuallyOnEnv( + receiverEnv, + "SELECT COUNT(s5) FROM root.sg1.d1 WHERE time > 44000 AND time <= 60000", + "COUNT(root.sg1.d1.s5),", + Collections.singleton("16000,")); + + // Check non-deleted points within deletion range (every 10th point except the ones we deleted) + TestUtils.assertDataEventuallyOnEnv( + receiverEnv, + "SELECT COUNT(s1) FROM root.sg1.d1 WHERE time >= 10 AND time <= 4000 AND time % 10 != 0", + "COUNT(root.sg1.d1.s1),", + Collections.singleton("3591,")); + + TestUtils.assertDataEventuallyOnEnv( + receiverEnv, + "SELECT COUNT(s2) FROM root.sg1.d1 WHERE time >= 10010 AND time <= 14000 AND time % 10 != 0", + "COUNT(root.sg1.d1.s2),", + Collections.singleton("3591,")); + + TestUtils.assertDataEventuallyOnEnv( + receiverEnv, + "SELECT COUNT(s3) FROM root.sg1.d1 WHERE time >= 20010 AND time <= 24000 AND time % 10 != 0", + "COUNT(root.sg1.d1.s3),", + Collections.singleton("3591,")); + + TestUtils.assertDataEventuallyOnEnv( + receiverEnv, + "SELECT COUNT(s4) FROM root.sg1.d1 WHERE time >= 30010 AND time <= 34000 AND time % 10 != 0", + "COUNT(root.sg1.d1.s4),", + Collections.singleton("3591,")); + + TestUtils.assertDataEventuallyOnEnv( + receiverEnv, + "SELECT COUNT(s5) FROM root.sg1.d1 WHERE time >= 40010 AND time <= 44000 AND time % 10 != 0", + "COUNT(root.sg1.d1.s5),", + Collections.singleton("3591,")); + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java index 5c9985fe748b7..99d35f7eb98bd 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java @@ -426,6 +426,11 @@ private boolean putValueToColumns(final BatchData data, final Tablet tablet, fin } } } else { + if (!modsInfos.isEmpty() + && ModsOperationUtil.isDelete(data.currentTime(), modsInfos.get(0))) { + return false; + } + isNeedFillTime = true; switch (tablet.getSchemas().get(0).getType()) { case BOOLEAN: