Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
import org.apache.iotdb.confignode.rpc.thrift.TDropPipeReq;
import org.apache.iotdb.db.it.utils.TestUtils;
import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
import org.apache.iotdb.it.framework.IoTDBTestRunner;
Expand Down Expand Up @@ -88,7 +89,9 @@ public void testThriftConnectorWithRealtimeFirstDisabled() throws Exception {

sourceAttributes.put("source.realtime.mode", "log");
sourceAttributes.put("capture.table", "true");
sourceAttributes.put("__system.sql-dialect", "table");
sourceAttributes.put("capture.tree", "true");
sourceAttributes.put("mode.double-living", "true");
sourceAttributes.put("user", "root");

sinkAttributes.put("sink", "iotdb-thrift-sink");
Expand Down Expand Up @@ -173,7 +176,9 @@ private void testSinkFormat(final String format, final boolean isAsyncLoad) thro
final Map<String, String> sinkAttributes = new HashMap<>();

sourceAttributes.put("capture.table", "true");
sourceAttributes.put("__system.sql-dialect", "table");
sourceAttributes.put("capture.tree", "true");
sourceAttributes.put("mode.double-living", "true");
sourceAttributes.put("user", "root");

sinkAttributes.put("sink", "iotdb-thrift-sink");
Expand Down Expand Up @@ -213,7 +218,11 @@ private void testSinkFormat(final String format, final boolean isAsyncLoad) thro
handleFailure);

Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.dropPipe("testPipe").getCode());
TSStatusCode.SUCCESS_STATUS.getStatusCode(),
client.dropPipeExtended(new TDropPipeReq("testPipe").setIsTableModel(false)).getCode());
Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(),
client.dropPipeExtended(new TDropPipeReq("testPipe").setIsTableModel(true)).getCode());

Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(),
Expand Down Expand Up @@ -260,7 +269,9 @@ public void testWriteBackSink() throws Exception {
final Map<String, String> sinkAttributes = new HashMap<>();

sourceAttributes.put("capture.table", "true");
sourceAttributes.put("__system.sql-dialect", "table");
sourceAttributes.put("capture.tree", "true");
sourceAttributes.put("mode.double-living", "true");
sourceAttributes.put("forwarding-pipe-requests", "false");
sourceAttributes.put("source.database-name", "test.*");
sourceAttributes.put("source.table-name", "test.*");
Expand Down Expand Up @@ -377,7 +388,9 @@ private void doTest(BiConsumer<Map<String, List<Tablet>>, Map<String, List<Table
final Map<String, String> sinkAttributes = new HashMap<>();

sourceAttributes.put("capture.table", "true");
sourceAttributes.put("__system.sql-dialect", "table");
sourceAttributes.put("capture.tree", "true");
sourceAttributes.put("mode.double-living", "true");
sourceAttributes.put("source.database-name", "test.*");
sourceAttributes.put("source.table-name", "test.*");
sourceAttributes.put("user", "root");
Expand Down Expand Up @@ -734,7 +747,9 @@ public void testLoadTsFileWithoutVerify() throws Exception {

sourceAttributes.put("source.realtime.mode", "batch");
sourceAttributes.put("capture.table", "true");
sourceAttributes.put("__system.sql-dialect", "table");
sourceAttributes.put("capture.tree", "true");
sourceAttributes.put("mode.double-living", "true");
sourceAttributes.put("user", "root");

sinkAttributes.put("sink", "iotdb-thrift-sink");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,15 @@
package org.apache.iotdb.pipe.it.dual.tablemodel.manual.basic;

import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
import org.apache.iotdb.commons.pipe.config.constant.SystemConstant;
import org.apache.iotdb.confignode.rpc.thrift.TAlterPipeReq;
import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
import org.apache.iotdb.confignode.rpc.thrift.TDropPipeReq;
import org.apache.iotdb.confignode.rpc.thrift.TShowPipeInfo;
import org.apache.iotdb.confignode.rpc.thrift.TShowPipeReq;
import org.apache.iotdb.confignode.rpc.thrift.TStartPipeReq;
import org.apache.iotdb.confignode.rpc.thrift.TStopPipeReq;
import org.apache.iotdb.isession.SessionConfig;
import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
import org.apache.iotdb.it.framework.IoTDBTestRunner;
import org.apache.iotdb.itbase.category.MultiClusterIT2DualTableManualBasic;
Expand All @@ -42,6 +47,9 @@
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static org.junit.Assert.fail;

Expand Down Expand Up @@ -245,7 +253,7 @@ public void testReadPipeIsolation() {
}

@Test
public void testCaptureTreeAndTableIsolation() throws Exception {
public void testCaptureTreeAndTableIgnoredByDialectIsolation() throws Exception {
final String treePipeName = "tree_a2b";
final String tablePipeName = "table_a2b";

Expand All @@ -272,7 +280,7 @@ public void testCaptureTreeAndTableIsolation() throws Exception {
Assert.assertEquals(1, TableModelUtils.showPipesCount(senderEnv, BaseEnv.TREE_SQL_DIALECT));

// Show table pipe by table session
Assert.assertEquals(1, TableModelUtils.showPipesCount(senderEnv, BaseEnv.TABLE_SQL_DIALECT));
Assert.assertEquals(0, TableModelUtils.showPipesCount(senderEnv, BaseEnv.TABLE_SQL_DIALECT));

// 2. Create table pipe by table session
try (final Connection connection = senderEnv.getConnection(BaseEnv.TABLE_SQL_DIALECT);
Expand All @@ -292,30 +300,105 @@ public void testCaptureTreeAndTableIsolation() throws Exception {
}

// Show tree pipe by tree session
Assert.assertEquals(2, TableModelUtils.showPipesCount(senderEnv, BaseEnv.TREE_SQL_DIALECT));
Assert.assertEquals(1, TableModelUtils.showPipesCount(senderEnv, BaseEnv.TREE_SQL_DIALECT));

// Show table pipe by table session
Assert.assertEquals(2, TableModelUtils.showPipesCount(senderEnv, BaseEnv.TABLE_SQL_DIALECT));
Assert.assertEquals(1, TableModelUtils.showPipesCount(senderEnv, BaseEnv.TABLE_SQL_DIALECT));

// 3. Drop pipe
try (final SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) {
Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(),
client.dropPipeExtended(new TDropPipeReq(treePipeName).setIsTableModel(true)).getCode());
client.dropPipeExtended(new TDropPipeReq(treePipeName).setIsTableModel(false)).getCode());
Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(),
client
.dropPipeExtended(new TDropPipeReq(tablePipeName).setIsTableModel(false))
.getCode());
client.dropPipeExtended(new TDropPipeReq(tablePipeName).setIsTableModel(true)).getCode());
}
}

@Test
public void testSameNameTreeOnlyAndTableOnlyPipeIsolation() throws Exception {
final String pipeName = "same_name_pipe";
final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);

try (final Connection connection = senderEnv.getConnection(BaseEnv.TREE_SQL_DIALECT);
final Statement statement = connection.createStatement()) {
statement.execute(
String.format(
"create pipe %s with sink ('node-urls'='%s')",
pipeName, receiverDataNode.getIpAndPortString()));
}

try (final Connection connection = senderEnv.getConnection(BaseEnv.TABLE_SQL_DIALECT);
final Statement statement = connection.createStatement()) {
statement.execute(
String.format(
"create pipe %s with sink ('node-urls'='%s')",
pipeName, receiverDataNode.getIpAndPortString()));
}

Assert.assertEquals(1, TableModelUtils.showPipesCount(senderEnv, BaseEnv.TREE_SQL_DIALECT));
Assert.assertEquals(1, TableModelUtils.showPipesCount(senderEnv, BaseEnv.TABLE_SQL_DIALECT));

try (final Connection connection = senderEnv.getConnection(BaseEnv.TABLE_SQL_DIALECT);
final Statement statement = connection.createStatement()) {
statement.execute("stop pipe " + pipeName);
}

Assert.assertEquals(1, TableModelUtils.showPipesCount(senderEnv, BaseEnv.TREE_SQL_DIALECT));
Assert.assertEquals(1, TableModelUtils.showPipesCount(senderEnv, BaseEnv.TABLE_SQL_DIALECT));

try (final Connection connection = senderEnv.getConnection(BaseEnv.TREE_SQL_DIALECT);
final Statement statement = connection.createStatement()) {
statement.execute("drop pipe " + pipeName);
}

Assert.assertEquals(0, TableModelUtils.showPipesCount(senderEnv, BaseEnv.TREE_SQL_DIALECT));
Assert.assertEquals(1, TableModelUtils.showPipesCount(senderEnv, BaseEnv.TABLE_SQL_DIALECT));

try (final Connection connection = senderEnv.getConnection(BaseEnv.TABLE_SQL_DIALECT);
final Statement statement = connection.createStatement()) {
statement.execute("start pipe " + pipeName);
statement.execute("drop pipe " + pipeName);
}

Assert.assertEquals(0, TableModelUtils.showPipesCount(senderEnv, BaseEnv.TREE_SQL_DIALECT));
Assert.assertEquals(0, TableModelUtils.showPipesCount(senderEnv, BaseEnv.TABLE_SQL_DIALECT));
}

@Test
public void testCaptureCornerCases() {
public void testSameNamePipeWithCaptureAttributesStillIsolated() throws Exception {
final String pipeName = "same_name_conflict_pipe";
final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);

// 1. Create tree pipe but capture table data
try (final Connection connection = senderEnv.getConnection(BaseEnv.TREE_SQL_DIALECT);
final Statement statement = connection.createStatement()) {
statement.execute(
String.format(
"create pipe %s with sink ('node-urls'='%s')",
pipeName, receiverDataNode.getIpAndPortString()));
}

try (final Connection connection = senderEnv.getConnection(BaseEnv.TABLE_SQL_DIALECT);
final Statement statement = connection.createStatement()) {
statement.execute(
String.format(
"create pipe %s"
+ " with source ('capture.tree'='true','capture.table'='true')"
+ " with sink ('node-urls'='%s')",
pipeName, receiverDataNode.getIpAndPortString()));
}

Assert.assertEquals(1, TableModelUtils.showPipesCount(senderEnv, BaseEnv.TREE_SQL_DIALECT));
Assert.assertEquals(1, TableModelUtils.showPipesCount(senderEnv, BaseEnv.TABLE_SQL_DIALECT));
}

@Test
public void testCaptureAttributesAreIgnoredByDialect() {
final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);

// 1. Create tree pipe with capture attributes pointing to table data
try (final Connection connection = senderEnv.getConnection(BaseEnv.TREE_SQL_DIALECT);
final Statement statement = connection.createStatement()) {
statement.execute(
Expand All @@ -333,12 +416,12 @@ public void testCaptureCornerCases() {
}

// Show tree pipe by tree session
Assert.assertEquals(0, TableModelUtils.showPipesCount(senderEnv, BaseEnv.TREE_SQL_DIALECT));
Assert.assertEquals(1, TableModelUtils.showPipesCount(senderEnv, BaseEnv.TREE_SQL_DIALECT));

// Show table pipe by table session
Assert.assertEquals(1, TableModelUtils.showPipesCount(senderEnv, BaseEnv.TABLE_SQL_DIALECT));
Assert.assertEquals(0, TableModelUtils.showPipesCount(senderEnv, BaseEnv.TABLE_SQL_DIALECT));

// 2. Create table pipe but capture tree data
// 2. Create table pipe with capture attributes pointing to tree data
try (final Connection connection = senderEnv.getConnection(BaseEnv.TABLE_SQL_DIALECT);
final Statement statement = connection.createStatement()) {
statement.execute(
Expand Down Expand Up @@ -373,14 +456,114 @@ public void testCaptureCornerCases() {
+ " with sink ("
+ "'node-urls'='%s')",
"p3", receiverDataNode.getIpAndPortString()));
fail();
} catch (final SQLException ignored) {
} catch (final SQLException e) {
e.printStackTrace();
fail(e.getMessage());
}

// Show tree pipe by tree session
Assert.assertEquals(1, TableModelUtils.showPipesCount(senderEnv, BaseEnv.TREE_SQL_DIALECT));
Assert.assertEquals(2, TableModelUtils.showPipesCount(senderEnv, BaseEnv.TREE_SQL_DIALECT));

// Show table pipe by table session
Assert.assertEquals(1, TableModelUtils.showPipesCount(senderEnv, BaseEnv.TABLE_SQL_DIALECT));
}

@Test
public void testDirectRpcCreationDialectCompatibility() throws Exception {
final String pipeName = "rpc_same_name_pipe";
final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
final Map<String, String> sinkAttributes = new HashMap<>();
sinkAttributes.put("sink", "iotdb-thrift-sink");
sinkAttributes.put("sink.ip", receiverDataNode.getIp());
sinkAttributes.put("sink.port", String.valueOf(receiverDataNode.getPort()));

try (final SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) {
Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(),
client.createPipe(new TCreatePipeReq(pipeName, sinkAttributes)).getCode());

Assert.assertEquals(1, showPipes(client, false).size());
Assert.assertEquals(0, showPipes(client, true).size());
Assert.assertTrue(
showPipes(client, false)
.get(0)
.pipeExtractor
.contains(
SystemConstant.SQL_DIALECT_KEY + "=" + SystemConstant.SQL_DIALECT_TREE_VALUE));

final Map<String, String> tableSourceAttributes = new HashMap<>();
tableSourceAttributes.put(
SystemConstant.SQL_DIALECT_KEY, SystemConstant.SQL_DIALECT_TABLE_VALUE);
Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(),
client
.createPipe(
new TCreatePipeReq(pipeName, sinkAttributes)
.setExtractorAttributes(tableSourceAttributes))
.getCode());

Assert.assertEquals(1, showPipes(client, false).size());
Assert.assertEquals(1, showPipes(client, true).size());
Assert.assertTrue(
showPipes(client, true)
.get(0)
.pipeExtractor
.contains(
SystemConstant.SQL_DIALECT_KEY + "=" + SystemConstant.SQL_DIALECT_TABLE_VALUE));

Assert.assertNotEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(),
client.createPipe(new TCreatePipeReq(pipeName, sinkAttributes)).getCode());
}
}

@Test
public void testLegacyLifecycleRpcPrefersTreePipeThenTablePipe() throws Exception {
final String pipeName = "legacy_same_name_pipe";
final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);

try (final Connection connection = senderEnv.getConnection(BaseEnv.TREE_SQL_DIALECT);
final Statement statement = connection.createStatement()) {
statement.execute(
String.format(
"create pipe %s with sink ('node-urls'='%s')",
pipeName, receiverDataNode.getIpAndPortString()));
}
try (final Connection connection = senderEnv.getConnection(BaseEnv.TABLE_SQL_DIALECT);
final Statement statement = connection.createStatement()) {
statement.execute(
String.format(
"create pipe %s with sink ('node-urls'='%s')",
pipeName, receiverDataNode.getIpAndPortString()));
}

try (final SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) {
Assert.assertEquals(1, showPipes(client, false).size());
Assert.assertEquals(1, showPipes(client, true).size());

Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.dropPipe(pipeName).getCode());
Assert.assertEquals(0, showPipes(client, false).size());
Assert.assertEquals(1, showPipes(client, true).size());

Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.dropPipe(pipeName).getCode());
Assert.assertEquals(0, showPipes(client, false).size());
Assert.assertEquals(0, showPipes(client, true).size());
}
}

private List<TShowPipeInfo> showPipes(
final SyncConfigNodeIServiceClient client, final boolean isTableModel) throws Exception {
final List<TShowPipeInfo> showPipeResult =
client.showPipe(
new TShowPipeReq()
.setIsTableModel(isTableModel)
.setUserName(SessionConfig.DEFAULT_USER))
.pipeInfoList;
showPipeResult.removeIf(i -> i.getId().startsWith("__consensus"));
return showPipeResult;
}
}
Loading