From: Tom Pantelis Date: Wed, 29 Jun 2016 06:09:49 +0000 (-0400) Subject: Add ServerConfigPayload to InstallSnapshot message X-Git-Tag: release/boron~83 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=ad860a3b51c31b740aabb297727e15aa45756777 Add ServerConfigPayload to InstallSnapshot message When the leader installs a snapshot on a follower, it needs to include the server config info as well. Otherwise if a server config change occurred while a follower was down, it won't get the updated server config info and will be out of sync with the rest of the cluster which causes other issues. Change-Id: Ic290ed162bf9fdf6b9fe55986ea0c9c9e83b29a9 Signed-off-by: Tom Pantelis (cherry picked from commit b8e21016b85e98c31d866de7b6db51691596c9f4) --- diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContextImpl.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContextImpl.java index 9acf849f54..f163c9045a 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContextImpl.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContextImpl.java @@ -225,6 +225,8 @@ public class RaftActorContextImpl implements RaftActorContext { votingMember = false; } + LOG.debug("{}: Updated server config: isVoting: {}, peers: {}", id, votingMember, peerInfoMap.values()); + setDynamicServerConfigurationInUse(); } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorSnapshotMessageSupport.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorSnapshotMessageSupport.java index 328de80a83..b35f4babb1 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorSnapshotMessageSupport.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorSnapshotMessageSupport.java @@ -92,8 +92,7 @@ class RaftActorSnapshotMessageSupport { } private void onApplySnapshot(ApplySnapshot message) { - log.info("{}: Applying snapshot on follower with snapshotIndex: {}, snapshotTerm: {}", context.getId(), - message.getSnapshot().getLastAppliedIndex(), message.getSnapshot().getLastAppliedTerm()); + log.info("{}: Applying snapshot on follower: {}", context.getId(), message.getSnapshot()); context.getSnapshotManager().apply(message); } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotManager.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotManager.java index b195686d20..dd52c38685 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotManager.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotManager.java @@ -398,6 +398,10 @@ public class SnapshotManager implements SnapshotState { context.setCommitIndex(snapshot.getLastAppliedIndex()); context.getTermInformation().update(snapshot.getElectionTerm(), snapshot.getElectionVotedFor()); + if(snapshot.getServerConfiguration() != null) { + context.updatePeerIds(snapshot.getServerConfiguration()); + } + if(snapshot.getState().length > 0 ) { applySnapshotProcedure.accept(snapshot.getState()); } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java index a1ec4d8831..5219ebb310 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java @@ -35,6 +35,7 @@ import org.opendaylight.controller.cluster.raft.PeerInfo; import org.opendaylight.controller.cluster.raft.RaftActorContext; import org.opendaylight.controller.cluster.raft.RaftState; import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; +import org.opendaylight.controller.cluster.raft.ServerConfigurationPayload; import org.opendaylight.controller.cluster.raft.Snapshot; import org.opendaylight.controller.cluster.raft.VotingState; import org.opendaylight.controller.cluster.raft.base.messages.Replicate; @@ -705,14 +706,21 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { // followerId to the followerToSnapshot map. FollowerToSnapshot followerToSnapshot = mapFollowerToSnapshot.get(followerId); + int nextChunkIndex = followerToSnapshot.incrementChunkIndex(); + Optional serverConfig = Optional.absent(); + if(followerToSnapshot.isLastChunk(nextChunkIndex)) { + serverConfig = Optional.fromNullable(context.getPeerServerInfo(true)); + } + followerActor.tell( new InstallSnapshot(currentTerm(), context.getId(), snapshot.get().getLastIncludedIndex(), snapshot.get().getLastIncludedTerm(), nextSnapshotChunk, - followerToSnapshot.incrementChunkIndex(), + nextChunkIndex, followerToSnapshot.getTotalChunks(), - Optional.of(followerToSnapshot.getLastChunkHashCode()) + Optional.of(followerToSnapshot.getLastChunkHashCode()), + serverConfig ).toSerializable(followerToLog.get(followerId).getRaftVersion()), actor() ); diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java index a9d1b8233e..02b5d7e72c 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java @@ -411,7 +411,7 @@ public class Follower extends AbstractRaftActorBehavior { installSnapshot.getLastIncludedTerm(), context.getTermInformation().getCurrentTerm(), context.getTermInformation().getVotedFor(), - context.getPeerServerInfo(true)); + installSnapshot.getServerConfig().orNull()); ApplySnapshot.Callback applySnapshotCallback = new ApplySnapshot.Callback() { @Override diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/InstallSnapshot.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/InstallSnapshot.java index f2f9cd39d8..2c141ea581 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/InstallSnapshot.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/InstallSnapshot.java @@ -15,6 +15,7 @@ import java.io.IOException; import java.io.ObjectInput; import java.io.ObjectOutput; import org.opendaylight.controller.cluster.raft.RaftVersions; +import org.opendaylight.controller.cluster.raft.ServerConfigurationPayload; import org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages; public class InstallSnapshot extends AbstractRaftRPC implements Externalizable { @@ -28,6 +29,7 @@ public class InstallSnapshot extends AbstractRaftRPC implements Externalizable { private int chunkIndex; private int totalChunks; private Optional lastChunkHashCode; + private Optional serverConfig; /** * Empty constructor to satisfy Externalizable. @@ -35,8 +37,8 @@ public class InstallSnapshot extends AbstractRaftRPC implements Externalizable { public InstallSnapshot() { } - public InstallSnapshot(long term, String leaderId, long lastIncludedIndex, - long lastIncludedTerm, byte[] data, int chunkIndex, int totalChunks, Optional lastChunkHashCode) { + public InstallSnapshot(long term, String leaderId, long lastIncludedIndex, long lastIncludedTerm, byte[] data, + int chunkIndex, int totalChunks, Optional lastChunkHashCode, Optional serverConfig) { super(term); this.leaderId = leaderId; this.lastIncludedIndex = lastIncludedIndex; @@ -45,11 +47,13 @@ public class InstallSnapshot extends AbstractRaftRPC implements Externalizable { this.chunkIndex = chunkIndex; this.totalChunks = totalChunks; this.lastChunkHashCode = lastChunkHashCode; + this.serverConfig = serverConfig; } public InstallSnapshot(long term, String leaderId, long lastIncludedIndex, long lastIncludedTerm, byte[] data, int chunkIndex, int totalChunks) { - this(term, leaderId, lastIncludedIndex, lastIncludedTerm, data, chunkIndex, totalChunks, Optional.absent()); + this(term, leaderId, lastIncludedIndex, lastIncludedTerm, data, chunkIndex, totalChunks, + Optional.absent(), Optional.absent()); } public String getLeaderId() { @@ -80,6 +84,10 @@ public class InstallSnapshot extends AbstractRaftRPC implements Externalizable { return lastChunkHashCode; } + public Optional getServerConfig() { + return serverConfig; + } + @Override public void writeExternal(ObjectOutput out) throws IOException { out.writeShort(RaftVersions.CURRENT_VERSION); @@ -95,6 +103,11 @@ public class InstallSnapshot extends AbstractRaftRPC implements Externalizable { out.writeInt(lastChunkHashCode.get().intValue()); } + out.writeByte(serverConfig.isPresent() ? 1 : 0); + if(serverConfig.isPresent()) { + out.writeObject(serverConfig.get()); + } + out.writeObject(data); } @@ -114,6 +127,12 @@ public class InstallSnapshot extends AbstractRaftRPC implements Externalizable { lastChunkHashCode = Optional.of(in.readInt()); } + serverConfig = Optional.absent(); + boolean serverConfigPresent = in.readByte() == 1; + if(serverConfigPresent) { + serverConfig = Optional.of((ServerConfigurationPayload)in.readObject()); + } + data = (byte[])in.readObject(); } @@ -141,7 +160,8 @@ public class InstallSnapshot extends AbstractRaftRPC implements Externalizable { public String toString() { return "InstallSnapshot [term=" + getTerm() + ", leaderId=" + leaderId + ", lastIncludedIndex=" + lastIncludedIndex + ", lastIncludedTerm=" + lastIncludedTerm + ", datasize=" + data.length - + ", Chunk=" + chunkIndex + "/" + totalChunks + ", lastChunkHashCode=" + lastChunkHashCode + "]"; + + ", Chunk=" + chunkIndex + "/" + totalChunks + ", lastChunkHashCode=" + lastChunkHashCode + + ", serverConfig=" + serverConfig.orNull() + "]"; } public static InstallSnapshot fromSerializable (Object o) { @@ -159,7 +179,8 @@ public class InstallSnapshot extends AbstractRaftRPC implements Externalizable { InstallSnapshot installSnapshot = new InstallSnapshot(from.getTerm(), from.getLeaderId(), from.getLastIncludedIndex(), from.getLastIncludedTerm(), from.getData().toByteArray(), - from.getChunkIndex(), from.getTotalChunks(), lastChunkHashCode); + from.getChunkIndex(), from.getTotalChunks(), lastChunkHashCode, + Optional.absent()); return installSnapshot; } diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractRaftActorIntegrationTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractRaftActorIntegrationTest.java index 0b072e67f5..1a2ed4bb15 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractRaftActorIntegrationTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractRaftActorIntegrationTest.java @@ -104,6 +104,11 @@ public abstract class AbstractRaftActorIntegrationTest extends AbstractActorTest return; } + if(message instanceof ServerConfigurationPayload) { + super.persistData(collectorActor, new MockIdentifier("serverConfig"), (Payload)message); + return; + } + if(message instanceof SetPeerAddress) { setPeerAddress(((SetPeerAddress) message).getPeerId().toString(), ((SetPeerAddress) message).getPeerAddress()); @@ -302,7 +307,7 @@ public abstract class AbstractRaftActorIntegrationTest extends AbstractActorTest } protected void verifyApplyState(ApplyState applyState, ActorRef expClientActor, - String expId, long expTerm, long expIndex, MockPayload payload) { + String expId, long expTerm, long expIndex, Payload payload) { assertEquals("ApplyState getClientActor", expClientActor, applyState.getClientActor()); final Identifier id = expId == null ? null : new MockIdentifier(expId); diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest.java index 03387abcd5..d9d5ef0a7d 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest.java @@ -8,19 +8,26 @@ package org.opendaylight.controller.cluster.raft; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import akka.actor.ActorRef; import akka.persistence.SaveSnapshotSuccess; import com.google.common.collect.ImmutableMap; import java.util.Arrays; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; +import javax.annotation.Nullable; import org.junit.Assert; import org.junit.Test; import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload; +import org.opendaylight.controller.cluster.raft.ServerConfigurationPayload.ServerInfo; import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries; import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot; import org.opendaylight.controller.cluster.raft.base.messages.ApplyState; import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot; import org.opendaylight.controller.cluster.raft.base.messages.UpdateElectionTerm; +import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader; import org.opendaylight.controller.cluster.raft.messages.AppendEntries; import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply; import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot; @@ -331,6 +338,25 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A expSnapshotState.add(payload5); expSnapshotState.add(payload6); + MessageCollectorActor.clearMessages(leaderCollectorActor); + MessageCollectorActor.clearMessages(follower1CollectorActor); + + // Send a server config change to test that the install snapshot includes the server config. + + ServerConfigurationPayload serverConfig = new ServerConfigurationPayload(Arrays.asList( + new ServerInfo(leaderId, true), + new ServerInfo(follower1Id, false), + new ServerInfo(follower2Id, false))); + leaderContext.updatePeerIds(serverConfig); + ((AbstractLeader)leader).updateMinReplicaCount(); + leaderActor.tell(serverConfig, ActorRef.noSender()); + + applyState = MessageCollectorActor.expectFirstMatching(leaderCollectorActor, ApplyState.class); + verifyApplyState(applyState, leaderCollectorActor, "serverConfig", currentTerm, 8, serverConfig); + + applyState = MessageCollectorActor.expectFirstMatching(follower1CollectorActor, ApplyState.class); + verifyApplyState(applyState, null, null, currentTerm, 8, serverConfig); + // Verify the leader's persisted snapshot. List persistedSnapshots = InMemorySnapshotStore.getSnapshots(leaderId, Snapshot.class); assertEquals("Persisted snapshots size", 1, persistedSnapshots.size()); @@ -341,7 +367,7 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A expSnapshotState.add(payload7); - verifyInstallSnapshotToLaggingFollower(7); + verifyInstallSnapshotToLaggingFollower(8, serverConfig); testLog.info("testLeaderSnapshotWithLaggingFollowerCaughtUpViaInstallSnapshot complete"); } @@ -419,7 +445,7 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A expSnapshotState.add(payload2); - verifyInstallSnapshotToLaggingFollower(2L); + verifyInstallSnapshotToLaggingFollower(2L, null); // Sends a payload with index 3. verifyNoSubsequentSnapshotAfterMemoryThresholdExceededSnapshot(); @@ -493,7 +519,8 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A * * @throws Exception */ - private void verifyInstallSnapshotToLaggingFollower(long lastAppliedIndex) throws Exception { + private void verifyInstallSnapshotToLaggingFollower(long lastAppliedIndex, + @Nullable ServerConfigurationPayload expServerConfig) throws Exception { List persistedSnapshots; List unAppliedEntry; ApplySnapshot applySnapshot; @@ -561,6 +588,21 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A // the log. In addition replicatedToAllIndex should've advanced. verifyLeadersTrimmedLog(lastAppliedIndex); + if(expServerConfig != null) { + Set expServerInfo = new HashSet<>(expServerConfig.getServerConfig()); + assertEquals("Leader snapshot server config", expServerInfo, + new HashSet<>(persistedSnapshot.getServerConfiguration().getServerConfig())); + + assertEquals("Follower 2 snapshot server config", expServerInfo, + new HashSet<>(applySnapshot.getSnapshot().getServerConfiguration().getServerConfig())); + + ServerConfigurationPayload follower2ServerConfig = follower2Context.getPeerServerInfo(true); + assertNotNull("Follower 2 server config is null", follower2ServerConfig); + + assertEquals("Follower 2 server config", expServerInfo, + new HashSet<>(follower2ServerConfig.getServerConfig())); + } + MessageCollectorActor.clearMessages(leaderCollectorActor); MessageCollectorActor.clearMessages(follower1CollectorActor); MessageCollectorActor.clearMessages(follower2CollectorActor); diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/messages/InstallSnapshotTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/messages/InstallSnapshotTest.java index 1d60158dce..7b123741b6 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/messages/InstallSnapshotTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/messages/InstallSnapshotTest.java @@ -11,9 +11,12 @@ import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import com.google.common.base.Optional; import java.io.Serializable; +import java.util.Arrays; import org.apache.commons.lang.SerializationUtils; import org.junit.Test; import org.opendaylight.controller.cluster.raft.RaftVersions; +import org.opendaylight.controller.cluster.raft.ServerConfigurationPayload; +import org.opendaylight.controller.cluster.raft.ServerConfigurationPayload.ServerInfo; import org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages; /** @@ -34,7 +37,10 @@ public class InstallSnapshotTest { } } - InstallSnapshot expected = new InstallSnapshot(3L, "leaderId", 11L, 2L, data, 5, 6, Optional.of(54321)); + ServerConfigurationPayload serverConfig = new ServerConfigurationPayload(Arrays.asList( + new ServerInfo("leader", true), new ServerInfo("follower", false))); + InstallSnapshot expected = new InstallSnapshot(3L, "leaderId", 11L, 2L, data, 5, 6, + Optional.of(54321), Optional.of(serverConfig)); Object serialized = expected.toSerializable(RaftVersions.CURRENT_VERSION); assertEquals("Serialized type", InstallSnapshot.class, serialized.getClass()); @@ -51,7 +57,8 @@ public class InstallSnapshotTest { @Test public void testSerializationWithPreBoronVersion() { byte[] data = {0,1,2,3,4,5,7,8,9}; - InstallSnapshot expected = new InstallSnapshot(3L, "leaderId", 11L, 2L, data, 5, 6, Optional.of(54321)); + InstallSnapshot expected = new InstallSnapshot(3L, "leaderId", 11L, 2L, data, 5, 6, Optional.of(54321), + Optional.absent()); Object serialized = expected.toSerializable(RaftVersions.LITHIUM_VERSION); assertEquals("Serialized type", InstallSnapshot.SERIALIZABLE_CLASS, serialized.getClass()); @@ -77,11 +84,19 @@ public class InstallSnapshotTest { assertEquals("getLeaderId", expected.getLeaderId(), actual.getLeaderId()); assertEquals("getChunkIndex", expected.getChunkIndex(), actual.getChunkIndex()); assertArrayEquals("getData", expected.getData(), actual.getData()); + assertEquals("getLastChunkHashCode present", expected.getLastChunkHashCode().isPresent(), actual.getLastChunkHashCode().isPresent()); if(expected.getLastChunkHashCode().isPresent()) { assertEquals("getLastChunkHashCode", expected.getLastChunkHashCode().get(), actual.getLastChunkHashCode().get()); } + + assertEquals("getServerConfig present", expected.getServerConfig().isPresent(), + actual.getServerConfig().isPresent()); + if(expected.getServerConfig().isPresent()) { + assertEquals("getServerConfig", expected.getServerConfig().get().getServerConfig(), + actual.getServerConfig().get().getServerConfig()); + } } }