Remove DelegatingRaftActorBehavior
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / behaviors / Follower.java
index ec1642ec2aa2329c45e84b32a891bdb3533cb570..33ed3357d85eab27441c7eafdd6d494974d667eb 100644 (file)
@@ -9,6 +9,7 @@
 package org.opendaylight.controller.cluster.raft.behaviors;
 
 import akka.actor.ActorRef;
+import akka.japi.Procedure;
 import com.google.common.annotations.VisibleForTesting;
 import java.util.ArrayList;
 import org.opendaylight.controller.cluster.raft.RaftActorContext;
@@ -37,26 +38,33 @@ import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
  * </ul>
  */
 public class Follower extends AbstractRaftActorBehavior {
-
-    private SnapshotTracker snapshotTracker = null;
+    private static final int SYNC_THRESHOLD = 10;
 
     private final SyncStatusTracker initialSyncStatusTracker;
 
-    private static final int SYNC_THRESHOLD = 10;
+    private final Procedure<ReplicatedLogEntry> appendAndPersistCallback = new Procedure<ReplicatedLogEntry>() {
+        @Override
+        public void apply(ReplicatedLogEntry logEntry) {
+            context.getReplicatedLog().captureSnapshotIfReady(logEntry);
+        }
+    };
+
+    private SnapshotTracker snapshotTracker = null;
 
     public Follower(RaftActorContext context) {
-        this(context, null);
+        this(context, null, (short)-1);
     }
 
-    public Follower(RaftActorContext context, String initialLeaderId) {
+    public Follower(RaftActorContext context, String initialLeaderId, short initialLeaderPayloadVersion) {
         super(context, RaftState.Follower);
         leaderId = initialLeaderId;
+        setLeaderPayloadVersion(initialLeaderPayloadVersion);
 
         initialSyncStatusTracker = new SyncStatusTracker(context.getActor(), getId(), SYNC_THRESHOLD);
 
         if(canStartElection()) {
             if (context.getPeerIds().isEmpty() && getLeaderId() == null) {
-                actor().tell(ELECTION_TIMEOUT, actor());
+                actor().tell(ElectionTimeout.INSTANCE, actor());
             } else {
                 scheduleElection(electionDuration());
             }
@@ -194,7 +202,7 @@ public class Follower extends AbstractRaftActorBehavior {
 
                 LOG.debug("{}: Append entry to log {}", logName(), entry.getData());
 
-                context.getReplicatedLog().appendAndPersist(entry);
+                context.getReplicatedLog().appendAndPersist(entry, appendAndPersistCallback);
 
                 if(entry.getData() instanceof ServerConfigurationPayload) {
                     context.updatePeerIds((ServerConfigurationPayload)entry.getData());
@@ -353,9 +361,7 @@ public class Follower extends AbstractRaftActorBehavior {
 
     private void handleInstallSnapshot(final ActorRef sender, InstallSnapshot installSnapshot) {
 
-        LOG.debug("{}: InstallSnapshot received from leader {}, datasize: {} , Chunk: {}/{}",
-                    logName(), installSnapshot.getLeaderId(), installSnapshot.getData().size(),
-                    installSnapshot.getChunkIndex(), installSnapshot.getTotalChunks());
+        LOG.debug("{}: handleInstallSnapshot: {}", logName(), installSnapshot);
 
         leaderId = installSnapshot.getLeaderId();