From af65c60b352ab06eb32294ccd9879f5e43585e31 Mon Sep 17 00:00:00 2001 From: Tom Pantelis Date: Mon, 26 Oct 2015 17:55:47 -0400 Subject: [PATCH] Add voting state to ServerConfigurationPayload Changed the internal state to a list of ServerInfo instances which contain he server id and voting state. Also removed the oldServerConfig field as it won't be needed. Change-Id: I10b3ca8dc2ffed9b5db0a7d0f6ca74d73a837b8e Signed-off-by: Tom Pantelis --- .../RaftActorServerConfigurationSupport.java | 14 ++-- .../raft/ServerConfigurationPayload.java | 71 ++++++++++++++----- .../behaviors/AbstractRaftActorBehavior.java | 13 ++-- ...ftActorServerConfigurationSupportTest.java | 39 ++++++---- .../raft/ServerConfigurationPayloadTest.java | 11 ++- 5 files changed, 105 insertions(+), 43 deletions(-) diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupport.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupport.java index 7c37b2a68d..aba94f653e 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupport.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupport.java @@ -12,12 +12,13 @@ import akka.actor.ActorSelection; import akka.actor.Cancellable; import com.google.common.base.Preconditions; import java.util.ArrayList; -import java.util.Collections; +import java.util.Collection; import java.util.LinkedList; import java.util.List; import java.util.Queue; import java.util.UUID; import java.util.concurrent.TimeUnit; +import org.opendaylight.controller.cluster.raft.ServerConfigurationPayload.ServerInfo; import org.opendaylight.controller.cluster.raft.base.messages.ApplyState; import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader; import org.opendaylight.controller.cluster.raft.messages.AddServer; @@ -171,12 +172,17 @@ class RaftActorServerConfigurationSupport { } protected void persistNewServerConfiguration(RaftActor raftActor, ServerOperationContext operationContext){ - List newConfig = new ArrayList(raftContext.getPeerIds()); - newConfig.add(raftContext.getId()); + Collection peers = raftContext.getPeers(); + List newConfig = new ArrayList<>(peers.size() + 1); + for(PeerInfo peer: peers) { + newConfig.add(new ServerInfo(peer.getId(), peer.isVoting())); + } + + newConfig.add(new ServerInfo(raftContext.getId(), true)); LOG.debug("{}: New server configuration : {}", raftContext.getId(), newConfig); - ServerConfigurationPayload payload = new ServerConfigurationPayload(newConfig, Collections.emptyList()); + ServerConfigurationPayload payload = new ServerConfigurationPayload(newConfig); raftActor.persistData(operationContext.getClientRequestor(), operationContext.getContextId(), payload); diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ServerConfigurationPayload.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ServerConfigurationPayload.java index db1f193cba..112c7d56e0 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ServerConfigurationPayload.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ServerConfigurationPayload.java @@ -7,6 +7,7 @@ */ package org.opendaylight.controller.cluster.raft; +import com.google.common.base.Preconditions; import com.google.protobuf.GeneratedMessage.GeneratedExtension; import java.io.ByteArrayOutputStream; import java.io.IOException; @@ -14,6 +15,7 @@ import java.io.ObjectOutputStream; import java.io.Serializable; import java.util.List; import java.util.Map; +import javax.annotation.Nonnull; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages; import org.slf4j.Logger; @@ -29,22 +31,16 @@ public class ServerConfigurationPayload extends Payload implements Serializable private static final Logger LOG = LoggerFactory.getLogger(ServerConfigurationPayload.class); - private final List newServerConfig; - private final List oldServerConfig; + private final List serverConfig; private transient int serializedSize = -1; - public ServerConfigurationPayload(List newServerConfig, List oldServerConfig) { - this.newServerConfig = newServerConfig; - this.oldServerConfig = oldServerConfig; + public ServerConfigurationPayload(@Nonnull List serverConfig) { + this.serverConfig = Preconditions.checkNotNull(serverConfig); } - public List getNewServerConfig() { - return newServerConfig; - } - - - public List getOldServerConfig() { - return oldServerConfig; + @Nonnull + public List getServerConfig() { + return serverConfig; } @Override @@ -53,8 +49,7 @@ public class ServerConfigurationPayload extends Payload implements Serializable try { ByteArrayOutputStream bos = new ByteArrayOutputStream(); ObjectOutputStream out = new ObjectOutputStream(bos); - out.writeObject(newServerConfig); - out.writeObject(oldServerConfig); + out.writeObject(serverConfig); out.close(); serializedSize = bos.toByteArray().length; @@ -82,7 +77,51 @@ public class ServerConfigurationPayload extends Payload implements Serializable @Override public String toString() { - return "ServerConfigurationPayload [newServerConfig=" + newServerConfig + ", oldServerConfig=" - + oldServerConfig + "]"; + return "ServerConfigurationPayload [serverConfig=" + serverConfig + "]"; + } + + public static class ServerInfo implements Serializable { + private static final long serialVersionUID = 1L; + + private final String id; + private final boolean isVoting; + + public ServerInfo(@Nonnull String id, boolean isVoting) { + this.id = Preconditions.checkNotNull(id); + this.isVoting = isVoting; + } + + @Nonnull + public String getId() { + return id; + } + + public boolean isVoting() { + return isVoting; + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + (isVoting ? 1231 : 1237); + result = prime * result + id.hashCode(); + return result; + } + + @Override + public boolean equals(Object obj) { + if (obj == null || getClass() != obj.getClass()) { + return false; + } + + ServerInfo other = (ServerInfo) obj; + return isVoting == other.isVoting && id.equals(other.id); + } + + @Override + public String toString() { + return "ServerInfo [id=" + id + ", isVoting=" + isVoting + "]"; + } } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java index 751aeeb15e..39c097f4ee 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java @@ -20,6 +20,7 @@ import org.opendaylight.controller.cluster.raft.RaftState; import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; import org.opendaylight.controller.cluster.raft.SerializationUtils; import org.opendaylight.controller.cluster.raft.ServerConfigurationPayload; +import org.opendaylight.controller.cluster.raft.ServerConfigurationPayload.ServerInfo; import org.opendaylight.controller.cluster.raft.VotingState; import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries; import org.opendaylight.controller.cluster.raft.base.messages.ApplyState; @@ -496,12 +497,14 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior { public void applyServerConfiguration(ServerConfigurationPayload serverConfig) { Set currentPeers = new HashSet<>(context.getPeerIds()); - for(String peerId: serverConfig.getNewServerConfig()) { - if(!getId().equals(peerId)) { - if(!currentPeers.contains(peerId)) { - context.addToPeers(peerId, null, VotingState.VOTING); + for(ServerInfo server: serverConfig.getServerConfig()) { + if(!getId().equals(server.getId())) { + VotingState votingState = server.isVoting() ? VotingState.VOTING: VotingState.NON_VOTING; + if(!currentPeers.contains(server.getId())) { + context.addToPeers(server.getId(), null, votingState); } else { - currentPeers.remove(peerId); + context.getPeerInfo(server.getId()).setVotingState(votingState); + currentPeers.remove(server.getId()); } } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupportTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupportTest.java index bc73a41d21..da7b4eddf4 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupportTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupportTest.java @@ -23,13 +23,13 @@ import com.google.common.collect.Sets; import java.util.Collections; import java.util.List; import java.util.Map; -//import java.util.List; import java.util.concurrent.TimeUnit; import org.junit.After; import org.junit.Before; import org.junit.Test; import org.opendaylight.controller.cluster.DataPersistenceProvider; import org.opendaylight.controller.cluster.NonPersistentDataProvider; +import org.opendaylight.controller.cluster.raft.ServerConfigurationPayload.ServerInfo; import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot; import org.opendaylight.controller.cluster.raft.base.messages.ApplyState; import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply; @@ -149,15 +149,18 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest { assertEquals("Leader journal last index", 3, leaderActorContext.getReplicatedLog().lastIndex()); assertEquals("Leader commit index", 3, leaderActorContext.getCommitIndex()); assertEquals("Leader last applied index", 3, leaderActorContext.getLastApplied()); - verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), LEADER_ID, FOLLOWER_ID, NEW_SERVER_ID); + verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), votingServer(LEADER_ID), + votingServer(FOLLOWER_ID), votingServer(NEW_SERVER_ID)); // Verify ServerConfigurationPayload entry in both followers assertEquals("Follower journal last index", 3, followerActorContext.getReplicatedLog().lastIndex()); - verifyServerConfigurationPayloadEntry(followerActorContext.getReplicatedLog(), LEADER_ID, FOLLOWER_ID, NEW_SERVER_ID); + verifyServerConfigurationPayloadEntry(followerActorContext.getReplicatedLog(), votingServer(LEADER_ID), + votingServer(FOLLOWER_ID), votingServer(NEW_SERVER_ID)); assertEquals("New follower journal last index", 3, newFollowerActorContext.getReplicatedLog().lastIndex()); - verifyServerConfigurationPayloadEntry(newFollowerActorContext.getReplicatedLog(), LEADER_ID, FOLLOWER_ID, NEW_SERVER_ID); + verifyServerConfigurationPayloadEntry(newFollowerActorContext.getReplicatedLog(), votingServer(LEADER_ID), + votingServer(FOLLOWER_ID), votingServer(NEW_SERVER_ID)); // Verify new server config was applied in both followers @@ -207,13 +210,15 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest { assertEquals("Leader journal last index", 2, leaderActorContext.getReplicatedLog().lastIndex()); assertEquals("Leader commit index", 2, leaderActorContext.getCommitIndex()); assertEquals("Leader last applied index", 2, leaderActorContext.getLastApplied()); - verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), LEADER_ID, NEW_SERVER_ID); + verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), votingServer(LEADER_ID), + votingServer(NEW_SERVER_ID)); // Verify ServerConfigurationPayload entry in the new follower expectFirstMatching(newFollowerCollectorActor, ApplyState.class); assertEquals("New follower journal last index", 2, newFollowerActorContext.getReplicatedLog().lastIndex()); - verifyServerConfigurationPayloadEntry(newFollowerActorContext.getReplicatedLog(), LEADER_ID, NEW_SERVER_ID); + verifyServerConfigurationPayloadEntry(newFollowerActorContext.getReplicatedLog(), votingServer(LEADER_ID), + votingServer(NEW_SERVER_ID)); // Verify new server config was applied in the new follower @@ -246,13 +251,15 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest { assertEquals("Leader journal last index", 0, leaderActorContext.getReplicatedLog().lastIndex()); assertEquals("Leader commit index", 0, leaderActorContext.getCommitIndex()); assertEquals("Leader last applied index", 0, leaderActorContext.getLastApplied()); - verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), LEADER_ID, NEW_SERVER_ID); + verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), votingServer(LEADER_ID), + nonVotingServer(NEW_SERVER_ID)); // Verify ServerConfigurationPayload entry in the new follower expectFirstMatching(newFollowerCollectorActor, ApplyState.class); assertEquals("New follower journal last index", 0, newFollowerActorContext.getReplicatedLog().lastIndex()); - verifyServerConfigurationPayloadEntry(newFollowerActorContext.getReplicatedLog(), LEADER_ID, NEW_SERVER_ID); + verifyServerConfigurationPayloadEntry(newFollowerActorContext.getReplicatedLog(), votingServer(LEADER_ID), + nonVotingServer(NEW_SERVER_ID)); // Verify new server config was applied in the new follower @@ -276,7 +283,7 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest { assertEquals("Leader commit index", 1, leaderActorContext.getCommitIndex()); assertEquals("Leader last applied index", 1, leaderActorContext.getLastApplied()); verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), - LEADER_ID, NEW_SERVER_ID, NEW_SERVER_ID2); + votingServer(LEADER_ID), nonVotingServer(NEW_SERVER_ID), nonVotingServer(NEW_SERVER_ID2)); } @Test @@ -325,7 +332,7 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest { assertEquals("Leader commit index", 1, leaderActorContext.getCommitIndex()); assertEquals("Leader last applied index", 1, leaderActorContext.getLastApplied()); verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), - LEADER_ID, NEW_SERVER_ID, NEW_SERVER_ID2); + votingServer(LEADER_ID), votingServer(NEW_SERVER_ID), nonVotingServer(NEW_SERVER_ID2)); // Verify ServerConfigurationPayload entry in the new follower @@ -401,11 +408,19 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest { expectFirstMatching(leaderActor, AddServer.class); } - private static void verifyServerConfigurationPayloadEntry(ReplicatedLog log, String... cNew) { + private ServerInfo votingServer(String id) { + return new ServerInfo(id, true); + } + + private ServerInfo nonVotingServer(String id) { + return new ServerInfo(id, false); + } + + private static void verifyServerConfigurationPayloadEntry(ReplicatedLog log, ServerInfo... expected) { ReplicatedLogEntry logEntry = log.get(log.lastIndex()); assertEquals("Last log entry payload class", ServerConfigurationPayload.class, logEntry.getData().getClass()); ServerConfigurationPayload payload = (ServerConfigurationPayload)logEntry.getData(); - assertEquals("getNewServerConfig", Sets.newHashSet(cNew), Sets.newHashSet(payload.getNewServerConfig())); + assertEquals("getNewServerConfig", Sets.newHashSet(expected), Sets.newHashSet(payload.getServerConfig())); } private static RaftActorContext newFollowerContext(String id, TestActorRef actor) { diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/ServerConfigurationPayloadTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/ServerConfigurationPayloadTest.java index cf9c5cbc1a..51a2c2f3e6 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/ServerConfigurationPayloadTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/ServerConfigurationPayloadTest.java @@ -7,6 +7,7 @@ */ package org.opendaylight.controller.cluster.raft; +import static org.opendaylight.controller.cluster.raft.ServerConfigurationPayload.ServerInfo; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import java.util.Arrays; @@ -22,18 +23,16 @@ public class ServerConfigurationPayloadTest { @Test public void testSerialization() { - ServerConfigurationPayload expected = new ServerConfigurationPayload(Arrays.asList("1", "2"), - Arrays.asList("3")); + ServerConfigurationPayload expected = new ServerConfigurationPayload(Arrays.asList(new ServerInfo("1", true), + new ServerInfo("2", false))); ServerConfigurationPayload cloned = (ServerConfigurationPayload) SerializationUtils.clone(expected); - assertEquals("getNewServerConfig", expected.getNewServerConfig(), cloned.getNewServerConfig()); - assertEquals("getOldServerConfig", expected.getOldServerConfig(), cloned.getOldServerConfig()); + assertEquals("getServerConfig", expected.getServerConfig(), cloned.getServerConfig()); } @Test public void testSize() { - ServerConfigurationPayload expected = new ServerConfigurationPayload(Arrays.asList("1", "2"), - Arrays.asList("3")); + ServerConfigurationPayload expected = new ServerConfigurationPayload(Arrays.asList(new ServerInfo("1", true))); assertTrue(expected.size() > 0); } } -- 2.36.6