Add voting state to ServerConfigurationPayload 29/28829/9
authorTom Pantelis <tpanteli@brocade.com>
Mon, 26 Oct 2015 21:55:47 +0000 (17:55 -0400)
committerTom Pantelis <tpanteli@brocade.com>
Wed, 28 Oct 2015 22:40:27 +0000 (22:40 +0000)
Changed the internal state to a list of ServerInfo instances which
contain he server id and voting state.

Also removed the oldServerConfig field as it won't be needed.

Change-Id: I10b3ca8dc2ffed9b5db0a7d0f6ca74d73a837b8e
Signed-off-by: Tom Pantelis <tpanteli@brocade.com>
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupport.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ServerConfigurationPayload.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupportTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/ServerConfigurationPayloadTest.java

index 7c37b2a68daa62768407bfd66365a2b0509f1b8e..aba94f653efdc1bc455d1a66b1ac88f48e8fb0f9 100644 (file)
@@ -12,12 +12,13 @@ import akka.actor.ActorSelection;
 import akka.actor.Cancellable;
 import com.google.common.base.Preconditions;
 import java.util.ArrayList;
-import java.util.Collections;
+import java.util.Collection;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Queue;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
+import org.opendaylight.controller.cluster.raft.ServerConfigurationPayload.ServerInfo;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
 import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader;
 import org.opendaylight.controller.cluster.raft.messages.AddServer;
@@ -171,12 +172,17 @@ class RaftActorServerConfigurationSupport {
         }
 
         protected void persistNewServerConfiguration(RaftActor raftActor, ServerOperationContext<?> operationContext){
-            List <String> newConfig = new ArrayList<String>(raftContext.getPeerIds());
-            newConfig.add(raftContext.getId());
+            Collection<PeerInfo> peers = raftContext.getPeers();
+            List<ServerInfo> newConfig = new ArrayList<>(peers.size() + 1);
+            for(PeerInfo peer: peers) {
+                newConfig.add(new ServerInfo(peer.getId(), peer.isVoting()));
+            }
+
+            newConfig.add(new ServerInfo(raftContext.getId(), true));
 
             LOG.debug("{}: New server configuration : {}", raftContext.getId(), newConfig);
 
-            ServerConfigurationPayload payload = new ServerConfigurationPayload(newConfig, Collections.<String>emptyList());
+            ServerConfigurationPayload payload = new ServerConfigurationPayload(newConfig);
 
             raftActor.persistData(operationContext.getClientRequestor(), operationContext.getContextId(), payload);
 
index db1f193cba360fd389a683d435a9fa8a6ff61698..112c7d56e00faa62a84051cfda87fb2ab39102a4 100644 (file)
@@ -7,6 +7,7 @@
  */
 package org.opendaylight.controller.cluster.raft;
 
+import com.google.common.base.Preconditions;
 import com.google.protobuf.GeneratedMessage.GeneratedExtension;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
@@ -14,6 +15,7 @@ import java.io.ObjectOutputStream;
 import java.io.Serializable;
 import java.util.List;
 import java.util.Map;
+import javax.annotation.Nonnull;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
 import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages;
 import org.slf4j.Logger;
@@ -29,22 +31,16 @@ public class ServerConfigurationPayload extends Payload implements Serializable
 
     private static final Logger LOG = LoggerFactory.getLogger(ServerConfigurationPayload.class);
 
-    private final List<String> newServerConfig;
-    private final List<String> oldServerConfig;
+    private final List<ServerInfo> serverConfig;
     private transient int serializedSize = -1;
 
-    public ServerConfigurationPayload(List<String> newServerConfig, List<String> oldServerConfig) {
-        this.newServerConfig = newServerConfig;
-        this.oldServerConfig = oldServerConfig;
+    public ServerConfigurationPayload(@Nonnull List<ServerInfo> serverConfig) {
+        this.serverConfig = Preconditions.checkNotNull(serverConfig);
     }
 
-    public List<String> getNewServerConfig() {
-        return newServerConfig;
-    }
-
-
-    public List<String> getOldServerConfig() {
-        return oldServerConfig;
+    @Nonnull
+    public List<ServerInfo> getServerConfig() {
+        return serverConfig;
     }
 
     @Override
@@ -53,8 +49,7 @@ public class ServerConfigurationPayload extends Payload implements Serializable
             try {
                 ByteArrayOutputStream bos = new ByteArrayOutputStream();
                 ObjectOutputStream out = new ObjectOutputStream(bos);
-                out.writeObject(newServerConfig);
-                out.writeObject(oldServerConfig);
+                out.writeObject(serverConfig);
                 out.close();
 
                 serializedSize = bos.toByteArray().length;
@@ -82,7 +77,51 @@ public class ServerConfigurationPayload extends Payload implements Serializable
 
     @Override
     public String toString() {
-        return "ServerConfigurationPayload [newServerConfig=" + newServerConfig + ", oldServerConfig="
-                + oldServerConfig + "]";
+        return "ServerConfigurationPayload [serverConfig=" + serverConfig + "]";
+    }
+
+    public static class ServerInfo implements Serializable {
+        private static final long serialVersionUID = 1L;
+
+        private final String id;
+        private final boolean isVoting;
+
+        public ServerInfo(@Nonnull String id, boolean isVoting) {
+            this.id = Preconditions.checkNotNull(id);
+            this.isVoting = isVoting;
+        }
+
+        @Nonnull
+        public String getId() {
+            return id;
+        }
+
+        public boolean isVoting() {
+            return isVoting;
+        }
+
+        @Override
+        public int hashCode() {
+            final int prime = 31;
+            int result = 1;
+            result = prime * result + (isVoting ? 1231 : 1237);
+            result = prime * result + id.hashCode();
+            return result;
+        }
+
+        @Override
+        public boolean equals(Object obj) {
+            if (obj == null || getClass() != obj.getClass()) {
+                return false;
+            }
+
+            ServerInfo other = (ServerInfo) obj;
+            return isVoting == other.isVoting && id.equals(other.id);
+        }
+
+        @Override
+        public String toString() {
+            return "ServerInfo [id=" + id + ", isVoting=" + isVoting + "]";
+        }
     }
 }
index 751aeeb15e25b4edf5beebe00424879d780f92a0..39c097f4ee36a250bd7e69a30e9042bc41d91fe5 100644 (file)
@@ -20,6 +20,7 @@ import org.opendaylight.controller.cluster.raft.RaftState;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
 import org.opendaylight.controller.cluster.raft.SerializationUtils;
 import org.opendaylight.controller.cluster.raft.ServerConfigurationPayload;
+import org.opendaylight.controller.cluster.raft.ServerConfigurationPayload.ServerInfo;
 import org.opendaylight.controller.cluster.raft.VotingState;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
@@ -496,12 +497,14 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
 
     public void applyServerConfiguration(ServerConfigurationPayload serverConfig) {
         Set<String> currentPeers = new HashSet<>(context.getPeerIds());
-        for(String peerId: serverConfig.getNewServerConfig()) {
-            if(!getId().equals(peerId)) {
-                if(!currentPeers.contains(peerId)) {
-                    context.addToPeers(peerId, null, VotingState.VOTING);
+        for(ServerInfo server: serverConfig.getServerConfig()) {
+            if(!getId().equals(server.getId())) {
+                VotingState votingState = server.isVoting() ? VotingState.VOTING: VotingState.NON_VOTING;
+                if(!currentPeers.contains(server.getId())) {
+                    context.addToPeers(server.getId(), null, votingState);
                 } else {
-                    currentPeers.remove(peerId);
+                    context.getPeerInfo(server.getId()).setVotingState(votingState);
+                    currentPeers.remove(server.getId());
                 }
             }
         }
index bc73a41d2151becf1d8093121704b2ae3abac06a..da7b4eddf4951db8890964c12531b4f1e8d22f1e 100644 (file)
@@ -23,13 +23,13 @@ import com.google.common.collect.Sets;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
-//import java.util.List;
 import java.util.concurrent.TimeUnit;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.DataPersistenceProvider;
 import org.opendaylight.controller.cluster.NonPersistentDataProvider;
+import org.opendaylight.controller.cluster.raft.ServerConfigurationPayload.ServerInfo;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
@@ -149,15 +149,18 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
         assertEquals("Leader journal last index", 3, leaderActorContext.getReplicatedLog().lastIndex());
         assertEquals("Leader commit index", 3, leaderActorContext.getCommitIndex());
         assertEquals("Leader last applied index", 3, leaderActorContext.getLastApplied());
-        verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), LEADER_ID, FOLLOWER_ID, NEW_SERVER_ID);
+        verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), votingServer(LEADER_ID),
+                votingServer(FOLLOWER_ID), votingServer(NEW_SERVER_ID));
 
         // Verify ServerConfigurationPayload entry in both followers
 
         assertEquals("Follower journal last index", 3, followerActorContext.getReplicatedLog().lastIndex());
-        verifyServerConfigurationPayloadEntry(followerActorContext.getReplicatedLog(), LEADER_ID, FOLLOWER_ID, NEW_SERVER_ID);
+        verifyServerConfigurationPayloadEntry(followerActorContext.getReplicatedLog(), votingServer(LEADER_ID),
+                votingServer(FOLLOWER_ID), votingServer(NEW_SERVER_ID));
 
         assertEquals("New follower journal last index", 3, newFollowerActorContext.getReplicatedLog().lastIndex());
-        verifyServerConfigurationPayloadEntry(newFollowerActorContext.getReplicatedLog(), LEADER_ID, FOLLOWER_ID, NEW_SERVER_ID);
+        verifyServerConfigurationPayloadEntry(newFollowerActorContext.getReplicatedLog(), votingServer(LEADER_ID),
+                votingServer(FOLLOWER_ID), votingServer(NEW_SERVER_ID));
 
         // Verify new server config was applied in both followers
 
@@ -207,13 +210,15 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
         assertEquals("Leader journal last index", 2, leaderActorContext.getReplicatedLog().lastIndex());
         assertEquals("Leader commit index", 2, leaderActorContext.getCommitIndex());
         assertEquals("Leader last applied index", 2, leaderActorContext.getLastApplied());
-        verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), LEADER_ID, NEW_SERVER_ID);
+        verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), votingServer(LEADER_ID),
+                votingServer(NEW_SERVER_ID));
 
         // Verify ServerConfigurationPayload entry in the new follower
 
         expectFirstMatching(newFollowerCollectorActor, ApplyState.class);
         assertEquals("New follower journal last index", 2, newFollowerActorContext.getReplicatedLog().lastIndex());
-        verifyServerConfigurationPayloadEntry(newFollowerActorContext.getReplicatedLog(), LEADER_ID, NEW_SERVER_ID);
+        verifyServerConfigurationPayloadEntry(newFollowerActorContext.getReplicatedLog(), votingServer(LEADER_ID),
+                votingServer(NEW_SERVER_ID));
 
         // Verify new server config was applied in the new follower
 
@@ -246,13 +251,15 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
         assertEquals("Leader journal last index", 0, leaderActorContext.getReplicatedLog().lastIndex());
         assertEquals("Leader commit index", 0, leaderActorContext.getCommitIndex());
         assertEquals("Leader last applied index", 0, leaderActorContext.getLastApplied());
-        verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), LEADER_ID, NEW_SERVER_ID);
+        verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), votingServer(LEADER_ID),
+                nonVotingServer(NEW_SERVER_ID));
 
         // Verify ServerConfigurationPayload entry in the new follower
 
         expectFirstMatching(newFollowerCollectorActor, ApplyState.class);
         assertEquals("New follower journal last index", 0, newFollowerActorContext.getReplicatedLog().lastIndex());
-        verifyServerConfigurationPayloadEntry(newFollowerActorContext.getReplicatedLog(), LEADER_ID, NEW_SERVER_ID);
+        verifyServerConfigurationPayloadEntry(newFollowerActorContext.getReplicatedLog(), votingServer(LEADER_ID),
+                nonVotingServer(NEW_SERVER_ID));
 
         // Verify new server config was applied in the new follower
 
@@ -276,7 +283,7 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
         assertEquals("Leader commit index", 1, leaderActorContext.getCommitIndex());
         assertEquals("Leader last applied index", 1, leaderActorContext.getLastApplied());
         verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(),
-                LEADER_ID, NEW_SERVER_ID, NEW_SERVER_ID2);
+                votingServer(LEADER_ID), nonVotingServer(NEW_SERVER_ID), nonVotingServer(NEW_SERVER_ID2));
     }
 
     @Test
@@ -325,7 +332,7 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
         assertEquals("Leader commit index", 1, leaderActorContext.getCommitIndex());
         assertEquals("Leader last applied index", 1, leaderActorContext.getLastApplied());
         verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(),
-                LEADER_ID, NEW_SERVER_ID, NEW_SERVER_ID2);
+                votingServer(LEADER_ID), votingServer(NEW_SERVER_ID), nonVotingServer(NEW_SERVER_ID2));
 
         // Verify ServerConfigurationPayload entry in the new follower
 
@@ -401,11 +408,19 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
         expectFirstMatching(leaderActor, AddServer.class);
     }
 
-    private static void verifyServerConfigurationPayloadEntry(ReplicatedLog log, String... cNew) {
+    private ServerInfo votingServer(String id) {
+        return new ServerInfo(id, true);
+    }
+
+    private ServerInfo nonVotingServer(String id) {
+        return new ServerInfo(id, false);
+    }
+
+    private static void verifyServerConfigurationPayloadEntry(ReplicatedLog log, ServerInfo... expected) {
         ReplicatedLogEntry logEntry = log.get(log.lastIndex());
         assertEquals("Last log entry payload class", ServerConfigurationPayload.class, logEntry.getData().getClass());
         ServerConfigurationPayload payload = (ServerConfigurationPayload)logEntry.getData();
-        assertEquals("getNewServerConfig", Sets.newHashSet(cNew), Sets.newHashSet(payload.getNewServerConfig()));
+        assertEquals("getNewServerConfig", Sets.newHashSet(expected), Sets.newHashSet(payload.getServerConfig()));
     }
 
     private static RaftActorContext newFollowerContext(String id, TestActorRef<? extends UntypedActor> actor) {
index cf9c5cbc1a62f3f4b7a7b237889a101848220fd0..51a2c2f3e6b0bc565ead4d4b9b6e61efdfecbc7e 100644 (file)
@@ -7,6 +7,7 @@
  */
 package org.opendaylight.controller.cluster.raft;
 
+import static org.opendaylight.controller.cluster.raft.ServerConfigurationPayload.ServerInfo;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import java.util.Arrays;
@@ -22,18 +23,16 @@ public class ServerConfigurationPayloadTest {
 
     @Test
     public void testSerialization() {
-        ServerConfigurationPayload expected = new ServerConfigurationPayload(Arrays.asList("1", "2"),
-                Arrays.asList("3"));
+        ServerConfigurationPayload expected = new ServerConfigurationPayload(Arrays.asList(new ServerInfo("1", true),
+                new ServerInfo("2", false)));
         ServerConfigurationPayload cloned = (ServerConfigurationPayload) SerializationUtils.clone(expected);
 
-        assertEquals("getNewServerConfig", expected.getNewServerConfig(), cloned.getNewServerConfig());
-        assertEquals("getOldServerConfig", expected.getOldServerConfig(), cloned.getOldServerConfig());
+        assertEquals("getServerConfig", expected.getServerConfig(), cloned.getServerConfig());
     }
 
     @Test
     public void testSize() {
-        ServerConfigurationPayload expected = new ServerConfigurationPayload(Arrays.asList("1", "2"),
-                Arrays.asList("3"));
+        ServerConfigurationPayload expected = new ServerConfigurationPayload(Arrays.asList(new ServerInfo("1", true)));
         assertTrue(expected.size() > 0);
     }
 }