*/
public interface FollowerLogInformation {
+ enum FollowerState {
+ VOTING,
+ NON_VOTING,
+ VOTING_NOT_INITIALIZED
+ };
+
/**
* Increment the value of the nextIndex
*
* Sets the payload data version of the follower.
*/
void setPayloadVersion(short payloadVersion);
+
+ /**
+ * Sets the state of the follower.
+ */
+ void setFollowerState(FollowerState state);
+
+ /**
+ * @return the state of the follower.
+ */
+ FollowerState getFollowerState();
+
+ /**
+ * @return true if the follower is in a state where it can participate in leader elections and
+ * commitment consensus.
+ */
+ boolean canParticipateInConsensus();
}
private short payloadVersion = -1;
+ private FollowerState state = FollowerState.VOTING;
+
public FollowerLogInformationImpl(String id, long matchIndex, RaftActorContext context) {
this.id = id;
this.nextIndex = context.getCommitIndex();
@Override
public boolean isFollowerActive() {
+ if(state == FollowerState.VOTING_NOT_INITIALIZED) {
+ return false;
+ }
+
long elapsed = stopwatch.elapsed(TimeUnit.MILLISECONDS);
return (stopwatch.isRunning()) &&
(elapsed <= context.getConfigParams().getElectionTimeOutInterval().toMillis());
@Override
public boolean okToReplicate() {
+ if(state == FollowerState.VOTING_NOT_INITIALIZED) {
+ return false;
+ }
+
// Return false if we are trying to send duplicate data before the heartbeat interval
if(getNextIndex() == lastReplicatedIndex){
if(lastReplicatedStopwatch.elapsed(TimeUnit.MILLISECONDS) < context.getConfigParams()
lastReplicatedStopwatch.start();
}
- @Override
- public String toString() {
- StringBuilder builder = new StringBuilder();
- builder.append("FollowerLogInformationImpl [id=").append(id).append(", nextIndex=").append(nextIndex)
- .append(", matchIndex=").append(matchIndex).append(", stopwatch=")
- .append(stopwatch.elapsed(TimeUnit.MILLISECONDS))
- .append(", followerTimeoutMillis=")
- .append(context.getConfigParams().getElectionTimeOutInterval().toMillis()).append("]");
- return builder.toString();
- }
-
@Override
public short getPayloadVersion() {
return payloadVersion;
public void setPayloadVersion(short payloadVersion) {
this.payloadVersion = payloadVersion;
}
+
+ @Override
+ public boolean canParticipateInConsensus() {
+ return state == FollowerState.VOTING;
+ }
+
+ @Override
+ public void setFollowerState(FollowerState state) {
+ this.state = state;
+ }
+
+ @Override
+ public FollowerState getFollowerState() {
+ return state;
+ }
+
+ @Override
+ public String toString() {
+ return "FollowerLogInformationImpl [id=" + id + ", nextIndex=" + nextIndex + ", matchIndex=" + matchIndex
+ + ", lastReplicatedIndex=" + lastReplicatedIndex + ", state=" + state + ", stopwatch="
+ + stopwatch.elapsed(TimeUnit.MILLISECONDS) + ", followerTimeoutMillis="
+ + context.getConfigParams().getElectionTimeOutInterval().toMillis() + "]";
+ }
}
private final SwitchBehaviorSupplier reusableSwitchBehaviorSupplier = new SwitchBehaviorSupplier();
+ private RaftActorServerConfigurationSupport serverConfigurationSupport;
+
public RaftActor(String id, Map<String, String> peerAddresses,
Optional<ConfigParams> configParams, short payloadVersion) {
super.preStart();
snapshotSupport = newRaftActorSnapshotMessageSupport();
+ serverConfigurationSupport = new RaftActorServerConfigurationSupport(getRaftActorContext());
}
@Override
captureSnapshot();
} else if(message instanceof SwitchBehavior){
switchBehavior(((SwitchBehavior) message));
- } else if(!snapshotSupport.handleSnapshotMessage(message)) {
+ } else if(!snapshotSupport.handleSnapshotMessage(message) &&
+ !serverConfigurationSupport.handleMessage(message, this, getSender())) {
switchBehavior(reusableSwitchBehaviorSupplier.handleMessage(getSender(), message));
}
}
import akka.actor.UntypedActorContext;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Supplier;
+import com.google.common.collect.Maps;
import java.util.Map;
import org.opendaylight.controller.cluster.DataPersistenceProvider;
import org.opendaylight.controller.cluster.raft.policy.RaftPolicy;
this.termInformation = termInformation;
this.commitIndex = commitIndex;
this.lastApplied = lastApplied;
- this.peerAddresses = peerAddresses;
+ this.peerAddresses = Maps.newHashMap(peerAddresses);
this.configParams = configParams;
this.persistenceProvider = persistenceProvider;
this.LOG = logger;
--- /dev/null
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSelection;
+import org.opendaylight.controller.cluster.raft.FollowerLogInformation.FollowerState;
+import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader;
+import org.opendaylight.controller.cluster.raft.messages.AddServer;
+import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
+import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Handles server configuration related messages for a RaftActor.
+ *
+ * @author Thomas Pantelis
+ */
+class RaftActorServerConfigurationSupport {
+ private static final Logger LOG = LoggerFactory.getLogger(RaftActorServerConfigurationSupport.class);
+
+ private final RaftActorContext context;
+
+ RaftActorServerConfigurationSupport(RaftActorContext context) {
+ this.context = context;
+ }
+
+ boolean handleMessage(Object message, RaftActor raftActor, ActorRef sender) {
+ if(message instanceof AddServer) {
+ onAddServer((AddServer)message, raftActor, sender);
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ private void onAddServer(AddServer addServer, RaftActor raftActor, ActorRef sender) {
+ LOG.debug("onAddServer: {}", addServer);
+
+ if(noLeaderOrForwardedToLeader(addServer, raftActor, sender)) {
+ return;
+ }
+
+ // TODO - check if a server config is in progress. If so, cache this AddServer request to be processed
+ // after the current one is done.
+
+ context.addToPeers(addServer.getNewServerId(), addServer.getNewServerAddress());
+
+ AbstractLeader leader = (AbstractLeader) raftActor.getCurrentBehavior();
+ FollowerState initialState = addServer.isVotingMember() ? FollowerState.VOTING_NOT_INITIALIZED :
+ FollowerState.NON_VOTING;
+ leader.addFollower(addServer.getNewServerId(), initialState);
+
+ // TODO
+ // if initialState == FollowerState.VOTING_NOT_INITIALIZED
+ // Initiate snapshot via leader.initiateCaptureSnapshot(addServer.getNewServerId())
+ // Start a timer to abort the operation after a period of time (maybe 2 times election timeout)
+ // Set local instance state and wait for message from the AbstractLeader when install snapshot is done and return now
+ // When install snapshot message is received, go to step 1
+ // else
+ // go to step 2
+ //
+ // 1) tell AbstractLeader mark the follower as VOTING and recalculate minReplicationCount and
+ // minIsolatedLeaderPeerCount
+ // 2) persist and replicate ServerConfigurationPayload via
+ // raftActor.persistData(sender, uuid, newServerConfigurationPayload)
+ // 3) Wait for commit complete via ApplyState message in RaftActor or time it out. In RaftActor,
+ // on ApplyState, check if ReplicatedLogEntry payload is ServerConfigurationPayload and call
+ // this class.
+ //
+
+ // TODO - temporary
+ sender.tell(new AddServerReply(ServerChangeStatus.OK, raftActor.getLeaderId()), raftActor.self());
+ }
+
+ private boolean noLeaderOrForwardedToLeader(Object message, RaftActor raftActor, ActorRef sender) {
+ if (raftActor.isLeader()) {
+ return false;
+ }
+
+ ActorSelection leader = raftActor.getLeader();
+ if (leader != null) {
+ LOG.debug("Not leader - forwarding to leader {}", leader);
+ leader.forward(message, raftActor.getContext());
+ } else {
+ LOG.debug("No leader - returning NO_LEADER AddServerReply");
+ sender.tell(new AddServerReply(ServerChangeStatus.NO_LEADER, null), raftActor.self());
+ }
+
+ return true;
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft;
+
+import com.google.protobuf.GeneratedMessage.GeneratedExtension;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
+import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Payload data for server configuration log entries.
+ *
+ * @author Thomas Pantelis
+ */
+public class ServerConfigurationPayload extends Payload implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private static final Logger LOG = LoggerFactory.getLogger(ServerConfigurationPayload.class);
+
+ private final List<String> newServerConfig;
+ private final List<String> oldServerConfig;
+ private transient int serializedSize = -1;
+
+ public ServerConfigurationPayload(List<String> newServerConfig, List<String> oldServerConfig) {
+ this.newServerConfig = newServerConfig;
+ this.oldServerConfig = oldServerConfig;
+ }
+
+ public List<String> getNewServerConfig() {
+ return newServerConfig;
+ }
+
+
+ public List<String> getOldServerConfig() {
+ return oldServerConfig;
+ }
+
+ @Override
+ public int size() {
+ if(serializedSize < 0) {
+ try {
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ ObjectOutputStream out = new ObjectOutputStream(bos);
+ out.writeObject(newServerConfig);
+ out.writeObject(oldServerConfig);
+ out.close();
+
+ serializedSize = bos.toByteArray().length;
+ } catch (IOException e) {
+ serializedSize = 0;
+ LOG.error("Error serializing", e);
+ }
+ }
+
+ return serializedSize;
+ }
+
+ @Override
+ @Deprecated
+ @SuppressWarnings("rawtypes")
+ public <T> Map<GeneratedExtension, T> encode() {
+ return null;
+ }
+
+ @Override
+ @Deprecated
+ public Payload decode(AppendEntriesMessages.AppendEntries.ReplicatedLogEntry.Payload payload) {
+ return null;
+ }
+}
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableMap.Builder;
import com.google.protobuf.ByteString;
import java.io.IOException;
import java.util.Collection;
import org.opendaylight.controller.cluster.raft.ClientRequestTracker;
import org.opendaylight.controller.cluster.raft.ClientRequestTrackerImpl;
import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
+import org.opendaylight.controller.cluster.raft.FollowerLogInformation.FollowerState;
import org.opendaylight.controller.cluster.raft.FollowerLogInformationImpl;
import org.opendaylight.controller.cluster.raft.RaftActorContext;
import org.opendaylight.controller.cluster.raft.RaftState;
// This would be passed as the hash code of the last chunk when sending the first chunk
public static final int INITIAL_LAST_CHUNK_HASH_CODE = -1;
- private final Map<String, FollowerLogInformation> followerToLog;
+ private final Map<String, FollowerLogInformation> followerToLog = new HashMap<>();
private final Map<String, FollowerToSnapshot> mapFollowerToSnapshot = new HashMap<>();
private Cancellable heartbeatSchedule = null;
setLeaderPayloadVersion(context.getPayloadVersion());
- final Builder<String, FollowerLogInformation> ftlBuilder = ImmutableMap.builder();
for (String followerId : context.getPeerAddresses().keySet()) {
FollowerLogInformation followerLogInformation =
new FollowerLogInformationImpl(followerId, -1, context);
- ftlBuilder.put(followerId, followerLogInformation);
+ followerToLog.put(followerId, followerLogInformation);
}
- followerToLog = ftlBuilder.build();
leaderId = context.getId();
return followerToLog.keySet();
}
+ public void addFollower(String followerId, FollowerState followerState) {
+ FollowerLogInformation followerLogInformation = new FollowerLogInformationImpl(followerId, -1, context);
+ followerLogInformation.setFollowerState(followerState);
+ followerToLog.put(followerId, followerLogInformation);
+ }
+
@VisibleForTesting
void setSnapshot(@Nullable Snapshot snapshot) {
if(snapshot != null) {
mapFollowerToSnapshot.remove(followerId);
LOG.debug("{}: follower: {}, matchIndex set to {}, nextIndex set to {}",
- logName(), followerId, followerLogInformation.getMatchIndex(),
- followerLogInformation.getNextIndex());
+ logName(), followerId, followerLogInformation.getMatchIndex(),
+ followerLogInformation.getNextIndex());
if (mapFollowerToSnapshot.isEmpty()) {
// once there are no pending followers receiving snapshots
* then send the existing snapshot in chunks to the follower.
* @param followerId
*/
- private void initiateCaptureSnapshot(String followerId) {
+ public void initiateCaptureSnapshot(String followerId) {
if (snapshot.isPresent()) {
// if a snapshot is present in the memory, most likely another install is in progress
// no need to capture snapshot.
LOG.debug("{}: sendInstallSnapshot", logName());
for (Entry<String, FollowerLogInformation> e : followerToLog.entrySet()) {
ActorSelection followerActor = context.getPeerActorSelection(e.getKey());
+ FollowerLogInformation followerLogInfo = e.getValue();
if (followerActor != null) {
long nextIndex = e.getValue().getNextIndex();
- if (canInstallSnapshot(nextIndex)) {
+ if (followerLogInfo.getFollowerState() == FollowerState.VOTING_NOT_INITIALIZED ||
+ canInstallSnapshot(nextIndex)) {
sendSnapshotChunk(followerActor, e.getKey());
}
}
import com.google.common.util.concurrent.Uninterruptibles;
import java.util.concurrent.TimeUnit;
import org.junit.Test;
+import org.opendaylight.controller.cluster.raft.FollowerLogInformation.FollowerState;
import scala.concurrent.duration.FiniteDuration;
public class FollowerLogInformationImplTest {
context.setConfigParams(configParams);
FollowerLogInformation followerLogInformation =
- new FollowerLogInformationImpl("follower1", 9, context);
+ new FollowerLogInformationImpl("follower1", 9, context);
assertFalse("Follower should be termed inactive before stopwatch starts",
- followerLogInformation.isFollowerActive());
+ followerLogInformation.isFollowerActive());
followerLogInformation.markFollowerActive();
if (sleepWithElaspsedTimeReturned(200) > 200) {
- return;
+ return;
}
assertTrue("Follower should be active", followerLogInformation.isFollowerActive());
return;
}
assertFalse("Follower should be inactive after time lapsed",
- followerLogInformation.isFollowerActive());
+ followerLogInformation.isFollowerActive());
followerLogInformation.markFollowerActive();
assertTrue("Follower should be active from inactive",
- followerLogInformation.isFollowerActive());
+ followerLogInformation.isFollowerActive());
}
// we cannot rely comfortably that the sleep will indeed sleep for the desired time
@Test
public void testOkToReplicate(){
MockRaftActorContext context = new MockRaftActorContext();
- context.setCommitIndex(9);
FollowerLogInformation followerLogInformation =
new FollowerLogInformationImpl(
"follower1", 10, context);
followerLogInformation.incrNextIndex();
assertTrue(followerLogInformation.okToReplicate());
}
+
+ @Test
+ public void testVotingNotInitializedState() {
+ MockRaftActorContext context = new MockRaftActorContext();
+ FollowerLogInformation followerLogInformation = new FollowerLogInformationImpl("follower1", -1, context);
+
+ followerLogInformation.setFollowerState(FollowerState.VOTING_NOT_INITIALIZED);
+ assertFalse(followerLogInformation.okToReplicate());
+ assertFalse(followerLogInformation.canParticipateInConsensus());
+
+ followerLogInformation.markFollowerActive();
+ assertFalse(followerLogInformation.isFollowerActive());
+
+ followerLogInformation.setFollowerState(FollowerState.VOTING);
+ assertTrue(followerLogInformation.okToReplicate());
+ assertTrue(followerLogInformation.canParticipateInConsensus());
+
+ followerLogInformation.markFollowerActive();
+ assertTrue(followerLogInformation.isFollowerActive());
+ }
+
+ @Test
+ public void testNonVotingState() {
+ MockRaftActorContext context = new MockRaftActorContext();
+ FollowerLogInformation followerLogInformation = new FollowerLogInformationImpl("follower1", -1, context);
+
+ followerLogInformation.setFollowerState(FollowerState.NON_VOTING);
+ assertTrue(followerLogInformation.okToReplicate());
+ assertFalse(followerLogInformation.canParticipateInConsensus());
+
+ followerLogInformation.markFollowerActive();
+ assertTrue(followerLogInformation.isFollowerActive());
+ }
}
private final CountDownLatch recoveryComplete = new CountDownLatch(1);
private final List<Object> state;
private ActorRef roleChangeNotifier;
- private final CountDownLatch initializeBehaviorComplete = new CountDownLatch(1);
+ protected final CountDownLatch initializeBehaviorComplete = new CountDownLatch(1);
private RaftActorRecoverySupport raftActorRecoverySupport;
private RaftActorSnapshotMessageSupport snapshotMessageSupport;
public ReplicatedLog getReplicatedLog(){
return this.getRaftActorContext().getReplicatedLog();
}
-}
\ No newline at end of file
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft;
+
+import static org.junit.Assert.assertEquals;
+import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.clearMessages;
+import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectFirstMatching;
+import akka.actor.ActorRef;
+import akka.actor.Props;
+import akka.actor.UntypedActor;
+import akka.dispatch.Dispatchers;
+import akka.testkit.JavaTestKit;
+import akka.testkit.TestActorRef;
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.DataPersistenceProvider;
+import org.opendaylight.controller.cluster.NonPersistentDataProvider;
+import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
+import org.opendaylight.controller.cluster.raft.behaviors.Follower;
+import org.opendaylight.controller.cluster.raft.behaviors.Leader;
+import org.opendaylight.controller.cluster.raft.messages.AddServer;
+import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
+import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
+import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
+import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
+import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorActor;
+import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.duration.FiniteDuration;
+
+/**
+ * Unit tests for RaftActorServerConfigurationSupport.
+ *
+ * @author Thomas Pantelis
+ */
+public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
+ static final String LEADER_ID = "leader";
+ static final String FOLLOWER_ID = "follower";
+ static final String NEW_SERVER_ID = "new-server";
+ private static final Logger LOG = LoggerFactory.getLogger(RaftActorServerConfigurationSupportTest.class);
+ private static final DataPersistenceProvider NO_PERSISTENCE = new NonPersistentDataProvider();
+
+ private final TestActorFactory actorFactory = new TestActorFactory(getSystem());
+
+ private final TestActorRef<ForwardMessageToBehaviorActor> followerActor = actorFactory.createTestActor(
+ Props.create(ForwardMessageToBehaviorActor.class).withDispatcher(Dispatchers.DefaultDispatcherId()),
+ actorFactory.generateActorId(FOLLOWER_ID));
+
+ private final TestActorRef<ForwardMessageToBehaviorActor> newServerActor = actorFactory.createTestActor(
+ Props.create(ForwardMessageToBehaviorActor.class).withDispatcher(Dispatchers.DefaultDispatcherId()),
+ actorFactory.generateActorId(NEW_SERVER_ID));
+
+ private RaftActorContext newServerActorContext;
+ private final JavaTestKit testKit = new JavaTestKit(getSystem());
+
+ @Before
+ public void setup() {
+ DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
+ configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
+ configParams.setElectionTimeoutFactor(100000);
+ configParams.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
+ newServerActorContext = new RaftActorContextImpl(newServerActor, newServerActor.underlyingActor().getContext(),
+ NEW_SERVER_ID, new ElectionTermImpl(NO_PERSISTENCE, NEW_SERVER_ID, LOG), -1, -1,
+ Maps.<String, String>newHashMap(), configParams, NO_PERSISTENCE, LOG);
+ newServerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ actorFactory.close();
+ }
+
+ @Test
+ public void testAddServerWithFollower() throws Exception {
+ RaftActorContext followerActorContext = newFollowerContext(FOLLOWER_ID, followerActor);
+ followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(
+ 0, 3, 1).build());
+ followerActorContext.setCommitIndex(3);
+ followerActorContext.setLastApplied(3);
+
+ Follower follower = new Follower(followerActorContext);
+ followerActor.underlyingActor().setBehavior(follower);
+
+ Follower newServer = new Follower(newServerActorContext);
+ newServerActor.underlyingActor().setBehavior(newServer);
+
+ TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
+ MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, followerActor.path().toString()),
+ followerActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
+ actorFactory.generateActorId(LEADER_ID));
+
+ // Expect initial heartbeat from the leader.
+ expectFirstMatching(followerActor, AppendEntries.class);
+ clearMessages(followerActor);
+
+ MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
+
+ leaderActor.tell(new AddServer(NEW_SERVER_ID, newServerActor.path().toString(), true), testKit.getRef());
+
+ // leader should install snapshot - capture and verify ApplySnapshot contents
+// ApplySnapshot applySnapshot = expectFirstMatching(followerActor, ApplySnapshot.class);
+// List<Object> snapshotState = (List<Object>) MockRaftActor.toObject(applySnapshot.getSnapshot().getState());
+// assertEquals("Snapshot state", snapshotState, leaderRaftActor.getState());
+
+ // leader should replicate new server config to both followers
+// expectFirstMatching(followerActor, AppendEntries.class);
+// expectFirstMatching(newServerActor, AppendEntries.class);
+
+ // verify ServerConfigurationPayload entry in leader's log
+// RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
+// assertEquals("Leader journal log size", 4, leaderActorContext.getReplicatedLog().size());
+// assertEquals("Leader journal last index", 3, leaderActorContext.getReplicatedLog().lastIndex());
+// ReplicatedLogEntry logEntry = leaderActorContext.getReplicatedLog().get(
+// leaderActorContext.getReplicatedLog().lastIndex());
+ // verify logEntry contents
+
+ // Also verify ServerConfigurationPayload entry in both followers
+
+ AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
+ assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
+ assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint());
+ }
+
+ @Test
+ public void testAddServerWithNoLeader() {
+ DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
+ configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
+
+ TestActorRef<MockRaftActor> noLeaderActor = actorFactory.createTestActor(
+ MockRaftActor.props(LEADER_ID, ImmutableMap.<String,String>of(FOLLOWER_ID, followerActor.path().toString()),
+ Optional.<ConfigParams>of(configParams), NO_PERSISTENCE).withDispatcher(Dispatchers.DefaultDispatcherId()),
+ actorFactory.generateActorId(LEADER_ID));
+ noLeaderActor.underlyingActor().waitForInitializeBehaviorComplete();
+
+ noLeaderActor.tell(new AddServer(NEW_SERVER_ID, newServerActor.path().toString(), true), testKit.getRef());
+ AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
+ assertEquals("getStatus", ServerChangeStatus.NO_LEADER, addServerReply.getStatus());
+ }
+
+ @Test
+ public void testAddServerForwardedToLeader() {
+ DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
+ configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
+
+ TestActorRef<MessageCollectorActor> leaderActor = actorFactory.createTestActor(
+ MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
+ actorFactory.generateActorId(LEADER_ID));
+
+ TestActorRef<MockRaftActor> followerRaftActor = actorFactory.createTestActor(
+ MockRaftActor.props(FOLLOWER_ID, ImmutableMap.<String,String>of(LEADER_ID, leaderActor.path().toString()),
+ Optional.<ConfigParams>of(configParams), NO_PERSISTENCE).withDispatcher(Dispatchers.DefaultDispatcherId()),
+ actorFactory.generateActorId(FOLLOWER_ID));
+ followerRaftActor.underlyingActor().waitForInitializeBehaviorComplete();
+
+ followerRaftActor.tell(new AppendEntries(1, LEADER_ID, 0, 1, Collections.<ReplicatedLogEntry>emptyList(),
+ -1, -1, (short)0), leaderActor);
+
+ followerRaftActor.tell(new AddServer(NEW_SERVER_ID, newServerActor.path().toString(), true), testKit.getRef());
+ expectFirstMatching(leaderActor, AddServer.class);
+ }
+
+ private RaftActorContext newFollowerContext(String id, TestActorRef<? extends UntypedActor> actor) {
+ DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
+ configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
+ configParams.setElectionTimeoutFactor(100000);
+ RaftActorContext followerActorContext = new RaftActorContextImpl(actor, actor.underlyingActor().getContext(),
+ id, new ElectionTermImpl(NO_PERSISTENCE, id, LOG), -1, -1,
+ ImmutableMap.of(LEADER_ID, ""), configParams, NO_PERSISTENCE, LOG);
+
+ return followerActorContext;
+ }
+
+ public static class MockLeaderRaftActor extends MockRaftActor {
+ public MockLeaderRaftActor(Map<String, String> peerAddresses, ConfigParams config,
+ RaftActorContext fromContext) {
+ super(LEADER_ID, peerAddresses, Optional.of(config), NO_PERSISTENCE);
+
+ RaftActorContext context = getRaftActorContext();
+ for(int i = 0; i < fromContext.getReplicatedLog().size(); i++) {
+ ReplicatedLogEntry entry = fromContext.getReplicatedLog().get(i);
+ getState().add(entry.getData());
+ context.getReplicatedLog().append(entry);
+ }
+
+ context.setCommitIndex(fromContext.getCommitIndex());
+ context.setLastApplied(fromContext.getLastApplied());
+ }
+
+ @Override
+ protected void initializeBehavior() {
+ changeCurrentBehavior(new Leader(getRaftActorContext()));
+ initializeBehaviorComplete.countDown();
+ }
+
+ @Override
+ public void createSnapshot(ActorRef actorRef) {
+ try {
+ actorRef.tell(new CaptureSnapshotReply(RaftActorTest.fromObject(getState()).toByteArray()), actorRef);
+ } catch (Exception e) {
+ LOG.error("createSnapshot failed", e);
+ }
+ }
+
+ static Props props(Map<String, String> peerAddresses, RaftActorContext fromContext) {
+ DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
+ configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
+ configParams.setElectionTimeoutFactor(100000);
+ return Props.create(MockLeaderRaftActor.class, peerAddresses, configParams, fromContext);
+ }
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import java.util.Arrays;
+import org.apache.commons.lang.SerializationUtils;
+import org.junit.Test;
+
+/**
+ * Unit tests for ServerConfigurationPayload.
+ *
+ * @author Thomas Pantelis
+ */
+public class ServerConfigurationPayloadTest {
+
+ @Test
+ public void testSerialization() {
+ ServerConfigurationPayload expected = new ServerConfigurationPayload(Arrays.asList("1", "2"),
+ Arrays.asList("3"));
+ ServerConfigurationPayload cloned = (ServerConfigurationPayload) SerializationUtils.clone(expected);
+
+ assertEquals("getNewServerConfig", expected.getNewServerConfig(), cloned.getNewServerConfig());
+ assertEquals("getOldServerConfig", expected.getOldServerConfig(), cloned.getOldServerConfig());
+ }
+
+ @Test
+ public void testSize() {
+ ServerConfigurationPayload expected = new ServerConfigurationPayload(Arrays.asList("1", "2"),
+ Arrays.asList("3"));
+ assertTrue(expected.size() > 0);
+ }
+}
import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
public class ForwardMessageToBehaviorActor extends MessageCollectorActor {
- private RaftActorBehavior behavior;
- private List<RaftActorBehavior> behaviorChanges = new ArrayList<>();
+ private volatile RaftActorBehavior behavior;
+ private final List<RaftActorBehavior> behaviorChanges = new ArrayList<>();
@Override
public void onReceive(Object message) throws Exception {