X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2Fbehaviors%2FAbstractLeader.java;h=94c38f6108eabf62ef9e41a3f484a6e2f915129d;hp=8f33d94700bc4a87231c44ab66758bd7c28bf049;hb=6bfcbfc5158130c4255b861cb02dfaa68c52aa63;hpb=05e04699813e6989cdf975cba027d6ff7ec44a35 diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java index 8f33d94700..94c38f6108 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java @@ -26,7 +26,6 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Map.Entry; -import java.util.concurrent.TimeUnit; import org.opendaylight.controller.cluster.raft.ClientRequestTracker; import org.opendaylight.controller.cluster.raft.ClientRequestTrackerImpl; import org.opendaylight.controller.cluster.raft.FollowerLogInformation; @@ -35,7 +34,6 @@ import org.opendaylight.controller.cluster.raft.RaftActorContext; import org.opendaylight.controller.cluster.raft.RaftState; import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot; -import org.opendaylight.controller.cluster.raft.base.messages.InitiateInstallSnapshot; import org.opendaylight.controller.cluster.raft.base.messages.Replicate; import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat; import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot; @@ -93,8 +91,6 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { private Optional snapshot; - private long replicatedToAllIndex = -1; - public AbstractLeader(RaftActorContext context) { super(context); @@ -129,7 +125,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { // Upon election: send initial empty AppendEntries RPCs // (heartbeat) to each server; repeat during idle periods to // prevent election timeouts (§5.2) - scheduleHeartBeat(new FiniteDuration(0, TimeUnit.SECONDS)); + sendAppendEntries(0); } /** @@ -238,15 +234,16 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { } private void purgeInMemoryLog() { - //find the lowest index across followers which has been replicated to all. -1 if there are no followers. + //find the lowest index across followers which has been replicated to all. + // lastApplied if there are no followers, so that we keep clearing the log for single-node // we would delete the in-mem log from that index on, in-order to minimize mem usage // we would also share this info thru AE with the followers so that they can delete their log entries as well. - long minReplicatedToAllIndex = followerToLog.isEmpty() ? -1 : Long.MAX_VALUE; + long minReplicatedToAllIndex = followerToLog.isEmpty() ? context.getLastApplied() : Long.MAX_VALUE; for (FollowerLogInformation info : followerToLog.values()) { minReplicatedToAllIndex = Math.min(minReplicatedToAllIndex, info.getMatchIndex()); } - replicatedToAllIndex = fakeSnapshot(minReplicatedToAllIndex, replicatedToAllIndex); + super.performSnapshotWithoutCapture(minReplicatedToAllIndex); } @Override @@ -310,9 +307,6 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { sendHeartBeat(); return this; - } else if(message instanceof InitiateInstallSnapshot) { - installSnapshotIfNeeded(); - } else if(message instanceof SendInstallSnapshot) { // received from RaftActor setSnapshot(Optional.of(((SendInstallSnapshot) message).getSnapshot())); @@ -425,18 +419,18 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { context.setCommitIndex(logIndex); applyLogToStateMachine(logIndex); } else { - sendAppendEntries(); + sendAppendEntries(0); } } - private void sendAppendEntries() { + private void sendAppendEntries(long timeSinceLastActivityInterval) { // Send an AppendEntries to all followers - long heartbeatInterval = context.getConfigParams().getHeartBeatInterval().toMillis(); for (Entry e : followerToLog.entrySet()) { final String followerId = e.getKey(); final FollowerLogInformation followerLogInformation = e.getValue(); // This checks helps not to send a repeat message to the follower - if(followerLogInformation.timeSinceLastActivity() >= heartbeatInterval) { + if(!followerLogInformation.isFollowerActive() || + followerLogInformation.timeSinceLastActivity() >= timeSinceLastActivityInterval) { sendUpdatesToFollower(followerId, followerLogInformation, true); } } @@ -477,7 +471,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { sendAppendEntriesToFollower(followerActor, followerNextIndex, entries, followerId); } else if (isFollowerActive && followerNextIndex >= 0 && - leaderLastIndex >= followerNextIndex) { + leaderLastIndex > followerNextIndex && !context.isSnapshotCaptureInitiated()) { // if the followers next index is not present in the leaders log, and // if the follower is just not starting and if leader's index is more than followers index // then snapshot should be sent @@ -489,12 +483,13 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { followerNextIndex, leaderSnapShotIndex, leaderLastIndex ); } - actor().tell(new InitiateInstallSnapshot(), actor()); // Send heartbeat to follower whenever install snapshot is initiated. sendAppendEntriesToFollower(followerActor, followerLogInformation.getNextIndex(), Collections.emptyList(), followerId); + initiateCaptureSnapshot(followerId, followerNextIndex); + } else if(sendHeartbeat) { //we send an AppendEntries, even if the follower is inactive // in-order to update the followers timestamp, in case it becomes active again @@ -511,7 +506,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { AppendEntries appendEntries = new AppendEntries(currentTerm(), context.getId(), prevLogIndex(followerNextIndex), prevLogTerm(followerNextIndex), entries, - context.getCommitIndex(), replicatedToAllIndex); + context.getCommitIndex(), super.getReplicatedToAllIndex()); if(!entries.isEmpty()) { LOG.debug("{}: Sending AppendEntries to follower {}: {}", context.getId(), followerId, @@ -522,74 +517,55 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { } /** - * An installSnapshot is scheduled at a interval that is a multiple of - * a HEARTBEAT_INTERVAL. This is to avoid the need to check for installing - * snapshots at every heartbeat. - * * Install Snapshot works as follows - * 1. Leader sends a InitiateInstallSnapshot message to self - * 2. Leader then initiates the capture snapshot by sending a CaptureSnapshot message to actor - * 3. RaftActor on receipt of the CaptureSnapshotReply (from Shard), stores the received snapshot in the replicated log + * 1. Leader initiates the capture snapshot by sending a CaptureSnapshot message to actor + * 2. RaftActor on receipt of the CaptureSnapshotReply (from Shard), stores the received snapshot in the replicated log * and makes a call to Leader's handleMessage , with SendInstallSnapshot message. - * 4. Leader , picks the snapshot from im-mem ReplicatedLog and sends it in chunks to the Follower - * 5. On complete, Follower sends back a InstallSnapshotReply. - * 6. On receipt of the InstallSnapshotReply for the last chunk, Leader marks the install complete for that follower + * 3. Leader , picks the snapshot from im-mem ReplicatedLog and sends it in chunks to the Follower + * 4. On complete, Follower sends back a InstallSnapshotReply. + * 5. On receipt of the InstallSnapshotReply for the last chunk, Leader marks the install complete for that follower * and replenishes the memory by deleting the snapshot in Replicated log. - * + * 6. If another follower requires a snapshot and a snapshot has been collected (via CaptureSnapshotReply) + * then send the existing snapshot in chunks to the follower. + * @param followerId + * @param followerNextIndex */ - private void installSnapshotIfNeeded() { - if(LOG.isDebugEnabled()) { - LOG.debug("{}: installSnapshotIfNeeded, followers {}", context.getId(), followerToLog.keySet()); - } - - for (Entry e : followerToLog.entrySet()) { - final ActorSelection followerActor = context.getPeerActorSelection(e.getKey()); - - if (followerActor != null) { - long nextIndex = e.getValue().getNextIndex(); - - if (!context.getReplicatedLog().isPresent(nextIndex) && - context.getReplicatedLog().isInSnapshot(nextIndex)) { - LOG.info("{}: {} follower needs a snapshot install", context.getId(), e.getKey()); - if (snapshot.isPresent()) { - // if a snapshot is present in the memory, most likely another install is in progress - // no need to capture snapshot - sendSnapshotChunk(followerActor, e.getKey()); - - } else if (!context.isSnapshotCaptureInitiated()) { - initiateCaptureSnapshot(); - //we just need 1 follower who would need snapshot to be installed. - // when we have the snapshot captured, we would again check (in SendInstallSnapshot) - // who needs an install and send to all who need - break; - } + private void initiateCaptureSnapshot(String followerId, long followerNextIndex) { + if (!context.getReplicatedLog().isPresent(followerNextIndex) && + context.getReplicatedLog().isInSnapshot(followerNextIndex)) { + if (snapshot.isPresent()) { + // if a snapshot is present in the memory, most likely another install is in progress + // no need to capture snapshot. + // This could happen if another follower needs an install when one is going on. + final ActorSelection followerActor = context.getPeerActorSelection(followerId); + sendSnapshotChunk(followerActor, followerId); + + } else if (!context.isSnapshotCaptureInitiated()) { + + LOG.info("{}: Initiating Snapshot Capture to Install Snapshot, Leader:{}", context.getId(), getLeaderId()); + ReplicatedLogEntry lastAppliedEntry = context.getReplicatedLog().get(context.getLastApplied()); + long lastAppliedIndex = -1; + long lastAppliedTerm = -1; + + if (lastAppliedEntry != null) { + lastAppliedIndex = lastAppliedEntry.getIndex(); + lastAppliedTerm = lastAppliedEntry.getTerm(); + } else if (context.getReplicatedLog().getSnapshotIndex() > -1) { + lastAppliedIndex = context.getReplicatedLog().getSnapshotIndex(); + lastAppliedTerm = context.getReplicatedLog().getSnapshotTerm(); } - } - } - } - - // on every install snapshot, we try to capture the snapshot. - // Once a capture is going on, another one issued will get ignored by RaftActor. - private void initiateCaptureSnapshot() { - LOG.info("{}: Initiating Snapshot Capture to Install Snapshot, Leader:{}", context.getId(), getLeaderId()); - ReplicatedLogEntry lastAppliedEntry = context.getReplicatedLog().get(context.getLastApplied()); - long lastAppliedIndex = -1; - long lastAppliedTerm = -1; - if (lastAppliedEntry != null) { - lastAppliedIndex = lastAppliedEntry.getIndex(); - lastAppliedTerm = lastAppliedEntry.getTerm(); - } else if (context.getReplicatedLog().getSnapshotIndex() > -1) { - lastAppliedIndex = context.getReplicatedLog().getSnapshotIndex(); - lastAppliedTerm = context.getReplicatedLog().getSnapshotTerm(); + boolean isInstallSnapshotInitiated = true; + long replicatedToAllIndex = super.getReplicatedToAllIndex(); + ReplicatedLogEntry replicatedToAllEntry = context.getReplicatedLog().get(replicatedToAllIndex); + actor().tell(new CaptureSnapshot(lastIndex(), lastTerm(), lastAppliedIndex, lastAppliedTerm, + (replicatedToAllEntry != null ? replicatedToAllEntry.getIndex() : -1), + (replicatedToAllEntry != null ? replicatedToAllEntry.getTerm() : -1), + isInstallSnapshotInitiated), actor()); + context.setSnapshotCaptureInitiated(true); + } } - - boolean isInstallSnapshotInitiated = true; - actor().tell(new CaptureSnapshot(lastIndex(), lastTerm(), - lastAppliedIndex, lastAppliedTerm, isInstallSnapshotInitiated), - actor()); - context.setSnapshotCaptureInitiated(true); } @@ -626,8 +602,8 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { context.getReplicatedLog().getSnapshotIndex(), context.getReplicatedLog().getSnapshotTerm(), nextSnapshotChunk, - followerToSnapshot.incrementChunkIndex(), - followerToSnapshot.getTotalChunks(), + followerToSnapshot.incrementChunkIndex(), + followerToSnapshot.getTotalChunks(), Optional.of(followerToSnapshot.getLastChunkHashCode()) ).toSerializable(), actor() @@ -638,7 +614,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { followerToSnapshot.getTotalChunks()); } } catch (IOException e) { - LOG.error(e, "{}: InstallSnapshot failed for Leader.", context.getId()); + LOG.error("{}: InstallSnapshot failed for Leader.", context.getId(), e); } } @@ -661,7 +637,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { private void sendHeartBeat() { if (!followerToLog.isEmpty()) { - sendAppendEntries(); + sendAppendEntries(context.getConfigParams().getHeartBeatInterval().toMillis()); } }