[Fix for Bug 1631] 65/11965/5
authormark.mozolewski <mark.mozolewski@hp.com>
Tue, 14 Oct 2014 23:05:57 +0000 (16:05 -0700)
committerMark Mozolewski <mark.mozolewski@hp.com>
Thu, 23 Oct 2014 17:51:47 +0000 (17:51 +0000)
- Refactored RaftActorBehavior#handleMessage (and related methods) to return RaftActorBehavior
  instead of RaftActorState.
- Moved behavior switching from RaftActor based on requested state to the implementations of the
  behaviors that should control that switch (Leader/Follower/Candidate) based on the type of
  message received.
- Pull up logger reference for concrete RAFT behaviors to parent for a common LOG reference.
- Updated all logger calls to:
  - Use substitution instead of String concatenation.
- Debug logger calls are gated by isDebugEnabled check.
- Correctly use Akka logging API for stack trace on exception.
- Avoid toString() calls during logging.
- Use assigned logger instance (LOG) instead of call to context.getLogger()

Change-Id: I376e21734a0540aae714fe9ef1562d74e73e558b
Signed-off-by: mark.mozolewski <mark.mozolewski@hp.com>
12 files changed:
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Candidate.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/RaftActorBehavior.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehaviorTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/CandidateTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java

index 04df778..97b912e 100644 (file)
@@ -100,7 +100,7 @@ public class ExampleActor extends RaftActor {
         try {
             bs = fromObject(state);
         } catch (Exception e) {
-            LOG.error("Exception in creating snapshot", e);
+            LOG.error(e, "Exception in creating snapshot");
         }
         getSelf().tell(new CaptureSnapshotReply(bs), null);
     }
@@ -110,10 +110,10 @@ public class ExampleActor extends RaftActor {
         try {
             state.putAll((HashMap) toObject(snapshot));
         } catch (Exception e) {
-           LOG.error("Exception in applying snapshot", e);
+           LOG.error(e, "Exception in applying snapshot");
         }
         if(LOG.isDebugEnabled()) {
-            LOG.debug("Snapshot applied to state :" + ((HashMap) state).size());
+            LOG.debug("Snapshot applied to state : {}", ((HashMap) state).size());
         }
     }
 
index 64fa749..66a46ef 100644 (file)
@@ -29,9 +29,7 @@ import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
-import org.opendaylight.controller.cluster.raft.behaviors.Candidate;
 import org.opendaylight.controller.cluster.raft.behaviors.Follower;
-import org.opendaylight.controller.cluster.raft.behaviors.Leader;
 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
 import org.opendaylight.controller.cluster.raft.client.messages.AddRaftPeer;
 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
@@ -159,7 +157,9 @@ public abstract class RaftActor extends UntypedPersistentActor {
     }
 
     private void onRecoveredSnapshot(SnapshotOffer offer) {
-        LOG.debug("SnapshotOffer called..");
+        if(LOG.isDebugEnabled()) {
+            LOG.debug("SnapshotOffer called..");
+        }
 
         initRecoveryTimer();
 
@@ -250,7 +250,7 @@ public abstract class RaftActor extends UntypedPersistentActor {
             replicatedLog.lastIndex(), replicatedLog.snapshotIndex,
             replicatedLog.snapshotTerm, replicatedLog.size());
 
-        currentBehavior = switchBehavior(RaftState.Follower);
+        currentBehavior = new Follower(context);
         onStateChanged();
     }
 
@@ -355,14 +355,13 @@ public abstract class RaftActor extends UntypedPersistentActor {
             if (!(message instanceof AppendEntriesMessages.AppendEntries)
                 && !(message instanceof AppendEntriesReply) && !(message instanceof SendHeartBeat)) {
                 if(LOG.isDebugEnabled()) {
-                    LOG.debug("onReceiveCommand: message:" + message.getClass());
+                    LOG.debug("onReceiveCommand: message: {}", message.getClass());
                 }
             }
 
-            RaftState state =
-                currentBehavior.handleMessage(getSender(), message);
             RaftActorBehavior oldBehavior = currentBehavior;
-            currentBehavior = switchBehavior(state);
+            currentBehavior = currentBehavior.handleMessage(getSender(), message);
+
             if(oldBehavior != currentBehavior){
                 onStateChanged();
             }
@@ -569,38 +568,6 @@ public abstract class RaftActor extends UntypedPersistentActor {
 
     protected void onLeaderChanged(String oldLeader, String newLeader){};
 
-    private RaftActorBehavior switchBehavior(RaftState state) {
-        if (currentBehavior != null) {
-            if (currentBehavior.state() == state) {
-                return currentBehavior;
-            }
-            LOG.info("Switching from state " + currentBehavior.state() + " to "
-                + state);
-
-            try {
-                currentBehavior.close();
-            } catch (Exception e) {
-                LOG.error(e,
-                    "Failed to close behavior : " + currentBehavior.state());
-            }
-
-        } else {
-            LOG.info("Switching behavior to " + state);
-        }
-        RaftActorBehavior behavior = null;
-        if (state == RaftState.Candidate) {
-            behavior = new Candidate(context);
-        } else if (state == RaftState.Follower) {
-            behavior = new Follower(context);
-        } else {
-            behavior = new Leader(context);
-        }
-
-
-
-        return behavior;
-    }
-
     private void trimPersistentData(long sequenceNumber) {
         // Trim akka snapshots
         // FIXME : Not sure how exactly the SnapshotSelectionCriteria is applied
@@ -622,8 +589,8 @@ public abstract class RaftActor extends UntypedPersistentActor {
         }
         String peerAddress = context.getPeerAddress(leaderId);
         if(LOG.isDebugEnabled()) {
-            LOG.debug("getLeaderAddress leaderId = " + leaderId + " peerAddress = "
-                + peerAddress);
+            LOG.debug("getLeaderAddress leaderId = {} peerAddress = {}",
+                    leaderId, peerAddress);
         }
 
         return peerAddress;
@@ -697,8 +664,11 @@ public abstract class RaftActor extends UntypedPersistentActor {
         public void appendAndPersist(final ActorRef clientActor,
             final String identifier,
             final ReplicatedLogEntry replicatedLogEntry) {
-            context.getLogger().debug(
-                "Append log entry and persist {} ", replicatedLogEntry);
+
+            if(LOG.isDebugEnabled()) {
+                LOG.debug("Append log entry and persist {} ", replicatedLogEntry);
+            }
+
             // FIXME : By adding the replicated log entry to the in-memory journal we are not truly ensuring durability of the logs
             journal.add(replicatedLogEntry);
 
index b1560a5..eed74bb 100644 (file)
@@ -10,9 +10,9 @@ package org.opendaylight.controller.cluster.raft.behaviors;
 
 import akka.actor.ActorRef;
 import akka.actor.Cancellable;
+import akka.event.LoggingAdapter;
 import org.opendaylight.controller.cluster.raft.ClientRequestTracker;
 import org.opendaylight.controller.cluster.raft.RaftActorContext;
-import org.opendaylight.controller.cluster.raft.RaftState;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
 import org.opendaylight.controller.cluster.raft.SerializationUtils;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
@@ -44,6 +44,11 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
      */
     protected final RaftActorContext context;
 
+    /**
+     *
+     */
+    protected final LoggingAdapter LOG;
+
     /**
      *
      */
@@ -57,6 +62,7 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
 
     protected AbstractRaftActorBehavior(RaftActorContext context) {
         this.context = context;
+        this.LOG = context.getLogger();
     }
 
     /**
@@ -71,7 +77,7 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
      * @param appendEntries  The AppendEntries message
      * @return
      */
-    protected abstract RaftState handleAppendEntries(ActorRef sender,
+    protected abstract RaftActorBehavior handleAppendEntries(ActorRef sender,
         AppendEntries appendEntries);
 
 
@@ -83,19 +89,21 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
      * @param appendEntries
      * @return
      */
-    protected RaftState appendEntries(ActorRef sender,
+    protected RaftActorBehavior appendEntries(ActorRef sender,
         AppendEntries appendEntries) {
 
         // 1. Reply false if term < currentTerm (§5.1)
         if (appendEntries.getTerm() < currentTerm()) {
-            context.getLogger().debug(
-                "Cannot append entries because sender term " + appendEntries
-                    .getTerm() + " is less than " + currentTerm());
+            if(LOG.isDebugEnabled()) {
+                LOG.debug("Cannot append entries because sender term {} is less than {}",
+                        appendEntries.getTerm(), currentTerm());
+            }
+
             sender.tell(
                 new AppendEntriesReply(context.getId(), currentTerm(), false,
                     lastIndex(), lastTerm()), actor()
             );
-            return state();
+            return this;
         }
 
 
@@ -114,7 +122,7 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
      * @param appendEntriesReply The AppendEntriesReply message
      * @return
      */
-    protected abstract RaftState handleAppendEntriesReply(ActorRef sender,
+    protected abstract RaftActorBehavior handleAppendEntriesReply(ActorRef sender,
         AppendEntriesReply appendEntriesReply);
 
     /**
@@ -125,11 +133,12 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
      * @param requestVote
      * @return
      */
-    protected RaftState requestVote(ActorRef sender,
+    protected RaftActorBehavior requestVote(ActorRef sender,
         RequestVote requestVote) {
 
-
-        context.getLogger().debug(requestVote.toString());
+        if(LOG.isDebugEnabled()) {
+            LOG.debug(requestVote.toString());
+        }
 
         boolean grantVote = false;
 
@@ -167,7 +176,7 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
 
         sender.tell(new RequestVoteReply(currentTerm(), grantVote), actor());
 
-        return state();
+        return this;
     }
 
     /**
@@ -182,7 +191,7 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
      * @param requestVoteReply The RequestVoteReply message
      * @return
      */
-    protected abstract RaftState handleRequestVoteReply(ActorRef sender,
+    protected abstract RaftActorBehavior handleRequestVoteReply(ActorRef sender,
         RequestVoteReply requestVoteReply);
 
     /**
@@ -341,12 +350,14 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
             } else {
                 //if one index is not present in the log, no point in looping
                 // around as the rest wont be present either
-                context.getLogger().warning(
-                    "Missing index {} from log. Cannot apply state. Ignoring {} to {}", i, i, index );
+                LOG.warning(
+                        "Missing index {} from log. Cannot apply state. Ignoring {} to {}", i, i, index);
                 break;
             }
         }
-        context.getLogger().debug("Setting last applied to {}", newLastApplied);
+        if(LOG.isDebugEnabled()) {
+            LOG.debug("Setting last applied to {}", newLastApplied);
+        }
         context.setLastApplied(newLastApplied);
 
         // send a message to persist a ApplyLogEntries marker message into akka's persistent journal
@@ -361,7 +372,7 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
     }
 
     @Override
-    public RaftState handleMessage(ActorRef sender, Object message) {
+    public RaftActorBehavior handleMessage(ActorRef sender, Object message) {
         if (message instanceof AppendEntries) {
             return appendEntries(sender, (AppendEntries) message);
         } else if (message instanceof AppendEntriesReply) {
@@ -371,10 +382,21 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
         } else if (message instanceof RequestVoteReply) {
             return handleRequestVoteReply(sender, (RequestVoteReply) message);
         }
-        return state();
+        return this;
     }
 
     @Override public String getLeaderId() {
         return leaderId;
     }
+
+    protected RaftActorBehavior switchBehavior(RaftActorBehavior behavior) {
+        LOG.info("Switching from behavior {} to {}", this.state(), behavior.state());
+        try {
+            close();
+        } catch (Exception e) {
+            LOG.error(e, "Failed to close behavior : {}", this.state());
+        }
+
+        return behavior;
+    }
 }
index bb1927e..4a3e2c5 100644 (file)
@@ -52,7 +52,9 @@ public class Candidate extends AbstractRaftActorBehavior {
 
         peers = context.getPeerAddresses().keySet();
 
-        context.getLogger().debug("Election:Candidate has following peers:"+ peers);
+        if(LOG.isDebugEnabled()) {
+            LOG.debug("Election:Candidate has following peers: {}", peers);
+        }
 
         if(peers.size() > 0) {
             // Votes are required from a majority of the peers including self.
@@ -78,21 +80,23 @@ public class Candidate extends AbstractRaftActorBehavior {
         scheduleElection(electionDuration());
     }
 
-    @Override protected RaftState handleAppendEntries(ActorRef sender,
+    @Override protected RaftActorBehavior handleAppendEntries(ActorRef sender,
         AppendEntries appendEntries) {
 
-        context.getLogger().debug(appendEntries.toString());
+        if(LOG.isDebugEnabled()) {
+            LOG.debug(appendEntries.toString());
+        }
 
-        return state();
+        return this;
     }
 
-    @Override protected RaftState handleAppendEntriesReply(ActorRef sender,
+    @Override protected RaftActorBehavior handleAppendEntriesReply(ActorRef sender,
         AppendEntriesReply appendEntriesReply) {
 
-        return state();
+        return this;
     }
 
-    @Override protected RaftState handleRequestVoteReply(ActorRef sender,
+    @Override protected RaftActorBehavior handleRequestVoteReply(ActorRef sender,
         RequestVoteReply requestVoteReply) {
 
         if (requestVoteReply.isVoteGranted()) {
@@ -100,10 +104,10 @@ public class Candidate extends AbstractRaftActorBehavior {
         }
 
         if (voteCount >= votesRequired) {
-            return RaftState.Leader;
+            return switchBehavior(new Leader(context));
         }
 
-        return state();
+        return this;
     }
 
     @Override public RaftState state() {
@@ -111,7 +115,7 @@ public class Candidate extends AbstractRaftActorBehavior {
     }
 
     @Override
-    public RaftState handleMessage(ActorRef sender, Object originalMessage) {
+    public RaftActorBehavior handleMessage(ActorRef sender, Object originalMessage) {
 
         Object message = fromSerializableMessage(originalMessage);
 
@@ -119,14 +123,17 @@ public class Candidate extends AbstractRaftActorBehavior {
 
             RaftRPC rpc = (RaftRPC) message;
 
-            context.getLogger().debug("RaftRPC message received {} my term is {}", rpc.toString(), context.getTermInformation().getCurrentTerm());
+            if(LOG.isDebugEnabled()) {
+                LOG.debug("RaftRPC message received {} my term is {}", rpc, context.getTermInformation().getCurrentTerm());
+            }
 
             // If RPC request or response contains term T > currentTerm:
             // set currentTerm = T, convert to follower (§5.1)
             // This applies to all RPC messages and responses
             if (rpc.getTerm() > context.getTermInformation().getCurrentTerm()) {
                 context.getTermInformation().updateAndPersist(rpc.getTerm(), null);
-                return RaftState.Follower;
+
+                return switchBehavior(new Follower(context));
             }
         }
 
@@ -137,11 +144,12 @@ public class Candidate extends AbstractRaftActorBehavior {
                 // ourselves the leader. This gives enough time for a leader
                 // who we do not know about (as a peer)
                 // to send a message to the candidate
-                return RaftState.Leader;
+
+                return switchBehavior(new Leader(context));
             }
             startNewTerm();
             scheduleElection(electionDuration());
-            return state();
+            return this;
         }
 
         return super.handleMessage(sender, message);
@@ -159,7 +167,9 @@ public class Candidate extends AbstractRaftActorBehavior {
         context.getTermInformation().updateAndPersist(currentTerm + 1,
             context.getId());
 
-        context.getLogger().debug("Starting new term " + (currentTerm + 1));
+        if(LOG.isDebugEnabled()) {
+            LOG.debug("Starting new term {}", (currentTerm + 1));
+        }
 
         // Request for a vote
         // TODO: Retry request for vote if replies do not arrive in a reasonable
index 1cfdf9d..7ada8b3 100644 (file)
@@ -9,7 +9,6 @@
 package org.opendaylight.controller.cluster.raft.behaviors;
 
 import akka.actor.ActorRef;
-import akka.event.LoggingAdapter;
 import com.google.protobuf.ByteString;
 import org.opendaylight.controller.cluster.raft.RaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftState;
@@ -39,17 +38,13 @@ import java.util.ArrayList;
 public class Follower extends AbstractRaftActorBehavior {
     private ByteString snapshotChunksCollected = ByteString.EMPTY;
 
-    private final LoggingAdapter LOG;
-
     public Follower(RaftActorContext context) {
         super(context);
 
-        LOG = context.getLogger();
-
         scheduleElection(electionDuration());
     }
 
-    @Override protected RaftState handleAppendEntries(ActorRef sender,
+    @Override protected RaftActorBehavior handleAppendEntries(ActorRef sender,
         AppendEntries appendEntries) {
 
         if(appendEntries.getEntries() != null && appendEntries.getEntries().size() > 0) {
@@ -133,15 +128,14 @@ public class Follower extends AbstractRaftActorBehavior {
                 new AppendEntriesReply(context.getId(), currentTerm(), false,
                     lastIndex(), lastTerm()), actor()
             );
-            return state();
+            return this;
         }
 
         if (appendEntries.getEntries() != null
             && appendEntries.getEntries().size() > 0) {
             if(LOG.isDebugEnabled()) {
                 LOG.debug(
-                    "Number of entries to be appended = " + appendEntries
-                        .getEntries().size()
+                    "Number of entries to be appended = {}", appendEntries.getEntries().size()
                 );
             }
 
@@ -168,8 +162,7 @@ public class Follower extends AbstractRaftActorBehavior {
 
                     if(LOG.isDebugEnabled()) {
                         LOG.debug(
-                            "Removing entries from log starting at "
-                                + matchEntry.getIndex()
+                            "Removing entries from log starting at {}", matchEntry.getIndex()
                         );
                     }
 
@@ -181,9 +174,7 @@ public class Follower extends AbstractRaftActorBehavior {
             }
 
             if(LOG.isDebugEnabled()) {
-                context.getLogger().debug(
-                    "After cleanup entries to be added from = " + (addEntriesFrom
-                        + lastIndex())
+                LOG.debug("After cleanup entries to be added from = {}", (addEntriesFrom + lastIndex())
                 );
             }
 
@@ -191,17 +182,14 @@ public class Follower extends AbstractRaftActorBehavior {
             for (int i = addEntriesFrom;
                  i < appendEntries.getEntries().size(); i++) {
 
-                context.getLogger().info(
-                    "Append entry to log " + appendEntries.getEntries().get(
-                        i).getData()
-                        .toString()
-                );
-                context.getReplicatedLog()
-                    .appendAndPersist(appendEntries.getEntries().get(i));
+                if(LOG.isDebugEnabled()) {
+                    LOG.debug("Append entry to log {}", appendEntries.getEntries().get(i).getData());
+                }
+                context.getReplicatedLog().appendAndPersist(appendEntries.getEntries().get(i));
             }
 
             if(LOG.isDebugEnabled()) {
-                LOG.debug("Log size is now " + context.getReplicatedLog().size());
+                LOG.debug("Log size is now {}", context.getReplicatedLog().size());
             }
         }
 
@@ -216,7 +204,7 @@ public class Follower extends AbstractRaftActorBehavior {
 
         if (prevCommitIndex != context.getCommitIndex()) {
             if(LOG.isDebugEnabled()) {
-                LOG.debug("Commit index set to " + context.getCommitIndex());
+                LOG.debug("Commit index set to {}", context.getCommitIndex());
             }
         }
 
@@ -239,24 +227,24 @@ public class Follower extends AbstractRaftActorBehavior {
         sender.tell(new AppendEntriesReply(context.getId(), currentTerm(), true,
             lastIndex(), lastTerm()), actor());
 
-        return state();
+        return this;
     }
 
-    @Override protected RaftState handleAppendEntriesReply(ActorRef sender,
+    @Override protected RaftActorBehavior handleAppendEntriesReply(ActorRef sender,
         AppendEntriesReply appendEntriesReply) {
-        return state();
+        return this;
     }
 
-    @Override protected RaftState handleRequestVoteReply(ActorRef sender,
+    @Override protected RaftActorBehavior handleRequestVoteReply(ActorRef sender,
         RequestVoteReply requestVoteReply) {
-        return state();
+        return this;
     }
 
     @Override public RaftState state() {
         return RaftState.Follower;
     }
 
-    @Override public RaftState handleMessage(ActorRef sender, Object originalMessage) {
+    @Override public RaftActorBehavior handleMessage(ActorRef sender, Object originalMessage) {
 
         Object message = fromSerializableMessage(originalMessage);
 
@@ -271,7 +259,7 @@ public class Follower extends AbstractRaftActorBehavior {
         }
 
         if (message instanceof ElectionTimeout) {
-            return RaftState.Candidate;
+            return switchBehavior(new Candidate(context));
 
         } else if (message instanceof InstallSnapshot) {
             InstallSnapshot installSnapshot = (InstallSnapshot) message;
@@ -297,8 +285,10 @@ public class Follower extends AbstractRaftActorBehavior {
                 // this is the last chunk, create a snapshot object and apply
 
                 snapshotChunksCollected = snapshotChunksCollected.concat(installSnapshot.getData());
-                context.getLogger().debug("Last chunk received: snapshotChunksCollected.size:{}",
-                    snapshotChunksCollected.size());
+                if(LOG.isDebugEnabled()) {
+                    LOG.debug("Last chunk received: snapshotChunksCollected.size:{}",
+                            snapshotChunksCollected.size());
+                }
 
                 Snapshot snapshot = Snapshot.create(snapshotChunksCollected.toByteArray(),
                     new ArrayList<ReplicatedLogEntry>(),
@@ -324,7 +314,7 @@ public class Follower extends AbstractRaftActorBehavior {
                 true), actor());
 
         } catch (Exception e) {
-            context.getLogger().error("Exception in InstallSnapshot of follower", e);
+            LOG.error(e, "Exception in InstallSnapshot of follower:");
             //send reply with success as false. The chunk will be sent again on failure
             sender.tell(new InstallSnapshotReply(currentTerm(), context.getId(),
                 installSnapshot.getChunkIndex(), false), actor());
index ff8a225..9edba85 100644 (file)
@@ -11,7 +11,6 @@ package org.opendaylight.controller.cluster.raft.behaviors;
 import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
 import akka.actor.Cancellable;
-import akka.event.LoggingAdapter;
 import com.google.common.base.Preconditions;
 import com.google.protobuf.ByteString;
 import org.opendaylight.controller.cluster.raft.ClientRequestTracker;
@@ -81,13 +80,9 @@ public class Leader extends AbstractRaftActorBehavior {
 
     private final int minReplicationCount;
 
-    private final LoggingAdapter LOG;
-
     public Leader(RaftActorContext context) {
         super(context);
 
-        LOG = context.getLogger();
-
         followers = context.getPeerAddresses().keySet();
 
         for (String followerId : followers) {
@@ -100,7 +95,7 @@ public class Leader extends AbstractRaftActorBehavior {
         }
 
         if(LOG.isDebugEnabled()) {
-            LOG.debug("Election:Leader has following peers:" + followers);
+            LOG.debug("Election:Leader has following peers: {}", followers);
         }
 
         if (followers.size() > 0) {
@@ -123,17 +118,17 @@ public class Leader extends AbstractRaftActorBehavior {
 
     }
 
-    @Override protected RaftState handleAppendEntries(ActorRef sender,
+    @Override protected RaftActorBehavior handleAppendEntries(ActorRef sender,
         AppendEntries appendEntries) {
 
         if(LOG.isDebugEnabled()) {
             LOG.debug(appendEntries.toString());
         }
 
-        return state();
+        return this;
     }
 
-    @Override protected RaftState handleAppendEntriesReply(ActorRef sender,
+    @Override protected RaftActorBehavior handleAppendEntriesReply(ActorRef sender,
         AppendEntriesReply appendEntriesReply) {
 
         if(! appendEntriesReply.isSuccess()) {
@@ -149,7 +144,7 @@ public class Leader extends AbstractRaftActorBehavior {
 
         if(followerLogInformation == null){
             LOG.error("Unknown follower {}", followerId);
-            return state();
+            return this;
         }
 
         if (appendEntriesReply.isSuccess()) {
@@ -199,7 +194,7 @@ public class Leader extends AbstractRaftActorBehavior {
             applyLogToStateMachine(context.getCommitIndex());
         }
 
-        return state();
+        return this;
     }
 
     protected ClientRequestTracker removeClientRequestTracker(long logIndex) {
@@ -222,16 +217,16 @@ public class Leader extends AbstractRaftActorBehavior {
         return null;
     }
 
-    @Override protected RaftState handleRequestVoteReply(ActorRef sender,
+    @Override protected RaftActorBehavior handleRequestVoteReply(ActorRef sender,
         RequestVoteReply requestVoteReply) {
-        return state();
+        return this;
     }
 
     @Override public RaftState state() {
         return RaftState.Leader;
     }
 
-    @Override public RaftState handleMessage(ActorRef sender, Object originalMessage) {
+    @Override public RaftActorBehavior handleMessage(ActorRef sender, Object originalMessage) {
         Preconditions.checkNotNull(sender, "sender should not be null");
 
         Object message = fromSerializableMessage(originalMessage);
@@ -243,13 +238,15 @@ public class Leader extends AbstractRaftActorBehavior {
             // This applies to all RPC messages and responses
             if (rpc.getTerm() > context.getTermInformation().getCurrentTerm()) {
                 context.getTermInformation().updateAndPersist(rpc.getTerm(), null);
-                return RaftState.Follower;
+
+                return switchBehavior(new Follower(context));
             }
         }
 
         try {
             if (message instanceof SendHeartBeat) {
-                return sendHeartBeat();
+                sendHeartBeat();
+                return this;
             } else if(message instanceof SendInstallSnapshot) {
                 installSnapshotIfNeeded();
             } else if (message instanceof Replicate) {
@@ -321,7 +318,7 @@ public class Leader extends AbstractRaftActorBehavior {
         long logIndex = replicate.getReplicatedLogEntry().getIndex();
 
         if(LOG.isDebugEnabled()) {
-            LOG.debug("Replicate message " + logIndex);
+            LOG.debug("Replicate message {}", logIndex);
         }
 
         // Create a tracker entry we will use this later to notify the
@@ -445,7 +442,7 @@ public class Leader extends AbstractRaftActorBehavior {
                 followerActor.path(), mapFollowerToSnapshot.get(followerId).getChunkIndex(),
                 mapFollowerToSnapshot.get(followerId).getTotalChunks());
         } catch (IOException e) {
-            LOG.error("InstallSnapshot failed for Leader.", e);
+            LOG.error(e, "InstallSnapshot failed for Leader.");
         }
     }
 
@@ -467,11 +464,10 @@ public class Leader extends AbstractRaftActorBehavior {
         return nextChunk;
     }
 
-    private RaftState sendHeartBeat() {
+    private void sendHeartBeat() {
         if (followers.size() > 0) {
             sendAppendEntries();
         }
-        return state();
     }
 
     private void stopHeartBeat() {
index ca2d916..064cd8b 100644 (file)
@@ -25,17 +25,18 @@ import org.opendaylight.controller.cluster.raft.RaftState;
  * differently.
  */
 public interface RaftActorBehavior extends AutoCloseable{
+
     /**
      * Handle a message. If the processing of the message warrants a state
-     * change then a new state should be returned otherwise this method should
-     * return the state for the current behavior.
+     * change then a new behavior should be returned otherwise this method should
+     * return the current behavior.
      *
      * @param sender The sender of the message
      * @param message A message that needs to be processed
      *
-     * @return The new state or self (this)
+     * @return The new behavior or current behavior
      */
-    RaftState handleMessage(ActorRef sender, Object message);
+    RaftActorBehavior handleMessage(ActorRef sender, Object message);
 
     /**
      * The state associated with a given behavior
index 22f3743..c15c919 100644 (file)
@@ -181,7 +181,7 @@ public class RaftActorTest extends AbstractActorTest {
                         return true;
                     }
                 }.from(raftActor.path().toString())
-                    .message("Switching from state Candidate to Leader")
+                    .message("Switching from behavior Candidate to Leader")
                     .occurrences(1).exec();
 
 
index 8068dfb..3893018 100644 (file)
@@ -7,7 +7,6 @@ import org.junit.Test;
 import org.opendaylight.controller.cluster.raft.AbstractActorTest;
 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftActorContext;
-import org.opendaylight.controller.cluster.raft.RaftState;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
 import org.opendaylight.controller.cluster.raft.SerializationUtils;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
@@ -22,6 +21,7 @@ import java.util.ArrayList;
 import java.util.List;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
 public abstract class AbstractRaftActorBehaviorTest extends AbstractActorTest {
 
@@ -79,12 +79,12 @@ public abstract class AbstractRaftActorBehaviorTest extends AbstractActorTest {
             RaftActorBehavior behavior = createBehavior(context);
 
             // Send an unknown message so that the state of the RaftActor remains unchanged
-            RaftState expected = behavior.handleMessage(getRef(), "unknown");
+            RaftActorBehavior expected = behavior.handleMessage(getRef(), "unknown");
 
-            RaftState raftState =
+            RaftActorBehavior raftBehavior =
                 behavior.handleMessage(getRef(), appendEntries);
 
-            assertEquals(expected, raftState);
+            assertEquals(expected, raftBehavior);
 
             // Also expect an AppendEntriesReply to be sent where success is false
             final Boolean out = new ExpectMsg<Boolean>(duration("1 seconds"),
@@ -145,12 +145,12 @@ public abstract class AbstractRaftActorBehaviorTest extends AbstractActorTest {
                 }
 
                 // Send an unknown message so that the state of the RaftActor remains unchanged
-                RaftState expected = behavior.handleMessage(getRef(), "unknown");
+                RaftActorBehavior expected = behavior.handleMessage(getRef(), "unknown");
 
-                RaftState raftState =
+                RaftActorBehavior raftBehavior =
                     behavior.handleMessage(getRef(), appendEntries);
 
-                assertEquals(expected, raftState);
+                assertEquals(expected, raftBehavior);
 
                 assertEquals(1, log.size());
 
@@ -174,11 +174,11 @@ public abstract class AbstractRaftActorBehaviorTest extends AbstractActorTest {
                     RaftActorBehavior behavior = createBehavior(
                         createActorContext(behaviorActor));
 
-                    RaftState raftState = behavior.handleMessage(getTestActor(),
+                    RaftActorBehavior raftBehavior = behavior.handleMessage(getTestActor(),
                         new RequestVote(1000, "test", 10000, 999));
 
-                    if(behavior.state() != RaftState.Follower){
-                        assertEquals(RaftState.Follower, raftState);
+                    if(!(behavior instanceof Follower)){
+                        assertTrue(raftBehavior instanceof Follower);
                     } else {
 
                         final Boolean out =
@@ -228,11 +228,11 @@ public abstract class AbstractRaftActorBehaviorTest extends AbstractActorTest {
 
                     RaftActorBehavior behavior = createBehavior(actorContext);
 
-                    RaftState raftState = behavior.handleMessage(getTestActor(),
+                    RaftActorBehavior raftBehavior = behavior.handleMessage(getTestActor(),
                         new RequestVote(1000, "test", 10000, 999));
 
-                    if(behavior.state() != RaftState.Follower){
-                        assertEquals(RaftState.Follower, raftState);
+                    if(!(behavior instanceof Follower)){
+                        assertTrue(raftBehavior instanceof Follower);
                     } else {
                         final Boolean out =
                             new ExpectMsg<Boolean>(duration("1 seconds"),
@@ -309,10 +309,10 @@ public abstract class AbstractRaftActorBehaviorTest extends AbstractActorTest {
         setLastLogEntry(
             (MockRaftActorContext) actorContext, 0, 0, p);
 
-        RaftState raftState = createBehavior(actorContext)
+        RaftActorBehavior raftBehavior = createBehavior(actorContext)
             .handleMessage(actorRef, rpc);
 
-        assertEquals(RaftState.Follower, raftState);
+        assertTrue(raftBehavior instanceof Follower);
     }
 
     protected MockRaftActorContext.SimpleReplicatedLog setLastLogEntry(
index d478b17..a8d47e2 100644 (file)
@@ -9,7 +9,6 @@ import org.junit.Test;
 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftActorContext;
-import org.opendaylight.controller.cluster.raft.RaftState;
 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
@@ -109,10 +108,10 @@ public class CandidateTest extends AbstractRaftActorBehaviorTest {
         Candidate candidate =
             new Candidate(raftActorContext);
 
-        RaftState raftState =
+        RaftActorBehavior raftBehavior =
             candidate.handleMessage(candidateActor, new ElectionTimeout());
 
-        Assert.assertEquals(RaftState.Leader, raftState);
+        Assert.assertTrue(raftBehavior instanceof Leader);
     }
 
     @Test
@@ -123,10 +122,10 @@ public class CandidateTest extends AbstractRaftActorBehaviorTest {
         Candidate candidate =
             new Candidate(raftActorContext);
 
-        RaftState raftState =
+        RaftActorBehavior raftBehavior =
             candidate.handleMessage(candidateActor, new ElectionTimeout());
 
-        Assert.assertEquals(RaftState.Candidate, raftState);
+        Assert.assertTrue(raftBehavior instanceof Candidate);
     }
 
     @Test
@@ -137,9 +136,9 @@ public class CandidateTest extends AbstractRaftActorBehaviorTest {
         Candidate candidate =
             new Candidate(raftActorContext);
 
-        RaftState stateOnFirstVote = candidate.handleMessage(peerActor1, new RequestVoteReply(0, true));
+        RaftActorBehavior behaviorOnFirstVote = candidate.handleMessage(peerActor1, new RequestVoteReply(0, true));
 
-        Assert.assertEquals(RaftState.Leader, stateOnFirstVote);
+        Assert.assertTrue(behaviorOnFirstVote instanceof Leader);
 
     }
 
@@ -151,12 +150,12 @@ public class CandidateTest extends AbstractRaftActorBehaviorTest {
         Candidate candidate =
             new Candidate(raftActorContext);
 
-        RaftState stateOnFirstVote = candidate.handleMessage(peerActor1, new RequestVoteReply(0, true));
+        RaftActorBehavior behaviorOnFirstVote = candidate.handleMessage(peerActor1, new RequestVoteReply(0, true));
 
-        RaftState stateOnSecondVote = candidate.handleMessage(peerActor2, new RequestVoteReply(0, true));
+        RaftActorBehavior behaviorOnSecondVote = candidate.handleMessage(peerActor2, new RequestVoteReply(0, true));
 
-        Assert.assertEquals(RaftState.Candidate, stateOnFirstVote);
-        Assert.assertEquals(RaftState.Leader, stateOnSecondVote);
+        Assert.assertTrue(behaviorOnFirstVote instanceof Candidate);
+        Assert.assertTrue(behaviorOnSecondVote instanceof Leader);
 
     }
 
index a72a7c4..edeab11 100644 (file)
@@ -9,7 +9,6 @@ import org.junit.Test;
 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftActorContext;
-import org.opendaylight.controller.cluster.raft.RaftState;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
@@ -85,10 +84,10 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
         Follower follower =
             new Follower(raftActorContext);
 
-        RaftState raftState =
+        RaftActorBehavior raftBehavior =
             follower.handleMessage(followerActor, new ElectionTimeout());
 
-        Assert.assertEquals(RaftState.Candidate, raftState);
+        Assert.assertTrue(raftBehavior instanceof Candidate);
     }
 
     @Test
@@ -187,7 +186,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
             AppendEntries appendEntries =
                 new AppendEntries(2, "leader-1", 100, 1, entries, 101);
 
-            RaftState raftState =
+            RaftActorBehavior raftBehavior =
                 createBehavior(context).handleMessage(getRef(), appendEntries);
 
             assertEquals(101L, context.getLastApplied());
@@ -226,12 +225,12 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
             RaftActorBehavior behavior = createBehavior(context);
 
             // Send an unknown message so that the state of the RaftActor remains unchanged
-            RaftState expected = behavior.handleMessage(getRef(), "unknown");
+            RaftActorBehavior expected = behavior.handleMessage(getRef(), "unknown");
 
-            RaftState raftState =
+            RaftActorBehavior raftBehavior =
                 behavior.handleMessage(getRef(), appendEntries);
 
-            assertEquals(expected, raftState);
+            assertEquals(expected, raftBehavior);
 
             // Also expect an AppendEntriesReply to be sent where success is false
             final Boolean out = new ExpectMsg<Boolean>(duration("1 seconds"),
@@ -302,12 +301,12 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
             RaftActorBehavior behavior = createBehavior(context);
 
             // Send an unknown message so that the state of the RaftActor remains unchanged
-            RaftState expected = behavior.handleMessage(getRef(), "unknown");
+            RaftActorBehavior expected = behavior.handleMessage(getRef(), "unknown");
 
-            RaftState raftState =
+            RaftActorBehavior raftBehavior =
                 behavior.handleMessage(getRef(), appendEntries);
 
-            assertEquals(expected, raftState);
+            assertEquals(expected, raftBehavior);
             assertEquals(5, log.last().getIndex() + 1);
             assertNotNull(log.get(3));
             assertNotNull(log.get(4));
@@ -382,12 +381,12 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
             RaftActorBehavior behavior = createBehavior(context);
 
             // Send an unknown message so that the state of the RaftActor remains unchanged
-            RaftState expected = behavior.handleMessage(getRef(), "unknown");
+            RaftActorBehavior expected = behavior.handleMessage(getRef(), "unknown");
 
-            RaftState raftState =
+            RaftActorBehavior raftBehavior =
                 behavior.handleMessage(getRef(), appendEntries);
 
-            assertEquals(expected, raftState);
+            assertEquals(expected, raftBehavior);
 
             // The entry at index 2 will be found out-of-sync with the leader
             // and will be removed
index 19af647..48543d7 100644 (file)
@@ -12,7 +12,6 @@ import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
 import org.opendaylight.controller.cluster.raft.FollowerLogInformationImpl;
 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftActorContext;
-import org.opendaylight.controller.cluster.raft.RaftState;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
 import org.opendaylight.controller.cluster.raft.SerializationUtils;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
@@ -54,8 +53,8 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
 
             // handle message should return the Leader state when it receives an
             // unknown message
-            RaftState state = leader.handleMessage(senderActor, "foo");
-            Assert.assertEquals(RaftState.Leader, state);
+            RaftActorBehavior behavior = leader.handleMessage(senderActor, "foo");
+            Assert.assertTrue(behavior instanceof Leader);
         }};
     }
 
@@ -125,7 +124,7 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
                     actorContext.setPeerAddresses(peerAddresses);
 
                     Leader leader = new Leader(actorContext);
-                    RaftState raftState = leader
+                    RaftActorBehavior raftBehavior = leader
                         .handleMessage(senderActor, new Replicate(null, null,
                             new MockRaftActorContext.MockReplicatedLogEntry(1,
                                 100,
@@ -133,7 +132,7 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
                         ));
 
                     // State should not change
-                    assertEquals(RaftState.Leader, raftState);
+                    assertTrue(raftBehavior instanceof Leader);
 
                     final String out =
                         new ExpectMsg<String>(duration("1 seconds"), "match hint") {
@@ -179,11 +178,11 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
                             .build());
 
                     Leader leader = new Leader(actorContext);
-                    RaftState raftState = leader
+                    RaftActorBehavior raftBehavior = leader
                         .handleMessage(senderActor, new Replicate(null, "state-id",actorContext.getReplicatedLog().get(1)));
 
                     // State should not change
-                    assertEquals(RaftState.Leader, raftState);
+                    assertTrue(raftBehavior instanceof Leader);
 
                     assertEquals(1, actorContext.getCommitIndex());
 
@@ -258,10 +257,10 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
                             new MockRaftActorContext.MockPayload("D"));
 
                     // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
-                    RaftState raftState = leader.handleMessage(
+                    RaftActorBehavior raftBehavior = leader.handleMessage(
                         senderActor, new Replicate(null, "state-id", entry));
 
-                    assertEquals(RaftState.Leader, raftState);
+                    assertTrue(raftBehavior instanceof Leader);
 
                     // we might receive some heartbeat messages, so wait till we SendInstallSnapshot
                     Boolean[] matches = new ReceiveWhile<Boolean>(Boolean.class, duration("2 seconds")) {
@@ -333,9 +332,9 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
                         new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
                             new MockRaftActorContext.MockPayload("D"));
 
-                    RaftState raftState = leader.handleMessage(senderActor, new SendInstallSnapshot());
+                    RaftActorBehavior raftBehavior = leader.handleMessage(senderActor, new SendInstallSnapshot());
 
-                    assertEquals(RaftState.Leader, raftState);
+                    assertTrue(raftBehavior instanceof Leader);
 
                     // check if installsnapshot gets called with the correct values.
                     final String out =
@@ -419,11 +418,11 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
                     //clears leaders log
                     actorContext.getReplicatedLog().removeFrom(0);
 
-                    RaftState raftState = leader.handleMessage(senderActor,
+                    RaftActorBehavior raftBehavior = leader.handleMessage(senderActor,
                         new InstallSnapshotReply(currentTerm, followerActor.path().toString(),
                             leader.getFollowerToSnapshot().getChunkIndex(), true));
 
-                    assertEquals(RaftState.Leader, raftState);
+                    assertTrue(raftBehavior instanceof Leader);
 
                     assertEquals(leader.mapFollowerToSnapshot.size(), 0);
                     assertEquals(leader.followerToLog.size(), 1);

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.