}
@Override
- protected void handleCommand(Object message) {
+ protected void handleNonRaftCommand(Object message) {
if(message instanceof KeyValue){
if(isLeader()) {
String persistId = Long.toString(persistIdentifier++);
}
} else {
- super.handleCommand(message);
+ super.handleNonRaftCommand(message);
}
}
handleBehaviorChange(state, newBehavior);
}
+ /**
+ * Method exposed for subclasses to plug-in their logic. This method is invoked by {@link #handleCommand(Object)}
+ * for messages which are not handled by this class. Subclasses overriding this class should fall back to this
+ * implementation for messages which they do not handle
+ *
+ * @param message Incoming command message
+ */
+ protected void handleNonRaftCommand(final Object message) {
+ unhandled(message);
+ }
+
+ /**
+ * @deprecated This method is not final for testing purposes. DO NOT OVERRIDE IT, override
+ * {@link #handleNonRaftCommand(Object)} instead.
+ */
+ @Deprecated
@Override
+ // FIXME: make this method final once our unit tests do not need to override it
protected void handleCommand(final Object message) {
if (serverConfigurationSupport.handleMessage(message, getSender())) {
return;
// Processing the message may affect the state, hence we need to capture it
final RaftActorBehavior currentBehavior = getCurrentBehavior();
final BehaviorState state = behaviorStateTracker.capture(currentBehavior);
+
+ // A behavior indicates that it processed the change by returning a reference to the next behavior
+ // to be used. A null return indicates it has not processed the message and we should be passing it to
+ // the subclass for handling.
final RaftActorBehavior nextBehavior = currentBehavior.handleMessage(getSender(), message);
- switchBehavior(state, nextBehavior);
+ if (nextBehavior != null) {
+ switchBehavior(state, nextBehavior);
+ } else {
+ handleNonRaftCommand(message);
+ }
}
}
beforeSendHeartbeat();
sendHeartBeat();
scheduleHeartBeat(context.getConfigParams().getHeartBeatInterval());
- return this;
-
} else if(message instanceof SendInstallSnapshot) {
// received from RaftActor
setSnapshot(((SendInstallSnapshot) message).getSnapshot());
sendInstallSnapshot();
-
} else if (message instanceof Replicate) {
replicate((Replicate) message);
-
- } else if (message instanceof InstallSnapshotReply){
+ } else if (message instanceof InstallSnapshotReply) {
handleInstallSnapshotReply((InstallSnapshotReply) message);
-
+ } else {
+ return super.handleMessage(sender, message);
}
-
- return super.handleMessage(sender, message);
+ return this;
}
private void handleInstallSnapshotReply(InstallSnapshotReply reply) {
return requestVote(sender, (RequestVote) message);
} else if (message instanceof RequestVoteReply) {
return handleRequestVoteReply(sender, (RequestVoteReply) message);
+ } else {
+ return null;
}
- return this;
}
@Override
}
final Object message = fromSerializableMessage(originalMessage);
- if (message instanceof RaftRPC) {
- RaftRPC rpc = (RaftRPC) message;
- // If RPC request or response contains term T > currentTerm:
- // set currentTerm = T, convert to follower (§5.1)
- // This applies to all RPC messages and responses
- if (rpc.getTerm() > context.getTermInformation().getCurrentTerm()) {
- LOG.debug("{}: Term {} in \"{}\" message is greater than follower's term {} - updating term",
- logName(), rpc.getTerm(), rpc, context.getTermInformation().getCurrentTerm());
-
- context.getTermInformation().updateAndPersist(rpc.getTerm(), null);
- }
+ if (!(message instanceof RaftRPC)) {
+ // The rest of the processing requires the message to be a RaftRPC
+ return null;
+ }
+
+ final RaftRPC rpc = (RaftRPC) message;
+ // If RPC request or response contains term T > currentTerm:
+ // set currentTerm = T, convert to follower (§5.1)
+ // This applies to all RPC messages and responses
+ if (rpc.getTerm() > context.getTermInformation().getCurrentTerm()) {
+ LOG.debug("{}: Term {} in \"{}\" message is greater than follower's term {} - updating term",
+ logName(), rpc.getTerm(), rpc, context.getTermInformation().getCurrentTerm());
+
+ context.getTermInformation().updateAndPersist(rpc.getTerm(), null);
}
- if (message instanceof InstallSnapshot) {
- InstallSnapshot installSnapshot = (InstallSnapshot) message;
+ if (rpc instanceof InstallSnapshot) {
+ InstallSnapshot installSnapshot = (InstallSnapshot) rpc;
handleInstallSnapshot(sender, installSnapshot);
+ scheduleElection(electionDuration());
+ return this;
}
- if (message instanceof RaftRPC && (!(message instanceof RequestVote) || (canGrantVote((RequestVote) message)))){
+ if (!(rpc instanceof RequestVote) || canGrantVote((RequestVote) rpc)) {
scheduleElection(electionDuration());
}
- return super.handleMessage(sender, message);
+ return super.handleMessage(sender, rpc);
}
private void handleInstallSnapshot(final ActorRef sender, InstallSnapshot installSnapshot) {
public RaftActorBehavior handleMessage(ActorRef sender, Object originalMessage) {
Preconditions.checkNotNull(sender, "sender should not be null");
- if (ISOLATED_LEADER_CHECK.equals(originalMessage) && isLeaderIsolated()) {
- LOG.warn("{}: At least {} followers need to be active, Switching {} from Leader to IsolatedLeader",
- context.getId(), getMinIsolatedLeaderPeerCount(), getLeaderId());
-
- return internalSwitchBehavior(RaftState.IsolatedLeader);
+ if (ISOLATED_LEADER_CHECK.equals(originalMessage)) {
+ if (isLeaderIsolated()) {
+ LOG.warn("{}: At least {} followers need to be active, Switching {} from Leader to IsolatedLeader",
+ context.getId(), getMinIsolatedLeaderPeerCount(), getLeaderId());
+ return internalSwitchBehavior(RaftState.IsolatedLeader);
+ } else {
+ return this;
+ }
+ } else {
+ return super.handleMessage(sender, originalMessage);
}
-
- return super.handleMessage(sender, originalMessage);
}
@Override
package org.opendaylight.controller.cluster.raft.behaviors;
import akka.actor.ActorRef;
+import javax.annotation.Nullable;
import org.opendaylight.controller.cluster.raft.RaftState;
/**
* @param sender The sender of the message
* @param message A message that needs to be processed
*
- * @return The new behavior or current behavior
+ * @return The new behavior or current behavior, or null if the message was not handled.
*/
- RaftActorBehavior handleMessage(ActorRef sender, Object message);
+ @Nullable RaftActorBehavior handleMessage(ActorRef sender, Object message);
/**
*
try {
if(behavior != null && !dropMessagesToBehavior.containsKey(message.getClass())) {
- RaftActorBehavior oldBehavior = behavior;
- behavior = behavior.handleMessage(getSender(), message);
- if(behavior != oldBehavior && behaviorStateChangeLatch != null) {
- behaviorStateChangeLatch.countDown();
+ final RaftActorBehavior nextBehavior = behavior.handleMessage(getSender(), message);
+ if (nextBehavior != null) {
+ RaftActorBehavior oldBehavior = behavior;
+ behavior = nextBehavior;
+ if(behavior != oldBehavior && behaviorStateChangeLatch != null) {
+ behaviorStateChangeLatch.countDown();
+ }
}
}
} finally {
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.testkit.TestActorRef;
behavior = createBehavior(context);
- // Send an unknown message so that the state of the RaftActor remains unchanged
- RaftActorBehavior expected = behavior.handleMessage(behaviorActor, "unknown");
+ RaftState expected = behavior.state();
RaftActorBehavior raftBehavior = behavior.handleMessage(behaviorActor, appendEntries);
- assertEquals("Raft state", expected.state(), raftBehavior.state());
+ assertEquals("Raft state", expected, raftBehavior.state());
// Also expect an AppendEntriesReply to be sent where success is false
assertFalse("This test should be overridden when testing Candidate", behavior instanceof Candidate);
- // Send an unknown message so that the state of the RaftActor remains unchanged
- RaftActorBehavior expected = behavior.handleMessage(behaviorActor, "unknown");
+ RaftState expected = behavior.state();
+
+ // Check that the behavior does not handle unknwon message
+ assertNull(behavior.handleMessage(behaviorActor, "unknown"));
RaftActorBehavior raftBehavior = behavior.handleMessage(behaviorActor, appendEntries);
- assertEquals("Raft state", expected.state(), raftBehavior.state());
+ assertEquals("Raft state", expected, raftBehavior.state());
assertEquals("ReplicatedLog size", 1, context.getReplicatedLog().size());
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.junit.After;
-import org.junit.Assert;
import org.junit.Test;
import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
leader = new Leader(createActorContext());
- // handle message should return the Leader state when it receives an
- // unknown message
- RaftActorBehavior behavior = leader.handleMessage(followerActor, "foo");
- Assert.assertTrue(behavior instanceof Leader);
+ // handle message should null when it receives an unknown message
+ assertNull(leader.handleMessage(followerActor, "foo"));
}
@Test
// if an initiate is started again when first is in progress, it shouldnt initiate Capture
leader.handleMessage(leaderActor, new Replicate(null, "state-id", entry));
- Assert.assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
+ assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
}
@Test
// if an initiate is started again when first is in progress, it shouldnt initiate Capture
leader.handleMessage(leaderActor, new Replicate(null, "state-id", entry));
- Assert.assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
+ assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
}
installSnapshot = MessageCollectorActor.getFirstMatching(followerActor, InstallSnapshot.class);
- Assert.assertNull(installSnapshot);
+ assertNull(installSnapshot);
}
leader = new Leader(leaderActorContext);
RaftActorBehavior behavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
- Assert.assertTrue(behavior instanceof Leader);
+ assertTrue(behavior instanceof Leader);
}
private RaftActorBehavior setupIsolatedLeaderCheckTestWithTwoFollowers(RaftPolicy raftPolicy){
leader.markFollowerActive("follower-1");
leader.markFollowerActive("follower-2");
RaftActorBehavior behavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
- Assert.assertTrue("Behavior not instance of Leader when all followers are active",
- behavior instanceof Leader);
+ assertTrue("Behavior not instance of Leader when all followers are active", behavior instanceof Leader);
// kill 1 follower and verify if that got killed
final JavaTestKit probe = new JavaTestKit(getSystem());
leader.markFollowerInActive("follower-1");
leader.markFollowerActive("follower-2");
behavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
- Assert.assertTrue("Behavior not instance of Leader when majority of followers are active",
- behavior instanceof Leader);
+ assertTrue("Behavior not instance of Leader when majority of followers are active", behavior instanceof Leader);
// kill 2nd follower and leader should change to Isolated leader
followerActor2.tell(PoisonPill.getInstance(), null);
RaftActorBehavior behavior = setupIsolatedLeaderCheckTestWithTwoFollowers(DefaultRaftPolicy.INSTANCE);
- Assert.assertTrue("Behavior not instance of IsolatedLeader when majority followers are inactive",
+ assertTrue("Behavior not instance of IsolatedLeader when majority followers are inactive",
behavior instanceof IsolatedLeader);
}
RaftActorBehavior behavior = setupIsolatedLeaderCheckTestWithTwoFollowers(createRaftPolicy(false, true));
- Assert.assertTrue("Behavior should not switch to IsolatedLeader because elections are disabled",
+ assertTrue("Behavior should not switch to IsolatedLeader because elections are disabled",
behavior instanceof Leader);
}
}
@Override
- protected void handleCommand(final Object message) {
+ protected void handleNonRaftCommand(final Object message) {
try (final MessageTracker.Context context = appendEntriesReplyTracker.received(message)) {
final Optional<Error> maybeError = context.error();
if (maybeError.isPresent()) {
} else if(ShardTransactionMessageRetrySupport.TIMER_MESSAGE_CLASS.isInstance(message)) {
messageRetrySupport.onTimerMessage(message);
} else {
- super.handleCommand(message);
+ super.handleNonRaftCommand(message);
}
}
}
}
@Override
- public void handleCommand(final Object message) {
+ public void handleNonRaftCommand(final Object message) {
if(message instanceof RegisterCandidateLocal) {
onRegisterCandidateLocal((RegisterCandidateLocal) message);
} else if(message instanceof UnregisterCandidateLocal) {
} else if(message instanceof SelectOwner) {
onSelectOwner((SelectOwner) message);
} else if(!commitCoordinator.handleMessage(message, this)) {
- super.handleCommand(message);
+ super.handleNonRaftCommand(message);
}
}
newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
"testFollowerInitialSyncStatus");
- shard.underlyingActor().onReceiveCommand(new FollowerInitialSyncUpStatus(false, "member-1-shard-inventory-operational"));
+ shard.underlyingActor().handleNonRaftCommand(new FollowerInitialSyncUpStatus(false, "member-1-shard-inventory-operational"));
assertEquals(false, shard.underlyingActor().getShardMBean().getFollowerInitialSyncStatus());
- shard.underlyingActor().onReceiveCommand(new FollowerInitialSyncUpStatus(true, "member-1-shard-inventory-operational"));
+ shard.underlyingActor().handleNonRaftCommand(new FollowerInitialSyncUpStatus(true, "member-1-shard-inventory-operational"));
assertEquals(true, shard.underlyingActor().getShardMBean().getFollowerInitialSyncStatus());
}