* dynamic server configurations are available, otherwise returns null
*/
@Nullable ServerConfigurationPayload getPeerServerInfo();
+
+ /**
+ * @return true if this RaftActor is a voting member of the cluster, false otherwise.
+ */
+ boolean isVotingMember();
}
private short payloadVersion;
+ private boolean votingMember = true;
+
public RaftActorContextImpl(ActorRef actor, ActorContext context, String id,
ElectionTerm termInformation, long commitIndex, long lastApplied, Map<String, String> peerAddresses,
ConfigParams configParams, DataPersistenceProvider persistenceProvider, Logger logger) {
@Override
public void updatePeerIds(ServerConfigurationPayload serverConfig){
-
+ votingMember = true;
Set<String> currentPeers = new HashSet<>(this.getPeerIds());
for(ServerInfo server: serverConfig.getServerConfig()) {
- if(!getId().equals(server.getId())) {
+ if(getId().equals(server.getId())) {
+ if(!server.isVoting()) {
+ votingMember = false;
+ }
+ } else {
VotingState votingState = server.isVoting() ? VotingState.VOTING: VotingState.NON_VOTING;
if(!currentPeers.contains(server.getId())) {
this.addToPeers(server.getId(), null, votingState);
newConfig.add(new ServerInfo(getId(), true));
return (new ServerConfigurationPayload(newConfig));
}
+
+ @Override
+ public boolean isVotingMember() {
+ return votingMember;
+ }
}
}
}
+ protected boolean canStartElection() {
+ return context.getRaftPolicy().automaticElectionsEnabled() && context.isVotingMember();
+ }
+
/**
* schedule a new election
*
protected void scheduleElection(FiniteDuration interval) {
stopElection();
- // Schedule an election. When the scheduler triggers an ElectionTimeout
- // message is sent to itself
- electionCancel =
- context.getActorSystem().scheduler().scheduleOnce(interval,
- context.getActor(), ELECTION_TIMEOUT,
- context.getActorSystem().dispatcher(), context.getActor());
+ if(canStartElection()) {
+ // Schedule an election. When the scheduler triggers an ElectionTimeout message is sent to itself
+ electionCancel = context.getActorSystem().scheduler().scheduleOnce(interval, context.getActor(),
+ ELECTION_TIMEOUT,context.getActorSystem().dispatcher(), context.getActor());
+ }
}
/**
initialSyncStatusTracker = new SyncStatusTracker(context.getActor(), getId(), SYNC_THRESHOLD);
- if(context.getRaftPolicy().automaticElectionsEnabled()) {
+ if(canStartElection()) {
if (context.getPeerIds().isEmpty() && getLeaderId() == null) {
actor().tell(ELECTION_TIMEOUT, actor());
} else {
}
if (message instanceof ElectionTimeout) {
- LOG.debug("{}: Received ElectionTimeout - switching to Candidate", logName());
- return internalSwitchBehavior(RaftState.Candidate);
+ if(canStartElection()) {
+ LOG.debug("{}: Received ElectionTimeout - switching to Candidate", logName());
+ return internalSwitchBehavior(RaftState.Candidate);
+ } else {
+ return this;
+ }
} else if (message instanceof InstallSnapshot) {
InstallSnapshot installSnapshot = (InstallSnapshot) message;
import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
import org.opendaylight.controller.cluster.notifications.RoleChanged;
import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload;
+import org.opendaylight.controller.cluster.raft.ServerConfigurationPayload.ServerInfo;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
TEST_LOG.info("testRestoreFromSnapshotWithRecoveredData ending");
}
+
+ @Test
+ public void testNonVotingOnRecovery() throws Exception {
+ TEST_LOG.info("testNonVotingOnRecovery starting");
+
+ DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
+ config.setElectionTimeoutFactor(1);
+ config.setHeartBeatInterval(FiniteDuration.create(1, TimeUnit.MILLISECONDS));
+
+ String persistenceId = factory.generateActorId("test-actor-");
+ InMemoryJournal.addEntry(persistenceId, 1, new MockRaftActorContext.MockReplicatedLogEntry(1, 0,
+ new ServerConfigurationPayload(Arrays.asList(new ServerInfo(persistenceId, false)))));
+
+ TestActorRef<MockRaftActor> raftActorRef = factory.createTestActor(MockRaftActor.builder().id(persistenceId).
+ config(config).props().withDispatcher(Dispatchers.DefaultDispatcherId()), persistenceId);
+ MockRaftActor mockRaftActor = raftActorRef.underlyingActor();
+
+ mockRaftActor.waitForInitializeBehaviorComplete();
+
+ // Sleep a bit and verify it didn't get an election timeout and schedule an election.
+
+ Uninterruptibles.sleepUninterruptibly(400, TimeUnit.MILLISECONDS);
+ assertEquals("getRaftState", RaftState.Follower, mockRaftActor.getRaftState());
+
+ TEST_LOG.info("testNonVotingOnRecovery ending");
+ }
}