Bug-2397:Provide a mechanism for stakeholders to get notifications on Raft state...
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / RaftActor.java
index 2459c2ff8b1764d3cd3b56be90fc7ea5191d65b6..d647475e4d53319e1638c0e4c03fe4dfcd58a891 100644 (file)
@@ -18,11 +18,15 @@ import akka.persistence.SaveSnapshotFailure;
 import akka.persistence.SaveSnapshotSuccess;
 import akka.persistence.SnapshotOffer;
 import akka.persistence.SnapshotSelectionCriteria;
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
 import com.google.common.base.Stopwatch;
 import com.google.protobuf.ByteString;
+import java.io.Serializable;
+import java.util.Map;
 import org.opendaylight.controller.cluster.DataPersistenceProvider;
 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActor;
+import org.opendaylight.controller.cluster.notifications.RoleChanged;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
@@ -30,19 +34,16 @@ import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
+import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
+import org.opendaylight.controller.cluster.raft.behaviors.AbstractRaftActorBehavior;
 import org.opendaylight.controller.cluster.raft.behaviors.Follower;
 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
-import org.opendaylight.controller.cluster.raft.client.messages.AddRaftPeer;
 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
-import org.opendaylight.controller.cluster.raft.client.messages.RemoveRaftPeer;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
 import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages;
 
-import java.io.Serializable;
-import java.util.Map;
-
 /**
  * RaftActor encapsulates a state machine that needs to be kept synchronized
  * in a cluster. It implements the RAFT algorithm as described in the paper
@@ -168,8 +169,10 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
                 deleteSnapshots(new SnapshotSelectionCriteria(scala.Long.MaxValue(), scala.Long.MaxValue()));
 
                 onRecoveryComplete();
+
+                RaftActorBehavior oldBehavior = currentBehavior;
                 currentBehavior = new Follower(context);
-                onStateChanged();
+                handleBehaviorChange(oldBehavior, currentBehavior);
             }
         }
     }
@@ -268,8 +271,9 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
             replicatedLog.lastIndex(), replicatedLog.snapshotIndex,
             replicatedLog.snapshotTerm, replicatedLog.size());
 
+        RaftActorBehavior oldBehavior = currentBehavior;
         currentBehavior = new Follower(context);
-        onStateChanged();
+        handleBehaviorChange(oldBehavior, currentBehavior);
     }
 
     @Override public void handleCommand(Object message) {
@@ -340,20 +344,6 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
                 context.getReplicatedLog().getSnapshotTerm(),
                 context.getReplicatedLog().size());
 
-        } else if (message instanceof AddRaftPeer){
-
-            // FIXME : Do not add raft peers like this.
-            // When adding a new Peer we have to ensure that the a majority of
-            // the peers know about the new Peer. Doing it this way may cause
-            // a situation where multiple Leaders may emerge
-            AddRaftPeer arp = (AddRaftPeer)message;
-           context.addToPeers(arp.getName(), arp.getAddress());
-
-        } else if (message instanceof RemoveRaftPeer){
-
-            RemoveRaftPeer rrp = (RemoveRaftPeer)message;
-            context.removePeer(rrp.getName());
-
         } else if (message instanceof CaptureSnapshot) {
             LOG.info("CaptureSnapshot received by actor");
             CaptureSnapshot cs = (CaptureSnapshot)message;
@@ -379,25 +369,26 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
             RaftActorBehavior oldBehavior = currentBehavior;
             currentBehavior = currentBehavior.handleMessage(getSender(), message);
 
-            if(oldBehavior != currentBehavior){
-                onStateChanged();
-            }
-
-            onLeaderChanged(oldBehavior.getLeaderId(), currentBehavior.getLeaderId());
+            handleBehaviorChange(oldBehavior, currentBehavior);
         }
     }
 
-    public java.util.Set<String> getPeers() {
-        return context.getPeerAddresses().keySet();
-    }
+    private void handleBehaviorChange(RaftActorBehavior oldBehavior, RaftActorBehavior currentBehavior) {
+        if (oldBehavior != currentBehavior){
+            onStateChanged();
+        }
+        if (oldBehavior != null) {
+            // it can happen that the state has not changed but the leader has changed.
+            onLeaderChanged(oldBehavior.getLeaderId(), currentBehavior.getLeaderId());
 
-    protected String getReplicatedLogState() {
-        return "snapshotIndex=" + context.getReplicatedLog().getSnapshotIndex()
-            + ", snapshotTerm=" + context.getReplicatedLog().getSnapshotTerm()
-            + ", im-mem journal size=" + context.getReplicatedLog().size();
+            if (getRoleChangeNotifier().isPresent() && oldBehavior.state() != currentBehavior.state()) {
+                // we do not want to notify when the behavior/role is set for the first time (i.e follower)
+                getRoleChangeNotifier().get().tell(new RoleChanged(getId(), oldBehavior.state().name(),
+                    currentBehavior.state().name()), getSelf());
+            }
+        }
     }
 
-
     /**
      * When a derived RaftActor needs to persist something it must call
      * persistData.
@@ -590,6 +581,12 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
 
     protected abstract DataPersistenceProvider persistence();
 
+    /**
+     * Notifier Actor for this RaftActor to notify when a role change happens
+     * @return ActorRef - ActorRef of the notifier or Optional.absent if none.
+     */
+    protected abstract Optional<ActorRef> getRoleChangeNotifier();
+
     protected void onLeaderChanged(String oldLeader, String newLeader){};
 
     private void trimPersistentData(long sequenceNumber) {
@@ -636,7 +633,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         //be greedy and remove entries from in-mem journal which are in the snapshot
         // and update snapshotIndex and snapshotTerm without waiting for the success,
 
-        context.getReplicatedLog().snapshotPreCommit(stateInBytes,
+        context.getReplicatedLog().snapshotPreCommit(
             captureSnapshot.getLastAppliedIndex(),
             captureSnapshot.getLastAppliedTerm());
 
@@ -644,16 +641,19 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
             "and term:{}", captureSnapshot.getLastAppliedIndex(),
             captureSnapshot.getLastAppliedTerm());
 
+        if (isLeader() && captureSnapshot.isInstallSnapshotInitiated()) {
+            // this would be call straight to the leader and won't initiate in serialization
+            currentBehavior.handleMessage(getSelf(), new SendInstallSnapshot(stateInBytes));
+        }
+
         captureSnapshot = null;
         hasSnapshotCaptureInitiated = false;
     }
 
-
     private class ReplicatedLogImpl extends AbstractReplicatedLogImpl {
 
         public ReplicatedLogImpl(Snapshot snapshot) {
-            super(ByteString.copyFrom(snapshot.getState()),
-                snapshot.getLastAppliedIndex(), snapshot.getLastAppliedTerm(),
+            super(snapshot.getLastAppliedIndex(), snapshot.getLastAppliedTerm(),
                 snapshot.getUnAppliedEntries());
         }
 
@@ -748,9 +748,9 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
     }
 
     static class DeleteEntries implements Serializable {
+        private static final long serialVersionUID = 1L;
         private final int fromIndex;
 
-
         public DeleteEntries(int fromIndex) {
             this.fromIndex = fromIndex;
         }
@@ -801,6 +801,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
     }
 
     static class UpdateElectionTerm implements Serializable {
+        private static final long serialVersionUID = 1L;
         private final long currentTerm;
         private final String votedFor;
 
@@ -843,4 +844,13 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         }
     }
 
+    @VisibleForTesting
+    void setCurrentBehavior(AbstractRaftActorBehavior behavior) {
+        currentBehavior = behavior;
+    }
+
+    protected RaftActorBehavior getCurrentBehavior() {
+        return currentBehavior;
+    }
+
 }