import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Objects;
import com.google.common.base.Optional;
+import com.google.common.base.Supplier;
import com.google.common.collect.Lists;
import java.io.Serializable;
import java.util.Collection;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
+import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior;
import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader;
import org.opendaylight.controller.cluster.raft.behaviors.DelegatingRaftActorBehavior;
import org.opendaylight.controller.cluster.raft.behaviors.Follower;
private final BehaviorStateHolder reusableBehaviorStateHolder = new BehaviorStateHolder();
+ private final SwitchBehaviorSupplier reusableSwitchBehaviorSupplier = new SwitchBehaviorSupplier();
+
public RaftActor(String id, Map<String, String> peerAddresses,
Optional<ConfigParams> configParams, short payloadVersion) {
}
@Override
- public void handleCommand(Object message) {
+ public void handleCommand(final Object message) {
if (message instanceof ApplyState){
ApplyState applyState = (ApplyState) message;
);
} else if(message instanceof GetOnDemandRaftState) {
onGetOnDemandRaftStats();
+ } else if(message instanceof SwitchBehavior){
+ switchBehavior(((SwitchBehavior) message));
} else if(!snapshotSupport.handleSnapshotMessage(message)) {
- reusableBehaviorStateHolder.init(getCurrentBehavior());
-
- setCurrentBehavior(currentBehavior.handleMessage(getSender(), message));
+ switchBehavior(reusableSwitchBehaviorSupplier.handleMessage(getSender(), message));
+ }
+ }
- handleBehaviorChange(reusableBehaviorStateHolder, getCurrentBehavior());
+ private void switchBehavior(SwitchBehavior message) {
+ if(!getRaftActorContext().getRaftPolicy().automaticElectionsEnabled()) {
+ RaftState newState = message.getNewState();
+ if( newState == RaftState.Leader || newState == RaftState.Follower) {
+ switchBehavior(reusableSwitchBehaviorSupplier.handleMessage(getSender(), message));
+ getRaftActorContext().getTermInformation().updateAndPersist(message.getNewTerm(), "");
+ } else {
+ LOG.warn("Switching to behavior : {} - not supported", newState);
+ }
}
}
+ private void switchBehavior(Supplier<RaftActorBehavior> supplier){
+ reusableBehaviorStateHolder.init(getCurrentBehavior());
+
+ setCurrentBehavior(supplier.get());
+
+ handleBehaviorChange(reusableBehaviorStateHolder, getCurrentBehavior());
+ }
+
protected RaftActorSnapshotMessageSupport newRaftActorSnapshotMessageSupport() {
return new RaftActorSnapshotMessageSupport(context, currentBehavior,
getRaftActorSnapshotCohort());
return leaderPayloadVersion;
}
}
+
+ private class SwitchBehaviorSupplier implements Supplier<RaftActorBehavior> {
+ private Object message;
+ private ActorRef sender;
+
+ public SwitchBehaviorSupplier handleMessage(ActorRef sender, Object message){
+ this.sender = sender;
+ this.message = message;
+ return this;
+ }
+
+ @Override
+ public RaftActorBehavior get() {
+ if(this.message instanceof SwitchBehavior){
+ return ((SwitchBehavior) message).getNewState().createBehavior(getRaftActorContext());
+ }
+ return currentBehavior.handleMessage(sender, message);
+ }
+ }
}
package org.opendaylight.controller.cluster.raft;
+import org.opendaylight.controller.cluster.raft.behaviors.Candidate;
+import org.opendaylight.controller.cluster.raft.behaviors.Follower;
+import org.opendaylight.controller.cluster.raft.behaviors.IsolatedLeader;
+import org.opendaylight.controller.cluster.raft.behaviors.Leader;
+import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
+
public enum RaftState {
- Candidate,
- Follower,
- Leader,
- IsolatedLeader;
+ Candidate {
+ @Override
+ public RaftActorBehavior createBehavior(RaftActorContext context) {
+ return new Candidate(context);
+ }
+ },
+ Follower {
+ @Override
+ public RaftActorBehavior createBehavior(RaftActorContext context) {
+ return new Follower(context);
+ }
+ },
+ Leader {
+ @Override
+ public RaftActorBehavior createBehavior(RaftActorContext context) {
+ return new Leader(context);
+ }
+ },
+ IsolatedLeader {
+ @Override
+ public RaftActorBehavior createBehavior(RaftActorContext context) {
+ return new IsolatedLeader(context);
+ }
+ };
+
+ public abstract RaftActorBehavior createBehavior(RaftActorContext context);
}
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.raft.base.messages;
+
+import org.opendaylight.controller.cluster.raft.RaftState;
+
+public class SwitchBehavior {
+ private final RaftState newState;
+ private final long newTerm;
+
+ public SwitchBehavior(RaftState newState, long newTerm) {
+ this.newState = newState;
+ this.newTerm = newTerm;
+ }
+
+ public RaftState getNewState() {
+ return newState;
+ }
+
+ public long getNewTerm() {
+ return newTerm;
+ }
+}
this.leaderPayloadVersion = leaderPayloadVersion;
}
- protected RaftActorBehavior switchBehavior(RaftActorBehavior behavior) {
+ @Override
+ public RaftActorBehavior switchBehavior(RaftActorBehavior behavior) {
LOG.info("{} :- Switching from behavior {} to {}", logName(), this.state(), behavior.state());
try {
close();
public short getLeaderPayloadVersion() {
return delegate.getLeaderPayloadVersion();
}
+
+ @Override
+ public RaftActorBehavior switchBehavior(RaftActorBehavior behavior) {
+ return delegate.switchBehavior(behavior);
+ }
}
* @return the leader's payload data version.
*/
short getLeaderPayloadVersion();
+
+ /**
+ * switchBehavior makes sure that the current behavior is shutdown before it switches to the new
+ * behavior
+ *
+ * @param behavior The new behavior to switch to
+ * @return The new behavior
+ */
+ RaftActorBehavior switchBehavior(RaftActorBehavior behavior);
}
import akka.actor.Props;
import akka.actor.Terminated;
import akka.japi.Procedure;
+import akka.persistence.RecoveryCompleted;
import akka.persistence.SaveSnapshotFailure;
import akka.persistence.SaveSnapshotSuccess;
import akka.persistence.SnapshotMetadata;
import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
import org.opendaylight.controller.cluster.raft.base.messages.DeleteEntries;
import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
+import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior;
import org.opendaylight.controller.cluster.raft.base.messages.UpdateElectionTerm;
import org.opendaylight.controller.cluster.raft.behaviors.Follower;
import org.opendaylight.controller.cluster.raft.behaviors.Leader;
(ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(1, 7,
new MockRaftActorContext.MockPayload("foo-7"))
);
- followerActor.onReceiveCommand(new AppendEntries(1, leaderId, 6, 1, entries, 6, 6, (short)0));
+ followerActor.onReceiveCommand(new AppendEntries(1, leaderId, 6, 1, entries, 6, 6, (short) 0));
assertEquals(8, followerActor.getReplicatedLog().size());
assertEquals(RaftState.Follower, followerActor.getCurrentBehavior().state());
new MockRaftActorContext.MockPayload("foo-7"))
);
// send an additional entry 8 with leaderCommit = 7
- followerActor.onReceiveCommand(new AppendEntries(1, leaderId, 7, 1, entries, 7, 7, (short)0));
+ followerActor.onReceiveCommand(new AppendEntries(1, leaderId, 7, 1, entries, 7, 7, (short) 0));
// 7 and 8, as lastapplied is 7
assertEquals(2, followerActor.getReplicatedLog().size());
assertEquals(5, leaderActor.getReplicatedLog().size());
assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
- leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 9, 1, (short)0));
+ leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 9, 1, (short) 0));
assertEquals(5, leaderActor.getReplicatedLog().size());
assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
assertEquals("Real snapshot didn't clear the log till replicatedToAllIndex", 0, leaderActor.getReplicatedLog().size());
//reply from a slow follower after should not raise errors
- leaderActor.onReceiveCommand(new AppendEntriesReply(follower2Id, 1, true, 5, 1, (short)0));
+ leaderActor.onReceiveCommand(new AppendEntriesReply(follower2Id, 1, true, 5, 1, (short) 0));
assertEquals(0, leaderActor.getReplicatedLog().size());
}
};
}};
}
+ @Test
+ public void testSwitchBehavior(){
+ String persistenceId = factory.generateActorId("leader-");
+ DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
+ config.setCustomRaftPolicyImplementationClass("org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy");
+ config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
+ config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
+ config.setSnapshotBatchCount(5);
+
+ DataPersistenceProvider dataPersistenceProvider = new NonPersistentDataProvider();
+
+ Map<String, String> peerAddresses = ImmutableMap.<String, String>builder().build();
+
+ TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
+ MockRaftActor.props(persistenceId, peerAddresses,
+ Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
+
+ MockRaftActor leaderActor = mockActorRef.underlyingActor();
+
+ leaderActor.handleRecover(RecoveryCompleted.getInstance());
+
+ leaderActor.handleCommand(new SwitchBehavior(RaftState.Follower, 100));
+
+ assertEquals(100, leaderActor.getRaftActorContext().getTermInformation().getCurrentTerm());
+ assertEquals(RaftState.Follower, leaderActor.getCurrentBehavior().state());
+
+ leaderActor.handleCommand(new SwitchBehavior(RaftState.Leader, 110));
+
+ assertEquals(110, leaderActor.getRaftActorContext().getTermInformation().getCurrentTerm());
+ assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
+
+ leaderActor.handleCommand(new SwitchBehavior(RaftState.Candidate, 125));
+
+ assertEquals(110, leaderActor.getRaftActorContext().getTermInformation().getCurrentTerm());
+ assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
+
+ leaderActor.handleCommand(new SwitchBehavior(RaftState.IsolatedLeader, 125));
+
+ assertEquals(110, leaderActor.getRaftActorContext().getTermInformation().getCurrentTerm());
+ assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
+
+
+ }
+
public static ByteString fromObject(Object snapshot) throws Exception {
ByteArrayOutputStream b = null;
ObjectOutputStream o = null;
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.raft.policy;
+
+/**
+ * DisableElectionsRaftPolicy can be useful for testing purposes where we may want to disable
+ * elections so that the Leaders for a RaftActor can be set externally. Modification to state would
+ * still require consensus.
+ */
+public class DisableElectionsRaftPolicy implements RaftPolicy {
+ @Override
+ public boolean automaticElectionsEnabled() {
+ return false;
+ }
+
+ @Override
+ public boolean applyModificationToStateBeforeConsensus() {
+ return false;
+ }
+}