X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2Fbehaviors%2FAbstractLeader.java;h=deb10da94187b340d6c4a82a2e7d57e0071e3ca3;hp=104654cb1206687b89bc917dc0ee7e3b95bbbf00;hb=9d5ec5cdd146a56bc03e35b6718e9492a5c8410a;hpb=4816cd33c7b70012b11dd4efc6dbb01218f8cef0 diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java index 104654cb12..deb10da941 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java @@ -47,6 +47,7 @@ import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply; import org.opendaylight.controller.cluster.raft.messages.RaftRPC; import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply; import org.opendaylight.controller.cluster.raft.messages.UnInitializedFollowerSnapshotReply; +import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload; import scala.concurrent.duration.FiniteDuration; /** @@ -98,12 +99,17 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { private Optional snapshot; private int minReplicationCount; - protected AbstractLeader(RaftActorContext context, RaftState state) { + protected AbstractLeader(RaftActorContext context, RaftState state, + @Nullable AbstractLeader initializeFromLeader) { super(context, state); - for(PeerInfo peerInfo: context.getPeers()) { - FollowerLogInformation followerLogInformation = new FollowerLogInformationImpl(peerInfo, -1, context); - followerToLog.put(peerInfo.getId(), followerLogInformation); + if(initializeFromLeader != null) { + followerToLog.putAll(initializeFromLeader.followerToLog); + } else { + for(PeerInfo peerInfo: context.getPeers()) { + FollowerLogInformation followerLogInformation = new FollowerLogInformationImpl(peerInfo, -1, context); + followerToLog.put(peerInfo.getId(), followerLogInformation); + } } LOG.debug("{}: Election: Leader has following peers: {}", logName(), getFollowerIds()); @@ -122,6 +128,10 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { scheduleHeartBeat(context.getConfigParams().getHeartBeatInterval()); } + protected AbstractLeader(RaftActorContext context, RaftState state) { + this(context, state, null); + } + /** * Return an immutable collection of follower identifiers. * @@ -217,13 +227,36 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { followerLogInformation.setRaftVersion(appendEntriesReply.getRaftVersion()); boolean updated = false; - if (appendEntriesReply.isSuccess()) { + if(appendEntriesReply.getLogLastIndex() > context.getReplicatedLog().lastIndex()) { + // The follower's log is actually ahead of the leader's log. Normally this doesn't happen + // in raft as a node cannot become leader if it's log is behind another's. However, the + // non-voting semantics deviate a bit from raft. Only voting members participate in + // elections and can become leader so it's possible for a non-voting follower to be ahead + // of the leader. This can happen if persistence is disabled and all voting members are + // restarted. In this case, the voting leader will start out with an empty log however + // the non-voting followers still retain the previous data in memory. On the first + // AppendEntries, the non-voting follower returns a successful reply b/c the prevLogIndex + // sent by the leader is -1 and thus the integrity checks pass. However the follower's returned + // lastLogIndex may be higher in which case we want to reset the follower by installing a + // snapshot. It's also possible that the follower's last log index is behind the leader's. + // However in this case the log terms won't match and the logs will conflict - this is handled + // elsewhere. + LOG.debug("{}: handleAppendEntriesReply: follower {} lastIndex {} is ahead of our lastIndex {} - forcing install snaphot", + logName(), followerLogInformation.getId(), appendEntriesReply.getLogLastIndex(), + context.getReplicatedLog().lastIndex()); + + followerLogInformation.setMatchIndex(-1); + followerLogInformation.setNextIndex(-1); + + initiateCaptureSnapshot(followerId); + updated = true; + } else if (appendEntriesReply.isSuccess()) { updated = updateFollowerLogInformation(followerLogInformation, appendEntriesReply); } else { LOG.debug("{}: handleAppendEntriesReply: received unsuccessful reply: {}", logName(), appendEntriesReply); long followerLastLogIndex = appendEntriesReply.getLogLastIndex(); - ReplicatedLogEntry followersLastLogEntry = context.getReplicatedLog().get(followerLastLogIndex); + long followersLastLogTerm = getLogEntryTerm(followerLastLogIndex); if(appendEntriesReply.isForceInstallSnapshot()) { // Reset the followers match and next index. This is to signal that this follower has nothing // in common with this Leader and so would require a snapshot to be installed @@ -232,8 +265,8 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { // Force initiate a snapshot capture initiateCaptureSnapshot(followerId); - } else if(followerLastLogIndex < 0 || (followersLastLogEntry != null && - followersLastLogEntry.getTerm() == appendEntriesReply.getLogLastTerm())) { + } else if(followerLastLogIndex < 0 || (followersLastLogTerm >= 0 && + followersLastLogTerm == appendEntriesReply.getLogLastTerm())) { // The follower's log is empty or the last entry is present in the leader's journal // and the terms match so the follower is just behind the leader's journal from // the last snapshot, if any. We'll catch up the follower quickly by starting at the @@ -241,11 +274,11 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { updated = updateFollowerLogInformation(followerLogInformation, appendEntriesReply); } else { - // TODO: When we find that the follower is out of sync with the - // Leader we simply decrement that followers next index by 1. - // Would it be possible to do better than this? The RAFT spec - // does not explicitly deal with it but may be something for us to - // think about. + // The follower's log conflicts with leader's log so decrement follower's next index by 1 + // in an attempt to find where the logs match. + + LOG.debug("{}: follower's last log term {} conflicts with the leader's {} - dec next index", + logName(), appendEntriesReply.getLogLastTerm(), followersLastLogTerm); followerLogInformation.decrNextIndex(); } @@ -268,8 +301,8 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { final PeerInfo peerInfo = context.getPeerInfo(info.getId()); if(info.getMatchIndex() >= N && (peerInfo != null && peerInfo.isVoting())) { replicatedCount++; - } else if(LOG.isDebugEnabled()) { - LOG.debug("{}: Not counting follower {} - matchIndex: {}, {}", logName(), info.getId(), + } else if(LOG.isTraceEnabled()) { + LOG.trace("{}: Not counting follower {} - matchIndex: {}, {}", logName(), info.getId(), info.getMatchIndex(), peerInfo); } } @@ -299,7 +332,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { logName(), N, replicatedLogEntry.getIndex(), replicatedLogEntry.getTerm(), currentTerm()); } } else { - LOG.trace("{}: minReplicationCount not reached - breaking", logName()); + LOG.trace("{}: minReplicationCount not reached, actual {} - breaking", logName(), replicatedCount); break; } } @@ -320,6 +353,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { //Send the next log entry immediately, if possible, no need to wait for heartbeat to trigger that event sendUpdatesToFollower(followerId, followerLogInformation, false, !updated); + return this; } @@ -504,18 +538,17 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { private void replicate(Replicate replicate) { long logIndex = replicate.getReplicatedLogEntry().getIndex(); - LOG.debug("{}: Replicate message: identifier: {}, logIndex: {}", logName(), - replicate.getIdentifier(), logIndex); + LOG.debug("{}: Replicate message: identifier: {}, logIndex: {}, payload: {}", logName(), + replicate.getIdentifier(), logIndex, replicate.getReplicatedLogEntry().getData().getClass()); // Create a tracker entry we will use this later to notify the // client actor - trackers.add( - new ClientRequestTrackerImpl(replicate.getClientActor(), - replicate.getIdentifier(), - logIndex) - ); + if(replicate.getClientActor() != null) { + trackers.add(new ClientRequestTrackerImpl(replicate.getClientActor(), replicate.getIdentifier(), + logIndex)); + } - boolean applyModificationToState = followerToLog.isEmpty() + boolean applyModificationToState = !context.anyVotingPeers() || context.getRaftPolicy().applyModificationToStateBeforeConsensus(); if(applyModificationToState){ @@ -595,7 +628,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { // then snapshot should be sent if (LOG.isDebugEnabled()) { - LOG.debug(String.format("%s: InitiateInstallSnapshot to follower: %s," + + LOG.debug(String.format("%s: InitiateInstallSnapshot to follower: %s, " + "follower-nextIndex: %d, leader-snapshot-index: %d, " + "leader-last-index: %d", logName(), followerId, followerNextIndex, leaderSnapShotIndex, leaderLastIndex)); @@ -625,8 +658,8 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { private void sendAppendEntriesToFollower(ActorSelection followerActor, long followerNextIndex, List entries, String followerId) { AppendEntries appendEntries = new AppendEntries(currentTerm(), context.getId(), - prevLogIndex(followerNextIndex), - prevLogTerm(followerNextIndex), entries, + getLogEntryIndex(followerNextIndex - 1), + getLogEntryTerm(followerNextIndex - 1), entries, context.getCommitIndex(), super.getReplicatedToAllIndex(), context.getPayloadVersion()); if(!entries.isEmpty() || LOG.isTraceEnabled()) { @@ -705,14 +738,21 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { // followerId to the followerToSnapshot map. FollowerToSnapshot followerToSnapshot = mapFollowerToSnapshot.get(followerId); + int nextChunkIndex = followerToSnapshot.incrementChunkIndex(); + Optional serverConfig = Optional.absent(); + if(followerToSnapshot.isLastChunk(nextChunkIndex)) { + serverConfig = Optional.fromNullable(context.getPeerServerInfo(true)); + } + followerActor.tell( new InstallSnapshot(currentTerm(), context.getId(), snapshot.get().getLastIncludedIndex(), snapshot.get().getLastIncludedTerm(), nextSnapshotChunk, - followerToSnapshot.incrementChunkIndex(), + nextChunkIndex, followerToSnapshot.getTotalChunks(), - Optional.of(followerToSnapshot.getLastChunkHashCode()) + Optional.of(followerToSnapshot.getLastChunkHashCode()), + serverConfig ).toSerializable(followerToLog.get(followerId).getRaftVersion()), actor() );