Initial code for RaftActorServerConfigurationSupport 57/27557/7
authorTom Pantelis <tpanteli@brocade.com>
Sun, 27 Sep 2015 16:32:12 +0000 (12:32 -0400)
committerGerrit Code Review <gerrit@opendaylight.org>
Tue, 6 Oct 2015 14:31:09 +0000 (14:31 +0000)
Added a RaftActorServerConfigurationSupport and unit test class with
mostly initial skeleton code. In RaftActorServerConfigurationSupport,
I implemented the basic checks for leader avaialbility with
corresponding unit tests. If not the leader and there is a leader, it
forwards to the remote leader. If no leader, it returns NO_LEADER
failure.

Also in RaftActorServerConfigurationSupport, I added code for the first
steps: add the serverId/address into the RaftActorContext peer map and
add a FollowerLogInformation entry in the AbstractLeader. I added an
initialized field wih getters/setters to FollowerLogInformation. The
entry is added with initialized set to false. I also changed the
followerToLogMap in AbstractLeader to mmutable.

I also modified FollowerLogInformationImpl so it returns false for
isFollowerActive and isOkToReplicate if initialized is false. The idea
is to prevent the leader from sending log entries or a snapshot via
the heartbeat or replication. The leader will send an empty
AppendEntries
heartbeat which should be fine. The RaftActorServerConfigurationSupport
will initiate the install snapshot directly.

I added TODO comments in RaftActorServerConfigurationSupport and the
unit test class which outline the remaining work.

I also added the ServerConfigurationPayload class to be used for the log
entries.

Change-Id: Ic11ddc99a57edb7ef70c2d4f5fa7906d6a95b35e
Signed-off-by: Tom Pantelis <tpanteli@brocade.com>
12 files changed:
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformation.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformationImpl.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContextImpl.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupport.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ServerConfigurationPayload.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/FollowerLogInformationImplTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActor.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupportTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/ServerConfigurationPayloadTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/utils/ForwardMessageToBehaviorActor.java

index 6618a97..b2173c2 100644 (file)
@@ -12,6 +12,12 @@ package org.opendaylight.controller.cluster.raft;
  */
 public interface FollowerLogInformation {
 
+    enum FollowerState {
+        VOTING,
+        NON_VOTING,
+        VOTING_NOT_INITIALIZED
+    };
+
     /**
      * Increment the value of the nextIndex
      *
@@ -108,4 +114,20 @@ public interface FollowerLogInformation {
      * Sets the payload data version of the follower.
      */
     void setPayloadVersion(short payloadVersion);
+
+    /**
+     * Sets the state of the follower.
+     */
+    void setFollowerState(FollowerState state);
+
+    /**
+     * @return the state of the follower.
+     */
+    FollowerState getFollowerState();
+
+    /**
+     * @return true if the follower is in a state where it can participate in leader elections and
+     *              commitment consensus.
+     */
+    boolean canParticipateInConsensus();
 }
index 5525d75..5bf37d6 100644 (file)
@@ -28,6 +28,8 @@ public class FollowerLogInformationImpl implements FollowerLogInformation {
 
     private short payloadVersion = -1;
 
+    private FollowerState state = FollowerState.VOTING;
+
     public FollowerLogInformationImpl(String id, long matchIndex, RaftActorContext context) {
         this.id = id;
         this.nextIndex = context.getCommitIndex();
@@ -87,6 +89,10 @@ public class FollowerLogInformationImpl implements FollowerLogInformation {
 
     @Override
     public boolean isFollowerActive() {
+        if(state == FollowerState.VOTING_NOT_INITIALIZED) {
+            return false;
+        }
+
         long elapsed = stopwatch.elapsed(TimeUnit.MILLISECONDS);
         return (stopwatch.isRunning()) &&
                 (elapsed <= context.getConfigParams().getElectionTimeOutInterval().toMillis());
@@ -114,6 +120,10 @@ public class FollowerLogInformationImpl implements FollowerLogInformation {
 
     @Override
     public boolean okToReplicate() {
+        if(state == FollowerState.VOTING_NOT_INITIALIZED) {
+            return false;
+        }
+
         // Return false if we are trying to send duplicate data before the heartbeat interval
         if(getNextIndex() == lastReplicatedIndex){
             if(lastReplicatedStopwatch.elapsed(TimeUnit.MILLISECONDS) < context.getConfigParams()
@@ -134,17 +144,6 @@ public class FollowerLogInformationImpl implements FollowerLogInformation {
         lastReplicatedStopwatch.start();
     }
 
-    @Override
-    public String toString() {
-        StringBuilder builder = new StringBuilder();
-        builder.append("FollowerLogInformationImpl [id=").append(id).append(", nextIndex=").append(nextIndex)
-                .append(", matchIndex=").append(matchIndex).append(", stopwatch=")
-                .append(stopwatch.elapsed(TimeUnit.MILLISECONDS))
-                .append(", followerTimeoutMillis=")
-                .append(context.getConfigParams().getElectionTimeOutInterval().toMillis()).append("]");
-        return builder.toString();
-    }
-
     @Override
     public short getPayloadVersion() {
         return payloadVersion;
@@ -154,4 +153,27 @@ public class FollowerLogInformationImpl implements FollowerLogInformation {
     public void setPayloadVersion(short payloadVersion) {
         this.payloadVersion = payloadVersion;
     }
+
+    @Override
+    public boolean canParticipateInConsensus() {
+        return state == FollowerState.VOTING;
+    }
+
+    @Override
+    public void setFollowerState(FollowerState state) {
+        this.state = state;
+    }
+
+    @Override
+    public FollowerState getFollowerState() {
+        return state;
+    }
+
+    @Override
+    public String toString() {
+        return "FollowerLogInformationImpl [id=" + id + ", nextIndex=" + nextIndex + ", matchIndex=" + matchIndex
+                + ", lastReplicatedIndex=" + lastReplicatedIndex + ", state=" + state + ", stopwatch="
+                + stopwatch.elapsed(TimeUnit.MILLISECONDS) + ", followerTimeoutMillis="
+                + context.getConfigParams().getElectionTimeOutInterval().toMillis() + "]";
+    }
 }
index a8c32cd..0a043da 100644 (file)
@@ -120,6 +120,8 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
 
     private final SwitchBehaviorSupplier reusableSwitchBehaviorSupplier = new SwitchBehaviorSupplier();
 
+    private RaftActorServerConfigurationSupport serverConfigurationSupport;
+
     public RaftActor(String id, Map<String, String> peerAddresses,
          Optional<ConfigParams> configParams, short payloadVersion) {
 
@@ -142,6 +144,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         super.preStart();
 
         snapshotSupport = newRaftActorSnapshotMessageSupport();
+        serverConfigurationSupport = new RaftActorServerConfigurationSupport(getRaftActorContext());
     }
 
     @Override
@@ -236,7 +239,8 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
             captureSnapshot();
         } else if(message instanceof SwitchBehavior){
             switchBehavior(((SwitchBehavior) message));
-        } else if(!snapshotSupport.handleSnapshotMessage(message)) {
+        } else if(!snapshotSupport.handleSnapshotMessage(message) &&
+                !serverConfigurationSupport.handleMessage(message, this, getSender())) {
             switchBehavior(reusableSwitchBehaviorSupplier.handleMessage(getSender(), message));
         }
     }
index 82bfb12..74f02b5 100644 (file)
@@ -16,6 +16,7 @@ import akka.actor.Props;
 import akka.actor.UntypedActorContext;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Supplier;
+import com.google.common.collect.Maps;
 import java.util.Map;
 import org.opendaylight.controller.cluster.DataPersistenceProvider;
 import org.opendaylight.controller.cluster.raft.policy.RaftPolicy;
@@ -63,7 +64,7 @@ public class RaftActorContextImpl implements RaftActorContext {
         this.termInformation = termInformation;
         this.commitIndex = commitIndex;
         this.lastApplied = lastApplied;
-        this.peerAddresses = peerAddresses;
+        this.peerAddresses = Maps.newHashMap(peerAddresses);
         this.configParams = configParams;
         this.persistenceProvider = persistenceProvider;
         this.LOG = logger;
diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupport.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupport.java
new file mode 100644 (file)
index 0000000..70ef369
--- /dev/null
@@ -0,0 +1,98 @@
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSelection;
+import org.opendaylight.controller.cluster.raft.FollowerLogInformation.FollowerState;
+import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader;
+import org.opendaylight.controller.cluster.raft.messages.AddServer;
+import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
+import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Handles server configuration related messages for a RaftActor.
+ *
+ * @author Thomas Pantelis
+ */
+class RaftActorServerConfigurationSupport {
+    private static final Logger LOG = LoggerFactory.getLogger(RaftActorServerConfigurationSupport.class);
+
+    private final RaftActorContext context;
+
+    RaftActorServerConfigurationSupport(RaftActorContext context) {
+        this.context = context;
+    }
+
+    boolean handleMessage(Object message, RaftActor raftActor, ActorRef sender) {
+        if(message instanceof AddServer) {
+            onAddServer((AddServer)message, raftActor, sender);
+            return true;
+        } else {
+            return false;
+        }
+    }
+
+    private void onAddServer(AddServer addServer, RaftActor raftActor, ActorRef sender) {
+        LOG.debug("onAddServer: {}", addServer);
+
+        if(noLeaderOrForwardedToLeader(addServer, raftActor, sender)) {
+            return;
+        }
+
+        // TODO - check if a server config is in progress. If so, cache this AddServer request to be processed
+        // after the current one is done.
+
+        context.addToPeers(addServer.getNewServerId(), addServer.getNewServerAddress());
+
+        AbstractLeader leader = (AbstractLeader) raftActor.getCurrentBehavior();
+        FollowerState initialState = addServer.isVotingMember() ? FollowerState.VOTING_NOT_INITIALIZED :
+            FollowerState.NON_VOTING;
+        leader.addFollower(addServer.getNewServerId(), initialState);
+
+        // TODO
+        // if initialState == FollowerState.VOTING_NOT_INITIALIZED
+        //     Initiate snapshot via leader.initiateCaptureSnapshot(addServer.getNewServerId())
+        //     Start a timer to abort the operation after a period of time (maybe 2 times election timeout)
+        //     Set local instance state and wait for message from the AbstractLeader when install snapshot is done and return now
+        //     When install snapshot message is received, go to step 1
+        // else
+        //     go to step 2
+        //
+        // 1) tell AbstractLeader mark the follower as VOTING and recalculate minReplicationCount and
+        //        minIsolatedLeaderPeerCount
+        // 2) persist and replicate ServerConfigurationPayload via
+        //           raftActor.persistData(sender, uuid, newServerConfigurationPayload)
+        // 3) Wait for commit complete via ApplyState message in RaftActor or time it out. In RaftActor,
+        //       on ApplyState, check if ReplicatedLogEntry payload is ServerConfigurationPayload and call
+        //       this class.
+        //
+
+        // TODO - temporary
+        sender.tell(new AddServerReply(ServerChangeStatus.OK, raftActor.getLeaderId()), raftActor.self());
+    }
+
+    private boolean noLeaderOrForwardedToLeader(Object message, RaftActor raftActor, ActorRef sender) {
+        if (raftActor.isLeader()) {
+            return false;
+        }
+
+        ActorSelection leader = raftActor.getLeader();
+        if (leader != null) {
+            LOG.debug("Not leader - forwarding to leader {}", leader);
+            leader.forward(message, raftActor.getContext());
+        } else {
+            LOG.debug("No leader - returning NO_LEADER AddServerReply");
+            sender.tell(new AddServerReply(ServerChangeStatus.NO_LEADER, null), raftActor.self());
+        }
+
+        return true;
+    }
+}
diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ServerConfigurationPayload.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ServerConfigurationPayload.java
new file mode 100644 (file)
index 0000000..32cb458
--- /dev/null
@@ -0,0 +1,82 @@
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft;
+
+import com.google.protobuf.GeneratedMessage.GeneratedExtension;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
+import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Payload data for server configuration log entries.
+ *
+ * @author Thomas Pantelis
+ */
+public class ServerConfigurationPayload extends Payload implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    private static final Logger LOG = LoggerFactory.getLogger(ServerConfigurationPayload.class);
+
+    private final List<String> newServerConfig;
+    private final List<String> oldServerConfig;
+    private transient int serializedSize = -1;
+
+    public ServerConfigurationPayload(List<String> newServerConfig, List<String> oldServerConfig) {
+        this.newServerConfig = newServerConfig;
+        this.oldServerConfig = oldServerConfig;
+    }
+
+    public List<String> getNewServerConfig() {
+        return newServerConfig;
+    }
+
+
+    public List<String> getOldServerConfig() {
+        return oldServerConfig;
+    }
+
+    @Override
+    public int size() {
+        if(serializedSize < 0) {
+            try {
+                ByteArrayOutputStream bos = new ByteArrayOutputStream();
+                ObjectOutputStream out = new ObjectOutputStream(bos);
+                out.writeObject(newServerConfig);
+                out.writeObject(oldServerConfig);
+                out.close();
+
+                serializedSize = bos.toByteArray().length;
+            } catch (IOException e) {
+                serializedSize = 0;
+                LOG.error("Error serializing", e);
+            }
+        }
+
+        return serializedSize;
+    }
+
+    @Override
+    @Deprecated
+    @SuppressWarnings("rawtypes")
+    public <T> Map<GeneratedExtension, T> encode() {
+        return null;
+    }
+
+    @Override
+    @Deprecated
+    public Payload decode(AppendEntriesMessages.AppendEntries.ReplicatedLogEntry.Payload payload) {
+        return null;
+    }
+}
index 5789895..622d59e 100644 (file)
@@ -14,8 +14,6 @@ import akka.actor.Cancellable;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableMap.Builder;
 import com.google.protobuf.ByteString;
 import java.io.IOException;
 import java.util.Collection;
@@ -30,6 +28,7 @@ import javax.annotation.Nullable;
 import org.opendaylight.controller.cluster.raft.ClientRequestTracker;
 import org.opendaylight.controller.cluster.raft.ClientRequestTrackerImpl;
 import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
+import org.opendaylight.controller.cluster.raft.FollowerLogInformation.FollowerState;
 import org.opendaylight.controller.cluster.raft.FollowerLogInformationImpl;
 import org.opendaylight.controller.cluster.raft.RaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftState;
@@ -79,7 +78,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
     // This would be passed as the hash code of the last chunk when sending the first chunk
     public static final int INITIAL_LAST_CHUNK_HASH_CODE = -1;
 
-    private final Map<String, FollowerLogInformation> followerToLog;
+    private final Map<String, FollowerLogInformation> followerToLog = new HashMap<>();
     private final Map<String, FollowerToSnapshot> mapFollowerToSnapshot = new HashMap<>();
 
     private Cancellable heartbeatSchedule = null;
@@ -97,14 +96,12 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
 
         setLeaderPayloadVersion(context.getPayloadVersion());
 
-        final Builder<String, FollowerLogInformation> ftlBuilder = ImmutableMap.builder();
         for (String followerId : context.getPeerAddresses().keySet()) {
             FollowerLogInformation followerLogInformation =
                 new FollowerLogInformationImpl(followerId, -1, context);
 
-            ftlBuilder.put(followerId, followerLogInformation);
+            followerToLog.put(followerId, followerLogInformation);
         }
-        followerToLog = ftlBuilder.build();
 
         leaderId = context.getId();
 
@@ -141,6 +138,12 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
         return followerToLog.keySet();
     }
 
+    public void addFollower(String followerId, FollowerState followerState) {
+        FollowerLogInformation followerLogInformation = new FollowerLogInformationImpl(followerId, -1, context);
+        followerLogInformation.setFollowerState(followerState);
+        followerToLog.put(followerId, followerLogInformation);
+    }
+
     @VisibleForTesting
     void setSnapshot(@Nullable Snapshot snapshot) {
         if(snapshot != null) {
@@ -402,8 +405,8 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
                     mapFollowerToSnapshot.remove(followerId);
 
                     LOG.debug("{}: follower: {}, matchIndex set to {}, nextIndex set to {}",
-                                logName(), followerId, followerLogInformation.getMatchIndex(),
-                                followerLogInformation.getNextIndex());
+                        logName(), followerId, followerLogInformation.getMatchIndex(),
+                        followerLogInformation.getNextIndex());
 
                     if (mapFollowerToSnapshot.isEmpty()) {
                         // once there are no pending followers receiving snapshots
@@ -594,7 +597,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
      * then send the existing snapshot in chunks to the follower.
      * @param followerId
      */
-    private void initiateCaptureSnapshot(String followerId) {
+    public void initiateCaptureSnapshot(String followerId) {
         if (snapshot.isPresent()) {
             // if a snapshot is present in the memory, most likely another install is in progress
             // no need to capture snapshot.
@@ -624,10 +627,12 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
         LOG.debug("{}: sendInstallSnapshot", logName());
         for (Entry<String, FollowerLogInformation> e : followerToLog.entrySet()) {
             ActorSelection followerActor = context.getPeerActorSelection(e.getKey());
+            FollowerLogInformation followerLogInfo = e.getValue();
 
             if (followerActor != null) {
                 long nextIndex = e.getValue().getNextIndex();
-                if (canInstallSnapshot(nextIndex)) {
+                if (followerLogInfo.getFollowerState() == FollowerState.VOTING_NOT_INITIALIZED ||
+                        canInstallSnapshot(nextIndex)) {
                     sendSnapshotChunk(followerActor, e.getKey());
                 }
             }
index bdfd69e..e2204a9 100644 (file)
@@ -13,6 +13,7 @@ import com.google.common.base.Stopwatch;
 import com.google.common.util.concurrent.Uninterruptibles;
 import java.util.concurrent.TimeUnit;
 import org.junit.Test;
+import org.opendaylight.controller.cluster.raft.FollowerLogInformation.FollowerState;
 import scala.concurrent.duration.FiniteDuration;
 
 public class FollowerLogInformationImplTest {
@@ -29,14 +30,14 @@ public class FollowerLogInformationImplTest {
         context.setConfigParams(configParams);
 
         FollowerLogInformation followerLogInformation =
-            new FollowerLogInformationImpl("follower1", 9, context);
+                new FollowerLogInformationImpl("follower1", 9, context);
 
         assertFalse("Follower should be termed inactive before stopwatch starts",
-            followerLogInformation.isFollowerActive());
+                followerLogInformation.isFollowerActive());
 
         followerLogInformation.markFollowerActive();
         if (sleepWithElaspsedTimeReturned(200) > 200) {
-          return;
+            return;
         }
         assertTrue("Follower should be active", followerLogInformation.isFollowerActive());
 
@@ -44,11 +45,11 @@ public class FollowerLogInformationImplTest {
             return;
         }
         assertFalse("Follower should be inactive after time lapsed",
-            followerLogInformation.isFollowerActive());
+                followerLogInformation.isFollowerActive());
 
         followerLogInformation.markFollowerActive();
         assertTrue("Follower should be active from inactive",
-            followerLogInformation.isFollowerActive());
+                followerLogInformation.isFollowerActive());
     }
 
     // we cannot rely comfortably that the sleep will indeed sleep for the desired time
@@ -64,7 +65,6 @@ public class FollowerLogInformationImplTest {
     @Test
     public void testOkToReplicate(){
         MockRaftActorContext context = new MockRaftActorContext();
-        context.setCommitIndex(9);
         FollowerLogInformation followerLogInformation =
                 new FollowerLogInformationImpl(
                         "follower1", 10, context);
@@ -80,4 +80,37 @@ public class FollowerLogInformationImplTest {
         followerLogInformation.incrNextIndex();
         assertTrue(followerLogInformation.okToReplicate());
     }
+
+    @Test
+    public void testVotingNotInitializedState() {
+        MockRaftActorContext context = new MockRaftActorContext();
+        FollowerLogInformation followerLogInformation = new FollowerLogInformationImpl("follower1", -1, context);
+
+        followerLogInformation.setFollowerState(FollowerState.VOTING_NOT_INITIALIZED);
+        assertFalse(followerLogInformation.okToReplicate());
+        assertFalse(followerLogInformation.canParticipateInConsensus());
+
+        followerLogInformation.markFollowerActive();
+        assertFalse(followerLogInformation.isFollowerActive());
+
+        followerLogInformation.setFollowerState(FollowerState.VOTING);
+        assertTrue(followerLogInformation.okToReplicate());
+        assertTrue(followerLogInformation.canParticipateInConsensus());
+
+        followerLogInformation.markFollowerActive();
+        assertTrue(followerLogInformation.isFollowerActive());
+    }
+
+    @Test
+    public void testNonVotingState() {
+        MockRaftActorContext context = new MockRaftActorContext();
+        FollowerLogInformation followerLogInformation = new FollowerLogInformationImpl("follower1", -1, context);
+
+        followerLogInformation.setFollowerState(FollowerState.NON_VOTING);
+        assertTrue(followerLogInformation.okToReplicate());
+        assertFalse(followerLogInformation.canParticipateInConsensus());
+
+        followerLogInformation.markFollowerActive();
+        assertTrue(followerLogInformation.isFollowerActive());
+    }
 }
index 741c75e..bb39ed9 100644 (file)
@@ -38,7 +38,7 @@ public class MockRaftActor extends RaftActor implements RaftActorRecoveryCohort,
     private final CountDownLatch recoveryComplete = new CountDownLatch(1);
     private final List<Object> state;
     private ActorRef roleChangeNotifier;
-    private final CountDownLatch initializeBehaviorComplete = new CountDownLatch(1);
+    protected final CountDownLatch initializeBehaviorComplete = new CountDownLatch(1);
     private RaftActorRecoverySupport raftActorRecoverySupport;
     private RaftActorSnapshotMessageSupport snapshotMessageSupport;
 
@@ -279,4 +279,4 @@ public class MockRaftActor extends RaftActor implements RaftActorRecoveryCohort,
     public ReplicatedLog getReplicatedLog(){
         return this.getRaftActorContext().getReplicatedLog();
     }
-}
\ No newline at end of file
+}
diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupportTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupportTest.java
new file mode 100644 (file)
index 0000000..77f40cd
--- /dev/null
@@ -0,0 +1,224 @@
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft;
+
+import static org.junit.Assert.assertEquals;
+import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.clearMessages;
+import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectFirstMatching;
+import akka.actor.ActorRef;
+import akka.actor.Props;
+import akka.actor.UntypedActor;
+import akka.dispatch.Dispatchers;
+import akka.testkit.JavaTestKit;
+import akka.testkit.TestActorRef;
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.DataPersistenceProvider;
+import org.opendaylight.controller.cluster.NonPersistentDataProvider;
+import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
+import org.opendaylight.controller.cluster.raft.behaviors.Follower;
+import org.opendaylight.controller.cluster.raft.behaviors.Leader;
+import org.opendaylight.controller.cluster.raft.messages.AddServer;
+import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
+import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
+import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
+import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
+import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorActor;
+import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.duration.FiniteDuration;
+
+/**
+ * Unit tests for RaftActorServerConfigurationSupport.
+ *
+ * @author Thomas Pantelis
+ */
+public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
+    static final String LEADER_ID = "leader";
+    static final String FOLLOWER_ID = "follower";
+    static final String NEW_SERVER_ID = "new-server";
+    private static final Logger LOG = LoggerFactory.getLogger(RaftActorServerConfigurationSupportTest.class);
+    private static final DataPersistenceProvider NO_PERSISTENCE = new NonPersistentDataProvider();
+
+    private final TestActorFactory actorFactory = new TestActorFactory(getSystem());
+
+    private final TestActorRef<ForwardMessageToBehaviorActor> followerActor = actorFactory.createTestActor(
+            Props.create(ForwardMessageToBehaviorActor.class).withDispatcher(Dispatchers.DefaultDispatcherId()),
+            actorFactory.generateActorId(FOLLOWER_ID));
+
+    private final TestActorRef<ForwardMessageToBehaviorActor> newServerActor = actorFactory.createTestActor(
+            Props.create(ForwardMessageToBehaviorActor.class).withDispatcher(Dispatchers.DefaultDispatcherId()),
+            actorFactory.generateActorId(NEW_SERVER_ID));
+
+    private RaftActorContext newServerActorContext;
+    private final JavaTestKit testKit = new JavaTestKit(getSystem());
+
+    @Before
+    public void setup() {
+        DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
+        configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
+        configParams.setElectionTimeoutFactor(100000);
+        configParams.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
+        newServerActorContext = new RaftActorContextImpl(newServerActor, newServerActor.underlyingActor().getContext(),
+                NEW_SERVER_ID, new ElectionTermImpl(NO_PERSISTENCE, NEW_SERVER_ID, LOG), -1, -1,
+                Maps.<String, String>newHashMap(), configParams, NO_PERSISTENCE, LOG);
+        newServerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        actorFactory.close();
+    }
+
+    @Test
+    public void testAddServerWithFollower() throws Exception {
+        RaftActorContext followerActorContext = newFollowerContext(FOLLOWER_ID, followerActor);
+        followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(
+                0, 3, 1).build());
+        followerActorContext.setCommitIndex(3);
+        followerActorContext.setLastApplied(3);
+
+        Follower follower = new Follower(followerActorContext);
+        followerActor.underlyingActor().setBehavior(follower);
+
+        Follower newServer = new Follower(newServerActorContext);
+        newServerActor.underlyingActor().setBehavior(newServer);
+
+        TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
+                MockLeaderRaftActor.props(ImmutableMap.of(FOLLOWER_ID, followerActor.path().toString()),
+                        followerActorContext).withDispatcher(Dispatchers.DefaultDispatcherId()),
+                actorFactory.generateActorId(LEADER_ID));
+
+        // Expect initial heartbeat from the leader.
+        expectFirstMatching(followerActor, AppendEntries.class);
+        clearMessages(followerActor);
+
+        MockLeaderRaftActor leaderRaftActor = leaderActor.underlyingActor();
+
+        leaderActor.tell(new AddServer(NEW_SERVER_ID, newServerActor.path().toString(), true), testKit.getRef());
+
+        // leader should install snapshot - capture and verify ApplySnapshot contents
+//        ApplySnapshot applySnapshot = expectFirstMatching(followerActor, ApplySnapshot.class);
+//        List<Object> snapshotState = (List<Object>) MockRaftActor.toObject(applySnapshot.getSnapshot().getState());
+//        assertEquals("Snapshot state", snapshotState, leaderRaftActor.getState());
+
+        // leader should replicate new server config to both followers
+//         expectFirstMatching(followerActor, AppendEntries.class);
+//         expectFirstMatching(newServerActor, AppendEntries.class);
+
+        // verify ServerConfigurationPayload entry in leader's log
+//        RaftActorContext leaderActorContext = leaderRaftActor.getRaftActorContext();
+//        assertEquals("Leader journal log size", 4, leaderActorContext.getReplicatedLog().size());
+//        assertEquals("Leader journal last index", 3, leaderActorContext.getReplicatedLog().lastIndex());
+//        ReplicatedLogEntry logEntry = leaderActorContext.getReplicatedLog().get(
+//                leaderActorContext.getReplicatedLog().lastIndex());
+        // verify logEntry contents
+
+        // Also verify ServerConfigurationPayload entry in both followers
+
+        AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
+        assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
+        assertEquals("getLeaderHint", LEADER_ID, addServerReply.getLeaderHint());
+    }
+
+    @Test
+    public void testAddServerWithNoLeader() {
+        DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
+        configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
+
+        TestActorRef<MockRaftActor> noLeaderActor = actorFactory.createTestActor(
+                MockRaftActor.props(LEADER_ID, ImmutableMap.<String,String>of(FOLLOWER_ID, followerActor.path().toString()),
+                        Optional.<ConfigParams>of(configParams), NO_PERSISTENCE).withDispatcher(Dispatchers.DefaultDispatcherId()),
+                actorFactory.generateActorId(LEADER_ID));
+        noLeaderActor.underlyingActor().waitForInitializeBehaviorComplete();
+
+        noLeaderActor.tell(new AddServer(NEW_SERVER_ID, newServerActor.path().toString(), true), testKit.getRef());
+        AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
+        assertEquals("getStatus", ServerChangeStatus.NO_LEADER, addServerReply.getStatus());
+    }
+
+    @Test
+    public void testAddServerForwardedToLeader() {
+        DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
+        configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
+
+        TestActorRef<MessageCollectorActor> leaderActor = actorFactory.createTestActor(
+                MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
+                actorFactory.generateActorId(LEADER_ID));
+
+        TestActorRef<MockRaftActor> followerRaftActor = actorFactory.createTestActor(
+                MockRaftActor.props(FOLLOWER_ID, ImmutableMap.<String,String>of(LEADER_ID, leaderActor.path().toString()),
+                        Optional.<ConfigParams>of(configParams), NO_PERSISTENCE).withDispatcher(Dispatchers.DefaultDispatcherId()),
+                actorFactory.generateActorId(FOLLOWER_ID));
+        followerRaftActor.underlyingActor().waitForInitializeBehaviorComplete();
+
+        followerRaftActor.tell(new AppendEntries(1, LEADER_ID, 0, 1, Collections.<ReplicatedLogEntry>emptyList(),
+                -1, -1, (short)0), leaderActor);
+
+        followerRaftActor.tell(new AddServer(NEW_SERVER_ID, newServerActor.path().toString(), true), testKit.getRef());
+        expectFirstMatching(leaderActor, AddServer.class);
+    }
+
+    private RaftActorContext newFollowerContext(String id, TestActorRef<? extends UntypedActor> actor) {
+        DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
+        configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
+        configParams.setElectionTimeoutFactor(100000);
+        RaftActorContext followerActorContext = new RaftActorContextImpl(actor, actor.underlyingActor().getContext(),
+                id, new ElectionTermImpl(NO_PERSISTENCE, id, LOG), -1, -1,
+                ImmutableMap.of(LEADER_ID, ""), configParams, NO_PERSISTENCE, LOG);
+
+        return followerActorContext;
+    }
+
+    public static class MockLeaderRaftActor extends MockRaftActor {
+        public MockLeaderRaftActor(Map<String, String> peerAddresses, ConfigParams config,
+                RaftActorContext fromContext) {
+            super(LEADER_ID, peerAddresses, Optional.of(config), NO_PERSISTENCE);
+
+            RaftActorContext context = getRaftActorContext();
+            for(int i = 0; i < fromContext.getReplicatedLog().size(); i++) {
+                ReplicatedLogEntry entry = fromContext.getReplicatedLog().get(i);
+                getState().add(entry.getData());
+                context.getReplicatedLog().append(entry);
+            }
+
+            context.setCommitIndex(fromContext.getCommitIndex());
+            context.setLastApplied(fromContext.getLastApplied());
+        }
+
+        @Override
+        protected void initializeBehavior() {
+            changeCurrentBehavior(new Leader(getRaftActorContext()));
+            initializeBehaviorComplete.countDown();
+        }
+
+        @Override
+        public void createSnapshot(ActorRef actorRef) {
+            try {
+                actorRef.tell(new CaptureSnapshotReply(RaftActorTest.fromObject(getState()).toByteArray()), actorRef);
+            } catch (Exception e) {
+                LOG.error("createSnapshot failed", e);
+            }
+        }
+
+        static Props props(Map<String, String> peerAddresses, RaftActorContext fromContext) {
+            DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
+            configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
+            configParams.setElectionTimeoutFactor(100000);
+            return Props.create(MockLeaderRaftActor.class, peerAddresses, configParams, fromContext);
+        }
+    }
+}
diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/ServerConfigurationPayloadTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/ServerConfigurationPayloadTest.java
new file mode 100644 (file)
index 0000000..cf9c5cb
--- /dev/null
@@ -0,0 +1,39 @@
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import java.util.Arrays;
+import org.apache.commons.lang.SerializationUtils;
+import org.junit.Test;
+
+/**
+ * Unit tests for ServerConfigurationPayload.
+ *
+ * @author Thomas Pantelis
+ */
+public class ServerConfigurationPayloadTest {
+
+    @Test
+    public void testSerialization() {
+        ServerConfigurationPayload expected = new ServerConfigurationPayload(Arrays.asList("1", "2"),
+                Arrays.asList("3"));
+        ServerConfigurationPayload cloned = (ServerConfigurationPayload) SerializationUtils.clone(expected);
+
+        assertEquals("getNewServerConfig", expected.getNewServerConfig(), cloned.getNewServerConfig());
+        assertEquals("getOldServerConfig", expected.getOldServerConfig(), cloned.getOldServerConfig());
+    }
+
+    @Test
+    public void testSize() {
+        ServerConfigurationPayload expected = new ServerConfigurationPayload(Arrays.asList("1", "2"),
+                Arrays.asList("3"));
+        assertTrue(expected.size() > 0);
+    }
+}
index 63810d8..6bfe16d 100644 (file)
@@ -15,8 +15,8 @@ import java.util.List;
 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
 
 public class ForwardMessageToBehaviorActor extends MessageCollectorActor {
-    private RaftActorBehavior behavior;
-    private List<RaftActorBehavior> behaviorChanges = new ArrayList<>();
+    private volatile RaftActorBehavior behavior;
+    private final List<RaftActorBehavior> behaviorChanges = new ArrayList<>();
 
     @Override
     public void onReceive(Object message) throws Exception {