[Fix for Bug 1631]
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / RaftActor.java
index 64fa7496042466e58bd51cf0a488c265898866da..66a46ef3bde0ca8a8d1fa8fe1056ccb463a594d6 100644 (file)
@@ -29,9 +29,7 @@ import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
-import org.opendaylight.controller.cluster.raft.behaviors.Candidate;
 import org.opendaylight.controller.cluster.raft.behaviors.Follower;
-import org.opendaylight.controller.cluster.raft.behaviors.Leader;
 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
 import org.opendaylight.controller.cluster.raft.client.messages.AddRaftPeer;
 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
@@ -159,7 +157,9 @@ public abstract class RaftActor extends UntypedPersistentActor {
     }
 
     private void onRecoveredSnapshot(SnapshotOffer offer) {
-        LOG.debug("SnapshotOffer called..");
+        if(LOG.isDebugEnabled()) {
+            LOG.debug("SnapshotOffer called..");
+        }
 
         initRecoveryTimer();
 
@@ -250,7 +250,7 @@ public abstract class RaftActor extends UntypedPersistentActor {
             replicatedLog.lastIndex(), replicatedLog.snapshotIndex,
             replicatedLog.snapshotTerm, replicatedLog.size());
 
-        currentBehavior = switchBehavior(RaftState.Follower);
+        currentBehavior = new Follower(context);
         onStateChanged();
     }
 
@@ -355,14 +355,13 @@ public abstract class RaftActor extends UntypedPersistentActor {
             if (!(message instanceof AppendEntriesMessages.AppendEntries)
                 && !(message instanceof AppendEntriesReply) && !(message instanceof SendHeartBeat)) {
                 if(LOG.isDebugEnabled()) {
-                    LOG.debug("onReceiveCommand: message:" + message.getClass());
+                    LOG.debug("onReceiveCommand: message: {}", message.getClass());
                 }
             }
 
-            RaftState state =
-                currentBehavior.handleMessage(getSender(), message);
             RaftActorBehavior oldBehavior = currentBehavior;
-            currentBehavior = switchBehavior(state);
+            currentBehavior = currentBehavior.handleMessage(getSender(), message);
+
             if(oldBehavior != currentBehavior){
                 onStateChanged();
             }
@@ -569,38 +568,6 @@ public abstract class RaftActor extends UntypedPersistentActor {
 
     protected void onLeaderChanged(String oldLeader, String newLeader){};
 
-    private RaftActorBehavior switchBehavior(RaftState state) {
-        if (currentBehavior != null) {
-            if (currentBehavior.state() == state) {
-                return currentBehavior;
-            }
-            LOG.info("Switching from state " + currentBehavior.state() + " to "
-                + state);
-
-            try {
-                currentBehavior.close();
-            } catch (Exception e) {
-                LOG.error(e,
-                    "Failed to close behavior : " + currentBehavior.state());
-            }
-
-        } else {
-            LOG.info("Switching behavior to " + state);
-        }
-        RaftActorBehavior behavior = null;
-        if (state == RaftState.Candidate) {
-            behavior = new Candidate(context);
-        } else if (state == RaftState.Follower) {
-            behavior = new Follower(context);
-        } else {
-            behavior = new Leader(context);
-        }
-
-
-
-        return behavior;
-    }
-
     private void trimPersistentData(long sequenceNumber) {
         // Trim akka snapshots
         // FIXME : Not sure how exactly the SnapshotSelectionCriteria is applied
@@ -622,8 +589,8 @@ public abstract class RaftActor extends UntypedPersistentActor {
         }
         String peerAddress = context.getPeerAddress(leaderId);
         if(LOG.isDebugEnabled()) {
-            LOG.debug("getLeaderAddress leaderId = " + leaderId + " peerAddress = "
-                + peerAddress);
+            LOG.debug("getLeaderAddress leaderId = {} peerAddress = {}",
+                    leaderId, peerAddress);
         }
 
         return peerAddress;
@@ -697,8 +664,11 @@ public abstract class RaftActor extends UntypedPersistentActor {
         public void appendAndPersist(final ActorRef clientActor,
             final String identifier,
             final ReplicatedLogEntry replicatedLogEntry) {
-            context.getLogger().debug(
-                "Append log entry and persist {} ", replicatedLogEntry);
+
+            if(LOG.isDebugEnabled()) {
+                LOG.debug("Append log entry and persist {} ", replicatedLogEntry);
+            }
+
             // FIXME : By adding the replicated log entry to the in-memory journal we are not truly ensuring durability of the logs
             journal.add(replicatedLogEntry);