Skip to content

Commit 24a93f5

Browse files
authored
Fixed the concurrency bug of create timeseries / set(unset) template & Optimized the table log & Removed the useless timeseries/table lock (#15062)
1 parent 5b91756 commit 24a93f5

File tree

7 files changed

+102
-203
lines changed

7 files changed

+102
-203
lines changed

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/UnsetTemplateProcedure.java

+57-89
Original file line numberDiff line numberDiff line change
@@ -69,22 +69,25 @@ public class UnsetTemplateProcedure
6969
private transient ByteBuffer addTemplateSetInfo;
7070
private transient ByteBuffer invalidateTemplateSetInfo;
7171

72-
public UnsetTemplateProcedure(boolean isGeneratedByPipe) {
72+
public UnsetTemplateProcedure(final boolean isGeneratedByPipe) {
7373
super(isGeneratedByPipe);
7474
}
7575

7676
public UnsetTemplateProcedure(
77-
String queryId, Template template, PartialPath path, boolean isGeneratedByPipe) {
77+
final String queryId,
78+
final Template template,
79+
final PartialPath path,
80+
final boolean isGeneratedByPipe) {
7881
super(isGeneratedByPipe);
7982
this.queryId = queryId;
8083
this.template = template;
8184
this.path = path;
8285
}
8386

8487
@Override
85-
protected Flow executeFromState(ConfigNodeProcedureEnv env, UnsetTemplateState state)
88+
protected Flow executeFromState(final ConfigNodeProcedureEnv env, final UnsetTemplateState state)
8689
throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException {
87-
long startTime = System.currentTimeMillis();
90+
final long startTime = System.currentTimeMillis();
8891
try {
8992
switch (state) {
9093
case CONSTRUCT_BLACK_LIST:
@@ -127,8 +130,8 @@ protected Flow executeFromState(ConfigNodeProcedureEnv env, UnsetTemplateState s
127130
}
128131
}
129132

130-
private void constructBlackList(ConfigNodeProcedureEnv env) {
131-
TSStatus status =
133+
private void constructBlackList(final ConfigNodeProcedureEnv env) {
134+
final TSStatus status =
132135
env.getConfigManager()
133136
.getClusterSchemaManager()
134137
.preUnsetSchemaTemplate(template.getId(), path);
@@ -139,30 +142,33 @@ private void constructBlackList(ConfigNodeProcedureEnv env) {
139142
}
140143
}
141144

142-
private void invalidateCache(ConfigNodeProcedureEnv env) {
145+
private void invalidateCache(final ConfigNodeProcedureEnv env) {
143146
try {
147+
// Cannot roll back after cache invalidation
148+
// Because we do not know whether there are time series successfully created
149+
alreadyRollback = true;
144150
executeInvalidateCache(env);
145151
setNextState(UnsetTemplateState.CHECK_DATANODE_TEMPLATE_ACTIVATION);
146-
} catch (ProcedureException e) {
152+
} catch (final ProcedureException e) {
147153
setFailure(e);
148154
}
149155
}
150156

151-
private void executeInvalidateCache(ConfigNodeProcedureEnv env) throws ProcedureException {
152-
Map<Integer, TDataNodeLocation> dataNodeLocationMap =
157+
private void executeInvalidateCache(final ConfigNodeProcedureEnv env) throws ProcedureException {
158+
final Map<Integer, TDataNodeLocation> dataNodeLocationMap =
153159
env.getConfigManager().getNodeManager().getRegisteredDataNodeLocations();
154-
TUpdateTemplateReq invalidateTemplateSetInfoReq = new TUpdateTemplateReq();
160+
final TUpdateTemplateReq invalidateTemplateSetInfoReq = new TUpdateTemplateReq();
155161
invalidateTemplateSetInfoReq.setType(
156162
TemplateInternalRPCUpdateType.INVALIDATE_TEMPLATE_SET_INFO.toByte());
157163
invalidateTemplateSetInfoReq.setTemplateInfo(getInvalidateTemplateSetInfo());
158-
DataNodeAsyncRequestContext<TUpdateTemplateReq, TSStatus> clientHandler =
164+
final DataNodeAsyncRequestContext<TUpdateTemplateReq, TSStatus> clientHandler =
159165
new DataNodeAsyncRequestContext<>(
160166
CnToDnAsyncRequestType.UPDATE_TEMPLATE,
161167
invalidateTemplateSetInfoReq,
162168
dataNodeLocationMap);
163169
CnToDnInternalServiceAsyncRequestManager.getInstance().sendAsyncRequestWithRetry(clientHandler);
164-
Map<Integer, TSStatus> statusMap = clientHandler.getResponseMap();
165-
for (TSStatus status : statusMap.values()) {
170+
final Map<Integer, TSStatus> statusMap = clientHandler.getResponseMap();
171+
for (final TSStatus status : statusMap.values()) {
166172
// all dataNodes must clear the related template cache
167173
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
168174
LOGGER.error(
@@ -174,14 +180,14 @@ private void executeInvalidateCache(ConfigNodeProcedureEnv env) throws Procedure
174180
}
175181
}
176182

177-
private boolean checkDataNodeTemplateActivation(ConfigNodeProcedureEnv env) {
178-
PathPatternTree patternTree = new PathPatternTree();
183+
private boolean checkDataNodeTemplateActivation(final ConfigNodeProcedureEnv env) {
184+
final PathPatternTree patternTree = new PathPatternTree();
179185
patternTree.appendPathPattern(path);
180186
patternTree.appendPathPattern(path.concatAsMeasurementPath(MULTI_LEVEL_PATH_WILDCARD));
181187
try {
182188
return SchemaUtils.checkDataNodeTemplateActivation(
183189
env.getConfigManager(), patternTree, template);
184-
} catch (MetadataException e) {
190+
} catch (final MetadataException e) {
185191
setFailure(
186192
new ProcedureException(
187193
new MetadataException(
@@ -192,95 +198,64 @@ private boolean checkDataNodeTemplateActivation(ConfigNodeProcedureEnv env) {
192198
}
193199
}
194200

195-
private void unsetTemplate(ConfigNodeProcedureEnv env) {
196-
TSStatus status =
201+
private void unsetTemplate(final ConfigNodeProcedureEnv env) {
202+
final TSStatus status =
197203
env.getConfigManager()
198204
.getClusterSchemaManager()
199205
.unsetSchemaTemplateInBlackList(template.getId(), path, isGeneratedByPipe);
200-
if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
201-
setNextState(UnsetTemplateState.CLEAN_DATANODE_TEMPLATE_CACHE);
202-
} else {
206+
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
203207
setFailure(new ProcedureException(new IoTDBException(status.getMessage(), status.getCode())));
204208
}
205209
}
206210

207211
@Override
208-
protected void rollbackState(ConfigNodeProcedureEnv env, UnsetTemplateState unsetTemplateState)
212+
protected void rollbackState(
213+
final ConfigNodeProcedureEnv env, final UnsetTemplateState unsetTemplateState)
209214
throws IOException, InterruptedException, ProcedureException {
210215
if (alreadyRollback) {
211216
return;
212217
}
213218
alreadyRollback = true;
214219
ProcedureException rollbackException;
215-
try {
216-
executeRollbackInvalidateCache(env);
217-
TSStatus status =
218-
env.getConfigManager()
219-
.getClusterSchemaManager()
220-
.rollbackPreUnsetSchemaTemplate(template.getId(), path);
221-
if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
222-
return;
223-
} else {
224-
LOGGER.error(
225-
"Failed to rollback pre unset template operation of template {} set on {}",
226-
template.getName(),
227-
path);
228-
rollbackException =
229-
new ProcedureException(
230-
new MetadataException(
231-
"Rollback template pre unset failed because of" + status.getMessage()));
232-
}
233-
} catch (ProcedureException e) {
234-
rollbackException = e;
220+
final TSStatus status =
221+
env.getConfigManager()
222+
.getClusterSchemaManager()
223+
.rollbackPreUnsetSchemaTemplate(template.getId(), path);
224+
if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
225+
return;
226+
} else {
227+
LOGGER.error(
228+
"Failed to rollback pre unset template operation of template {} set on {}",
229+
template.getName(),
230+
path);
231+
rollbackException =
232+
new ProcedureException(
233+
new MetadataException(
234+
"Rollback template pre unset failed because of" + status.getMessage()));
235235
}
236236
try {
237237
executeInvalidateCache(env);
238238
setFailure(rollbackException);
239-
} catch (ProcedureException exception) {
239+
} catch (final ProcedureException exception) {
240240
setFailure(
241241
new ProcedureException(
242242
new MetadataException(
243243
"Rollback unset template failed and the cluster template info management is strictly broken. Please try unset again.")));
244244
}
245245
}
246246

247-
private void executeRollbackInvalidateCache(ConfigNodeProcedureEnv env)
248-
throws ProcedureException {
249-
Map<Integer, TDataNodeLocation> dataNodeLocationMap =
250-
env.getConfigManager().getNodeManager().getRegisteredDataNodeLocations();
251-
TUpdateTemplateReq rollbackTemplateSetInfoReq = new TUpdateTemplateReq();
252-
rollbackTemplateSetInfoReq.setType(
253-
TemplateInternalRPCUpdateType.ADD_TEMPLATE_SET_INFO.toByte());
254-
rollbackTemplateSetInfoReq.setTemplateInfo(getAddTemplateSetInfo());
255-
DataNodeAsyncRequestContext<TUpdateTemplateReq, TSStatus> clientHandler =
256-
new DataNodeAsyncRequestContext<>(
257-
CnToDnAsyncRequestType.UPDATE_TEMPLATE,
258-
rollbackTemplateSetInfoReq,
259-
dataNodeLocationMap);
260-
CnToDnInternalServiceAsyncRequestManager.getInstance().sendAsyncRequestWithRetry(clientHandler);
261-
Map<Integer, TSStatus> statusMap = clientHandler.getResponseMap();
262-
for (TSStatus status : statusMap.values()) {
263-
// all dataNodes must clear the related template cache
264-
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
265-
LOGGER.error(
266-
"Failed to rollback template cache of template {} set on {}", template.getName(), path);
267-
throw new ProcedureException(new MetadataException("Rollback template cache failed"));
268-
}
269-
}
270-
}
271-
272247
@Override
273-
protected boolean isRollbackSupported(UnsetTemplateState unsetTemplateState) {
248+
protected boolean isRollbackSupported(final UnsetTemplateState unsetTemplateState) {
274249
return true;
275250
}
276251

277252
@Override
278-
protected UnsetTemplateState getState(int stateId) {
253+
protected UnsetTemplateState getState(final int stateId) {
279254
return UnsetTemplateState.values()[stateId];
280255
}
281256

282257
@Override
283-
protected int getStateId(UnsetTemplateState unsetTemplateState) {
258+
protected int getStateId(final UnsetTemplateState unsetTemplateState) {
284259
return unsetTemplateState.ordinal();
285260
}
286261

@@ -309,17 +284,6 @@ public PartialPath getPath() {
309284
return path;
310285
}
311286

312-
private ByteBuffer getAddTemplateSetInfo() {
313-
if (this.addTemplateSetInfo == null) {
314-
this.addTemplateSetInfo =
315-
ByteBuffer.wrap(
316-
TemplateInternalRPCUtil.generateAddTemplateSetInfoBytes(
317-
template, path.getFullPath()));
318-
}
319-
320-
return addTemplateSetInfo;
321-
}
322-
323287
private ByteBuffer getInvalidateTemplateSetInfo() {
324288
if (this.invalidateTemplateSetInfo == null) {
325289
this.invalidateTemplateSetInfo =
@@ -331,7 +295,7 @@ private ByteBuffer getInvalidateTemplateSetInfo() {
331295
}
332296

333297
@Override
334-
public void serialize(DataOutputStream stream) throws IOException {
298+
public void serialize(final DataOutputStream stream) throws IOException {
335299
stream.writeShort(
336300
isGeneratedByPipe
337301
? ProcedureType.PIPE_ENRICHED_UNSET_TEMPLATE_PROCEDURE.getTypeCode()
@@ -344,7 +308,7 @@ public void serialize(DataOutputStream stream) throws IOException {
344308
}
345309

346310
@Override
347-
public void deserialize(ByteBuffer byteBuffer) {
311+
public void deserialize(final ByteBuffer byteBuffer) {
348312
super.deserialize(byteBuffer);
349313
queryId = ReadWriteIOUtils.readString(byteBuffer);
350314
template = new Template();
@@ -354,10 +318,14 @@ public void deserialize(ByteBuffer byteBuffer) {
354318
}
355319

356320
@Override
357-
public boolean equals(Object o) {
358-
if (this == o) return true;
359-
if (o == null || getClass() != o.getClass()) return false;
360-
UnsetTemplateProcedure that = (UnsetTemplateProcedure) o;
321+
public boolean equals(final Object o) {
322+
if (this == o) {
323+
return true;
324+
}
325+
if (o == null || getClass() != o.getClass()) {
326+
return false;
327+
}
328+
final UnsetTemplateProcedure that = (UnsetTemplateProcedure) o;
361329
return Objects.equals(getProcId(), that.getProcId())
362330
&& Objects.equals(getCurrentState(), that.getCurrentState())
363331
&& Objects.equals(getCycles(), that.getCycles())

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/metadata/template/NoTemplateOnMNodeException.java

-34
This file was deleted.

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java

+13-20
Original file line numberDiff line numberDiff line change
@@ -1583,19 +1583,11 @@ public TFetchFragmentInstanceStatisticsResp fetchFragmentInstanceStatistics(
15831583

15841584
@Override
15851585
public TSStatus updateTable(final TUpdateTableReq req) {
1586-
final String database;
1587-
final int size;
15881586
switch (TsTableInternalRPCType.getType(req.type)) {
15891587
case PRE_UPDATE_TABLE:
1590-
DataNodeSchemaLockManager.getInstance().takeWriteLock(SchemaLockType.TIMESERIES_VS_TABLE);
1591-
try {
1592-
Pair<String, TsTable> pair =
1593-
TsTableInternalRPCUtil.deserializeSingleTsTableWithDatabase(req.getTableInfo());
1594-
DataNodeTableCache.getInstance().preUpdateTable(pair.left, pair.right);
1595-
} finally {
1596-
DataNodeSchemaLockManager.getInstance()
1597-
.releaseWriteLock(SchemaLockType.TIMESERIES_VS_TABLE);
1598-
}
1588+
Pair<String, TsTable> pair =
1589+
TsTableInternalRPCUtil.deserializeSingleTsTableWithDatabase(req.getTableInfo());
1590+
DataNodeTableCache.getInstance().preUpdateTable(pair.left, pair.right);
15991591
break;
16001592
case ROLLBACK_UPDATE_TABLE:
16011593
DataNodeTableCache.getInstance()
@@ -2271,24 +2263,25 @@ public TSStatus setTTL(TSetTTLReq req) throws TException {
22712263
}
22722264

22732265
@Override
2274-
public TSStatus updateTemplate(TUpdateTemplateReq req) {
2266+
public TSStatus updateTemplate(final TUpdateTemplateReq req) {
22752267
switch (TemplateInternalRPCUpdateType.getType(req.type)) {
2276-
case ADD_TEMPLATE_SET_INFO:
2268+
// Reserved for rolling upgrade
2269+
case ROLLBACK_INVALIDATE_TEMPLATE_SET_INFO:
2270+
ClusterTemplateManager.getInstance().addTemplateSetInfo(req.getTemplateInfo());
2271+
break;
2272+
case INVALIDATE_TEMPLATE_SET_INFO:
2273+
ClusterTemplateManager.getInstance().invalidateTemplateSetInfo(req.getTemplateInfo());
2274+
break;
2275+
case ADD_TEMPLATE_PRE_SET_INFO:
22772276
DataNodeSchemaLockManager.getInstance()
22782277
.takeWriteLock(SchemaLockType.TIMESERIES_VS_TEMPLATE);
22792278
try {
2280-
ClusterTemplateManager.getInstance().addTemplateSetInfo(req.getTemplateInfo());
2279+
ClusterTemplateManager.getInstance().addTemplatePreSetInfo(req.getTemplateInfo());
22812280
} finally {
22822281
DataNodeSchemaLockManager.getInstance()
22832282
.releaseWriteLock(SchemaLockType.TIMESERIES_VS_TEMPLATE);
22842283
}
22852284
break;
2286-
case INVALIDATE_TEMPLATE_SET_INFO:
2287-
ClusterTemplateManager.getInstance().invalidateTemplateSetInfo(req.getTemplateInfo());
2288-
break;
2289-
case ADD_TEMPLATE_PRE_SET_INFO:
2290-
ClusterTemplateManager.getInstance().addTemplatePreSetInfo(req.getTemplateInfo());
2291-
break;
22922285
case COMMIT_TEMPLATE_SET_INFO:
22932286
ClusterTemplateManager.getInstance().commitTemplatePreSetInfo(req.getTemplateInfo());
22942287
break;

0 commit comments

Comments
 (0)