X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2Fbehaviors%2FAbstractLeader.java;h=85f2f153ab3046d6541225c95d3863be3435b6b0;hp=8fb66998c71abc8cb45657928822d580f09ab753;hb=c68d251880d95d6d2f8df70c67d2cdd3a3a47685;hpb=a564647b197ef00124b9ae6aa00578dd73e27aa1 diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java index 8fb66998c7..85f2f153ab 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java @@ -26,6 +26,7 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import javax.annotation.Nullable; import org.opendaylight.controller.cluster.raft.ClientRequestTracker; import org.opendaylight.controller.cluster.raft.ClientRequestTrackerImpl; import org.opendaylight.controller.cluster.raft.FollowerLogInformation; @@ -33,6 +34,7 @@ import org.opendaylight.controller.cluster.raft.FollowerLogInformationImpl; import org.opendaylight.controller.cluster.raft.RaftActorContext; import org.opendaylight.controller.cluster.raft.RaftState; import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; +import org.opendaylight.controller.cluster.raft.Snapshot; import org.opendaylight.controller.cluster.raft.base.messages.Replicate; import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat; import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot; @@ -88,11 +90,13 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { protected final int minIsolatedLeaderPeerCount; - private Optional snapshot; + private Optional snapshot; public AbstractLeader(RaftActorContext context) { super(context, RaftState.Leader); + setLeaderPayloadVersion(context.getPayloadVersion()); + final Builder ftlBuilder = ImmutableMap.builder(); for (String followerId : context.getPeerAddresses().keySet()) { FollowerLogInformation followerLogInformation = @@ -138,8 +142,12 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { } @VisibleForTesting - void setSnapshot(Optional snapshot) { - this.snapshot = snapshot; + void setSnapshot(@Nullable Snapshot snapshot) { + if(snapshot != null) { + this.snapshot = Optional.of(new SnapshotHolder(snapshot)); + } else { + this.snapshot = Optional.absent(); + } } @Override @@ -325,7 +333,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { context.getTermInformation().updateAndPersist(rpc.getTerm(), null); - return switchBehavior(new Follower(context)); + return internalSwitchBehavior(RaftState.Follower); } } @@ -337,7 +345,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { } else if(message instanceof SendInstallSnapshot) { // received from RaftActor - setSnapshot(Optional.of(((SendInstallSnapshot) message).getSnapshot())); + setSnapshot(((SendInstallSnapshot) message).getSnapshot()); sendInstallSnapshot(); } else if (message instanceof Replicate) { @@ -380,10 +388,9 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { ); } - followerLogInformation.setMatchIndex( - context.getReplicatedLog().getSnapshotIndex()); - followerLogInformation.setNextIndex( - context.getReplicatedLog().getSnapshotIndex() + 1); + long followerMatchIndex = snapshot.get().getLastIncludedIndex(); + followerLogInformation.setMatchIndex(followerMatchIndex); + followerLogInformation.setNextIndex(followerMatchIndex + 1); mapFollowerToSnapshot.remove(followerId); LOG.debug("{}: follower: {}, matchIndex set to {}, nextIndex set to {}", @@ -393,7 +400,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { if (mapFollowerToSnapshot.isEmpty()) { // once there are no pending followers receiving snapshots // we can remove snapshot from the memory - setSnapshot(Optional.absent()); + setSnapshot(null); } wasLastChunk = true; @@ -444,10 +451,15 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { logIndex) ); - if (followerToLog.isEmpty()) { + boolean applyModificationToState = followerToLog.isEmpty() + || context.getRaftPolicy().applyModificationToStateBeforeConsensus(); + + if(applyModificationToState){ context.setCommitIndex(logIndex); applyLogToStateMachine(logIndex); - } else { + } + + if (!followerToLog.isEmpty()) { sendAppendEntries(0, false); } } @@ -504,9 +516,12 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { LOG.debug("{}: sendAppendEntries: {} is present for follower {}", logName(), followerNextIndex, followerId); - // FIXME : Sending one entry at a time if(followerLogInformation.okToReplicate()) { - entries = context.getReplicatedLog().getFrom(followerNextIndex, 1); + // Try to send all the entries in the journal but not exceeding the max data size + // for a single AppendEntries message. + int maxEntries = (int) context.getReplicatedLog().size(); + entries = context.getReplicatedLog().getFrom(followerNextIndex, maxEntries, + context.getConfigParams().getSnapshotChunkSize()); sendAppendEntries = true; } } else if (isFollowerActive && followerNextIndex >= 0 && @@ -613,7 +628,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { private void sendSnapshotChunk(ActorSelection followerActor, String followerId) { try { if (snapshot.isPresent()) { - ByteString nextSnapshotChunk = getNextSnapshotChunk(followerId,snapshot.get()); + ByteString nextSnapshotChunk = getNextSnapshotChunk(followerId, snapshot.get().getSnapshotBytes()); // Note: the previous call to getNextSnapshotChunk has the side-effect of adding // followerId to the followerToSnapshot map. @@ -621,8 +636,8 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { followerActor.tell( new InstallSnapshot(currentTerm(), context.getId(), - context.getReplicatedLog().getSnapshotIndex(), - context.getReplicatedLog().getSnapshotTerm(), + snapshot.get().getLastIncludedIndex(), + snapshot.get().getLastIncludedTerm(), nextSnapshotChunk, followerToSnapshot.incrementChunkIndex(), followerToSnapshot.getTotalChunks(), @@ -866,4 +881,28 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { public int followerLogSize() { return followerToLog.size(); } + + private static class SnapshotHolder { + private final long lastIncludedTerm; + private final long lastIncludedIndex; + private final ByteString snapshotBytes; + + SnapshotHolder(Snapshot snapshot) { + this.lastIncludedTerm = snapshot.getLastAppliedTerm(); + this.lastIncludedIndex = snapshot.getLastAppliedIndex(); + this.snapshotBytes = ByteString.copyFrom(snapshot.getState()); + } + + long getLastIncludedTerm() { + return lastIncludedTerm; + } + + long getLastIncludedIndex() { + return lastIncludedIndex; + } + + ByteString getSnapshotBytes() { + return snapshotBytes; + } + } }