Skip to content

Commit 8bd75a9

Browse files
committed
[ISSUE #176] Realize the ability to read from the replica node,Optimize read performance
1 parent a34df8d commit 8bd75a9

10 files changed

+254
-53
lines changed

src/main/java/io/openmessaging/storage/dledger/DLedgerRpcNettyService.java

+49-27
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@
3232
import io.openmessaging.storage.dledger.protocol.MetadataResponse;
3333
import io.openmessaging.storage.dledger.protocol.PullEntriesRequest;
3434
import io.openmessaging.storage.dledger.protocol.PullEntriesResponse;
35+
import io.openmessaging.storage.dledger.protocol.PullReadIndexRequest;
36+
import io.openmessaging.storage.dledger.protocol.PullReadIndexResponse;
3537
import io.openmessaging.storage.dledger.protocol.PushEntryRequest;
3638
import io.openmessaging.storage.dledger.protocol.PushEntryResponse;
3739
import io.openmessaging.storage.dledger.protocol.RequestOrResponse;
@@ -77,11 +79,13 @@ public DLedgerRpcNettyService(DLedgerServer dLedgerServer) {
7779
this(dLedgerServer, null, null, null);
7880
}
7981

80-
public DLedgerRpcNettyService(DLedgerServer dLedgerServer, NettyServerConfig nettyServerConfig, NettyClientConfig nettyClientConfig) {
82+
public DLedgerRpcNettyService(DLedgerServer dLedgerServer, NettyServerConfig nettyServerConfig,
83+
NettyClientConfig nettyClientConfig) {
8184
this(dLedgerServer, nettyServerConfig, nettyClientConfig, null);
8285
}
8386

84-
public DLedgerRpcNettyService(DLedgerServer dLedgerServer, NettyServerConfig nettyServerConfig, NettyClientConfig nettyClientConfig, ChannelEventListener channelEventListener) {
87+
public DLedgerRpcNettyService(DLedgerServer dLedgerServer, NettyServerConfig nettyServerConfig,
88+
NettyClientConfig nettyClientConfig, ChannelEventListener channelEventListener) {
8589
this.dLedgerServer = dLedgerServer;
8690
this.memberState = dLedgerServer.getMemberState();
8791
NettyRequestProcessor protocolProcessor = new NettyRequestProcessor() {
@@ -109,6 +113,7 @@ public boolean rejectRequest() {
109113
this.remotingServer.registerProcessor(DLedgerRequestCode.VOTE.getCode(), protocolProcessor, null);
110114
this.remotingServer.registerProcessor(DLedgerRequestCode.HEART_BEAT.getCode(), protocolProcessor, null);
111115
this.remotingServer.registerProcessor(DLedgerRequestCode.LEADERSHIP_TRANSFER.getCode(), protocolProcessor, null);
116+
this.remotingServer.registerProcessor(DLedgerRequestCode.PULL_READ_INDEX.getCode(), protocolProcessor, null);
112117

113118
//start the remoting client
114119
if (nettyClientConfig == null) {
@@ -252,9 +257,29 @@ public CompletableFuture<PushEntryResponse> push(PushEntryRequest request) throw
252257
return future;
253258
}
254259

260+
@Override
261+
public CompletableFuture<PullReadIndexResponse> pullReadIndex(PullReadIndexRequest request) throws Exception {
262+
CompletableFuture<PullReadIndexResponse> future = new CompletableFuture<>();
263+
RemotingCommand wrapperRequest = RemotingCommand.createRequestCommand(DLedgerRequestCode.PULL_READ_INDEX.getCode(), null);
264+
wrapperRequest.setBody(JSON.toJSONBytes(request));
265+
remotingClient.invokeAsync(getPeerAddr(request), wrapperRequest, 3000, responseFuture -> {
266+
RemotingCommand responseCommand = responseFuture.getResponseCommand();
267+
PullReadIndexResponse response;
268+
if (null != responseCommand) {
269+
response = JSON.parseObject(responseCommand.getBody(), PullReadIndexResponse.class);
270+
} else {
271+
response = new PullReadIndexResponse();
272+
response.copyBaseInfo(request);
273+
response.setCode(DLedgerResponseCode.NETWORK_ERROR.getCode());
274+
}
275+
future.complete(response);
276+
});
277+
return future;
278+
}
279+
255280
@Override
256281
public CompletableFuture<LeadershipTransferResponse> leadershipTransfer(
257-
LeadershipTransferRequest request) throws Exception {
282+
LeadershipTransferRequest request) throws Exception {
258283
CompletableFuture<LeadershipTransferResponse> future = new CompletableFuture<>();
259284
try {
260285
RemotingCommand wrapperRequest = RemotingCommand.createRequestCommand(DLedgerRequestCode.LEADERSHIP_TRANSFER.getCode(), null);
@@ -283,7 +308,7 @@ public CompletableFuture<LeadershipTransferResponse> leadershipTransfer(
283308
}
284309

285310
private void writeResponse(RequestOrResponse storeResp, Throwable t, RemotingCommand request,
286-
ChannelHandlerContext ctx) {
311+
ChannelHandlerContext ctx) {
287312
RemotingCommand response = null;
288313
try {
289314
if (t != null) {
@@ -319,57 +344,43 @@ public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand
319344
case METADATA: {
320345
MetadataRequest metadataRequest = JSON.parseObject(request.getBody(), MetadataRequest.class);
321346
CompletableFuture<MetadataResponse> future = handleMetadata(metadataRequest);
322-
future.whenCompleteAsync((x, y) -> {
323-
writeResponse(x, y, request, ctx);
324-
}, futureExecutor);
347+
future.whenCompleteAsync((x, y) -> writeResponse(x, y, request, ctx), futureExecutor);
325348
break;
326349
}
327350
case APPEND: {
328351
AppendEntryRequest appendEntryRequest = JSON.parseObject(request.getBody(), AppendEntryRequest.class);
329352
CompletableFuture<AppendEntryResponse> future = handleAppend(appendEntryRequest);
330-
future.whenCompleteAsync((x, y) -> {
331-
writeResponse(x, y, request, ctx);
332-
}, futureExecutor);
353+
future.whenCompleteAsync((x, y) -> writeResponse(x, y, request, ctx), futureExecutor);
333354
break;
334355
}
335356
case GET: {
336357
GetEntriesRequest getEntriesRequest = JSON.parseObject(request.getBody(), GetEntriesRequest.class);
337358
CompletableFuture<GetEntriesResponse> future = handleGet(getEntriesRequest);
338-
future.whenCompleteAsync((x, y) -> {
339-
writeResponse(x, y, request, ctx);
340-
}, futureExecutor);
359+
future.whenCompleteAsync((x, y) -> writeResponse(x, y, request, ctx), futureExecutor);
341360
break;
342361
}
343362
case PULL: {
344363
PullEntriesRequest pullEntriesRequest = JSON.parseObject(request.getBody(), PullEntriesRequest.class);
345364
CompletableFuture<PullEntriesResponse> future = handlePull(pullEntriesRequest);
346-
future.whenCompleteAsync((x, y) -> {
347-
writeResponse(x, y, request, ctx);
348-
}, futureExecutor);
365+
future.whenCompleteAsync((x, y) -> writeResponse(x, y, request, ctx), futureExecutor);
349366
break;
350367
}
351368
case PUSH: {
352369
PushEntryRequest pushEntryRequest = JSON.parseObject(request.getBody(), PushEntryRequest.class);
353370
CompletableFuture<PushEntryResponse> future = handlePush(pushEntryRequest);
354-
future.whenCompleteAsync((x, y) -> {
355-
writeResponse(x, y, request, ctx);
356-
}, futureExecutor);
371+
future.whenCompleteAsync((x, y) -> writeResponse(x, y, request, ctx), futureExecutor);
357372
break;
358373
}
359374
case VOTE: {
360375
VoteRequest voteRequest = JSON.parseObject(request.getBody(), VoteRequest.class);
361376
CompletableFuture<VoteResponse> future = handleVote(voteRequest);
362-
future.whenCompleteAsync((x, y) -> {
363-
writeResponse(x, y, request, ctx);
364-
}, futureExecutor);
377+
future.whenCompleteAsync((x, y) -> writeResponse(x, y, request, ctx), futureExecutor);
365378
break;
366379
}
367380
case HEART_BEAT: {
368381
HeartBeatRequest heartBeatRequest = JSON.parseObject(request.getBody(), HeartBeatRequest.class);
369382
CompletableFuture<HeartBeatResponse> future = handleHeartBeat(heartBeatRequest);
370-
future.whenCompleteAsync((x, y) -> {
371-
writeResponse(x, y, request, ctx);
372-
}, futureExecutor);
383+
future.whenCompleteAsync((x, y) -> writeResponse(x, y, request, ctx), futureExecutor);
373384
break;
374385
}
375386
case LEADERSHIP_TRANSFER: {
@@ -379,10 +390,16 @@ public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand
379390
future.whenCompleteAsync((x, y) -> {
380391
writeResponse(x, y, request, ctx);
381392
logger.info("LEADERSHIP_TRANSFER FINISHED. Request={}, response={}, cost={}ms",
382-
request, x, DLedgerUtils.elapsed(start));
393+
request, x, DLedgerUtils.elapsed(start));
383394
}, futureExecutor);
384395
break;
385396
}
397+
case PULL_READ_INDEX: {
398+
PullReadIndexRequest pullReadIndexRequest = JSON.parseObject(request.getBody(), PullReadIndexRequest.class);
399+
CompletableFuture<PullReadIndexResponse> future = handlePullReadIndex(pullReadIndexRequest);
400+
future.whenCompleteAsync((x, y) -> writeResponse(x, y, request, ctx), futureExecutor);
401+
break;
402+
}
386403
default:
387404
logger.error("Unknown request code {} from {}", request.getCode(), request);
388405
break;
@@ -392,7 +409,7 @@ public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand
392409

393410
@Override
394411
public CompletableFuture<LeadershipTransferResponse> handleLeadershipTransfer(
395-
LeadershipTransferRequest leadershipTransferRequest) throws Exception {
412+
LeadershipTransferRequest leadershipTransferRequest) throws Exception {
396413
return dLedgerServer.handleLeadershipTransfer(leadershipTransferRequest);
397414
}
398415

@@ -432,6 +449,11 @@ public CompletableFuture<PushEntryResponse> handlePush(PushEntryRequest request)
432449
return dLedgerServer.handlePush(request);
433450
}
434451

452+
@Override
453+
public CompletableFuture<PullReadIndexResponse> handlePullReadIndex(PullReadIndexRequest request) throws Exception {
454+
return dLedgerServer.handlePullReadIndex(request);
455+
}
456+
435457
public RemotingCommand handleResponse(RequestOrResponse response, RemotingCommand request) {
436458
RemotingCommand remotingCommand = RemotingCommand.createResponseCommand(DLedgerResponseCode.SUCCESS.getCode(), null);
437459
remotingCommand.setBody(JSON.toJSONBytes(response));

src/main/java/io/openmessaging/storage/dledger/DLedgerServer.java

+100-11
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@
3333
import io.openmessaging.storage.dledger.protocol.MetadataResponse;
3434
import io.openmessaging.storage.dledger.protocol.PullEntriesRequest;
3535
import io.openmessaging.storage.dledger.protocol.PullEntriesResponse;
36+
import io.openmessaging.storage.dledger.protocol.PullReadIndexRequest;
37+
import io.openmessaging.storage.dledger.protocol.PullReadIndexResponse;
3638
import io.openmessaging.storage.dledger.protocol.PushEntryRequest;
3739
import io.openmessaging.storage.dledger.protocol.PushEntryResponse;
3840
import io.openmessaging.storage.dledger.protocol.VoteRequest;
@@ -44,19 +46,17 @@
4446
import io.openmessaging.storage.dledger.store.file.DLedgerMmapFileStore;
4547
import io.openmessaging.storage.dledger.utils.DLedgerUtils;
4648
import io.openmessaging.storage.dledger.utils.PreConditions;
47-
4849
import java.io.IOException;
4950
import java.util.ArrayList;
5051
import java.util.Arrays;
5152
import java.util.Collections;
5253
import java.util.Iterator;
5354
import java.util.List;
5455
import java.util.Optional;
56+
import java.util.concurrent.CompletableFuture;
5557
import java.util.concurrent.Executors;
5658
import java.util.concurrent.ScheduledExecutorService;
5759
import java.util.concurrent.TimeUnit;
58-
import java.util.concurrent.CompletableFuture;
59-
6060
import org.apache.rocketmq.remoting.ChannelEventListener;
6161
import org.apache.rocketmq.remoting.netty.NettyClientConfig;
6262
import org.apache.rocketmq.remoting.netty.NettyRemotingClient;
@@ -211,7 +211,7 @@ public CompletableFuture<AppendEntryResponse> handleAppend(AppendEntryRequest re
211211
// record positions to return;
212212
long[] positions = new long[batchRequest.getBatchMsgs().size()];
213213
DLedgerEntry resEntry = null;
214-
// split bodys to append
214+
// split bodies to append
215215
int index = 0;
216216
Iterator<byte[]> iterator = batchRequest.getBatchMsgs().iterator();
217217
while (iterator.hasNext()) {
@@ -226,8 +226,8 @@ public CompletableFuture<AppendEntryResponse> handleAppend(AppendEntryRequest re
226226
batchAppendFuture.setPositions(positions);
227227
return batchAppendFuture;
228228
}
229-
throw new DLedgerException(DLedgerResponseCode.REQUEST_WITH_EMPTY_BODYS, "BatchAppendEntryRequest" +
230-
" with empty bodys");
229+
throw new DLedgerException(DLedgerResponseCode.REQUEST_WITH_EMPTY_BODIES, "BatchAppendEntryRequest" +
230+
" with empty bodies");
231231
} else {
232232
DLedgerEntry dLedgerEntry = new DLedgerEntry();
233233
dLedgerEntry.setBody(request.getBody());
@@ -246,16 +246,58 @@ public CompletableFuture<AppendEntryResponse> handleAppend(AppendEntryRequest re
246246
}
247247

248248
@Override
249-
public CompletableFuture<GetEntriesResponse> handleGet(GetEntriesRequest request) throws IOException {
249+
public CompletableFuture<GetEntriesResponse> handleGet(GetEntriesRequest request) throws Exception {
250250
try {
251251
PreConditions.check(memberState.getSelfId().equals(request.getRemoteId()), DLedgerResponseCode.UNKNOWN_MEMBER, "%s != %s", request.getRemoteId(), memberState.getSelfId());
252252
PreConditions.check(memberState.getGroup().equals(request.getGroup()), DLedgerResponseCode.UNKNOWN_GROUP, "%s != %s", request.getGroup(), memberState.getGroup());
253-
PreConditions.check(memberState.isLeader(), DLedgerResponseCode.NOT_LEADER);
254-
DLedgerEntry entry = dLedgerStore.get(request.getBeginIndex());
253+
PreConditions.check(!memberState.isCandidate(), DLedgerResponseCode.IS_CANDIDATE);
255254
GetEntriesResponse response = new GetEntriesResponse();
256255
response.setGroup(memberState.getGroup());
257-
if (entry != null) {
258-
response.setEntries(Collections.singletonList(entry));
256+
Long requestIndex = request.getBeginIndex();
257+
if (memberState.isFollower()) {
258+
//Get from follower
259+
if (requestIndex <= memberState.getLedgerEndIndex()) {
260+
getEntry(response, requestIndex);
261+
return CompletableFuture.completedFuture(response);
262+
}
263+
264+
// when requestIndex greater than ledgerEndIndex then send pull readIndex(ledgerEndIndex) request to leader
265+
PullReadIndexRequest indexRequest = new PullReadIndexRequest();
266+
indexRequest.setGroup(request.getGroup());
267+
indexRequest.setRemoteId(memberState.getLeaderId());
268+
CompletableFuture<PullReadIndexResponse> future = dLedgerRpcService.pullReadIndex(indexRequest);
269+
PullReadIndexResponse pullReadIndexResponse = future.get();
270+
if (pullReadIndexResponse.getCode() != DLedgerResponseCode.SUCCESS.getCode()) {
271+
response.copyBaseInfo(request);
272+
response.setLeaderId(memberState.getLeaderId());
273+
response.setCode(pullReadIndexResponse.getCode());
274+
return CompletableFuture.completedFuture(response);
275+
}
276+
277+
long readIndex = pullReadIndexResponse.getReadIndex();
278+
if (requestIndex > readIndex) {
279+
response.copyBaseInfo(request);
280+
response.setLeaderId(memberState.getLeaderId());
281+
response.setCode(DLedgerResponseCode.INDEX_OUT_OF_RANGE.getCode());
282+
return CompletableFuture.completedFuture(response);
283+
}
284+
285+
if (readIndex <= memberState.getLedgerEndIndex()) {
286+
getEntry(response, requestIndex);
287+
return CompletableFuture.completedFuture(response);
288+
}
289+
290+
//wait for follower ledgerEndIndex to update
291+
if (!waitFollowerEndIndex2Update(2, TimeUnit.SECONDS, requestIndex)) {
292+
logger.warn("update follower[{}] ledgerEndIndex time out", memberState.getSelfId());
293+
response.setCode(DLedgerResponseCode.FOLLOWER_UPDATE_END_INDEX_TIMEOUT.getCode());
294+
return CompletableFuture.completedFuture(response);
295+
}
296+
getEntry(response, requestIndex);
297+
return CompletableFuture.completedFuture(response);
298+
} else {
299+
//get from leader
300+
getEntry(response, requestIndex);
259301
}
260302
return CompletableFuture.completedFuture(response);
261303
} catch (DLedgerException e) {
@@ -268,6 +310,13 @@ public CompletableFuture<GetEntriesResponse> handleGet(GetEntriesRequest request
268310
}
269311
}
270312

313+
private void getEntry(GetEntriesResponse response, Long requestIndex) {
314+
DLedgerEntry entry = dLedgerStore.get(requestIndex);
315+
if (entry != null) {
316+
response.setEntries(Collections.singletonList(entry));
317+
}
318+
}
319+
271320
@Override
272321
public CompletableFuture<MetadataResponse> handleMetadata(MetadataRequest request) throws Exception {
273322
try {
@@ -311,6 +360,30 @@ public CompletableFuture<PushEntryResponse> handlePush(PushEntryRequest request)
311360

312361
}
313362

363+
@Override
364+
public CompletableFuture<PullReadIndexResponse> handlePullReadIndex(PullReadIndexRequest request) throws Exception {
365+
try {
366+
PreConditions.check(memberState.getSelfId().equals(request.getRemoteId()), DLedgerResponseCode.UNKNOWN_MEMBER, "%s != %s", request.getRemoteId(), memberState.getSelfId());
367+
PreConditions.check(memberState.getGroup().equals(request.getGroup()), DLedgerResponseCode.UNKNOWN_GROUP, "%s != %s", request.getGroup(), memberState.getGroup());
368+
PreConditions.check(memberState.isLeader(), DLedgerResponseCode.NOT_LEADER);
369+
370+
PullReadIndexResponse response = new PullReadIndexResponse();
371+
response.setGroup(memberState.getGroup());
372+
response.setLeaderId(memberState.getLeaderId());
373+
response.setEndIndex(memberState.getLedgerEndIndex());
374+
response.setReadIndex(memberState.getLedgerEndIndex());
375+
376+
return CompletableFuture.completedFuture(response);
377+
} catch (DLedgerException e) {
378+
logger.error("[{}][HandlePullReadIndex] failed", memberState.getSelfId(), e);
379+
PullReadIndexResponse response = new PullReadIndexResponse();
380+
response.copyBaseInfo(request);
381+
response.setCode(e.getCode().getCode());
382+
response.setLeaderId(memberState.getLeaderId());
383+
return CompletableFuture.completedFuture(response);
384+
}
385+
}
386+
314387
@Override
315388
public CompletableFuture<LeadershipTransferResponse> handleLeadershipTransfer(
316389
LeadershipTransferRequest request) throws Exception {
@@ -486,4 +559,20 @@ public NettyRemotingClient getRemotingClient() {
486559
return null;
487560
}
488561

562+
private boolean waitFollowerEndIndex2Update(long maxWaitTime, TimeUnit unit, long requestIndex) {
563+
long maxWaitMs = unit.toMillis(maxWaitTime);
564+
long start = System.currentTimeMillis();
565+
while (DLedgerUtils.elapsed(start) < maxWaitMs) {
566+
try {
567+
if (requestIndex <= memberState.getLedgerEndIndex()) {
568+
return true;
569+
}
570+
DLedgerUtils.sleep(1);
571+
} catch (Exception e) {
572+
logger.warn("Wait [{}]Follower update endIndex error",memberState.getSelfId(),e);
573+
break;
574+
}
575+
}
576+
return false;
577+
}
489578
}

0 commit comments

Comments
 (0)