X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2FRaftActor.java;h=f02b52beb9aa2c6aa14bbd5cc71dae725690f817;hb=2b0c99463883b10d5eacdec901d7543d5815a54f;hp=2459c2ff8b1764d3cd3b56be90fc7ea5191d65b6;hpb=1a8ace932ef318601c17e0fa52b0a9dbd02e3005;p=controller.git diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java index 2459c2ff8b..f02b52beb9 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java @@ -18,6 +18,7 @@ import akka.persistence.SaveSnapshotFailure; import akka.persistence.SaveSnapshotSuccess; import akka.persistence.SnapshotOffer; import akka.persistence.SnapshotSelectionCriteria; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Optional; import com.google.common.base.Stopwatch; import com.google.protobuf.ByteString; @@ -30,6 +31,8 @@ import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot; import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply; import org.opendaylight.controller.cluster.raft.base.messages.Replicate; import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat; +import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot; +import org.opendaylight.controller.cluster.raft.behaviors.AbstractRaftActorBehavior; import org.opendaylight.controller.cluster.raft.behaviors.Follower; import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior; import org.opendaylight.controller.cluster.raft.client.messages.AddRaftPeer; @@ -388,6 +391,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { } public java.util.Set getPeers() { + return context.getPeerAddresses().keySet(); } @@ -636,7 +640,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { //be greedy and remove entries from in-mem journal which are in the snapshot // and update snapshotIndex and snapshotTerm without waiting for the success, - context.getReplicatedLog().snapshotPreCommit(stateInBytes, + context.getReplicatedLog().snapshotPreCommit( captureSnapshot.getLastAppliedIndex(), captureSnapshot.getLastAppliedTerm()); @@ -644,16 +648,19 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { "and term:{}", captureSnapshot.getLastAppliedIndex(), captureSnapshot.getLastAppliedTerm()); + if (isLeader() && captureSnapshot.isInstallSnapshotInitiated()) { + // this would be call straight to the leader and won't initiate in serialization + currentBehavior.handleMessage(getSelf(), new SendInstallSnapshot(stateInBytes)); + } + captureSnapshot = null; hasSnapshotCaptureInitiated = false; } - private class ReplicatedLogImpl extends AbstractReplicatedLogImpl { public ReplicatedLogImpl(Snapshot snapshot) { - super(ByteString.copyFrom(snapshot.getState()), - snapshot.getLastAppliedIndex(), snapshot.getLastAppliedTerm(), + super(snapshot.getLastAppliedIndex(), snapshot.getLastAppliedTerm(), snapshot.getUnAppliedEntries()); } @@ -843,4 +850,12 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { } } + @VisibleForTesting + void setCurrentBehavior(AbstractRaftActorBehavior behavior) { + currentBehavior = behavior; + } + + protected RaftActorBehavior getCurrentBehavior() { + return currentBehavior; + } }