X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2Fbehaviors%2FAbstractLeader.java;h=4382cfec3237075087837b5f1402cadab249188f;hb=fbeed3e25ef80495c67bfe1698daf44b88fb10ff;hp=4fae48e1aa42143d0867338d1ca3a3536ea9b26f;hpb=b8da9f6fa8bf4284805349f4638ebdadf169ff5f;p=controller.git diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java index 4fae48e1aa..4382cfec32 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java @@ -14,7 +14,9 @@ import akka.actor.Cancellable; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Optional; import com.google.common.base.Preconditions; -import com.google.protobuf.ByteString; +import com.google.common.base.Throwables; +import com.google.common.io.ByteSource; +import java.io.IOException; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -33,7 +35,6 @@ import org.opendaylight.controller.cluster.raft.PeerInfo; import org.opendaylight.controller.cluster.raft.RaftActorContext; import org.opendaylight.controller.cluster.raft.RaftState; import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; -import org.opendaylight.controller.cluster.raft.Snapshot; import org.opendaylight.controller.cluster.raft.VotingState; import org.opendaylight.controller.cluster.raft.base.messages.CheckConsensusReached; import org.opendaylight.controller.cluster.raft.base.messages.Replicate; @@ -47,6 +48,7 @@ import org.opendaylight.controller.cluster.raft.messages.RaftRPC; import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply; import org.opendaylight.controller.cluster.raft.messages.UnInitializedFollowerSnapshotReply; import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload; +import org.opendaylight.controller.cluster.raft.persisted.Snapshot; import scala.concurrent.duration.FiniteDuration; /** @@ -83,7 +85,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { private final Queue trackers = new LinkedList<>(); private Cancellable heartbeatSchedule = null; - private Optional snapshot = Optional.absent(); + private Optional snapshotHolder = Optional.absent(); private int minReplicationCount; protected AbstractLeader(RaftActorContext context, RaftState state, @@ -92,7 +94,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { if (initializeFromLeader != null) { followerToLog.putAll(initializeFromLeader.followerToLog); - snapshot = initializeFromLeader.snapshot; + snapshotHolder = initializeFromLeader.snapshotHolder; trackers.addAll(initializeFromLeader.trackers); } else { for (PeerInfo peerInfo: context.getPeers()) { @@ -165,17 +167,13 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { } @VisibleForTesting - void setSnapshot(@Nullable Snapshot snapshot) { - if (snapshot != null) { - this.snapshot = Optional.of(new SnapshotHolder(snapshot)); - } else { - this.snapshot = Optional.absent(); - } + void setSnapshot(@Nullable SnapshotHolder snapshotHolder) { + this.snapshotHolder = Optional.fromNullable(snapshotHolder); } @VisibleForTesting boolean hasSnapshot() { - return snapshot.isPresent(); + return snapshotHolder.isPresent(); } @Override @@ -229,9 +227,10 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { // snapshot. It's also possible that the follower's last log index is behind the leader's. // However in this case the log terms won't match and the logs will conflict - this is handled // elsewhere. - log.debug("{}: handleAppendEntriesReply: follower {} lastIndex {} is ahead of our lastIndex {} - " - + "forcing install snaphot", logName(), followerLogInformation.getId(), - appendEntriesReply.getLogLastIndex(), context.getReplicatedLog().lastIndex()); + log.info("{}: handleAppendEntriesReply: follower {} lastIndex {} is ahead of our lastIndex {} " + + "(snapshotIndex {}) - forcing install snaphot", logName(), followerLogInformation.getId(), + appendEntriesReply.getLogLastIndex(), context.getReplicatedLog().lastIndex(), + context.getReplicatedLog().getSnapshotIndex()); followerLogInformation.setMatchIndex(-1); followerLogInformation.setNextIndex(-1); @@ -253,7 +252,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { followerLogInformation.setNextIndex(followerLastLogIndex - 1); updated = true; - log.debug("{}: handleAppendEntriesReply: follower {} last log term {} for index {} conflicts with the " + log.info("{}: handleAppendEntriesReply: follower {} last log term {} for index {} conflicts with the " + "leader's {} - set the follower's next index to {}", logName(), followerId, appendEntriesReply.getLogLastTerm(), appendEntriesReply.getLogLastIndex(), followersLastLogTermInLeadersLog, followerLogInformation.getNextIndex()); @@ -261,7 +260,8 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { updated = updateFollowerLogInformation(followerLogInformation, appendEntriesReply); } } else { - log.debug("{}: handleAppendEntriesReply: received unsuccessful reply: {}", logName(), appendEntriesReply); + log.info("{}: handleAppendEntriesReply - received unsuccessful reply: {}, leader snapshotIndex: {}", + logName(), appendEntriesReply, context.getReplicatedLog().getSnapshotIndex()); if (appendEntriesReply.isForceInstallSnapshot()) { // Reset the followers match and next index. This is to signal that this follower has nothing @@ -279,16 +279,22 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { // follower's last log index. updated = updateFollowerLogInformation(followerLogInformation, appendEntriesReply); + + log.info("{}: follower {} appears to be behind the leader from the last snapshot - " + + "updated: matchIndex: {}, nextIndex: {}", logName(), followerId, + appendEntriesReply.getLogLastTerm(), followerLogInformation.getMatchIndex(), + followerLogInformation.getNextIndex()); } else { // The follower's log conflicts with leader's log so decrement follower's next index by 1 // in an attempt to find where the logs match. - followerLogInformation.decrNextIndex(); - updated = true; + if (followerLogInformation.decrNextIndex()) { + updated = true; - log.debug("{}: follower's last log term {} conflicts with the leader's {} - dec next index to {}", - logName(), appendEntriesReply.getLogLastTerm(), followersLastLogTermInLeadersLog, - followerLogInformation.getNextIndex()); + log.info("{}: follower {} last log term {} conflicts with the leader's {} - dec next index to {}", + logName(), followerId, appendEntriesReply.getLogLastTerm(), + followersLastLogTermInLeadersLog, followerLogInformation.getNextIndex()); + } } } @@ -323,14 +329,9 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { int replicatedCount = replicatedLogEntry.isPersistencePending() ? 0 : 1; if (replicatedCount == 0) { - // We don't commit and apply a log entry until we've gotten the ack from our local persistence. Ideally - // we should be able to update the commit index if we get a consensus amongst the followers - // w/o the local persistence ack however this can cause timing issues with snapshot capture - // which may lead to an entry that is neither in the serialized snapshot state nor in the snapshot's - // unapplied entries. This can happen if the lastAppliedIndex is updated but the corresponding - // ApplyState message is still pending in the message queue and thus the corresponding log entry hasn't - // actually been applied to the state yet. This would be alleviated by eliminating the ApplyState - // message in lieu of synchronously updating lastAppliedIndex and applying to state. + // We don't commit and apply a log entry until we've gotten the ack from our local persistence, + // even though there *shouldn't* be any issue with updating the commit index if we get a consensus + // amongst the followers w/o the local persistence ack. break; } @@ -442,7 +443,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { // set currentTerm = T, convert to follower (§5.1) // This applies to all RPC messages and responses if (rpc.getTerm() > context.getTermInformation().getCurrentTerm()) { - log.debug("{}: Term {} in \"{}\" message is greater than leader's term {} - switching to Follower", + log.info("{}: Term {} in \"{}\" message is greater than leader's term {} - switching to Follower", logName(), rpc.getTerm(), rpc, context.getTermInformation().getCurrentTerm()); context.getTermInformation().updateAndPersist(rpc.getTerm(), null); @@ -456,8 +457,8 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { sendHeartBeat(); scheduleHeartBeat(context.getConfigParams().getHeartBeatInterval()); } else if (message instanceof SendInstallSnapshot) { - // received from RaftActor - setSnapshot(((SendInstallSnapshot) message).getSnapshot()); + SendInstallSnapshot sendInstallSnapshot = (SendInstallSnapshot) message; + setSnapshot(new SnapshotHolder(sendInstallSnapshot.getSnapshot(), sendInstallSnapshot.getSnapshotBytes())); sendInstallSnapshot(); } else if (message instanceof Replicate) { replicate((Replicate) message); @@ -498,18 +499,15 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { if (reply.isSuccess()) { if (installSnapshotState.isLastChunk(reply.getChunkIndex())) { //this was the last chunk reply - log.debug("{}: InstallSnapshotReply received, last chunk received, Chunk: {}. Follower: {} -" - + " Setting nextIndex: {}", logName(), reply.getChunkIndex(), followerId, - context.getReplicatedLog().getSnapshotIndex() + 1); - long followerMatchIndex = snapshot.get().getLastIncludedIndex(); + long followerMatchIndex = snapshotHolder.get().getLastIncludedIndex(); followerLogInformation.setMatchIndex(followerMatchIndex); followerLogInformation.setNextIndex(followerMatchIndex + 1); followerLogInformation.clearLeaderInstallSnapshotState(); - log.debug("{}: follower: {}, matchIndex set to {}, nextIndex set to {}", - logName(), followerId, followerLogInformation.getMatchIndex(), - followerLogInformation.getNextIndex()); + log.info("{}: Snapshot successfully installed on follower {} (last chunk {}) - " + + "matchIndex set to {}, nextIndex set to {}", logName(), followerId, reply.getChunkIndex(), + followerLogInformation.getMatchIndex(), followerLogInformation.getNextIndex()); if (!anyFollowersInstallingSnapshot()) { // once there are no pending followers receiving snapshots @@ -528,8 +526,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { installSnapshotState.markSendStatus(true); } } else { - log.info("{}: InstallSnapshotReply received sending snapshot chunk failed, Will retry, Chunk: {}", - logName(), reply.getChunkIndex()); + log.warn("{}: Received failed InstallSnapshotReply - will retry: {}", logName(), reply); installSnapshotState.markSendStatus(false); } @@ -611,8 +608,6 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { /** * This method checks if any update needs to be sent to the given follower. This includes append log entries, * sending next snapshot chunk, and initiating a snapshot. - * - * @return true if any update is sent, false otherwise */ private void sendUpdatesToFollower(String followerId, FollowerLogInformation followerLogInformation, boolean sendHeartbeat, boolean isHeartbeat) { @@ -662,17 +657,22 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { // if the follower is just not starting and if leader's index is more than followers index // then snapshot should be sent - if (log.isDebugEnabled()) { - log.debug(String.format("%s: InitiateInstallSnapshot to follower: %s, " - + "follower-nextIndex: %d, leader-snapshot-index: %d, " - + "leader-last-index: %d", logName(), followerId, - followerNextIndex, leaderSnapShotIndex, leaderLastIndex)); - } - // Send heartbeat to follower whenever install snapshot is initiated. sendAppendEntries = true; if (canInstallSnapshot(followerNextIndex)) { + log.info("{}: Initiating install snapshot to follower {}: follower nextIndex: {}, leader " + + "snapshotIndex: {}, leader lastIndex: {}, leader log size: {}", logName(), followerId, + followerNextIndex, leaderSnapShotIndex, leaderLastIndex, + context.getReplicatedLog().size()); + initiateCaptureSnapshot(followerId); + } else { + // It doesn't seem like we should ever reach here - most likely indicates sonething is + // wrong. + log.info("{}: Follower {} is behind but cannot install snapshot: follower nextIndex: {}, " + + "leader snapshotIndex: {}, leader lastIndex: {}, leader log size: {}", logName(), + followerId, followerNextIndex, leaderSnapShotIndex, leaderLastIndex, + context.getReplicatedLog().size()); } } else if (sendHeartbeat) { @@ -737,7 +737,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { */ public boolean initiateCaptureSnapshot(String followerId) { FollowerLogInformation followerLogInfo = followerToLog.get(followerId); - if (snapshot.isPresent()) { + if (snapshotHolder.isPresent()) { // If a snapshot is present in the memory, most likely another install is in progress no need to capture // snapshot. This could happen if another follower needs an install when one is going on. final ActorSelection followerActor = context.getPeerActorSelection(followerId); @@ -745,16 +745,16 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { // Note: sendSnapshotChunk will set the LeaderInstallSnapshotState. sendSnapshotChunk(followerActor, followerLogInfo); return true; - } else { - boolean captureInitiated = context.getSnapshotManager().captureToInstall(context.getReplicatedLog().last(), - this.getReplicatedToAllIndex(), followerId); - if (captureInitiated) { - followerLogInfo.setLeaderInstallSnapshotState(new LeaderInstallSnapshotState( - context.getConfigParams().getSnapshotChunkSize(), logName())); - } + } - return captureInitiated; + boolean captureInitiated = context.getSnapshotManager().captureToInstall(context.getReplicatedLog().last(), + this.getReplicatedToAllIndex(), followerId); + if (captureInitiated) { + followerLogInfo.setLeaderInstallSnapshotState(new LeaderInstallSnapshotState( + context.getConfigParams().getSnapshotChunkSize(), logName())); } + + return captureInitiated; } private boolean canInstallSnapshot(long nextIndex) { @@ -790,7 +790,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { * InstallSnapshot should qualify as a heartbeat too. */ private void sendSnapshotChunk(ActorSelection followerActor, FollowerLogInformation followerLogInfo) { - if (snapshot.isPresent()) { + if (snapshotHolder.isPresent()) { LeaderInstallSnapshotState installSnapshotState = followerLogInfo.getInstallSnapshotState(); if (installSnapshotState == null) { installSnapshotState = new LeaderInstallSnapshotState(context.getConfigParams().getSnapshotChunkSize(), @@ -798,35 +798,39 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { followerLogInfo.setLeaderInstallSnapshotState(installSnapshotState); } - // Ensure the snapshot bytes are set - this is a no-op. - installSnapshotState.setSnapshotBytes(snapshot.get().getSnapshotBytes()); + try { + // Ensure the snapshot bytes are set - this is a no-op. + installSnapshotState.setSnapshotBytes(snapshotHolder.get().getSnapshotBytes()); - byte[] nextSnapshotChunk = installSnapshotState.getNextChunk(); + byte[] nextSnapshotChunk = installSnapshotState.getNextChunk(); - log.debug("{}: next snapshot chunk size for follower {}: {}", logName(), followerLogInfo.getId(), - nextSnapshotChunk.length); + log.debug("{}: next snapshot chunk size for follower {}: {}", logName(), followerLogInfo.getId(), + nextSnapshotChunk.length); - int nextChunkIndex = installSnapshotState.incrementChunkIndex(); - Optional serverConfig = Optional.absent(); - if (installSnapshotState.isLastChunk(nextChunkIndex)) { - serverConfig = Optional.fromNullable(context.getPeerServerInfo(true)); - } - - followerActor.tell( - new InstallSnapshot(currentTerm(), context.getId(), - snapshot.get().getLastIncludedIndex(), - snapshot.get().getLastIncludedTerm(), - nextSnapshotChunk, - nextChunkIndex, - installSnapshotState.getTotalChunks(), - Optional.of(installSnapshotState.getLastChunkHashCode()), - serverConfig - ).toSerializable(followerLogInfo.getRaftVersion()), - actor() - ); + int nextChunkIndex = installSnapshotState.incrementChunkIndex(); + Optional serverConfig = Optional.absent(); + if (installSnapshotState.isLastChunk(nextChunkIndex)) { + serverConfig = Optional.fromNullable(context.getPeerServerInfo(true)); + } - log.debug("{}: InstallSnapshot sent to follower {}, Chunk: {}/{}", logName(), followerActor.path(), - installSnapshotState.getChunkIndex(), installSnapshotState.getTotalChunks()); + followerActor.tell( + new InstallSnapshot(currentTerm(), context.getId(), + snapshotHolder.get().getLastIncludedIndex(), + snapshotHolder.get().getLastIncludedTerm(), + nextSnapshotChunk, + nextChunkIndex, + installSnapshotState.getTotalChunks(), + Optional.of(installSnapshotState.getLastChunkHashCode()), + serverConfig + ).toSerializable(followerLogInfo.getRaftVersion()), + actor() + ); + + log.debug("{}: InstallSnapshot sent to follower {}, Chunk: {}/{}", logName(), followerActor.path(), + installSnapshotState.getChunkIndex(), installSnapshotState.getTotalChunks()); + } catch (IOException e) { + throw Throwables.propagate(e); + } } } @@ -918,15 +922,15 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { return followerToLog.size(); } - private static class SnapshotHolder { + static class SnapshotHolder { private final long lastIncludedTerm; private final long lastIncludedIndex; - private final ByteString snapshotBytes; + private final ByteSource snapshotBytes; - SnapshotHolder(Snapshot snapshot) { + SnapshotHolder(Snapshot snapshot, ByteSource snapshotBytes) { this.lastIncludedTerm = snapshot.getLastAppliedTerm(); this.lastIncludedIndex = snapshot.getLastAppliedIndex(); - this.snapshotBytes = ByteString.copyFrom(snapshot.getState()); + this.snapshotBytes = snapshotBytes; } long getLastIncludedTerm() { @@ -937,7 +941,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { return lastIncludedIndex; } - ByteString getSnapshotBytes() { + ByteSource getSnapshotBytes() { return snapshotBytes; } }