Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -657,4 +657,320 @@ public void testTsFileDecompositionWithModsLargeScaleSinglePointDeletion() throw
"COUNT(root.sg1.d1.s5),",
Collections.singleton("3591,"));
}

/**
* Test IoTDB pipe handling TsFile decomposition with Mods (modification operations) in tree model
* - Large scale single point deletion scenario (non-aligned timeseries)
*
* <p>Test scenario: 1. Create database root.sg1 with 1 device: d1 (non-aligned timeseries with 5
* sensors) 2. Insert 20000 data points for each sensor with different time ranges: - s1: time
* 1-20000 - s2: time 10001-30000 - s3: time 20001-40000 - s4: time 30001-50000 - s5: time
* 40001-60000 3. Execute FLUSH operation to persist data to TsFile 4. Execute 2000 single point
* DELETE operations, each deleting one time point from different sensors 5. Execute FLUSH
* operation again 6. Create pipe with mods enabled 7. Verify correctness of receiver data: - Each
* sensor should have 19800 remaining data points - Deleted points should not appear in receiver
*
* <p>Test purpose: Verify that IoTDB pipe can correctly handle large scale single point deletion
* operations in TsFile under tree model with non-aligned timeseries, ensuring the binary search
* optimization in ModsOperationUtil works correctly with many modification entries.
*/
@Test
public void testTsFileDecompositionWithModsLargeScaleSinglePointDeletionNonAligned()
throws Exception {
TestUtils.executeNonQueryWithRetry(senderEnv, "CREATE DATABASE root.sg1");
TestUtils.executeNonQueryWithRetry(receiverEnv, "CREATE DATABASE root.sg1");

// Insert 20000 data points for s1 (time 1-20000)
String s1 = "INSERT INTO root.sg1.d1(time, s1) VALUES ";
StringBuilder insertBuilder1 = new StringBuilder(s1);
for (int i = 1; i <= 20000; i++) {
insertBuilder1.append("(").append(i).append(",").append(1.0f).append(")");
if (i % 50 != 0) {
insertBuilder1.append(",");
} else {
TestUtils.executeNonQueryWithRetry(senderEnv, insertBuilder1.toString());
insertBuilder1 = new StringBuilder(s1);
}
}
// Execute remaining data if any
if (insertBuilder1.length() > s1.length()) {
TestUtils.executeNonQueryWithRetry(senderEnv, insertBuilder1.toString());
}

// Insert 20000 data points for s2 (time 10001-30000)
String s2 = "INSERT INTO root.sg1.d1(time, s2) VALUES ";
StringBuilder insertBuilder2 = new StringBuilder(s2);
for (int i = 10001; i <= 30000; i++) {
insertBuilder2.append("(").append(i).append(",").append(2.0f).append(")");
if (i % 50 != 0) {
insertBuilder2.append(",");
} else {
TestUtils.executeNonQueryWithRetry(senderEnv, insertBuilder2.toString());
insertBuilder2 = new StringBuilder(s2);
}
}
// Execute remaining data if any
if (insertBuilder2.length() > s2.length()) {
TestUtils.executeNonQueryWithRetry(senderEnv, insertBuilder2.toString());
}

// Insert 20000 data points for s3 (time 20001-40000)
String s3 = "INSERT INTO root.sg1.d1(time, s3) VALUES ";
StringBuilder insertBuilder3 = new StringBuilder(s3);
for (int i = 20001; i <= 40000; i++) {
insertBuilder3.append("(").append(i).append(",").append(3.0f).append(")");
if (i % 50 != 0) {
insertBuilder3.append(",");
} else {
TestUtils.executeNonQueryWithRetry(senderEnv, insertBuilder3.toString());
insertBuilder3 = new StringBuilder(s3);
}
}
// Execute remaining data if any
if (insertBuilder3.length() > s3.length()) {
TestUtils.executeNonQueryWithRetry(senderEnv, insertBuilder3.toString());
}

// Insert 20000 data points for s4 (time 30001-50000)
String s4 = "INSERT INTO root.sg1.d1(time, s4) VALUES ";
StringBuilder insertBuilder4 = new StringBuilder(s4);
for (int i = 30001; i <= 50000; i++) {
insertBuilder4.append("(").append(i).append(",").append(4.0f).append(")");
if (i % 50 != 0) {
insertBuilder4.append(",");
} else {
TestUtils.executeNonQueryWithRetry(senderEnv, insertBuilder4.toString());
insertBuilder4 = new StringBuilder(s4);
}
}
// Execute remaining data if any
if (insertBuilder4.length() > s4.length()) {
TestUtils.executeNonQueryWithRetry(senderEnv, insertBuilder4.toString());
}

// Insert 20000 data points for s5 (time 40001-60000)
String s5 = "INSERT INTO root.sg1.d1(time, s5) VALUES ";
StringBuilder insertBuilder5 = new StringBuilder(s5);
for (int i = 40001; i <= 60000; i++) {
insertBuilder5.append("(").append(i).append(",").append(5.0f).append(")");
if (i % 50 != 0) {
insertBuilder5.append(",");
} else {
TestUtils.executeNonQueryWithRetry(senderEnv, insertBuilder5.toString());
insertBuilder5 = new StringBuilder(s5);
}
}
// Execute remaining data if any
if (insertBuilder5.length() > s5.length()) {
TestUtils.executeNonQueryWithRetry(senderEnv, insertBuilder5.toString());
}

TestUtils.executeNonQueryWithRetry(senderEnv, "FLUSH");

// Execute 2000 single point DELETE operations
// Delete 400 points from each sensor (distributed across different time ranges)
for (int i = 0; i < 400; i++) {
// Delete from s1: every 10th point starting from 10
TestUtils.executeNonQueryWithRetry(
senderEnv, "DELETE FROM root.sg1.d1.s1 WHERE time = " + (10 + i * 10));

// Delete from s2: every 10th point starting from 10010
TestUtils.executeNonQueryWithRetry(
senderEnv, "DELETE FROM root.sg1.d1.s2 WHERE time = " + (10010 + i * 10));

// Delete from s3: every 10th point starting from 20010
TestUtils.executeNonQueryWithRetry(
senderEnv, "DELETE FROM root.sg1.d1.s3 WHERE time = " + (20010 + i * 10));

// Delete from s4: every 10th point starting from 30010
TestUtils.executeNonQueryWithRetry(
senderEnv, "DELETE FROM root.sg1.d1.s4 WHERE time = " + (30010 + i * 10));

// Delete from s5: every 10th point starting from 40010
TestUtils.executeNonQueryWithRetry(
senderEnv, "DELETE FROM root.sg1.d1.s5 WHERE time = " + (40010 + i * 10));
}

TestUtils.executeNonQueryWithRetry(senderEnv, "FLUSH");

// Verify sender data integrity before creating pipe to avoid leader election issues
// This ensures all data is properly persisted and consistent on sender side
// before starting the pipe synchronization process
TestUtils.assertDataEventuallyOnEnv(
senderEnv,
"SELECT COUNT(**) FROM root.sg1.d1",
"COUNT(root.sg1.d1.s3),COUNT(root.sg1.d1.s4),COUNT(root.sg1.d1.s5),COUNT(root.sg1.d1.s1),COUNT(root.sg1.d1.s2),",
Collections.singleton("19600,19600,19600,19600,19600,"));

executeNonQueryWithRetry(
senderEnv,
String.format(
"CREATE PIPE test_pipe WITH SOURCE ('mods.enable'='true') WITH CONNECTOR('ip'='%s', 'port'='%s', 'format'='tablet')",
receiverEnv.getDataNodeWrapperList().get(0).getIp(),
receiverEnv.getDataNodeWrapperList().get(0).getPort()));

// Verify total count of all sensors using COUNT(*)
TestUtils.assertDataEventuallyOnEnv(
receiverEnv,
"SELECT COUNT(**) FROM root.sg1.d1",
"COUNT(root.sg1.d1.s3),COUNT(root.sg1.d1.s4),COUNT(root.sg1.d1.s5),COUNT(root.sg1.d1.s1),COUNT(root.sg1.d1.s2),",
Collections.singleton("19600,19600,19600,19600,19600,"));

// Verify individual sensor counts
TestUtils.assertDataEventuallyOnEnv(
receiverEnv,
"SELECT COUNT(s1) FROM root.sg1.d1",
"COUNT(root.sg1.d1.s1),",
Collections.singleton("19600,"));

TestUtils.assertDataEventuallyOnEnv(
receiverEnv,
"SELECT COUNT(s2) FROM root.sg1.d1",
"COUNT(root.sg1.d1.s2),",
Collections.singleton("19600,"));

TestUtils.assertDataEventuallyOnEnv(
receiverEnv,
"SELECT COUNT(s3) FROM root.sg1.d1",
"COUNT(root.sg1.d1.s3),",
Collections.singleton("19600,"));

TestUtils.assertDataEventuallyOnEnv(
receiverEnv,
"SELECT COUNT(s4) FROM root.sg1.d1",
"COUNT(root.sg1.d1.s4),",
Collections.singleton("19600,"));

TestUtils.assertDataEventuallyOnEnv(
receiverEnv,
"SELECT COUNT(s5) FROM root.sg1.d1",
"COUNT(root.sg1.d1.s5),",
Collections.singleton("19600,"));

// Verify count of deleted time ranges using COUNT with WHERE clause
// These should return 0 since all points in these ranges were deleted
TestUtils.assertDataEventuallyOnEnv(
receiverEnv,
"SELECT COUNT(s1) FROM root.sg1.d1 WHERE time >= 10 AND time <= 4000 AND time % 10 = 0",
"COUNT(root.sg1.d1.s1),",
Collections.singleton("0,"));

TestUtils.assertDataEventuallyOnEnv(
receiverEnv,
"SELECT COUNT(s2) FROM root.sg1.d1 WHERE time >= 10010 AND time <= 14000 AND time % 10 = 0",
"COUNT(root.sg1.d1.s2),",
Collections.singleton("0,"));

TestUtils.assertDataEventuallyOnEnv(
receiverEnv,
"SELECT COUNT(s3) FROM root.sg1.d1 WHERE time >= 20010 AND time <= 24000 AND time % 10 = 0",
"COUNT(root.sg1.d1.s3),",
Collections.singleton("0,"));

TestUtils.assertDataEventuallyOnEnv(
receiverEnv,
"SELECT COUNT(s4) FROM root.sg1.d1 WHERE time >= 30010 AND time <= 34000 AND time % 10 = 0",
"COUNT(root.sg1.d1.s4),",
Collections.singleton("0,"));

TestUtils.assertDataEventuallyOnEnv(
receiverEnv,
"SELECT COUNT(s5) FROM root.sg1.d1 WHERE time >= 40010 AND time <= 44000 AND time % 10 = 0",
"COUNT(root.sg1.d1.s5),",
Collections.singleton("0,"));

// Verify count of non-deleted time ranges using multiple range queries
// Check ranges before deletion area
TestUtils.assertDataEventuallyOnEnv(
receiverEnv,
"SELECT COUNT(s1) FROM root.sg1.d1 WHERE time >= 1 AND time < 10",
"COUNT(root.sg1.d1.s1),",
Collections.singleton("9,"));

TestUtils.assertDataEventuallyOnEnv(
receiverEnv,
"SELECT COUNT(s2) FROM root.sg1.d1 WHERE time >= 10001 AND time < 10010",
"COUNT(root.sg1.d1.s2),",
Collections.singleton("9,"));

TestUtils.assertDataEventuallyOnEnv(
receiverEnv,
"SELECT COUNT(s3) FROM root.sg1.d1 WHERE time >= 20001 AND time < 20010",
"COUNT(root.sg1.d1.s3),",
Collections.singleton("9,"));

TestUtils.assertDataEventuallyOnEnv(
receiverEnv,
"SELECT COUNT(s4) FROM root.sg1.d1 WHERE time >= 30001 AND time < 30010",
"COUNT(root.sg1.d1.s4),",
Collections.singleton("9,"));

TestUtils.assertDataEventuallyOnEnv(
receiverEnv,
"SELECT COUNT(s5) FROM root.sg1.d1 WHERE time >= 40001 AND time < 40010",
"COUNT(root.sg1.d1.s5),",
Collections.singleton("9,"));

// Check ranges after deletion area
TestUtils.assertDataEventuallyOnEnv(
receiverEnv,
"SELECT COUNT(s1) FROM root.sg1.d1 WHERE time > 4000 AND time <= 20000",
"COUNT(root.sg1.d1.s1),",
Collections.singleton("16000,"));

TestUtils.assertDataEventuallyOnEnv(
receiverEnv,
"SELECT COUNT(s2) FROM root.sg1.d1 WHERE time > 14000 AND time <= 30000",
"COUNT(root.sg1.d1.s2),",
Collections.singleton("16000,"));

TestUtils.assertDataEventuallyOnEnv(
receiverEnv,
"SELECT COUNT(s3) FROM root.sg1.d1 WHERE time > 24000 AND time <= 40000",
"COUNT(root.sg1.d1.s3),",
Collections.singleton("16000,"));

TestUtils.assertDataEventuallyOnEnv(
receiverEnv,
"SELECT COUNT(s4) FROM root.sg1.d1 WHERE time > 34000 AND time <= 50000",
"COUNT(root.sg1.d1.s4),",
Collections.singleton("16000,"));

TestUtils.assertDataEventuallyOnEnv(
receiverEnv,
"SELECT COUNT(s5) FROM root.sg1.d1 WHERE time > 44000 AND time <= 60000",
"COUNT(root.sg1.d1.s5),",
Collections.singleton("16000,"));

// Check non-deleted points within deletion range (every 10th point except the ones we deleted)
TestUtils.assertDataEventuallyOnEnv(
receiverEnv,
"SELECT COUNT(s1) FROM root.sg1.d1 WHERE time >= 10 AND time <= 4000 AND time % 10 != 0",
"COUNT(root.sg1.d1.s1),",
Collections.singleton("3591,"));

TestUtils.assertDataEventuallyOnEnv(
receiverEnv,
"SELECT COUNT(s2) FROM root.sg1.d1 WHERE time >= 10010 AND time <= 14000 AND time % 10 != 0",
"COUNT(root.sg1.d1.s2),",
Collections.singleton("3591,"));

TestUtils.assertDataEventuallyOnEnv(
receiverEnv,
"SELECT COUNT(s3) FROM root.sg1.d1 WHERE time >= 20010 AND time <= 24000 AND time % 10 != 0",
"COUNT(root.sg1.d1.s3),",
Collections.singleton("3591,"));

TestUtils.assertDataEventuallyOnEnv(
receiverEnv,
"SELECT COUNT(s4) FROM root.sg1.d1 WHERE time >= 30010 AND time <= 34000 AND time % 10 != 0",
"COUNT(root.sg1.d1.s4),",
Collections.singleton("3591,"));

TestUtils.assertDataEventuallyOnEnv(
receiverEnv,
"SELECT COUNT(s5) FROM root.sg1.d1 WHERE time >= 40010 AND time <= 44000 AND time % 10 != 0",
"COUNT(root.sg1.d1.s5),",
Collections.singleton("3591,"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -426,6 +426,11 @@ private boolean putValueToColumns(final BatchData data, final Tablet tablet, fin
}
}
} else {
if (!modsInfos.isEmpty()
&& ModsOperationUtil.isDelete(data.currentTime(), modsInfos.get(0))) {
return false;
}

isNeedFillTime = true;
switch (tablet.getSchemas().get(0).getType()) {
case BOOLEAN:
Expand Down
Loading