X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;ds=sidebyside;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2Fbehaviors%2FAbstractLeader.java;h=be51ba069cc5056636646566d1db00b30154073a;hb=5ebdd63990602003ecf568a0675b9f5e521eade2;hp=94c38f6108eabf62ef9e41a3f484a6e2f915129d;hpb=6bfcbfc5158130c4255b861cb02dfaa68c52aa63;p=controller.git diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java index 94c38f6108..be51ba069c 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java @@ -92,7 +92,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { private Optional snapshot; public AbstractLeader(RaftActorContext context) { - super(context); + super(context, RaftState.Leader); final Builder ftlBuilder = ImmutableMap.builder(); for (String followerId : context.getPeerAddresses().keySet()) { @@ -107,7 +107,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { leaderId = context.getId(); - LOG.debug("{}: Election: Leader has following peers: {}", context.getId(), getFollowerIds()); + LOG.debug("{}: Election: Leader has following peers: {}", logName(), getFollowerIds()); minReplicationCount = getMajorityVoteCount(getFollowerIds().size()); @@ -125,7 +125,10 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { // Upon election: send initial empty AppendEntries RPCs // (heartbeat) to each server; repeat during idle periods to // prevent election timeouts (§5.2) - sendAppendEntries(0); + sendAppendEntries(0, false); + + // It is important to schedule this heartbeat here + scheduleHeartBeat(context.getConfigParams().getHeartBeatInterval()); } /** @@ -137,10 +140,6 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { return followerToLog.keySet(); } - private Optional getSnapshot() { - return snapshot; - } - @VisibleForTesting void setSnapshot(Optional snapshot) { this.snapshot = snapshot; @@ -150,9 +149,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { protected RaftActorBehavior handleAppendEntries(ActorRef sender, AppendEntries appendEntries) { - if(LOG.isDebugEnabled()) { - LOG.debug("{}: handleAppendEntries: {}", context.getId(), appendEntries); - } + LOG.debug("{}: handleAppendEntries: {}", logName(), appendEntries); return this; } @@ -161,10 +158,10 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { protected RaftActorBehavior handleAppendEntriesReply(ActorRef sender, AppendEntriesReply appendEntriesReply) { - if(! appendEntriesReply.isSuccess()) { - if(LOG.isDebugEnabled()) { - LOG.debug("{}: handleAppendEntriesReply: {}", context.getId(), appendEntriesReply); - } + if(LOG.isTraceEnabled()) { + LOG.trace("{}: handleAppendEntriesReply: {}", logName(), appendEntriesReply); + } else if(LOG.isDebugEnabled() && !appendEntriesReply.isSuccess()) { + LOG.debug("{}: handleAppendEntriesReply: {}", logName(), appendEntriesReply); } // Update the FollowerLogInformation @@ -173,10 +170,18 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { followerToLog.get(followerId); if(followerLogInformation == null){ - LOG.error("{}: handleAppendEntriesReply - unknown follower {}", context.getId(), followerId); + LOG.error("{}: handleAppendEntriesReply - unknown follower {}", logName(), followerId); return this; } + if(followerLogInformation.timeSinceLastActivity() > + context.getConfigParams().getElectionTimeOutInterval().toMillis()) { + LOG.error("{} : handleAppendEntriesReply delayed beyond election timeout, " + + "appendEntriesReply : {}, timeSinceLastActivity : {}, lastApplied : {}, commitIndex : {}", + logName(), appendEntriesReply, followerLogInformation.timeSinceLastActivity(), + context.getLastApplied(), context.getCommitIndex()); + } + followerLogInformation.markFollowerActive(); if (appendEntriesReply.isSuccess()) { @@ -221,6 +226,9 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { // Apply the change to the state machine if (context.getCommitIndex() > context.getLastApplied()) { + LOG.debug("{}: handleAppendEntriesReply: applying to log - commitIndex: {}, lastAppliedIndex: {}", + logName(), context.getCommitIndex(), context.getLastApplied()); + applyLogToStateMachine(context.getCommitIndex()); } @@ -229,7 +237,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { } //Send the next log entry immediately, if possible, no need to wait for heartbeat to trigger that event - sendUpdatesToFollower(followerId, followerLogInformation, false); + sendUpdatesToFollower(followerId, followerLogInformation, false, false); return this; } @@ -276,10 +284,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { return this; } - @Override - public RaftState state() { - return RaftState.Leader; - } + protected void beforeSendHeartbeat(){} @Override public RaftActorBehavior handleMessage(ActorRef sender, Object originalMessage) { @@ -293,8 +298,8 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { // set currentTerm = T, convert to follower (§5.1) // This applies to all RPC messages and responses if (rpc.getTerm() > context.getTermInformation().getCurrentTerm()) { - LOG.debug("{}: Term {} in \"{}\" message is greater than leader's term {}", context.getId(), - rpc.getTerm(), rpc, context.getTermInformation().getCurrentTerm()); + LOG.debug("{}: Term {} in \"{}\" message is greater than leader's term {} - switching to Follower", + logName(), rpc.getTerm(), rpc, context.getTermInformation().getCurrentTerm()); context.getTermInformation().updateAndPersist(rpc.getTerm(), null); @@ -302,37 +307,38 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { } } - try { - if (message instanceof SendHeartBeat) { - sendHeartBeat(); - return this; + if (message instanceof SendHeartBeat) { + beforeSendHeartbeat(); + sendHeartBeat(); + scheduleHeartBeat(context.getConfigParams().getHeartBeatInterval()); + return this; - } else if(message instanceof SendInstallSnapshot) { - // received from RaftActor - setSnapshot(Optional.of(((SendInstallSnapshot) message).getSnapshot())); - sendInstallSnapshot(); + } else if(message instanceof SendInstallSnapshot) { + // received from RaftActor + setSnapshot(Optional.of(((SendInstallSnapshot) message).getSnapshot())); + sendInstallSnapshot(); - } else if (message instanceof Replicate) { - replicate((Replicate) message); + } else if (message instanceof Replicate) { + replicate((Replicate) message); - } else if (message instanceof InstallSnapshotReply){ - handleInstallSnapshotReply((InstallSnapshotReply) message); + } else if (message instanceof InstallSnapshotReply){ + handleInstallSnapshotReply((InstallSnapshotReply) message); - } - } finally { - scheduleHeartBeat(context.getConfigParams().getHeartBeatInterval()); } + return super.handleMessage(sender, message); } private void handleInstallSnapshotReply(InstallSnapshotReply reply) { + LOG.debug("{}: handleInstallSnapshotReply: {}", logName(), reply); + String followerId = reply.getFollowerId(); FollowerToSnapshot followerToSnapshot = mapFollowerToSnapshot.get(followerId); if (followerToSnapshot == null) { LOG.error("{}: FollowerId {} in InstallSnapshotReply not known to Leader", - context.getId(), followerId); + logName(), followerId); return; } @@ -346,8 +352,8 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { //this was the last chunk reply if(LOG.isDebugEnabled()) { LOG.debug("{}: InstallSnapshotReply received, " + - "last chunk received, Chunk:{}. Follower:{} Setting nextIndex:{}", - context.getId(), reply.getChunkIndex(), followerId, + "last chunk received, Chunk: {}. Follower: {} Setting nextIndex: {}", + logName(), reply.getChunkIndex(), followerId, context.getReplicatedLog().getSnapshotIndex() + 1 ); } @@ -358,10 +364,9 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { context.getReplicatedLog().getSnapshotIndex() + 1); mapFollowerToSnapshot.remove(followerId); - if(LOG.isDebugEnabled()) { - LOG.debug("{}: followerToLog.get(followerId).getNextIndex()=" + - context.getId(), followerToLog.get(followerId).getNextIndex()); - } + LOG.debug("{}: follower: {}, matchIndex set to {}, nextIndex set to {}", + logName(), followerId, followerLogInformation.getMatchIndex(), + followerLogInformation.getNextIndex()); if (mapFollowerToSnapshot.isEmpty()) { // once there are no pending followers receiving snapshots @@ -375,7 +380,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { } } else { LOG.info("{}: InstallSnapshotReply received sending snapshot chunk failed, Will retry, Chunk: {}", - context.getId(), reply.getChunkIndex()); + logName(), reply.getChunkIndex()); followerToSnapshot.markSendStatus(false); } @@ -389,7 +394,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { } else { LOG.error("{}: Chunk index {} in InstallSnapshotReply from follower {} does not match expected index {}", - context.getId(), reply.getChunkIndex(), followerId, + logName(), reply.getChunkIndex(), followerId, followerToSnapshot.getChunkIndex()); if(reply.getChunkIndex() == INVALID_CHUNK_INDEX){ @@ -403,9 +408,8 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { private void replicate(Replicate replicate) { long logIndex = replicate.getReplicatedLogEntry().getIndex(); - if(LOG.isDebugEnabled()) { - LOG.debug("{}: Replicate message {}", context.getId(), logIndex); - } + LOG.debug("{}: Replicate message: identifier: {}, logIndex: {}", logName(), + replicate.getIdentifier(), logIndex); // Create a tracker entry we will use this later to notify the // client actor @@ -419,11 +423,11 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { context.setCommitIndex(logIndex); applyLogToStateMachine(logIndex); } else { - sendAppendEntries(0); + sendAppendEntries(0, false); } } - private void sendAppendEntries(long timeSinceLastActivityInterval) { + private void sendAppendEntries(long timeSinceLastActivityInterval, boolean isHeartbeat) { // Send an AppendEntries to all followers for (Entry e : followerToLog.entrySet()) { final String followerId = e.getKey(); @@ -431,7 +435,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { // This checks helps not to send a repeat message to the follower if(!followerLogInformation.isFollowerActive() || followerLogInformation.timeSinceLastActivity() >= timeSinceLastActivityInterval) { - sendUpdatesToFollower(followerId, followerLogInformation, true); + sendUpdatesToFollower(followerId, followerLogInformation, true, isHeartbeat); } } } @@ -444,7 +448,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { */ private void sendUpdatesToFollower(String followerId, FollowerLogInformation followerLogInformation, - boolean sendHeartbeat) { + boolean sendHeartbeat, boolean isHeartbeat) { ActorSelection followerActor = context.getPeerActorSelection(followerId); if (followerActor != null) { @@ -463,8 +467,17 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { } else { long leaderLastIndex = context.getReplicatedLog().lastIndex(); long leaderSnapShotIndex = context.getReplicatedLog().getSnapshotIndex(); - if (isFollowerActive && - context.getReplicatedLog().isPresent(followerNextIndex)) { + + if(!isHeartbeat || LOG.isTraceEnabled()) { + LOG.debug("{}: Checking sendAppendEntries for follower {}, leaderLastIndex: {}, leaderSnapShotIndex: {}", + logName(), followerId, leaderLastIndex, leaderSnapShotIndex); + } + + if (isFollowerActive && context.getReplicatedLog().isPresent(followerNextIndex)) { + + LOG.debug("{}: sendAppendEntries: {} is present for follower {}", logName(), + followerNextIndex, followerId); + // FIXME : Sending one entry at a time final List entries = context.getReplicatedLog().getFrom(followerNextIndex, 1); @@ -477,11 +490,10 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { // then snapshot should be sent if (LOG.isDebugEnabled()) { - LOG.debug("InitiateInstallSnapshot to follower:{}," + - "follower-nextIndex:{}, leader-snapshot-index:{}, " + - "leader-last-index:{}", followerId, - followerNextIndex, leaderSnapShotIndex, leaderLastIndex - ); + LOG.debug(String.format("%s: InitiateInstallSnapshot to follower: %s," + + "follower-nextIndex: %d, leader-snapshot-index: %d, " + + "leader-last-index: %d", logName(), followerId, + followerNextIndex, leaderSnapShotIndex, leaderLastIndex)); } // Send heartbeat to follower whenever install snapshot is initiated. @@ -508,8 +520,8 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { prevLogTerm(followerNextIndex), entries, context.getCommitIndex(), super.getReplicatedToAllIndex()); - if(!entries.isEmpty()) { - LOG.debug("{}: Sending AppendEntries to follower {}: {}", context.getId(), followerId, + if(!entries.isEmpty() || LOG.isTraceEnabled()) { + LOG.debug("{}: Sending AppendEntries to follower {}: {}", logName(), followerId, appendEntries); } @@ -543,7 +555,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { } else if (!context.isSnapshotCaptureInitiated()) { - LOG.info("{}: Initiating Snapshot Capture to Install Snapshot, Leader:{}", context.getId(), getLeaderId()); + LOG.info("{}: Initiating Snapshot Capture to Install Snapshot, Leader:{}", logName(), getLeaderId()); ReplicatedLogEntry lastAppliedEntry = context.getReplicatedLog().get(context.getLastApplied()); long lastAppliedIndex = -1; long lastAppliedTerm = -1; @@ -570,6 +582,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { private void sendInstallSnapshot() { + LOG.debug("{}: sendInstallSnapshot", logName()); for (Entry e : followerToLog.entrySet()) { ActorSelection followerActor = context.getPeerActorSelection(e.getKey()); @@ -609,12 +622,12 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { actor() ); LOG.info("{}: InstallSnapshot sent to follower {}, Chunk: {}/{}", - context.getId(), followerActor.path(), + logName(), followerActor.path(), followerToSnapshot.getChunkIndex(), followerToSnapshot.getTotalChunks()); } } catch (IOException e) { - LOG.error("{}: InstallSnapshot failed for Leader.", context.getId(), e); + LOG.error("{}: InstallSnapshot failed for Leader.", logName(), e); } } @@ -629,15 +642,16 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { mapFollowerToSnapshot.put(followerId, followerToSnapshot); } ByteString nextChunk = followerToSnapshot.getNextChunk(); - if (LOG.isDebugEnabled()) { - LOG.debug("{}: Leader's snapshot nextChunk size:{}", context.getId(), nextChunk.size()); - } + + LOG.debug("{}: next snapshot chunk size for follower {}: {}", logName(), followerId, nextChunk.size()); + return nextChunk; } private void sendHeartBeat() { if (!followerToLog.isEmpty()) { - sendAppendEntries(context.getConfigParams().getHeartBeatInterval().toMillis()); + LOG.trace("{}: Sending heartbeat", logName()); + sendAppendEntries(context.getConfigParams().getHeartBeatInterval().toMillis(), true); } } @@ -712,7 +726,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { ((size % context.getConfigParams().getSnapshotChunkSize()) > 0 ? 1 : 0); if(LOG.isDebugEnabled()) { LOG.debug("{}: Snapshot {} bytes, total chunks to send:{}", - context.getId(), size, totalChunks); + logName(), size, totalChunks); } replyReceivedForOffset = -1; chunkIndex = AbstractLeader.FIRST_CHUNK_INDEX; @@ -780,10 +794,10 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { } } - if(LOG.isDebugEnabled()) { - LOG.debug("{}: Next chunk: length={}, offset={},size={}", context.getId(), + + LOG.debug("{}: Next chunk: length={}, offset={},size={}", logName(), snapshotLength, start, size); - } + ByteString substring = getSnapshotBytes().substring(start, start + size); nextChunkHashCode = substring.hashCode(); return substring;