BUG-5626: do not allow overriding of RaftActor.handleCommand() 58/36958/18
authorRobert Varga <rovarga@cisco.com>
Thu, 31 Mar 2016 14:03:46 +0000 (16:03 +0200)
committerRobert Varga <rovarga@cisco.com>
Tue, 12 Apr 2016 11:34:14 +0000 (13:34 +0200)
Our class hierarchy does not reflect what a typical actor would do
and allows subclasses to override base message processing. This has
the potential to break the RAFT implementation.

Expose a new method, handleNonRaftCommand(), which subclasses can use
to plug in their message processing. This method is invoked after
RAFT message processing has completed.

Change-Id: I8ce2e44db2169e7b1ab55d5f9b6611a187ae1f84
Signed-off-by: Robert Varga <rovarga@cisco.com>
13 files changed:
opendaylight/md-sal/sal-akka-raft-example/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/RaftActorBehavior.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeaderElectionScenarioTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehaviorTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShard.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java

index 0a099885e4df083d5f2dff95faf5882de0b7a23c..87dede1086901d3c930be777d3d0fbd8642baa07 100644 (file)
@@ -58,7 +58,7 @@ public class ExampleActor extends RaftActor implements RaftActorRecoveryCohort,
     }
 
     @Override
-    protected void handleCommand(Object message) {
+    protected void handleNonRaftCommand(Object message) {
         if(message instanceof KeyValue){
             if(isLeader()) {
                 String persistId = Long.toString(persistIdentifier++);
@@ -90,7 +90,7 @@ public class ExampleActor extends RaftActor implements RaftActorRecoveryCohort,
             }
 
         } else {
-            super.handleCommand(message);
+            super.handleNonRaftCommand(message);
         }
     }
 
index 1c1f9114ae89e655a50bd5beb9a5885bc80609df..47c8db6006544b13c11b42f7abdb370590fd3b8b 100644 (file)
@@ -205,7 +205,24 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         handleBehaviorChange(state, newBehavior);
     }
 
+    /**
+     * Method exposed for subclasses to plug-in their logic. This method is invoked by {@link #handleCommand(Object)}
+     * for messages which are not handled by this class. Subclasses overriding this class should fall back to this
+     * implementation for messages which they do not handle
+     *
+     * @param message Incoming command message
+     */
+    protected void handleNonRaftCommand(final Object message) {
+        unhandled(message);
+    }
+
+    /**
+     * @deprecated This method is not final for testing purposes. DO NOT OVERRIDE IT, override
+     * {@link #handleNonRaftCommand(Object)} instead.
+     */
+    @Deprecated
     @Override
+    // FIXME: make this method final once our unit tests do not need to override it
     protected void handleCommand(final Object message) {
         if (serverConfigurationSupport.handleMessage(message, getSender())) {
             return;
@@ -273,8 +290,16 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
             // Processing the message may affect the state, hence we need to capture it
             final RaftActorBehavior currentBehavior = getCurrentBehavior();
             final BehaviorState state = behaviorStateTracker.capture(currentBehavior);
+
+            // A behavior indicates that it processed the change by returning a reference to the next behavior
+            // to be used. A null return indicates it has not processed the message and we should be passing it to
+            // the subclass for handling.
             final RaftActorBehavior nextBehavior = currentBehavior.handleMessage(getSender(), message);
-            switchBehavior(state, nextBehavior);
+            if (nextBehavior != null) {
+                switchBehavior(state, nextBehavior);
+            } else {
+                handleNonRaftCommand(message);
+            }
         }
     }
 
index 6065103b13dbd4d54d1e1ffb9352e9c2b072952a..d914154f8bb5565c64836f7f97282a7390aa37e4 100644 (file)
@@ -380,23 +380,19 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
             beforeSendHeartbeat();
             sendHeartBeat();
             scheduleHeartBeat(context.getConfigParams().getHeartBeatInterval());
-            return this;
-
         } else if(message instanceof SendInstallSnapshot) {
             // received from RaftActor
             setSnapshot(((SendInstallSnapshot) message).getSnapshot());
             sendInstallSnapshot();
-
         } else if (message instanceof Replicate) {
             replicate((Replicate) message);
-
-        } else if (message instanceof InstallSnapshotReply){
+        } else if (message instanceof InstallSnapshotReply) {
             handleInstallSnapshotReply((InstallSnapshotReply) message);
-
+        } else {
+            return super.handleMessage(sender, message);
         }
 
-
-        return super.handleMessage(sender, message);
+        return this;
     }
 
     private void handleInstallSnapshotReply(InstallSnapshotReply reply) {
index 0210dc4b1d0ed703a7e76fb8651f97b908563629..ab09ba3967955bad54139dd92c7c6bc977714d2c 100644 (file)
@@ -417,8 +417,9 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
             return requestVote(sender, (RequestVote) message);
         } else if (message instanceof RequestVoteReply) {
             return handleRequestVoteReply(sender, (RequestVoteReply) message);
+        } else {
+            return null;
         }
-        return this;
     }
 
     @Override
index 2480a0db8f734bc6d1d0801b1b57e92e73769189..7e6654d5d6b9849b106a36bf56c76928798d912d 100644 (file)
@@ -357,29 +357,34 @@ public class Follower extends AbstractRaftActorBehavior {
         }
 
         final Object message = fromSerializableMessage(originalMessage);
-        if (message instanceof RaftRPC) {
-            RaftRPC rpc = (RaftRPC) message;
-            // If RPC request or response contains term T > currentTerm:
-            // set currentTerm = T, convert to follower (§5.1)
-            // This applies to all RPC messages and responses
-            if (rpc.getTerm() > context.getTermInformation().getCurrentTerm()) {
-                LOG.debug("{}: Term {} in \"{}\" message is greater than follower's term {} - updating term",
-                        logName(), rpc.getTerm(), rpc, context.getTermInformation().getCurrentTerm());
-
-                context.getTermInformation().updateAndPersist(rpc.getTerm(), null);
-            }
+        if (!(message instanceof RaftRPC)) {
+            // The rest of the processing requires the message to be a RaftRPC
+            return null;
+        }
+
+        final RaftRPC rpc = (RaftRPC) message;
+        // If RPC request or response contains term T > currentTerm:
+        // set currentTerm = T, convert to follower (§5.1)
+        // This applies to all RPC messages and responses
+        if (rpc.getTerm() > context.getTermInformation().getCurrentTerm()) {
+            LOG.debug("{}: Term {} in \"{}\" message is greater than follower's term {} - updating term",
+                logName(), rpc.getTerm(), rpc, context.getTermInformation().getCurrentTerm());
+
+            context.getTermInformation().updateAndPersist(rpc.getTerm(), null);
         }
 
-        if (message instanceof InstallSnapshot) {
-            InstallSnapshot installSnapshot = (InstallSnapshot) message;
+        if (rpc instanceof InstallSnapshot) {
+            InstallSnapshot installSnapshot = (InstallSnapshot) rpc;
             handleInstallSnapshot(sender, installSnapshot);
+            scheduleElection(electionDuration());
+            return this;
         }
 
-        if (message instanceof RaftRPC && (!(message instanceof RequestVote) || (canGrantVote((RequestVote) message)))){
+        if (!(rpc instanceof RequestVote) || canGrantVote((RequestVote) rpc)) {
             scheduleElection(electionDuration());
         }
 
-        return super.handleMessage(sender, message);
+        return super.handleMessage(sender, rpc);
     }
 
     private void handleInstallSnapshot(final ActorRef sender, InstallSnapshot installSnapshot) {
index c724e3bc1c8657be7c25ee7bed93689b7925e25e..36e9b646e66f069504e8d220ceafb80bdc0a1b6a 100644 (file)
@@ -63,14 +63,17 @@ public class Leader extends AbstractLeader {
     public RaftActorBehavior handleMessage(ActorRef sender, Object originalMessage) {
         Preconditions.checkNotNull(sender, "sender should not be null");
 
-        if (ISOLATED_LEADER_CHECK.equals(originalMessage) && isLeaderIsolated()) {
-            LOG.warn("{}: At least {} followers need to be active, Switching {} from Leader to IsolatedLeader",
-                context.getId(), getMinIsolatedLeaderPeerCount(), getLeaderId());
-
-            return internalSwitchBehavior(RaftState.IsolatedLeader);
+        if (ISOLATED_LEADER_CHECK.equals(originalMessage)) {
+            if (isLeaderIsolated()) {
+                LOG.warn("{}: At least {} followers need to be active, Switching {} from Leader to IsolatedLeader",
+                    context.getId(), getMinIsolatedLeaderPeerCount(), getLeaderId());
+                return internalSwitchBehavior(RaftState.IsolatedLeader);
+            } else {
+                return this;
+            }
+        } else {
+            return super.handleMessage(sender, originalMessage);
         }
-
-        return super.handleMessage(sender, originalMessage);
     }
 
     @Override
index e7f43c898c1e4506ab1e382ae153c05d2eb6950e..bf5116935d108c91ecec947cf7544ecc7f7cd3dc 100644 (file)
@@ -9,6 +9,7 @@
 package org.opendaylight.controller.cluster.raft.behaviors;
 
 import akka.actor.ActorRef;
+import javax.annotation.Nullable;
 import org.opendaylight.controller.cluster.raft.RaftState;
 
 /**
@@ -34,9 +35,9 @@ public interface RaftActorBehavior extends AutoCloseable{
      * @param sender The sender of the message
      * @param message A message that needs to be processed
      *
-     * @return The new behavior or current behavior
+     * @return The new behavior or current behavior, or null if the message was not handled.
      */
-    RaftActorBehavior handleMessage(ActorRef sender, Object message);
+    @Nullable RaftActorBehavior handleMessage(ActorRef sender, Object message);
 
     /**
      *
index 31f2d629641fd92820c549c9dcaf2a7eb3a08beb..36f5fd502e3c9cc3d0559dc2c22ab1fd04b0feea 100644 (file)
@@ -62,10 +62,13 @@ public class AbstractLeaderElectionScenarioTest {
 
             try {
                 if(behavior != null && !dropMessagesToBehavior.containsKey(message.getClass())) {
-                    RaftActorBehavior oldBehavior = behavior;
-                    behavior = behavior.handleMessage(getSender(), message);
-                    if(behavior != oldBehavior && behaviorStateChangeLatch != null) {
-                        behaviorStateChangeLatch.countDown();
+                    final RaftActorBehavior nextBehavior = behavior.handleMessage(getSender(), message);
+                    if (nextBehavior != null) {
+                        RaftActorBehavior oldBehavior = behavior;
+                        behavior = nextBehavior;
+                        if(behavior != oldBehavior && behaviorStateChangeLatch != null) {
+                            behaviorStateChangeLatch.countDown();
+                        }
                     }
                 }
             } finally {
index 30ca63b0634a74c7ecbe0887ba0e8452f72b6176..b5be834fec064d21d0c40a10000c413c9fd64f60 100644 (file)
@@ -10,6 +10,7 @@ package org.opendaylight.controller.cluster.raft.behaviors;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
 import akka.actor.ActorRef;
 import akka.actor.Props;
 import akka.testkit.TestActorRef;
@@ -102,12 +103,11 @@ public abstract class AbstractRaftActorBehaviorTest<T extends RaftActorBehavior>
 
         behavior = createBehavior(context);
 
-        // Send an unknown message so that the state of the RaftActor remains unchanged
-        RaftActorBehavior expected = behavior.handleMessage(behaviorActor, "unknown");
+        RaftState expected = behavior.state();
 
         RaftActorBehavior raftBehavior = behavior.handleMessage(behaviorActor, appendEntries);
 
-        assertEquals("Raft state", expected.state(), raftBehavior.state());
+        assertEquals("Raft state", expected, raftBehavior.state());
 
         // Also expect an AppendEntriesReply to be sent where success is false
 
@@ -138,12 +138,14 @@ public abstract class AbstractRaftActorBehaviorTest<T extends RaftActorBehavior>
 
         assertFalse("This test should be overridden when testing Candidate", behavior instanceof Candidate);
 
-        // Send an unknown message so that the state of the RaftActor remains unchanged
-        RaftActorBehavior expected = behavior.handleMessage(behaviorActor, "unknown");
+        RaftState expected = behavior.state();
+
+        // Check that the behavior does not handle unknwon message
+        assertNull(behavior.handleMessage(behaviorActor, "unknown"));
 
         RaftActorBehavior raftBehavior = behavior.handleMessage(behaviorActor, appendEntries);
 
-        assertEquals("Raft state", expected.state(), raftBehavior.state());
+        assertEquals("Raft state", expected, raftBehavior.state());
 
         assertEquals("ReplicatedLog size", 1, context.getReplicatedLog().size());
 
index 9d8144c4396fc195541706306debb21bf8060296..1f42af7b4511159b89a4e2a2b441042ac8a2d20e 100644 (file)
@@ -10,6 +10,8 @@ package org.opendaylight.controller.cluster.raft.behaviors;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
@@ -30,7 +32,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import org.junit.After;
-import org.junit.Assert;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
 import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
@@ -94,10 +95,8 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
 
         leader = new Leader(createActorContext());
 
-        // handle message should return the Leader state when it receives an
-        // unknown message
-        RaftActorBehavior behavior = leader.handleMessage(followerActor, "foo");
-        Assert.assertTrue(behavior instanceof Leader);
+        // handle message should null when it receives an unknown message
+        assertNull(leader.handleMessage(followerActor, "foo"));
     }
 
     @Test
@@ -713,7 +712,7 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         // if an initiate is started again when first is in progress, it shouldnt initiate Capture
         leader.handleMessage(leaderActor, new Replicate(null, "state-id", entry));
 
-        Assert.assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
+        assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
     }
 
     @Test
@@ -775,7 +774,7 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         // if an initiate is started again when first is in progress, it shouldnt initiate Capture
         leader.handleMessage(leaderActor, new Replicate(null, "state-id", entry));
 
-        Assert.assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
+        assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
     }
 
 
@@ -1014,7 +1013,7 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
 
         installSnapshot = MessageCollectorActor.getFirstMatching(followerActor, InstallSnapshot.class);
 
-        Assert.assertNull(installSnapshot);
+        assertNull(installSnapshot);
     }
 
 
@@ -1832,7 +1831,7 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
 
         leader = new Leader(leaderActorContext);
         RaftActorBehavior behavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
-        Assert.assertTrue(behavior instanceof Leader);
+        assertTrue(behavior instanceof Leader);
     }
 
     private RaftActorBehavior setupIsolatedLeaderCheckTestWithTwoFollowers(RaftPolicy raftPolicy){
@@ -1853,8 +1852,7 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         leader.markFollowerActive("follower-1");
         leader.markFollowerActive("follower-2");
         RaftActorBehavior behavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
-        Assert.assertTrue("Behavior not instance of Leader when all followers are active",
-                behavior instanceof Leader);
+        assertTrue("Behavior not instance of Leader when all followers are active", behavior instanceof Leader);
 
         // kill 1 follower and verify if that got killed
         final JavaTestKit probe = new JavaTestKit(getSystem());
@@ -1866,8 +1864,7 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         leader.markFollowerInActive("follower-1");
         leader.markFollowerActive("follower-2");
         behavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
-        Assert.assertTrue("Behavior not instance of Leader when majority of followers are active",
-                behavior instanceof Leader);
+        assertTrue("Behavior not instance of Leader when majority of followers are active", behavior instanceof Leader);
 
         // kill 2nd follower and leader should change to Isolated leader
         followerActor2.tell(PoisonPill.getInstance(), null);
@@ -1886,7 +1883,7 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
 
         RaftActorBehavior behavior = setupIsolatedLeaderCheckTestWithTwoFollowers(DefaultRaftPolicy.INSTANCE);
 
-        Assert.assertTrue("Behavior not instance of IsolatedLeader when majority followers are inactive",
+        assertTrue("Behavior not instance of IsolatedLeader when majority followers are inactive",
             behavior instanceof IsolatedLeader);
     }
 
@@ -1896,7 +1893,7 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
 
         RaftActorBehavior behavior = setupIsolatedLeaderCheckTestWithTwoFollowers(createRaftPolicy(false, true));
 
-        Assert.assertTrue("Behavior should not switch to IsolatedLeader because elections are disabled",
+        assertTrue("Behavior should not switch to IsolatedLeader because elections are disabled",
                 behavior instanceof Leader);
     }
 
index a647554806034f5b21d52e04dd8530dd6bb4d51d..9cb015cfaf35796bdf98692d8d98b9b67dd86a13 100644 (file)
@@ -218,7 +218,7 @@ public class Shard extends RaftActor {
     }
 
     @Override
-    protected void handleCommand(final Object message) {
+    protected void handleNonRaftCommand(final Object message) {
         try (final MessageTracker.Context context = appendEntriesReplyTracker.received(message)) {
             final Optional<Error> maybeError = context.error();
             if (maybeError.isPresent()) {
@@ -270,7 +270,7 @@ public class Shard extends RaftActor {
             } else if(ShardTransactionMessageRetrySupport.TIMER_MESSAGE_CLASS.isInstance(message)) {
                 messageRetrySupport.onTimerMessage(message);
             } else {
-                super.handleCommand(message);
+                super.handleNonRaftCommand(message);
             }
         }
     }
index 678546e1670b3d5e6d3d4367c3c6a751f3cca910..2909d0ae0c0a3ab80a8af0d4fdd500dc1e4a6a0d 100644 (file)
@@ -113,7 +113,7 @@ class EntityOwnershipShard extends Shard {
     }
 
     @Override
-    public void handleCommand(final Object message) {
+    public void handleNonRaftCommand(final Object message) {
         if(message instanceof RegisterCandidateLocal) {
             onRegisterCandidateLocal((RegisterCandidateLocal) message);
         } else if(message instanceof UnregisterCandidateLocal) {
@@ -133,7 +133,7 @@ class EntityOwnershipShard extends Shard {
         } else if(message instanceof SelectOwner) {
             onSelectOwner((SelectOwner) message);
         } else if(!commitCoordinator.handleMessage(message, this)) {
-            super.handleCommand(message);
+            super.handleNonRaftCommand(message);
         }
     }
 
index d743e7903c9b25c40a2555bdca7574f2b0c6383a..3e2313dc2780eab81c8a6c355751f8def2589905 100644 (file)
@@ -2290,11 +2290,11 @@ public class ShardTest extends AbstractShardTest {
                 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
                 "testFollowerInitialSyncStatus");
 
-        shard.underlyingActor().onReceiveCommand(new FollowerInitialSyncUpStatus(false, "member-1-shard-inventory-operational"));
+        shard.underlyingActor().handleNonRaftCommand(new FollowerInitialSyncUpStatus(false, "member-1-shard-inventory-operational"));
 
         assertEquals(false, shard.underlyingActor().getShardMBean().getFollowerInitialSyncStatus());
 
-        shard.underlyingActor().onReceiveCommand(new FollowerInitialSyncUpStatus(true, "member-1-shard-inventory-operational"));
+        shard.underlyingActor().handleNonRaftCommand(new FollowerInitialSyncUpStatus(true, "member-1-shard-inventory-operational"));
 
         assertEquals(true, shard.underlyingActor().getShardMBean().getFollowerInitialSyncStatus());
     }