Merge "Fix config-manager activator"
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / RaftActor.java
index 36c86f542d398f43b58b6236d8095b8f1ddcf4a9..0a979d24eef2a26d60cb6168c18d84815037cf18 100644 (file)
@@ -22,6 +22,7 @@ import akka.persistence.UntypedPersistentActor;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
+import com.google.common.base.Optional;
 import org.opendaylight.controller.cluster.raft.behaviors.Candidate;
 import org.opendaylight.controller.cluster.raft.behaviors.Follower;
 import org.opendaylight.controller.cluster.raft.behaviors.Leader;
@@ -80,8 +81,6 @@ public abstract class RaftActor extends UntypedPersistentActor {
     protected final LoggingAdapter LOG =
         Logging.getLogger(getContext().system(), this);
 
-    private static final int SNAPSHOT_ENTRY_COUNT = 100000;
-
     /**
      * The current state determines the current behavior of a RaftActor
      * A Raft Actor always starts off in the Follower State
@@ -101,10 +100,17 @@ public abstract class RaftActor extends UntypedPersistentActor {
 
 
     public RaftActor(String id, Map<String, String> peerAddresses) {
+        this(id, peerAddresses, Optional.<ConfigParams>absent());
+    }
+
+    public RaftActor(String id, Map<String, String> peerAddresses,
+         Optional<ConfigParams> configParams) {
+
         context = new RaftActorContextImpl(this.getSelf(),
-            this.getContext(),
-            id, new ElectionTermImpl(),
-            -1, -1, replicatedLog, peerAddresses, LOG);
+            this.getContext(), id, new ElectionTermImpl(),
+            -1, -1, replicatedLog, peerAddresses,
+            (configParams.isPresent() ? configParams.get(): new DefaultConfigParamsImpl()),
+            LOG);
     }
 
     @Override public void onReceiveRecover(Object message) {
@@ -142,6 +148,7 @@ public abstract class RaftActor extends UntypedPersistentActor {
                 replicatedLog.lastIndex(), replicatedLog.snapshotIndex,
                 replicatedLog.snapshotTerm, replicatedLog.size());
             currentBehavior = switchBehavior(RaftState.Follower);
+            onStateChanged();
         }
     }
 
@@ -200,7 +207,11 @@ public abstract class RaftActor extends UntypedPersistentActor {
 
             RaftState state =
                 currentBehavior.handleMessage(getSender(), message);
+            RaftActorBehavior oldBehavior = currentBehavior;
             currentBehavior = switchBehavior(state);
+            if(oldBehavior != currentBehavior){
+                onStateChanged();
+            }
         }
     }
 
@@ -265,9 +276,21 @@ public abstract class RaftActor extends UntypedPersistentActor {
         String peerAddress = context.getPeerAddress(leaderId);
         LOG.debug("getLeader leaderId = " + leaderId + " peerAddress = "
             + peerAddress);
+
+        if(peerAddress == null){
+            return null;
+        }
         return context.actorSelection(peerAddress);
     }
 
+    /**
+     *
+     * @return the current leader's id
+     */
+    protected String getLeaderId(){
+        return currentBehavior.getLeaderId();
+    }
+
     protected RaftState getRaftState() {
         return currentBehavior.state();
     }
@@ -335,6 +358,13 @@ public abstract class RaftActor extends UntypedPersistentActor {
      */
     protected abstract void applySnapshot(Object snapshot);
 
+    /**
+     * This method will be called by the RaftActor when the state of the
+     * RaftActor changes. The derived actor can then use methods like
+     * isLeader or getLeader to do something useful
+     */
+    protected abstract void onStateChanged();
+
     private RaftActorBehavior switchBehavior(RaftState state) {
         if (currentBehavior != null) {
             if (currentBehavior.state() == state) {
@@ -361,6 +391,9 @@ public abstract class RaftActor extends UntypedPersistentActor {
         } else {
             behavior = new Leader(context);
         }
+
+
+
         return behavior;
     }
 
@@ -369,7 +402,7 @@ public abstract class RaftActor extends UntypedPersistentActor {
         // FIXME : Not sure how exactly the SnapshotSelectionCriteria is applied
         // For now guessing that it is ANDed.
         deleteSnapshots(new SnapshotSelectionCriteria(
-            sequenceNumber - SNAPSHOT_ENTRY_COUNT, 43200000));
+            sequenceNumber - context.getConfigParams().getSnapshotBatchCount(), 43200000));
 
         // Trim akka journal
         deleteMessages(sequenceNumber);
@@ -429,7 +462,7 @@ public abstract class RaftActor extends UntypedPersistentActor {
                 new Procedure<ReplicatedLogEntry>() {
                     public void apply(ReplicatedLogEntry evt) throws Exception {
                         // FIXME : Tentatively create a snapshot every hundred thousand entries. To be tuned.
-                        if (journal.size() > SNAPSHOT_ENTRY_COUNT) {
+                        if (journal.size() > context.getConfigParams().getSnapshotBatchCount()) {
                             LOG.info("Initiating Snapshot Capture..");
                             long lastAppliedIndex = -1;
                             long lastAppliedTerm = -1;