Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,17 @@
import org.apache.iotdb.confignode.rpc.thrift.TShowPipeReq;
import org.apache.iotdb.consensus.ConsensusFactory;
import org.apache.iotdb.db.it.utils.TestUtils;
import org.apache.iotdb.isession.ISession;
import org.apache.iotdb.it.env.MultiEnvFactory;
import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
import org.apache.iotdb.it.framework.IoTDBTestRunner;
import org.apache.iotdb.itbase.category.MultiClusterIT2AutoCreateSchema;
import org.apache.iotdb.itbase.env.BaseEnv;
import org.apache.iotdb.rpc.TSStatusCode;

import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.write.record.Tablet;
import org.apache.tsfile.write.schema.MeasurementSchema;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
Expand All @@ -48,6 +52,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;

import static org.junit.Assert.fail;

Expand Down Expand Up @@ -757,6 +762,11 @@ public void testExtractorTimeRangeMatch() throws Exception {

final String receiverIp = receiverDataNode.getIp();
final int receiverPort = receiverDataNode.getPort();
final Consumer<String> handleFailure =
o -> {
TestUtils.executeNonQueryWithRetry(senderEnv, "flush");
TestUtils.executeNonQueryWithRetry(receiverEnv, "flush");
};

try (final SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) {
Expand Down Expand Up @@ -793,9 +803,10 @@ public void testExtractorTimeRangeMatch() throws Exception {

TestUtils.assertDataEventuallyOnEnv(
receiverEnv,
"select count(*) from root.**",
"select count(at1) from root.db.d1",
"count(root.db.d1.at1),",
Collections.singleton("3,"));
Collections.singleton("3,"),
handleFailure);

// Insert realtime data that overlapped with time range
TestUtils.executeNonQueries(
Expand All @@ -808,9 +819,33 @@ public void testExtractorTimeRangeMatch() throws Exception {

TestUtils.assertDataEventuallyOnEnv(
receiverEnv,
"select count(*) from root.**",
"select count(at1) from root.db.d1, root.db.d3",
"count(root.db.d1.at1),count(root.db.d3.at1),",
Collections.singleton("3,3,"));
Collections.singleton("3,3,"),
handleFailure);

// Session Tablet can have unused timestamp slots when rowSize is smaller than maxRowNumber.
// The pipe source time range filter should ignore the unused zero tail.
final List<MeasurementSchema> schemas =
Collections.singletonList(new MeasurementSchema("at1", TSDataType.INT32));
final Tablet tabletWithUnusedTail = new Tablet("root.db.d5", schemas, 5);
for (int time = 2000; time <= 4000; time += 1000) {
final int rowIndex = tabletWithUnusedTail.rowSize++;
tabletWithUnusedTail.addTimestamp(rowIndex, time);
tabletWithUnusedTail.addValue("at1", rowIndex, time / 1000);
}
Assert.assertEquals(3, tabletWithUnusedTail.rowSize);
Assert.assertEquals(5, tabletWithUnusedTail.timestamps.length);
try (final ISession session = senderEnv.getSessionConnection()) {
session.insertTablet(tabletWithUnusedTail);
}

TestUtils.assertDataEventuallyOnEnv(
receiverEnv,
"select count(at1) from root.db.d1, root.db.d3, root.db.d5",
"count(root.db.d1.at1),count(root.db.d3.at1),count(root.db.d5.at1),",
Collections.singleton("3,3,3,"),
handleFailure);

// Insert realtime data that does not overlap with time range
TestUtils.executeNonQueries(
Expand All @@ -823,9 +858,20 @@ public void testExtractorTimeRangeMatch() throws Exception {

TestUtils.assertDataAlwaysOnEnv(
receiverEnv,
"select count(*) from root.**",
"count(root.db.d1.at1),count(root.db.d3.at1),",
Collections.singleton("3,3,"));
"select count(at1) from root.db.d1, root.db.d3, root.db.d5",
"count(root.db.d1.at1),count(root.db.d3.at1),count(root.db.d5.at1),",
Collections.singleton("3,3,3,"),
600);
TestUtils.assertDataAlwaysOnEnv(
receiverEnv,
"show timeseries root.db.d2.**",
"Timeseries,Alias,Database,DataType,Encoding,Compression,Tags,Attributes,Deadband,DeadbandParameters,ViewType,",
Collections.emptySet());
TestUtils.assertDataAlwaysOnEnv(
receiverEnv,
"show timeseries root.db.d4.**",
"Timeseries,Alias,Database,DataType,Encoding,Compression,Tags,Attributes,Deadband,DeadbandParameters,ViewType,",
Collections.emptySet());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
Expand All @@ -70,6 +69,10 @@ public class TagManager {
private static final String PREVIOUS_CONDITION =
"before deleting it, tag key is %s, tag value is %s, tlog offset is %d, contains key %b";

// The tag index memory model adds one int-sized estimated overhead for each indexed key, value,
// and measurement reference. This is an accounting estimate rather than a specific
// ConcurrentHashMap or Set field.
private static final long INDEX_ENTRY_OVERHEAD_ESTIMATE_BYTES = Integer.BYTES;
private static final Logger logger = LoggerFactory.getLogger(TagManager.class);
private static final CommonConfig COMMON_CONFIG = CommonDescriptor.getInstance().getConfig();

Expand Down Expand Up @@ -164,34 +167,31 @@ public void addIndex(String tagKey, String tagValue, IMeasurementMNode<?> measur
return;
}

int tagIndexOldSize = tagIndex.size();
Map<String, Set<IMeasurementMNode<?>>> tagValueMap =
tagIndex.computeIfAbsent(tagKey, k -> new ConcurrentHashMap<>());
int tagIndexNewSize = tagIndex.size();

int tagValueMapOldSize = tagValueMap.size();
Set<IMeasurementMNode<?>> measurementsSet =
tagValueMap.computeIfAbsent(tagValue, v -> Collections.synchronizedSet(new HashSet<>()));
int tagValueMapNewSize = tagValueMap.size();
tagIndex.compute(
tagKey,
(key, tagValueMap) -> {
long memorySize = 0;
if (tagValueMap == null) {
tagValueMap = new ConcurrentHashMap<>();
memorySize += RamUsageEstimator.sizeOf(tagKey) + INDEX_ENTRY_OVERHEAD_ESTIMATE_BYTES;
}

int measurementsSetOldSize = measurementsSet.size();
measurementsSet.add(measurementMNode);
int measurementsSetNewSize = measurementsSet.size();
Set<IMeasurementMNode<?>> measurementsSet = tagValueMap.get(tagValue);
if (measurementsSet == null) {
measurementsSet = ConcurrentHashMap.newKeySet();
tagValueMap.put(tagValue, measurementsSet);
memorySize += RamUsageEstimator.sizeOf(tagValue) + INDEX_ENTRY_OVERHEAD_ESTIMATE_BYTES;
}

long memorySize = 0;
if (tagIndexNewSize - tagIndexOldSize == 1) {
// the last 4 is the memory occupied by the size of tagvaluemap
memorySize += RamUsageEstimator.sizeOf(tagKey) + 4;
}
if (tagValueMapNewSize - tagValueMapOldSize == 1) {
// the last 4 is the memory occupied by the size of measurementsSet
memorySize += RamUsageEstimator.sizeOf(tagValue) + 4;
}
if (measurementsSetNewSize - measurementsSetOldSize == 1) {
// 8 is the memory occupied by the length of the IMeasurementMNode
memorySize += RamUsageEstimator.NUM_BYTES_OBJECT_REF + 4;
}
requestMemory(memorySize);
if (measurementsSet.add(measurementMNode)) {
memorySize +=
RamUsageEstimator.NUM_BYTES_OBJECT_REF + INDEX_ENTRY_OVERHEAD_ESTIMATE_BYTES;
}
if (memorySize > 0) {
requestMemory(memorySize);
}
return tagValueMap;
});
}

public void addIndex(Map<String, String> tagsMap, IMeasurementMNode<?> measurementMNode) {
Expand All @@ -206,32 +206,47 @@ public void removeIndex(String tagKey, String tagValue, IMeasurementMNode<?> mea
if (tagKey == null || tagValue == null || measurementMNode == null) {
return;
}
// init memory size
long memorySize = 0;
if (tagIndex.get(tagKey).get(tagValue).remove(measurementMNode)) {
memorySize += RamUsageEstimator.NUM_BYTES_OBJECT_REF + 4;
}
if (tagIndex.get(tagKey).get(tagValue).isEmpty()) {
if (tagIndex.get(tagKey).remove(tagValue) != null) {
// the last 4 is the memory occupied by the size of IMeasurementMNodeSet
memorySize += RamUsageEstimator.sizeOf(tagValue) + 4;
}
}
if (tagIndex.get(tagKey).isEmpty()) {
if (tagIndex.remove(tagKey) != null) {
// the last 4 is the memory occupied by the size of tagValueMap
memorySize += RamUsageEstimator.sizeOf(tagKey) + 4;
}
}
releaseMemory(memorySize);
tagIndex.computeIfPresent(
tagKey,
(key, tagValueMap) -> {
long memorySize = 0;
Set<IMeasurementMNode<?>> measurementsSet = tagValueMap.get(tagValue);
if (measurementsSet == null) {
return tagValueMap;
}

if (measurementsSet.remove(measurementMNode)) {
memorySize +=
RamUsageEstimator.NUM_BYTES_OBJECT_REF + INDEX_ENTRY_OVERHEAD_ESTIMATE_BYTES;
}
if (measurementsSet.isEmpty()) {
if (tagValueMap.remove(tagValue, measurementsSet)) {
memorySize +=
RamUsageEstimator.sizeOf(tagValue) + INDEX_ENTRY_OVERHEAD_ESTIMATE_BYTES;
}
}
if (tagValueMap.isEmpty()) {
memorySize += RamUsageEstimator.sizeOf(tagKey) + INDEX_ENTRY_OVERHEAD_ESTIMATE_BYTES;
if (memorySize > 0) {
releaseMemory(memorySize);
}
return null;
}
if (memorySize > 0) {
releaseMemory(memorySize);
}
return tagValueMap;
});
}

private boolean containsIndex(String tagKey, String tagValue) {
Map<String, Set<IMeasurementMNode<?>>> tagValueMap = tagIndex.get(tagKey);
return tagValueMap != null && tagValueMap.containsKey(tagValue);
}

private List<IMeasurementMNode<?>> getMatchedTimeseriesInIndex(TagFilter tagFilter) {
if (!tagIndex.containsKey(tagFilter.getKey())) {
return Collections.emptyList();
}
Map<String, Set<IMeasurementMNode<?>>> value2Node = tagIndex.get(tagFilter.getKey());
if (value2Node.isEmpty()) {
if (value2Node == null || value2Node.isEmpty()) {
return Collections.emptyList();
}

Expand Down Expand Up @@ -362,8 +377,7 @@ public void removeFromTagInvertedIndex(IMeasurementMNode<?> node) throws IOExcep
Map<String, String> tagMap = tagLogFile.readTag(node.getOffset());
if (tagMap != null) {
for (Map.Entry<String, String> entry : tagMap.entrySet()) {
if (tagIndex.containsKey(entry.getKey())
&& tagIndex.get(entry.getKey()).containsKey(entry.getValue())) {
if (containsIndex(entry.getKey(), entry.getValue())) {
if (logger.isDebugEnabled()) {
logger.debug(
String.format(
Expand Down Expand Up @@ -415,7 +429,7 @@ public void updateTagsAndAttributes(
// we should remove before key-value from inverted index map
if (beforeValue != null && !beforeValue.equals(value)) {

if (tagIndex.containsKey(key) && tagIndex.get(key).containsKey(beforeValue)) {
if (containsIndex(key, beforeValue)) {
if (logger.isDebugEnabled()) {
logger.debug(
String.format(
Expand Down Expand Up @@ -548,8 +562,7 @@ public void dropTagsOrAttributes(

if (!deleteTag.isEmpty()) {
for (Map.Entry<String, String> entry : deleteTag.entrySet()) {
if (tagIndex.containsKey((entry.getKey()))
&& tagIndex.get(entry.getKey()).containsKey(entry.getValue())) {
if (containsIndex(entry.getKey(), entry.getValue())) {
if (logger.isDebugEnabled()) {
logger.debug(
String.format(
Expand Down Expand Up @@ -618,7 +631,7 @@ public void setTagsOrAttributesValue(
String beforeValue = entry.getValue();
String currentValue = newTagValue.get(key);
// change the tag inverted index map
if (tagIndex.containsKey(key) && tagIndex.get(key).containsKey(beforeValue)) {
if (containsIndex(key, beforeValue)) {

if (logger.isDebugEnabled()) {
logger.debug(
Expand Down Expand Up @@ -676,7 +689,7 @@ public void renameTagOrAttributeKey(
// persist the change to disk
tagLogFile.write(pair.left, pair.right, leafMNode.getOffset());
// change the tag inverted index map
if (tagIndex.containsKey(oldKey) && tagIndex.get(oldKey).containsKey(value)) {
if (containsIndex(oldKey, value)) {

if (logger.isDebugEnabled()) {
logger.debug(
Expand Down
Loading
Loading