import javax.annotation.Nullable;
import org.opendaylight.controller.cluster.raft.ServerConfigurationPayload.ServerInfo;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
-import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
import org.opendaylight.controller.cluster.raft.base.messages.SnapshotComplete;
+import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader;
import org.opendaylight.controller.cluster.raft.messages.AddServer;
import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
return;
}
- raftContext.getActor().tell(ElectionTimeout.INSTANCE, raftContext.getActor());
+ raftContext.getActor().tell(TimeoutNow.INSTANCE, raftContext.getActor());
currentOperationState = new WaitingForLeaderElected(changeVotingStatusContext, previousServerConfig);
}
--- /dev/null
+/*
+ * Copyright (c) 2016 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft.base.messages;
+
+import java.io.Serializable;
+
+/**
+ * Message sent to a follower to force an immediate election time out.
+ *
+ * @author Thomas Pantelis
+ */
+public final class TimeoutNow implements Serializable {
+ private static final long serialVersionUID = 1L;
+ public static final TimeoutNow INSTANCE = new TimeoutNow();
+
+ private TimeoutNow() {
+ // Hidden on purpose
+ }
+
+ private Object readResolve() {
+ return INSTANCE;
+ }
+}
import akka.actor.ActorRef;
import akka.japi.Procedure;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Stopwatch;
import java.util.ArrayList;
+import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.opendaylight.controller.cluster.raft.RaftActorContext;
import org.opendaylight.controller.cluster.raft.RaftState;
import org.opendaylight.controller.cluster.raft.Snapshot;
import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
+import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
}
};
+ private final Stopwatch lastLeaderMessageTimer = Stopwatch.createUnstarted();
private SnapshotTracker snapshotTracker = null;
private String leaderId;
private short leaderPayloadVersion;
initialSyncStatusTracker = new SyncStatusTracker(context.getActor(), getId(), SYNC_THRESHOLD);
if (context.getPeerIds().isEmpty() && getLeaderId() == null) {
- actor().tell(ElectionTimeout.INSTANCE, actor());
+ actor().tell(TimeoutNow.INSTANCE, actor());
} else {
scheduleElection(electionDuration());
}
this.leaderPayloadVersion = leaderPayloadVersion;
}
+ private void restartLastLeaderMessageTimer() {
+ if (lastLeaderMessageTimer.isRunning()) {
+ lastLeaderMessageTimer.reset();
+ }
+
+ lastLeaderMessageTimer.start();
+ }
+
private boolean isLogEntryPresent(long index){
if(context.getReplicatedLog().isInSnapshot(index)) {
return true;
@Override
public RaftActorBehavior handleMessage(ActorRef sender, Object originalMessage) {
- if (originalMessage instanceof ElectionTimeout) {
- if (canStartElection()) {
- LOG.debug("{}: Received ElectionTimeout - switching to Candidate", logName());
- return internalSwitchBehavior(RaftState.Candidate);
- } else {
- setLeaderId(null);
- scheduleElection(electionDuration());
- return this;
- }
+ if (originalMessage instanceof ElectionTimeout || originalMessage instanceof TimeoutNow) {
+ return handleElectionTimeout(originalMessage);
}
final Object message = fromSerializableMessage(originalMessage);
}
if (rpc instanceof InstallSnapshot) {
- InstallSnapshot installSnapshot = (InstallSnapshot) rpc;
- handleInstallSnapshot(sender, installSnapshot);
+ handleInstallSnapshot(sender, (InstallSnapshot) rpc);
+ restartLastLeaderMessageTimer();
scheduleElection(electionDuration());
return this;
}
if (!(rpc instanceof RequestVote) || canGrantVote((RequestVote) rpc)) {
+ restartLastLeaderMessageTimer();
scheduleElection(electionDuration());
}
return super.handleMessage(sender, rpc);
}
+ private RaftActorBehavior handleElectionTimeout(Object message) {
+ // If the message is ElectionTimeout, verify we haven't actually seen a message from the leader
+ // during the election timeout interval. It may that the election timer expired b/c this actor
+ // was busy and messages got delayed, in which case leader messages would be backed up in the
+ // queue but would be processed before the ElectionTimeout message and thus would restart the
+ // lastLeaderMessageTimer.
+ long lastLeaderMessageInterval = lastLeaderMessageTimer.elapsed(TimeUnit.MILLISECONDS);
+ boolean noLeaderMessageReceived = !lastLeaderMessageTimer.isRunning() || lastLeaderMessageInterval >=
+ context.getConfigParams().getElectionTimeOutInterval().toMillis();
+
+ if(canStartElection()) {
+ if(message instanceof TimeoutNow || noLeaderMessageReceived) {
+ LOG.debug("{}: Received {} - switching to Candidate", logName(), message.getClass().getSimpleName());
+ return internalSwitchBehavior(RaftState.Candidate);
+ } else {
+ LOG.debug("{}: Received ElectionTimeout but lastLeaderMessageInterval {} < election timeout",
+ logName(), lastLeaderMessageInterval);
+ scheduleElection(electionDuration());
+ }
+ } else if(message instanceof ElectionTimeout) {
+ if(noLeaderMessageReceived) {
+ setLeaderId(null);
+ }
+
+ scheduleElection(electionDuration());
+ }
+
+ return this;
+ }
+
private void handleInstallSnapshot(final ActorRef sender, InstallSnapshot installSnapshot) {
LOG.debug("{}: handleInstallSnapshot: {}", logName(), installSnapshot);
import org.opendaylight.controller.cluster.raft.RaftActorContext;
import org.opendaylight.controller.cluster.raft.RaftActorLeadershipTransferCohort;
import org.opendaylight.controller.cluster.raft.RaftState;
-import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
+import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
/**
// additional AppendEntries with the latest commit index.
sendAppendEntries(0, false);
- // Now send an ElectionTimeout to the matching follower to immediately start an election.
+ // Now send a TimeoutNow message to the matching follower to immediately start an election.
ActorSelection followerActor = context.getPeerActorSelection(followerId);
- followerActor.tell(ElectionTimeout.INSTANCE, context.getActor());
+ followerActor.tell(TimeoutNow.INSTANCE, context.getActor());
LOG.debug("{}: Leader transfer complete", logName());
import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
import org.opendaylight.controller.cluster.raft.base.messages.InitiateCaptureSnapshot;
+import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
import org.opendaylight.controller.cluster.raft.base.messages.UpdateElectionTerm;
import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader;
import org.opendaylight.controller.cluster.raft.behaviors.Follower;
MessageCollectorActor.expectFirstMatching(node2Collector, RequestVote.class);
- node2RaftActorRef.tell(ElectionTimeout.INSTANCE, ActorRef.noSender());
+ node2RaftActorRef.tell(TimeoutNow.INSTANCE, ActorRef.noSender());
ServerChangeReply reply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), ServerChangeReply.class);
assertEquals("getStatus", ServerChangeStatus.OK, reply.getStatus());
import org.junit.Test;
import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
import org.opendaylight.controller.cluster.raft.RaftState;
-import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
+import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
import org.opendaylight.controller.cluster.raft.messages.RequestVote;
import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
member3Actor.expectMessageClass(RequestVoteReply.class, 1);
member3Actor.expectMessageClass(AppendEntriesReply.class, 2);
- member3ActorRef.tell(ElectionTimeout.INSTANCE, ActorRef.noSender());
+ member3ActorRef.tell(TimeoutNow.INSTANCE, ActorRef.noSender());
member3Actor.waitForExpectedMessages(RequestVoteReply.class);
member3Actor.dropMessagesToBehavior(RequestVote.class);
- member2ActorRef.tell(ElectionTimeout.INSTANCE, ActorRef.noSender());
+ member2ActorRef.tell(TimeoutNow.INSTANCE, ActorRef.noSender());
member1Actor.waitForExpectedMessages(RequestVote.class);
member3Actor.waitForExpectedMessages(RequestVote.class);
import akka.actor.Props;
import akka.testkit.TestActorRef;
import com.google.common.base.Stopwatch;
+import com.google.common.util.concurrent.Uninterruptibles;
import com.google.protobuf.ByteString;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
+import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
MockRaftActorContext actorContext = createActorContext();
follower = new Follower(actorContext);
- MessageCollectorActor.expectFirstMatching(followerActor, ElectionTimeout.class,
+ MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class,
actorContext.getConfigParams().getElectionTimeOutInterval().$times(6).toMillis());
}
@Test
- public void testHandleElectionTimeout(){
- logStart("testHandleElectionTimeout");
+ public void testHandleElectionTimeoutWhenNoLeaderMessageReceived() {
+ logStart("testHandleElectionTimeoutWhenNoLeaderMessageReceived");
- follower = new Follower(createActorContext());
+ MockRaftActorContext context = createActorContext();
+ follower = new Follower(context);
- RaftActorBehavior raftBehavior = follower.handleMessage(followerActor, ElectionTimeout.INSTANCE);
+ Uninterruptibles.sleepUninterruptibly(context.getConfigParams().getElectionTimeOutInterval().toMillis(),
+ TimeUnit.MILLISECONDS);
+ RaftActorBehavior raftBehavior = follower.handleMessage(leaderActor, ElectionTimeout.INSTANCE);
assertTrue(raftBehavior instanceof Candidate);
}
@Test
- public void testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNull(){
+ public void testHandleElectionTimeoutWhenLeaderMessageReceived() {
+ logStart("testHandleElectionTimeoutWhenLeaderMessageReceived");
+
+ MockRaftActorContext context = createActorContext();
+ ((DefaultConfigParamsImpl) context.getConfigParams()).
+ setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
+ ((DefaultConfigParamsImpl) context.getConfigParams()).setElectionTimeoutFactor(4);
+
+ follower = new Follower(context);
+ context.setCurrentBehavior(follower);
+
+ Uninterruptibles.sleepUninterruptibly(context.getConfigParams().
+ getElectionTimeOutInterval().toMillis() - 100, TimeUnit.MILLISECONDS);
+ follower.handleMessage(leaderActor, new AppendEntries(1, "leader", -1, -1, Collections.emptyList(),
+ -1, -1, (short) 1));
+
+ Uninterruptibles.sleepUninterruptibly(130, TimeUnit.MILLISECONDS);
+ RaftActorBehavior raftBehavior = follower.handleMessage(leaderActor, ElectionTimeout.INSTANCE);
+ assertTrue(raftBehavior instanceof Follower);
+
+ Uninterruptibles.sleepUninterruptibly(context.getConfigParams().
+ getElectionTimeOutInterval().toMillis() - 150, TimeUnit.MILLISECONDS);
+ follower.handleMessage(leaderActor, new AppendEntries(1, "leader", -1, -1, Collections.emptyList(),
+ -1, -1, (short) 1));
+
+ Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
+ raftBehavior = follower.handleMessage(leaderActor, ElectionTimeout.INSTANCE);
+ assertTrue(raftBehavior instanceof Follower);
+ }
+
+ @Test
+ public void testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNull() {
logStart("testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNull");
MockRaftActorContext context = createActorContext();
follower = createBehavior(context);
- ElectionTimeout electionTimeout = MessageCollectorActor.expectFirstMatching(followerActor,
- ElectionTimeout.class);
+ TimeoutNow timeoutNow = MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class);
long elapsed = stopwatch.elapsed(TimeUnit.MILLISECONDS);
assertTrue(elapsed < context.getConfigParams().getElectionTimeOutInterval().toMillis());
- RaftActorBehavior newBehavior = follower.handleMessage(ActorRef.noSender(), electionTimeout);
+ RaftActorBehavior newBehavior = follower.handleMessage(ActorRef.noSender(), timeoutNow);
assertTrue("Expected Candidate", newBehavior instanceof Candidate);
}
follower = createBehavior(context);
- ElectionTimeout electionTimeout = MessageCollectorActor.expectFirstMatching(followerActor,
- ElectionTimeout.class);
- RaftActorBehavior newBehavior = follower.handleMessage(ActorRef.noSender(), electionTimeout);
+ TimeoutNow timeoutNow = MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class);
+ RaftActorBehavior newBehavior = follower.handleMessage(ActorRef.noSender(), timeoutNow);
assertSame("handleMessage result", follower, newBehavior);
}
import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
+import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader.FollowerToSnapshot;
import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
MessageCollectorActor.expectMatching(followerActor, AppendEntries.class, 2);
// Leader should force an election timeout
- MessageCollectorActor.expectFirstMatching(followerActor, ElectionTimeout.class);
+ MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class);
verify(mockTransferCohort).transferComplete();
}
MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
// Leader should force an election timeout
- MessageCollectorActor.expectFirstMatching(followerActor, ElectionTimeout.class);
+ MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class);
verify(mockTransferCohort).transferComplete();
}
leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 1, 1, (short)0));
// Leader should force an election timeout
- MessageCollectorActor.expectFirstMatching(followerActor, ElectionTimeout.class);
+ MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class);
verify(mockTransferCohort).transferComplete();
}
import org.opendaylight.controller.cluster.raft.MockRaftActorContext.SimpleReplicatedLog;
import org.opendaylight.controller.cluster.raft.RaftState;
import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
+import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
import org.opendaylight.controller.cluster.raft.messages.RequestVote;
import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
member3Actor.expectMessageClass(RequestVote.class, 1);
member3Actor.expectBehaviorStateChange();
- member1ActorRef.tell(ElectionTimeout.INSTANCE, ActorRef.noSender());
+ member1ActorRef.tell(TimeoutNow.INSTANCE, ActorRef.noSender());
member2Actor.waitForExpectedMessages(RequestVote.class);
member3Actor.waitForExpectedMessages(RequestVote.class);
import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
import org.opendaylight.controller.cluster.raft.RaftState;
import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
+import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
import org.opendaylight.controller.cluster.raft.messages.RequestVote;
member3Actor.expectMessageClass(RequestVoteReply.class, 1);
member3Actor.expectMessageClass(AppendEntriesReply.class, 1);
- member3ActorRef.tell(ElectionTimeout.INSTANCE, ActorRef.noSender());
+ member3ActorRef.tell(TimeoutNow.INSTANCE, ActorRef.noSender());
member1Actor.waitForExpectedMessages(RequestVote.class);
member2Actor.waitForExpectedMessages(RequestVote.class);
member3Actor.dropMessagesToBehavior(RequestVote.class);
- member2ActorRef.tell(ElectionTimeout.INSTANCE, ActorRef.noSender());
+ member2ActorRef.tell(TimeoutNow.INSTANCE, ActorRef.noSender());
member1Actor.waitForExpectedMessages(RequestVote.class);
member3Actor.waitForExpectedMessages(RequestVote.class);
import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
+import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState;
"akka://test/user/" + followerShardID.toString())).schemaContext(SCHEMA_CONTEXT).props().
withDispatcher(Dispatchers.DefaultDispatcherId()), leaderShardID.toString());
- leaderShard.tell(ElectionTimeout.INSTANCE, ActorRef.noSender());
+ leaderShard.tell(TimeoutNow.INSTANCE, ActorRef.noSender());
String leaderPath = waitUntilLeader(followerShard);
assertEquals("Shard leader path", leaderShard.path().toString(), leaderPath);
"akka://test/user/" + followerShardID.toString())).schemaContext(SCHEMA_CONTEXT).props().
withDispatcher(Dispatchers.DefaultDispatcherId()), leaderShardID.toString());
- leaderShard.tell(ElectionTimeout.INSTANCE, ActorRef.noSender());
+ leaderShard.tell(TimeoutNow.INSTANCE, ActorRef.noSender());
String leaderPath = waitUntilLeader(followerShard);
assertEquals("Shard leader path", leaderShard.path().toString(), leaderPath);
import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
import org.opendaylight.controller.cluster.raft.TestActorFactory;
import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
+import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
import org.opendaylight.controller.cluster.raft.messages.RequestVote;
ImmutableMap.<String, String>builder().put(peerId1.toString(), peer1.path().toString()).
put(peerId2.toString(), peer2.path().toString()).build(), LOCAL_MEMBER_NAME, EntityOwnerSelectionStrategyConfig.newBuilder().build()).
withDispatcher(Dispatchers.DefaultDispatcherId()), leaderId.toString());
- leader.tell(ElectionTimeout.INSTANCE, leader);
+ leader.tell(TimeoutNow.INSTANCE, leader);
ShardTestKit.waitUntilLeader(leader);
leader.tell(PoisonPill.getInstance(), ActorRef.noSender());
peer2.tell(new PeerDown(leaderId.getMemberName(), leaderId.toString()), ActorRef.noSender());
- peer2.tell(ElectionTimeout.INSTANCE, peer2);
+ peer2.tell(TimeoutNow.INSTANCE, peer2);
ShardTestKit.waitUntilLeader(peer2);
TestActorRef<EntityOwnershipShard> leader = actorFactory.createTestActor(newShardProps(leaderId,
ImmutableMap.<String, String>builder().put(localId.toString(), shard.path().toString()).build(),
LOCAL_MEMBER_NAME, EntityOwnerSelectionStrategyConfig.newBuilder().build()).withDispatcher(Dispatchers.DefaultDispatcherId()), leaderId.toString());
- leader.tell(ElectionTimeout.INSTANCE, leader);
+ leader.tell(TimeoutNow.INSTANCE, leader);
ShardTestKit.waitUntilLeader(leader);