X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2Fbehaviors%2FAbstractLeader.java;h=9b6c08857a50d50e81065c52c39c6c5469687ad9;hb=a8cdfe15e97b0ca8f683a2d0aed1b37ab15618e0;hp=68cf5027dff1244d145e93dac28f2680ed9ee20a;hpb=d1daa68ccdb73666524ae3673ee24361530b285b;p=controller.git diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java index 68cf5027df..9b6c08857a 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java @@ -26,7 +26,6 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Map.Entry; -import java.util.concurrent.TimeUnit; import org.opendaylight.controller.cluster.raft.ClientRequestTracker; import org.opendaylight.controller.cluster.raft.ClientRequestTrackerImpl; import org.opendaylight.controller.cluster.raft.FollowerLogInformation; @@ -129,7 +128,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { // Upon election: send initial empty AppendEntries RPCs // (heartbeat) to each server; repeat during idle periods to // prevent election timeouts (§5.2) - scheduleHeartBeat(new FiniteDuration(0, TimeUnit.SECONDS)); + sendAppendEntries(0); } /** @@ -346,6 +345,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { followerLogInformation.markFollowerActive(); if (followerToSnapshot.getChunkIndex() == reply.getChunkIndex()) { + boolean wasLastChunk = false; if (reply.isSuccess()) { if(followerToSnapshot.isLastChunk(reply.getChunkIndex())) { //this was the last chunk reply @@ -373,6 +373,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { // we can remove snapshot from the memory setSnapshot(Optional.absent()); } + wasLastChunk = true; } else { followerToSnapshot.markSendStatus(true); @@ -383,6 +384,14 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { followerToSnapshot.markSendStatus(false); } + + if (!wasLastChunk && followerToSnapshot.canSendNextChunk()) { + ActorSelection followerActor = context.getPeerActorSelection(followerId); + if(followerActor != null) { + sendSnapshotChunk(followerActor, followerId); + } + } + } else { LOG.error("{}: Chunk index {} in InstallSnapshotReply from follower {} does not match expected index {}", context.getId(), reply.getChunkIndex(), followerId, @@ -415,18 +424,18 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { context.setCommitIndex(logIndex); applyLogToStateMachine(logIndex); } else { - sendAppendEntries(); + sendAppendEntries(0); } } - private void sendAppendEntries() { + private void sendAppendEntries(long timeSinceLastActivityInterval) { // Send an AppendEntries to all followers - long heartbeatInterval = context.getConfigParams().getHeartBeatInterval().toMillis(); for (Entry e : followerToLog.entrySet()) { final String followerId = e.getKey(); final FollowerLogInformation followerLogInformation = e.getValue(); // This checks helps not to send a repeat message to the follower - if(followerLogInformation.timeSinceLastActivity() >= heartbeatInterval) { + if(!followerLogInformation.isFollowerActive() || + followerLogInformation.timeSinceLastActivity() >= timeSinceLastActivityInterval) { sendUpdatesToFollower(followerId, followerLogInformation, true); } } @@ -546,7 +555,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { // no need to capture snapshot sendSnapshotChunk(followerActor, e.getKey()); - } else { + } else if (!context.isSnapshotCaptureInitiated()) { initiateCaptureSnapshot(); //we just need 1 follower who would need snapshot to be installed. // when we have the snapshot captured, we would again check (in SendInstallSnapshot) @@ -579,6 +588,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { actor().tell(new CaptureSnapshot(lastIndex(), lastTerm(), lastAppliedIndex, lastAppliedTerm, isInstallSnapshotInitiated), actor()); + context.setSnapshotCaptureInitiated(true); } @@ -615,8 +625,8 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { context.getReplicatedLog().getSnapshotIndex(), context.getReplicatedLog().getSnapshotTerm(), nextSnapshotChunk, - followerToSnapshot.incrementChunkIndex(), - followerToSnapshot.getTotalChunks(), + followerToSnapshot.incrementChunkIndex(), + followerToSnapshot.getTotalChunks(), Optional.of(followerToSnapshot.getLastChunkHashCode()) ).toSerializable(), actor() @@ -627,7 +637,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { followerToSnapshot.getTotalChunks()); } } catch (IOException e) { - LOG.error(e, "{}: InstallSnapshot failed for Leader.", context.getId()); + LOG.error("{}: InstallSnapshot failed for Leader.", context.getId(), e); } } @@ -650,7 +660,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { private void sendHeartBeat() { if (!followerToLog.isEmpty()) { - sendAppendEntries(); + sendAppendEntries(context.getConfigParams().getHeartBeatInterval().toMillis()); } }