From: Tom Pantelis Date: Sun, 27 Sep 2015 16:32:12 +0000 (-0400) Subject: Initial code for RaftActorServerConfigurationSupport X-Git-Tag: release/beryllium~249 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=3bc363a69d6d48709f7fd741ef018ecd75b8f99b Initial code for RaftActorServerConfigurationSupport Added a RaftActorServerConfigurationSupport and unit test class with mostly initial skeleton code. In RaftActorServerConfigurationSupport, I implemented the basic checks for leader avaialbility with corresponding unit tests. If not the leader and there is a leader, it forwards to the remote leader. If no leader, it returns NO_LEADER failure. Also in RaftActorServerConfigurationSupport, I added code for the first steps: add the serverId/address into the RaftActorContext peer map and add a FollowerLogInformation entry in the AbstractLeader. I added an initialized field wih getters/setters to FollowerLogInformation. The entry is added with initialized set to false. I also changed the followerToLogMap in AbstractLeader to mmutable. I also modified FollowerLogInformationImpl so it returns false for isFollowerActive and isOkToReplicate if initialized is false. The idea is to prevent the leader from sending log entries or a snapshot via the heartbeat or replication. The leader will send an empty AppendEntries heartbeat which should be fine. The RaftActorServerConfigurationSupport will initiate the install snapshot directly. I added TODO comments in RaftActorServerConfigurationSupport and the unit test class which outline the remaining work. I also added the ServerConfigurationPayload class to be used for the log entries. Change-Id: Ic11ddc99a57edb7ef70c2d4f5fa7906d6a95b35e Signed-off-by: Tom Pantelis --- diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformation.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformation.java index 6618a97f21..b2173c2baf 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformation.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformation.java @@ -12,6 +12,12 @@ package org.opendaylight.controller.cluster.raft; */ public interface FollowerLogInformation { + enum FollowerState { + VOTING, + NON_VOTING, + VOTING_NOT_INITIALIZED + }; + /** * Increment the value of the nextIndex * @@ -108,4 +114,20 @@ public interface FollowerLogInformation { * Sets the payload data version of the follower. */ void setPayloadVersion(short payloadVersion); + + /** + * Sets the state of the follower. + */ + void setFollowerState(FollowerState state); + + /** + * @return the state of the follower. + */ + FollowerState getFollowerState(); + + /** + * @return true if the follower is in a state where it can participate in leader elections and + * commitment consensus. + */ + boolean canParticipateInConsensus(); } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformationImpl.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformationImpl.java index 5525d75b7d..5bf37d6534 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformationImpl.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformationImpl.java @@ -28,6 +28,8 @@ public class FollowerLogInformationImpl implements FollowerLogInformation { private short payloadVersion = -1; + private FollowerState state = FollowerState.VOTING; + public FollowerLogInformationImpl(String id, long matchIndex, RaftActorContext context) { this.id = id; this.nextIndex = context.getCommitIndex(); @@ -87,6 +89,10 @@ public class FollowerLogInformationImpl implements FollowerLogInformation { @Override public boolean isFollowerActive() { + if(state == FollowerState.VOTING_NOT_INITIALIZED) { + return false; + } + long elapsed = stopwatch.elapsed(TimeUnit.MILLISECONDS); return (stopwatch.isRunning()) && (elapsed <= context.getConfigParams().getElectionTimeOutInterval().toMillis()); @@ -114,6 +120,10 @@ public class FollowerLogInformationImpl implements FollowerLogInformation { @Override public boolean okToReplicate() { + if(state == FollowerState.VOTING_NOT_INITIALIZED) { + return false; + } + // Return false if we are trying to send duplicate data before the heartbeat interval if(getNextIndex() == lastReplicatedIndex){ if(lastReplicatedStopwatch.elapsed(TimeUnit.MILLISECONDS) < context.getConfigParams() @@ -134,17 +144,6 @@ public class FollowerLogInformationImpl implements FollowerLogInformation { lastReplicatedStopwatch.start(); } - @Override - public String toString() { - StringBuilder builder = new StringBuilder(); - builder.append("FollowerLogInformationImpl [id=").append(id).append(", nextIndex=").append(nextIndex) - .append(", matchIndex=").append(matchIndex).append(", stopwatch=") - .append(stopwatch.elapsed(TimeUnit.MILLISECONDS)) - .append(", followerTimeoutMillis=") - .append(context.getConfigParams().getElectionTimeOutInterval().toMillis()).append("]"); - return builder.toString(); - } - @Override public short getPayloadVersion() { return payloadVersion; @@ -154,4 +153,27 @@ public class FollowerLogInformationImpl implements FollowerLogInformation { public void setPayloadVersion(short payloadVersion) { this.payloadVersion = payloadVersion; } + + @Override + public boolean canParticipateInConsensus() { + return state == FollowerState.VOTING; + } + + @Override + public void setFollowerState(FollowerState state) { + this.state = state; + } + + @Override + public FollowerState getFollowerState() { + return state; + } + + @Override + public String toString() { + return "FollowerLogInformationImpl [id=" + id + ", nextIndex=" + nextIndex + ", matchIndex=" + matchIndex + + ", lastReplicatedIndex=" + lastReplicatedIndex + ", state=" + state + ", stopwatch=" + + stopwatch.elapsed(TimeUnit.MILLISECONDS) + ", followerTimeoutMillis=" + + context.getConfigParams().getElectionTimeOutInterval().toMillis() + "]"; + } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java index a8c32cd469..0a043da742 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java @@ -120,6 +120,8 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { private final SwitchBehaviorSupplier reusableSwitchBehaviorSupplier = new SwitchBehaviorSupplier(); + private RaftActorServerConfigurationSupport serverConfigurationSupport; + public RaftActor(String id, Map peerAddresses, Optional configParams, short payloadVersion) { @@ -142,6 +144,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { super.preStart(); snapshotSupport = newRaftActorSnapshotMessageSupport(); + serverConfigurationSupport = new RaftActorServerConfigurationSupport(getRaftActorContext()); } @Override @@ -236,7 +239,8 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { captureSnapshot(); } else if(message instanceof SwitchBehavior){ switchBehavior(((SwitchBehavior) message)); - } else if(!snapshotSupport.handleSnapshotMessage(message)) { + } else if(!snapshotSupport.handleSnapshotMessage(message) && + !serverConfigurationSupport.handleMessage(message, this, getSender())) { switchBehavior(reusableSwitchBehaviorSupplier.handleMessage(getSender(), message)); } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContextImpl.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContextImpl.java index 82bfb128f7..74f02b5ef5 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContextImpl.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContextImpl.java @@ -16,6 +16,7 @@ import akka.actor.Props; import akka.actor.UntypedActorContext; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Supplier; +import com.google.common.collect.Maps; import java.util.Map; import org.opendaylight.controller.cluster.DataPersistenceProvider; import org.opendaylight.controller.cluster.raft.policy.RaftPolicy; @@ -63,7 +64,7 @@ public class RaftActorContextImpl implements RaftActorContext { this.termInformation = termInformation; this.commitIndex = commitIndex; this.lastApplied = lastApplied; - this.peerAddresses = peerAddresses; + this.peerAddresses = Maps.newHashMap(peerAddresses); this.configParams = configParams; this.persistenceProvider = persistenceProvider; this.LOG = logger; diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupport.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupport.java new file mode 100644 index 0000000000..70ef369100 --- /dev/null +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupport.java @@ -0,0 +1,98 @@ +/* + * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.raft; + +import akka.actor.ActorRef; +import akka.actor.ActorSelection; +import org.opendaylight.controller.cluster.raft.FollowerLogInformation.FollowerState; +import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader; +import org.opendaylight.controller.cluster.raft.messages.AddServer; +import org.opendaylight.controller.cluster.raft.messages.AddServerReply; +import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Handles server configuration related messages for a RaftActor. + * + * @author Thomas Pantelis + */ +class RaftActorServerConfigurationSupport { + private static final Logger LOG = LoggerFactory.getLogger(RaftActorServerConfigurationSupport.class); + + private final RaftActorContext context; + + RaftActorServerConfigurationSupport(RaftActorContext context) { + this.context = context; + } + + boolean handleMessage(Object message, RaftActor raftActor, ActorRef sender) { + if(message instanceof AddServer) { + onAddServer((AddServer)message, raftActor, sender); + return true; + } else { + return false; + } + } + + private void onAddServer(AddServer addServer, RaftActor raftActor, ActorRef sender) { + LOG.debug("onAddServer: {}", addServer); + + if(noLeaderOrForwardedToLeader(addServer, raftActor, sender)) { + return; + } + + // TODO - check if a server config is in progress. If so, cache this AddServer request to be processed + // after the current one is done. + + context.addToPeers(addServer.getNewServerId(), addServer.getNewServerAddress()); + + AbstractLeader leader = (AbstractLeader) raftActor.getCurrentBehavior(); + FollowerState initialState = addServer.isVotingMember() ? FollowerState.VOTING_NOT_INITIALIZED : + FollowerState.NON_VOTING; + leader.addFollower(addServer.getNewServerId(), initialState); + + // TODO + // if initialState == FollowerState.VOTING_NOT_INITIALIZED + // Initiate snapshot via leader.initiateCaptureSnapshot(addServer.getNewServerId()) + // Start a timer to abort the operation after a period of time (maybe 2 times election timeout) + // Set local instance state and wait for message from the AbstractLeader when install snapshot is done and return now + // When install snapshot message is received, go to step 1 + // else + // go to step 2 + // + // 1) tell AbstractLeader mark the follower as VOTING and recalculate minReplicationCount and + // minIsolatedLeaderPeerCount + // 2) persist and replicate ServerConfigurationPayload via + // raftActor.persistData(sender, uuid, newServerConfigurationPayload) + // 3) Wait for commit complete via ApplyState message in RaftActor or time it out. In RaftActor, + // on ApplyState, check if ReplicatedLogEntry payload is ServerConfigurationPayload and call + // this class. + // + + // TODO - temporary + sender.tell(new AddServerReply(ServerChangeStatus.OK, raftActor.getLeaderId()), raftActor.self()); + } + + private boolean noLeaderOrForwardedToLeader(Object message, RaftActor raftActor, ActorRef sender) { + if (raftActor.isLeader()) { + return false; + } + + ActorSelection leader = raftActor.getLeader(); + if (leader != null) { + LOG.debug("Not leader - forwarding to leader {}", leader); + leader.forward(message, raftActor.getContext()); + } else { + LOG.debug("No leader - returning NO_LEADER AddServerReply"); + sender.tell(new AddServerReply(ServerChangeStatus.NO_LEADER, null), raftActor.self()); + } + + return true; + } +} diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ServerConfigurationPayload.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ServerConfigurationPayload.java new file mode 100644 index 0000000000..32cb458c20 --- /dev/null +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ServerConfigurationPayload.java @@ -0,0 +1,82 @@ +/* + * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.raft; + +import com.google.protobuf.GeneratedMessage.GeneratedExtension; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectOutputStream; +import java.io.Serializable; +import java.util.List; +import java.util.Map; +import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; +import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Payload data for server configuration log entries. + * + * @author Thomas Pantelis + */ +public class ServerConfigurationPayload extends Payload implements Serializable { + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(ServerConfigurationPayload.class); + + private final List newServerConfig; + private final List oldServerConfig; + private transient int serializedSize = -1; + + public ServerConfigurationPayload(List newServerConfig, List oldServerConfig) { + this.newServerConfig = newServerConfig; + this.oldServerConfig = oldServerConfig; + } + + public List getNewServerConfig() { + return newServerConfig; + } + + + public List getOldServerConfig() { + return oldServerConfig; + } + + @Override + public int size() { + if(serializedSize < 0) { + try { + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + ObjectOutputStream out = new ObjectOutputStream(bos); + out.writeObject(newServerConfig); + out.writeObject(oldServerConfig); + out.close(); + + serializedSize = bos.toByteArray().length; + } catch (IOException e) { + serializedSize = 0; + LOG.error("Error serializing", e); + } + } + + return serializedSize; + } + + @Override + @Deprecated + @SuppressWarnings("rawtypes") + public Map encode() { + return null; + } + + @Override + @Deprecated + public Payload decode(AppendEntriesMessages.AppendEntries.ReplicatedLogEntry.Payload payload) { + return null; + } +} diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java index 5789895997..622d59e41a 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java @@ -14,8 +14,6 @@ import akka.actor.Cancellable; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Optional; import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.ImmutableMap.Builder; import com.google.protobuf.ByteString; import java.io.IOException; import java.util.Collection; @@ -30,6 +28,7 @@ import javax.annotation.Nullable; import org.opendaylight.controller.cluster.raft.ClientRequestTracker; import org.opendaylight.controller.cluster.raft.ClientRequestTrackerImpl; import org.opendaylight.controller.cluster.raft.FollowerLogInformation; +import org.opendaylight.controller.cluster.raft.FollowerLogInformation.FollowerState; import org.opendaylight.controller.cluster.raft.FollowerLogInformationImpl; import org.opendaylight.controller.cluster.raft.RaftActorContext; import org.opendaylight.controller.cluster.raft.RaftState; @@ -79,7 +78,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { // This would be passed as the hash code of the last chunk when sending the first chunk public static final int INITIAL_LAST_CHUNK_HASH_CODE = -1; - private final Map followerToLog; + private final Map followerToLog = new HashMap<>(); private final Map mapFollowerToSnapshot = new HashMap<>(); private Cancellable heartbeatSchedule = null; @@ -97,14 +96,12 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { setLeaderPayloadVersion(context.getPayloadVersion()); - final Builder ftlBuilder = ImmutableMap.builder(); for (String followerId : context.getPeerAddresses().keySet()) { FollowerLogInformation followerLogInformation = new FollowerLogInformationImpl(followerId, -1, context); - ftlBuilder.put(followerId, followerLogInformation); + followerToLog.put(followerId, followerLogInformation); } - followerToLog = ftlBuilder.build(); leaderId = context.getId(); @@ -141,6 +138,12 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { return followerToLog.keySet(); } + public void addFollower(String followerId, FollowerState followerState) { + FollowerLogInformation followerLogInformation = new FollowerLogInformationImpl(followerId, -1, context); + followerLogInformation.setFollowerState(followerState); + followerToLog.put(followerId, followerLogInformation); + } + @VisibleForTesting void setSnapshot(@Nullable Snapshot snapshot) { if(snapshot != null) { @@ -402,8 +405,8 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { mapFollowerToSnapshot.remove(followerId); LOG.debug("{}: follower: {}, matchIndex set to {}, nextIndex set to {}", - logName(), followerId, followerLogInformation.getMatchIndex(), - followerLogInformation.getNextIndex()); + logName(), followerId, followerLogInformation.getMatchIndex(), + followerLogInformation.getNextIndex()); if (mapFollowerToSnapshot.isEmpty()) { // once there are no pending followers receiving snapshots @@ -594,7 +597,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { * then send the existing snapshot in chunks to the follower. * @param followerId */ - private void initiateCaptureSnapshot(String followerId) { + public void initiateCaptureSnapshot(String followerId) { if (snapshot.isPresent()) { // if a snapshot is present in the memory, most likely another install is in progress // no need to capture snapshot. @@ -624,10 +627,12 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { LOG.debug("{}: sendInstallSnapshot", logName()); for (Entry e : followerToLog.entrySet()) { ActorSelection followerActor = context.getPeerActorSelection(e.getKey()); + FollowerLogInformation followerLogInfo = e.getValue(); if (followerActor != null) { long nextIndex = e.getValue().getNextIndex(); - if (canInstallSnapshot(nextIndex)) { + if (followerLogInfo.getFollowerState() == FollowerState.VOTING_NOT_INITIALIZED || + canInstallSnapshot(nextIndex)) { sendSnapshotChunk(followerActor, e.getKey()); } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/FollowerLogInformationImplTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/FollowerLogInformationImplTest.java index bdfd69ec11..e2204a9d08 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/FollowerLogInformationImplTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/FollowerLogInformationImplTest.java @@ -13,6 +13,7 @@ import com.google.common.base.Stopwatch; import com.google.common.util.concurrent.Uninterruptibles; import java.util.concurrent.TimeUnit; import org.junit.Test; +import org.opendaylight.controller.cluster.raft.FollowerLogInformation.FollowerState; import scala.concurrent.duration.FiniteDuration; public class FollowerLogInformationImplTest { @@ -29,14 +30,14 @@ public class FollowerLogInformationImplTest { context.setConfigParams(configParams); FollowerLogInformation followerLogInformation = - new FollowerLogInformationImpl("follower1", 9, context); + new FollowerLogInformationImpl("follower1", 9, context); assertFalse("Follower should be termed inactive before stopwatch starts", - followerLogInformation.isFollowerActive()); + followerLogInformation.isFollowerActive()); followerLogInformation.markFollowerActive(); if (sleepWithElaspsedTimeReturned(200) > 200) { - return; + return; } assertTrue("Follower should be active", followerLogInformation.isFollowerActive()); @@ -44,11 +45,11 @@ public class FollowerLogInformationImplTest { return; } assertFalse("Follower should be inactive after time lapsed", - followerLogInformation.isFollowerActive()); + followerLogInformation.isFollowerActive()); followerLogInformation.markFollowerActive(); assertTrue("Follower should be active from inactive", - followerLogInformation.isFollowerActive()); + followerLogInformation.isFollowerActive()); } // we cannot rely comfortably that the sleep will indeed sleep for the desired time @@ -64,7 +65,6 @@ public class FollowerLogInformationImplTest { @Test public void testOkToReplicate(){ MockRaftActorContext context = new MockRaftActorContext(); - context.setCommitIndex(9); FollowerLogInformation followerLogInformation = new FollowerLogInformationImpl( "follower1", 10, context); @@ -80,4 +80,37 @@ public class FollowerLogInformationImplTest { followerLogInformation.incrNextIndex(); assertTrue(followerLogInformation.okToReplicate()); } + + @Test + public void testVotingNotInitializedState() { + MockRaftActorContext context = new MockRaftActorContext(); + FollowerLogInformation followerLogInformation = new FollowerLogInformationImpl("follower1", -1, context); + + followerLogInformation.setFollowerState(FollowerState.VOTING_NOT_INITIALIZED); + assertFalse(followerLogInformation.okToReplicate()); + assertFalse(followerLogInformation.canParticipateInConsensus()); + + followerLogInformation.markFollowerActive(); + assertFalse(followerLogInformation.isFollowerActive()); + + followerLogInformation.setFollowerState(FollowerState.VOTING); + assertTrue(followerLogInformation.okToReplicate()); + assertTrue(followerLogInformation.canParticipateInConsensus()); + + followerLogInformation.markFollowerActive(); + assertTrue(followerLogInformation.isFollowerActive()); + } + + @Test + public void testNonVotingState() { + MockRaftActorContext context = new MockRaftActorContext(); + FollowerLogInformation followerLogInformation = new FollowerLogInformationImpl("follower1", -1, context); + + followerLogInformation.setFollowerState(FollowerState.NON_VOTING); + assertTrue(followerLogInformation.okToReplicate()); + assertFalse(followerLogInformation.canParticipateInConsensus()); + + followerLogInformation.markFollowerActive(); + assertTrue(followerLogInformation.isFollowerActive()); + } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActor.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActor.java index 741c75ee4b..bb39ed98ba 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActor.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActor.java @@ -38,7 +38,7 @@ public class MockRaftActor extends RaftActor implements RaftActorRecoveryCohort, private final CountDownLatch recoveryComplete = new CountDownLatch(1); private final List state; private ActorRef roleChangeNotifier; - private final CountDownLatch initializeBehaviorComplete = new CountDownLatch(1); + protected final CountDownLatch initializeBehaviorComplete = new CountDownLatch(1); private RaftActorRecoverySupport raftActorRecoverySupport; private RaftActorSnapshotMessageSupport snapshotMessageSupport; @@ -279,4 +279,4 @@ public class MockRaftActor extends RaftActor implements RaftActorRecoveryCohort, public ReplicatedLog getReplicatedLog(){ return this.getRaftActorContext().getReplicatedLog(); } -} \ No newline at end of file +} diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupportTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupportTest.java new file mode 100644 index 0000000000..77f40cd7c1 --- /dev/null +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupportTest.java @@ -0,0 +1,224 @@ +/* + * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.raft; + +import static org.junit.Assert.assertEquals; +import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.clearMessages; +import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectFirstMatching; +import akka.actor.ActorRef; +import akka.actor.Props; +import akka.actor.UntypedActor; +import akka.dispatch.Dispatchers; +import akka.testkit.JavaTestKit; +import akka.testkit.TestActorRef; +import com.google.common.base.Optional; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Maps; +import java.util.Collections; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.opendaylight.controller.cluster.DataPersistenceProvider; +import org.opendaylight.controller.cluster.NonPersistentDataProvider; +import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply; +import org.opendaylight.controller.cluster.raft.behaviors.Follower; +import org.opendaylight.controller.cluster.raft.behaviors.Leader; +import org.opendaylight.controller.cluster.raft.messages.AddServer; +import org.opendaylight.controller.cluster.raft.messages.AddServerReply; +import org.opendaylight.controller.cluster.raft.messages.AppendEntries; +import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus; +import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy; +import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorActor; +import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.concurrent.duration.FiniteDuration; + +/** + * Unit tests for RaftActorServerConfigurationSupport. + * + * @author Thomas Pantelis + */ +public class RaftActorServerConfigurationSupportTest extends AbstractActorTest { + static final String LEADER_ID = "leader"; + static final String FOLLOWER_ID = "follower"; + static final String NEW_SERVER_ID = "new-server"; + private static final Logger LOG = LoggerFactory.getLogger(RaftActorServerConfigurationSupportTest.class); + private static final DataPersistenceProvider NO_PERSISTENCE = new NonPersistentDataProvider(); + + private final TestActorFactory actorFactory = new TestActorFactory(getSystem()); + + private final TestActorRef followerActor = actorFactory.createTestActor( + Props.create(ForwardMessageToBehaviorActor.class).withDispatcher(Dispatchers.DefaultDispatcherId()), + actorFactory.generateActorId(FOLLOWER_ID)); + + private final TestActorRef newServerActor = actorFactory.createTestActor( + Props.create(ForwardMessageToBehaviorActor.class).withDispatcher(Dispatchers.DefaultDispatcherId()), + actorFactory.generateActorId(NEW_SERVER_ID)); + + private RaftActorContext newServerActorContext; + private final JavaTestKit testKit = new JavaTestKit(getSystem()); + + @Before + public void setup() { + DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl(); + configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS)); + configParams.setElectionTimeoutFactor(100000); + configParams.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName()); + newServerActorContext = new RaftActorContextImpl(newServerActor, newServerActor.underlyingActor().getContext(), + NEW_SERVER_ID, new ElectionTermImpl(NO_PERSISTENCE, NEW_SERVER_ID, LOG), -1, -1, + Maps.newHashMap(), configParams, NO_PERSISTENCE, LOG); + newServerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build()); + } + + @After + public void tearDown() throws Exception { + actorFactory.close(); + } + + @Test + public void testAddServerWithFollower() throws Exception { + RaftActorContext followerActorContext = newFollowerContext(FOLLOWER_ID, followerActor); + followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries( + 0, 3, 1).build()); + followerActorContext.setCommitIndex(3); + followerActorContext.setLastApplied(3); + + Follower follower = new Follower(followerActorContext); + followerActor.underlyingActor().setBehavior(follower); + + Follower newServer = new Follower(newServerActorContext); + newServerActor.underlyingActor().setBehavior(newServer); + + TestActorRef leaderActor = actorFactory.createTestActor( + MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, followerActor.path().toString()), + followerActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()), + actorFactory.generateActorId(LEADER_ID)); + + // Expect initial heartbeat from the leader. + expectFirstMatching(followerActor, AppendEntries.class); + clearMessages(followerActor); + + MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor(); + + leaderActor.tell(new AddServer(NEW_SERVER_ID, newServerActor.path().toString(), true), testKit.getRef()); + + // leader should install snapshot - capture and verify ApplySnapshot contents +// ApplySnapshot applySnapshot = expectFirstMatching(followerActor, ApplySnapshot.class); +// List snapshotState = (List) MockRaftActor.toObject(applySnapshot.getSnapshot().getState()); +// assertEquals("Snapshot state", snapshotState, leaderRaftActor.getState()); + + // leader should replicate new server config to both followers +// expectFirstMatching(followerActor, AppendEntries.class); +// expectFirstMatching(newServerActor, AppendEntries.class); + + // verify ServerConfigurationPayload entry in leader's log +// RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext(); +// assertEquals("Leader journal log size", 4, leaderActorContext.getReplicatedLog().size()); +// assertEquals("Leader journal last index", 3, leaderActorContext.getReplicatedLog().lastIndex()); +// ReplicatedLogEntry logEntry = leaderActorContext.getReplicatedLog().get( +// leaderActorContext.getReplicatedLog().lastIndex()); + // verify logEntry contents + + // Also verify ServerConfigurationPayload entry in both followers + + AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class); + assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus()); + assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint()); + } + + @Test + public void testAddServerWithNoLeader() { + DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl(); + configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS)); + + TestActorRef noLeaderActor = actorFactory.createTestActor( + MockRaftActor.props(LEADER_ID, ImmutableMap.of(FOLLOWER_ID, followerActor.path().toString()), + Optional.of(configParams), NO_PERSISTENCE).withDispatcher(Dispatchers.DefaultDispatcherId()), + actorFactory.generateActorId(LEADER_ID)); + noLeaderActor.underlyingActor().waitForInitializeBehaviorComplete(); + + noLeaderActor.tell(new AddServer(NEW_SERVER_ID, newServerActor.path().toString(), true), testKit.getRef()); + AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class); + assertEquals("getStatus", ServerChangeStatus.NO_LEADER, addServerReply.getStatus()); + } + + @Test + public void testAddServerForwardedToLeader() { + DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl(); + configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS)); + + TestActorRef leaderActor = actorFactory.createTestActor( + MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()), + actorFactory.generateActorId(LEADER_ID)); + + TestActorRef followerRaftActor = actorFactory.createTestActor( + MockRaftActor.props(FOLLOWER_ID, ImmutableMap.of(LEADER_ID, leaderActor.path().toString()), + Optional.of(configParams), NO_PERSISTENCE).withDispatcher(Dispatchers.DefaultDispatcherId()), + actorFactory.generateActorId(FOLLOWER_ID)); + followerRaftActor.underlyingActor().waitForInitializeBehaviorComplete(); + + followerRaftActor.tell(new AppendEntries(1, LEADER_ID, 0, 1, Collections.emptyList(), + -1, -1, (short)0), leaderActor); + + followerRaftActor.tell(new AddServer(NEW_SERVER_ID, newServerActor.path().toString(), true), testKit.getRef()); + expectFirstMatching(leaderActor, AddServer.class); + } + + private RaftActorContext newFollowerContext(String id, TestActorRef actor) { + DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl(); + configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS)); + configParams.setElectionTimeoutFactor(100000); + RaftActorContext followerActorContext = new RaftActorContextImpl(actor, actor.underlyingActor().getContext(), + id, new ElectionTermImpl(NO_PERSISTENCE, id, LOG), -1, -1, + ImmutableMap.of(LEADER_ID, ""), configParams, NO_PERSISTENCE, LOG); + + return followerActorContext; + } + + public static class MockLeaderRaftActor extends MockRaftActor { + public MockLeaderRaftActor(Map peerAddresses, ConfigParams config, + RaftActorContext fromContext) { + super(LEADER_ID, peerAddresses, Optional.of(config), NO_PERSISTENCE); + + RaftActorContext context = getRaftActorContext(); + for(int i = 0; i < fromContext.getReplicatedLog().size(); i++) { + ReplicatedLogEntry entry = fromContext.getReplicatedLog().get(i); + getState().add(entry.getData()); + context.getReplicatedLog().append(entry); + } + + context.setCommitIndex(fromContext.getCommitIndex()); + context.setLastApplied(fromContext.getLastApplied()); + } + + @Override + protected void initializeBehavior() { + changeCurrentBehavior(new Leader(getRaftActorContext())); + initializeBehaviorComplete.countDown(); + } + + @Override + public void createSnapshot(ActorRef actorRef) { + try { + actorRef.tell(new CaptureSnapshotReply(RaftActorTest.fromObject(getState()).toByteArray()), actorRef); + } catch (Exception e) { + LOG.error("createSnapshot failed", e); + } + } + + static Props props(Map peerAddresses, RaftActorContext fromContext) { + DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl(); + configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS)); + configParams.setElectionTimeoutFactor(100000); + return Props.create(MockLeaderRaftActor.class, peerAddresses, configParams, fromContext); + } + } +} diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/ServerConfigurationPayloadTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/ServerConfigurationPayloadTest.java new file mode 100644 index 0000000000..cf9c5cbc1a --- /dev/null +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/ServerConfigurationPayloadTest.java @@ -0,0 +1,39 @@ +/* + * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.raft; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import java.util.Arrays; +import org.apache.commons.lang.SerializationUtils; +import org.junit.Test; + +/** + * Unit tests for ServerConfigurationPayload. + * + * @author Thomas Pantelis + */ +public class ServerConfigurationPayloadTest { + + @Test + public void testSerialization() { + ServerConfigurationPayload expected = new ServerConfigurationPayload(Arrays.asList("1", "2"), + Arrays.asList("3")); + ServerConfigurationPayload cloned = (ServerConfigurationPayload) SerializationUtils.clone(expected); + + assertEquals("getNewServerConfig", expected.getNewServerConfig(), cloned.getNewServerConfig()); + assertEquals("getOldServerConfig", expected.getOldServerConfig(), cloned.getOldServerConfig()); + } + + @Test + public void testSize() { + ServerConfigurationPayload expected = new ServerConfigurationPayload(Arrays.asList("1", "2"), + Arrays.asList("3")); + assertTrue(expected.size() > 0); + } +} diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/utils/ForwardMessageToBehaviorActor.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/utils/ForwardMessageToBehaviorActor.java index 63810d8882..6bfe16d3d1 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/utils/ForwardMessageToBehaviorActor.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/utils/ForwardMessageToBehaviorActor.java @@ -15,8 +15,8 @@ import java.util.List; import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior; public class ForwardMessageToBehaviorActor extends MessageCollectorActor { - private RaftActorBehavior behavior; - private List behaviorChanges = new ArrayList<>(); + private volatile RaftActorBehavior behavior; + private final List behaviorChanges = new ArrayList<>(); @Override public void onReceive(Object message) throws Exception {