BUG 1623 - Clustering : Parsing Error thrown on startup
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / RaftActor.java
index 36c86f542d398f43b58b6236d8095b8f1ddcf4a9..27e8f34d327dd5ebe9e1b1ca8a9d8e6ea1b6ceba 100644 (file)
@@ -22,6 +22,7 @@ import akka.persistence.UntypedPersistentActor;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
+import com.google.common.base.Optional;
 import org.opendaylight.controller.cluster.raft.behaviors.Candidate;
 import org.opendaylight.controller.cluster.raft.behaviors.Follower;
 import org.opendaylight.controller.cluster.raft.behaviors.Leader;
@@ -80,8 +81,6 @@ public abstract class RaftActor extends UntypedPersistentActor {
     protected final LoggingAdapter LOG =
         Logging.getLogger(getContext().system(), this);
 
-    private static final int SNAPSHOT_ENTRY_COUNT = 100000;
-
     /**
      * The current state determines the current behavior of a RaftActor
      * A Raft Actor always starts off in the Follower State
@@ -101,10 +100,17 @@ public abstract class RaftActor extends UntypedPersistentActor {
 
 
     public RaftActor(String id, Map<String, String> peerAddresses) {
+        this(id, peerAddresses, Optional.<ConfigParams>absent());
+    }
+
+    public RaftActor(String id, Map<String, String> peerAddresses,
+         Optional<ConfigParams> configParams) {
+
         context = new RaftActorContextImpl(this.getSelf(),
-            this.getContext(),
-            id, new ElectionTermImpl(),
-            -1, -1, replicatedLog, peerAddresses, LOG);
+            this.getContext(), id, new ElectionTermImpl(),
+            -1, -1, replicatedLog, peerAddresses,
+            (configParams.isPresent() ? configParams.get(): new DefaultConfigParamsImpl()),
+            LOG);
     }
 
     @Override public void onReceiveRecover(Object message) {
@@ -142,6 +148,7 @@ public abstract class RaftActor extends UntypedPersistentActor {
                 replicatedLog.lastIndex(), replicatedLog.snapshotIndex,
                 replicatedLog.snapshotTerm, replicatedLog.size());
             currentBehavior = switchBehavior(RaftState.Follower);
+            onStateChanged();
         }
     }
 
@@ -161,8 +168,7 @@ public abstract class RaftActor extends UntypedPersistentActor {
 
         } else if (message instanceof FindLeader) {
             getSender().tell(
-                new FindLeaderReply(
-                    context.getPeerAddress(currentBehavior.getLeaderId())),
+                new FindLeaderReply(getLeaderAddress()),
                 getSelf()
             );
 
@@ -176,12 +182,6 @@ public abstract class RaftActor extends UntypedPersistentActor {
 
             // TODO: Handle failure in saving the snapshot
 
-        } else if (message instanceof FindLeader){
-
-            getSender().tell(new FindLeaderReply(
-                context.getPeerAddress(currentBehavior.getLeaderId())),
-                getSelf());
-
         } else if (message instanceof AddRaftPeer){
 
             // FIXME : Do not add raft peers like this.
@@ -200,7 +200,11 @@ public abstract class RaftActor extends UntypedPersistentActor {
 
             RaftState state =
                 currentBehavior.handleMessage(getSender(), message);
+            RaftActorBehavior oldBehavior = currentBehavior;
             currentBehavior = switchBehavior(state);
+            if(oldBehavior != currentBehavior){
+                onStateChanged();
+            }
         }
     }
 
@@ -258,20 +262,43 @@ public abstract class RaftActor extends UntypedPersistentActor {
      * @return A reference to the leader if known, null otherwise
      */
     protected ActorSelection getLeader(){
-        String leaderId = currentBehavior.getLeaderId();
-        if (leaderId == null) {
+        String leaderAddress = getLeaderAddress();
+
+        if(leaderAddress == null){
             return null;
         }
-        String peerAddress = context.getPeerAddress(leaderId);
-        LOG.debug("getLeader leaderId = " + leaderId + " peerAddress = "
-            + peerAddress);
-        return context.actorSelection(peerAddress);
+
+        return context.actorSelection(leaderAddress);
+    }
+
+    /**
+     *
+     * @return the current leader's id
+     */
+    protected String getLeaderId(){
+        return currentBehavior.getLeaderId();
     }
 
     protected RaftState getRaftState() {
         return currentBehavior.state();
     }
 
+    protected ReplicatedLogEntry getLastLogEntry() {
+        return replicatedLog.last();
+    }
+
+    protected Long getCurrentTerm(){
+        return context.getTermInformation().getCurrentTerm();
+    }
+
+    protected Long getCommitIndex(){
+        return context.getCommitIndex();
+    }
+
+    protected Long getLastApplied(){
+        return context.getLastApplied();
+    }
+
     /**
      * setPeerAddress sets the address of a known peer at a later time.
      * <p>
@@ -335,6 +362,13 @@ public abstract class RaftActor extends UntypedPersistentActor {
      */
     protected abstract void applySnapshot(Object snapshot);
 
+    /**
+     * This method will be called by the RaftActor when the state of the
+     * RaftActor changes. The derived actor can then use methods like
+     * isLeader or getLeader to do something useful
+     */
+    protected abstract void onStateChanged();
+
     private RaftActorBehavior switchBehavior(RaftState state) {
         if (currentBehavior != null) {
             if (currentBehavior.state() == state) {
@@ -361,6 +395,9 @@ public abstract class RaftActor extends UntypedPersistentActor {
         } else {
             behavior = new Leader(context);
         }
+
+
+
         return behavior;
     }
 
@@ -369,12 +406,27 @@ public abstract class RaftActor extends UntypedPersistentActor {
         // FIXME : Not sure how exactly the SnapshotSelectionCriteria is applied
         // For now guessing that it is ANDed.
         deleteSnapshots(new SnapshotSelectionCriteria(
-            sequenceNumber - SNAPSHOT_ENTRY_COUNT, 43200000));
+            sequenceNumber - context.getConfigParams().getSnapshotBatchCount(), 43200000));
 
         // Trim akka journal
         deleteMessages(sequenceNumber);
     }
 
+    private String getLeaderAddress(){
+        if(isLeader()){
+            return getSelf().path().toString();
+        }
+        String leaderId = currentBehavior.getLeaderId();
+        if (leaderId == null) {
+            return null;
+        }
+        String peerAddress = context.getPeerAddress(leaderId);
+        LOG.debug("getLeaderAddress leaderId = " + leaderId + " peerAddress = "
+            + peerAddress);
+
+        return peerAddress;
+    }
+
 
     private class ReplicatedLogImpl extends AbstractReplicatedLogImpl {
 
@@ -429,7 +481,7 @@ public abstract class RaftActor extends UntypedPersistentActor {
                 new Procedure<ReplicatedLogEntry>() {
                     public void apply(ReplicatedLogEntry evt) throws Exception {
                         // FIXME : Tentatively create a snapshot every hundred thousand entries. To be tuned.
-                        if (journal.size() > SNAPSHOT_ENTRY_COUNT) {
+                        if (journal.size() > context.getConfigParams().getSnapshotBatchCount()) {
                             LOG.info("Initiating Snapshot Capture..");
                             long lastAppliedIndex = -1;
                             long lastAppliedTerm = -1;
@@ -569,7 +621,7 @@ public abstract class RaftActor extends UntypedPersistentActor {
         }
 
         @Override public void update(long currentTerm, String votedFor) {
-            LOG.info("Set currentTerm={}, votedFor={}", currentTerm, votedFor);
+            LOG.debug("Set currentTerm={}, votedFor={}", currentTerm, votedFor);
 
             this.currentTerm = currentTerm;
             this.votedFor = votedFor;