Add ServerConfigPayload to InstallSnapshot message 20/41320/3
authorTom Pantelis <tpanteli@brocade.com>
Wed, 29 Jun 2016 06:09:49 +0000 (02:09 -0400)
committerTom Pantelis <tpanteli@brocade.com>
Tue, 5 Jul 2016 17:20:44 +0000 (17:20 +0000)
When the leader installs a snapshot on a follower, it needs to include the
server config info as well. Otherwise if a server config change occurred
while a follower was down, it won't get the updated server config info
and will be out of sync with the rest of the cluster which causes other
issues.

Change-Id: Ic290ed162bf9fdf6b9fe55986ea0c9c9e83b29a9
Signed-off-by: Tom Pantelis <tpanteli@brocade.com>
(cherry picked from commit b8e21016b85e98c31d866de7b6db51691596c9f4)

opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContextImpl.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorSnapshotMessageSupport.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotManager.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/InstallSnapshot.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractRaftActorIntegrationTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/messages/InstallSnapshotTest.java

index 9acf849..f163c90 100644 (file)
@@ -225,6 +225,8 @@ public class RaftActorContextImpl implements RaftActorContext {
             votingMember = false;
         }
 
+        LOG.debug("{}: Updated server config: isVoting: {}, peers: {}", id, votingMember, peerInfoMap.values());
+
         setDynamicServerConfigurationInUse();
     }
 
index 328de80..b35f4ba 100644 (file)
@@ -92,8 +92,7 @@ class RaftActorSnapshotMessageSupport {
     }
 
     private void onApplySnapshot(ApplySnapshot message) {
-        log.info("{}: Applying snapshot on follower with snapshotIndex: {}, snapshotTerm: {}", context.getId(),
-                message.getSnapshot().getLastAppliedIndex(), message.getSnapshot().getLastAppliedTerm());
+        log.info("{}: Applying snapshot on follower:  {}", context.getId(), message.getSnapshot());
 
         context.getSnapshotManager().apply(message);
     }
index b195686..dd52c38 100644 (file)
@@ -398,6 +398,10 @@ public class SnapshotManager implements SnapshotState {
                     context.setCommitIndex(snapshot.getLastAppliedIndex());
                     context.getTermInformation().update(snapshot.getElectionTerm(), snapshot.getElectionVotedFor());
 
+                    if(snapshot.getServerConfiguration() != null) {
+                        context.updatePeerIds(snapshot.getServerConfiguration());
+                    }
+
                     if(snapshot.getState().length > 0 ) {
                         applySnapshotProcedure.accept(snapshot.getState());
                     }
index a1ec4d8..5219ebb 100644 (file)
@@ -35,6 +35,7 @@ import org.opendaylight.controller.cluster.raft.PeerInfo;
 import org.opendaylight.controller.cluster.raft.RaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftState;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
+import org.opendaylight.controller.cluster.raft.ServerConfigurationPayload;
 import org.opendaylight.controller.cluster.raft.Snapshot;
 import org.opendaylight.controller.cluster.raft.VotingState;
 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
@@ -705,14 +706,21 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
                 // followerId to the followerToSnapshot map.
                 FollowerToSnapshot followerToSnapshot = mapFollowerToSnapshot.get(followerId);
 
+                int nextChunkIndex = followerToSnapshot.incrementChunkIndex();
+                Optional<ServerConfigurationPayload> serverConfig = Optional.absent();
+                if(followerToSnapshot.isLastChunk(nextChunkIndex)) {
+                    serverConfig = Optional.fromNullable(context.getPeerServerInfo(true));
+                }
+
                 followerActor.tell(
                     new InstallSnapshot(currentTerm(), context.getId(),
                         snapshot.get().getLastIncludedIndex(),
                         snapshot.get().getLastIncludedTerm(),
                         nextSnapshotChunk,
-                        followerToSnapshot.incrementChunkIndex(),
+                        nextChunkIndex,
                         followerToSnapshot.getTotalChunks(),
-                        Optional.of(followerToSnapshot.getLastChunkHashCode())
+                        Optional.of(followerToSnapshot.getLastChunkHashCode()),
+                        serverConfig
                     ).toSerializable(followerToLog.get(followerId).getRaftVersion()),
                     actor()
                 );
index a9d1b82..02b5d7e 100644 (file)
@@ -411,7 +411,7 @@ public class Follower extends AbstractRaftActorBehavior {
                         installSnapshot.getLastIncludedTerm(),
                         context.getTermInformation().getCurrentTerm(),
                         context.getTermInformation().getVotedFor(),
-                        context.getPeerServerInfo(true));
+                        installSnapshot.getServerConfig().orNull());
 
                 ApplySnapshot.Callback applySnapshotCallback = new ApplySnapshot.Callback() {
                     @Override
index f2f9cd3..2c141ea 100644 (file)
@@ -15,6 +15,7 @@ import java.io.IOException;
 import java.io.ObjectInput;
 import java.io.ObjectOutput;
 import org.opendaylight.controller.cluster.raft.RaftVersions;
+import org.opendaylight.controller.cluster.raft.ServerConfigurationPayload;
 import org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages;
 
 public class InstallSnapshot extends AbstractRaftRPC implements Externalizable {
@@ -28,6 +29,7 @@ public class InstallSnapshot extends AbstractRaftRPC implements Externalizable {
     private int chunkIndex;
     private int totalChunks;
     private Optional<Integer> lastChunkHashCode;
+    private Optional<ServerConfigurationPayload> serverConfig;
 
     /**
      * Empty constructor to satisfy Externalizable.
@@ -35,8 +37,8 @@ public class InstallSnapshot extends AbstractRaftRPC implements Externalizable {
     public InstallSnapshot() {
     }
 
-    public InstallSnapshot(long term, String leaderId, long lastIncludedIndex,
-        long lastIncludedTerm, byte[] data, int chunkIndex, int totalChunks, Optional<Integer> lastChunkHashCode) {
+    public InstallSnapshot(long term, String leaderId, long lastIncludedIndex, long lastIncludedTerm, byte[] data,
+            int chunkIndex, int totalChunks, Optional<Integer> lastChunkHashCode, Optional<ServerConfigurationPayload> serverConfig) {
         super(term);
         this.leaderId = leaderId;
         this.lastIncludedIndex = lastIncludedIndex;
@@ -45,11 +47,13 @@ public class InstallSnapshot extends AbstractRaftRPC implements Externalizable {
         this.chunkIndex = chunkIndex;
         this.totalChunks = totalChunks;
         this.lastChunkHashCode = lastChunkHashCode;
+        this.serverConfig = serverConfig;
     }
 
     public InstallSnapshot(long term, String leaderId, long lastIncludedIndex,
                            long lastIncludedTerm, byte[] data, int chunkIndex, int totalChunks) {
-        this(term, leaderId, lastIncludedIndex, lastIncludedTerm, data, chunkIndex, totalChunks, Optional.<Integer>absent());
+        this(term, leaderId, lastIncludedIndex, lastIncludedTerm, data, chunkIndex, totalChunks,
+                Optional.<Integer>absent(), Optional.<ServerConfigurationPayload>absent());
     }
 
     public String getLeaderId() {
@@ -80,6 +84,10 @@ public class InstallSnapshot extends AbstractRaftRPC implements Externalizable {
         return lastChunkHashCode;
     }
 
+    public Optional<ServerConfigurationPayload> getServerConfig() {
+        return serverConfig;
+    }
+
     @Override
     public void writeExternal(ObjectOutput out) throws IOException {
         out.writeShort(RaftVersions.CURRENT_VERSION);
@@ -95,6 +103,11 @@ public class InstallSnapshot extends AbstractRaftRPC implements Externalizable {
             out.writeInt(lastChunkHashCode.get().intValue());
         }
 
+        out.writeByte(serverConfig.isPresent() ? 1 : 0);
+        if(serverConfig.isPresent()) {
+            out.writeObject(serverConfig.get());
+        }
+
         out.writeObject(data);
     }
 
@@ -114,6 +127,12 @@ public class InstallSnapshot extends AbstractRaftRPC implements Externalizable {
             lastChunkHashCode = Optional.of(in.readInt());
         }
 
+        serverConfig = Optional.absent();
+        boolean serverConfigPresent = in.readByte() == 1;
+        if(serverConfigPresent) {
+            serverConfig = Optional.of((ServerConfigurationPayload)in.readObject());
+        }
+
         data = (byte[])in.readObject();
     }
 
@@ -141,7 +160,8 @@ public class InstallSnapshot extends AbstractRaftRPC implements Externalizable {
     public String toString() {
         return "InstallSnapshot [term=" + getTerm() + ", leaderId=" + leaderId + ", lastIncludedIndex="
                 + lastIncludedIndex + ", lastIncludedTerm=" + lastIncludedTerm + ", datasize=" + data.length
-                + ", Chunk=" + chunkIndex + "/" + totalChunks + ", lastChunkHashCode=" + lastChunkHashCode + "]";
+                + ", Chunk=" + chunkIndex + "/" + totalChunks + ", lastChunkHashCode=" + lastChunkHashCode
+                + ", serverConfig=" + serverConfig.orNull() + "]";
     }
 
     public static InstallSnapshot fromSerializable (Object o) {
@@ -159,7 +179,8 @@ public class InstallSnapshot extends AbstractRaftRPC implements Externalizable {
             InstallSnapshot installSnapshot = new InstallSnapshot(from.getTerm(),
                     from.getLeaderId(), from.getLastIncludedIndex(),
                     from.getLastIncludedTerm(), from.getData().toByteArray(),
-                    from.getChunkIndex(), from.getTotalChunks(), lastChunkHashCode);
+                    from.getChunkIndex(), from.getTotalChunks(), lastChunkHashCode,
+                    Optional.<ServerConfigurationPayload>absent());
 
             return installSnapshot;
         }
index 0b072e6..1a2ed4b 100644 (file)
@@ -104,6 +104,11 @@ public abstract class AbstractRaftActorIntegrationTest extends AbstractActorTest
                 return;
             }
 
+            if(message instanceof ServerConfigurationPayload) {
+                super.persistData(collectorActor, new MockIdentifier("serverConfig"), (Payload)message);
+                return;
+            }
+
             if(message instanceof SetPeerAddress) {
                 setPeerAddress(((SetPeerAddress) message).getPeerId().toString(),
                         ((SetPeerAddress) message).getPeerAddress());
@@ -302,7 +307,7 @@ public abstract class AbstractRaftActorIntegrationTest extends AbstractActorTest
     }
 
     protected void verifyApplyState(ApplyState applyState, ActorRef expClientActor,
-            String expId, long expTerm, long expIndex, MockPayload payload) {
+            String expId, long expTerm, long expIndex, Payload payload) {
         assertEquals("ApplyState getClientActor", expClientActor, applyState.getClientActor());
 
         final Identifier id = expId == null ? null : new MockIdentifier(expId);
index 03387ab..d9d5ef0 100644 (file)
@@ -8,19 +8,26 @@
 package org.opendaylight.controller.cluster.raft;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import akka.actor.ActorRef;
 import akka.persistence.SaveSnapshotSuccess;
 import com.google.common.collect.ImmutableMap;
 import java.util.Arrays;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
+import javax.annotation.Nullable;
 import org.junit.Assert;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload;
+import org.opendaylight.controller.cluster.raft.ServerConfigurationPayload.ServerInfo;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.UpdateElectionTerm;
+import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
@@ -331,6 +338,25 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A
         expSnapshotState.add(payload5);
         expSnapshotState.add(payload6);
 
+        MessageCollectorActor.clearMessages(leaderCollectorActor);
+        MessageCollectorActor.clearMessages(follower1CollectorActor);
+
+        // Send a server config change to test that the install snapshot includes the server config.
+
+        ServerConfigurationPayload serverConfig = new ServerConfigurationPayload(Arrays.asList(
+                new ServerInfo(leaderId, true),
+                new ServerInfo(follower1Id, false),
+                new ServerInfo(follower2Id, false)));
+        leaderContext.updatePeerIds(serverConfig);
+        ((AbstractLeader)leader).updateMinReplicaCount();
+        leaderActor.tell(serverConfig, ActorRef.noSender());
+
+        applyState = MessageCollectorActor.expectFirstMatching(leaderCollectorActor, ApplyState.class);
+        verifyApplyState(applyState, leaderCollectorActor, "serverConfig", currentTerm, 8, serverConfig);
+
+        applyState = MessageCollectorActor.expectFirstMatching(follower1CollectorActor, ApplyState.class);
+        verifyApplyState(applyState, null, null, currentTerm, 8, serverConfig);
+
         // Verify the leader's persisted snapshot.
         List<Snapshot> persistedSnapshots = InMemorySnapshotStore.getSnapshots(leaderId, Snapshot.class);
         assertEquals("Persisted snapshots size", 1, persistedSnapshots.size());
@@ -341,7 +367,7 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A
 
         expSnapshotState.add(payload7);
 
-        verifyInstallSnapshotToLaggingFollower(7);
+        verifyInstallSnapshotToLaggingFollower(8, serverConfig);
 
         testLog.info("testLeaderSnapshotWithLaggingFollowerCaughtUpViaInstallSnapshot complete");
     }
@@ -419,7 +445,7 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A
 
         expSnapshotState.add(payload2);
 
-        verifyInstallSnapshotToLaggingFollower(2L);
+        verifyInstallSnapshotToLaggingFollower(2L, null);
 
         // Sends a payload with index 3.
         verifyNoSubsequentSnapshotAfterMemoryThresholdExceededSnapshot();
@@ -493,7 +519,8 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A
      *
      * @throws Exception
      */
-    private void verifyInstallSnapshotToLaggingFollower(long lastAppliedIndex) throws Exception {
+    private void verifyInstallSnapshotToLaggingFollower(long lastAppliedIndex,
+            @Nullable ServerConfigurationPayload expServerConfig) throws Exception {
         List<Snapshot> persistedSnapshots;
         List<ReplicatedLogEntry> unAppliedEntry;
         ApplySnapshot applySnapshot;
@@ -561,6 +588,21 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A
         // the log. In addition replicatedToAllIndex should've advanced.
         verifyLeadersTrimmedLog(lastAppliedIndex);
 
+        if(expServerConfig != null) {
+            Set<ServerInfo> expServerInfo = new HashSet<>(expServerConfig.getServerConfig());
+            assertEquals("Leader snapshot server config", expServerInfo,
+                    new HashSet<>(persistedSnapshot.getServerConfiguration().getServerConfig()));
+
+            assertEquals("Follower 2 snapshot server config", expServerInfo,
+                    new HashSet<>(applySnapshot.getSnapshot().getServerConfiguration().getServerConfig()));
+
+            ServerConfigurationPayload follower2ServerConfig = follower2Context.getPeerServerInfo(true);
+            assertNotNull("Follower 2 server config is null", follower2ServerConfig);
+
+            assertEquals("Follower 2 server config", expServerInfo,
+                    new HashSet<>(follower2ServerConfig.getServerConfig()));
+        }
+
         MessageCollectorActor.clearMessages(leaderCollectorActor);
         MessageCollectorActor.clearMessages(follower1CollectorActor);
         MessageCollectorActor.clearMessages(follower2CollectorActor);
index 1d60158..7b12374 100644 (file)
@@ -11,9 +11,12 @@ import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import com.google.common.base.Optional;
 import java.io.Serializable;
+import java.util.Arrays;
 import org.apache.commons.lang.SerializationUtils;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.raft.RaftVersions;
+import org.opendaylight.controller.cluster.raft.ServerConfigurationPayload;
+import org.opendaylight.controller.cluster.raft.ServerConfigurationPayload.ServerInfo;
 import org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages;
 
 /**
@@ -34,7 +37,10 @@ public class InstallSnapshotTest {
             }
         }
 
-        InstallSnapshot expected = new InstallSnapshot(3L, "leaderId", 11L, 2L, data, 5, 6, Optional.<Integer>of(54321));
+        ServerConfigurationPayload serverConfig = new ServerConfigurationPayload(Arrays.asList(
+                new ServerInfo("leader", true), new ServerInfo("follower", false)));
+        InstallSnapshot expected = new InstallSnapshot(3L, "leaderId", 11L, 2L, data, 5, 6,
+                Optional.<Integer>of(54321), Optional.of(serverConfig));
 
         Object serialized = expected.toSerializable(RaftVersions.CURRENT_VERSION);
         assertEquals("Serialized type", InstallSnapshot.class, serialized.getClass());
@@ -51,7 +57,8 @@ public class InstallSnapshotTest {
     @Test
     public void testSerializationWithPreBoronVersion() {
         byte[] data = {0,1,2,3,4,5,7,8,9};
-        InstallSnapshot expected = new InstallSnapshot(3L, "leaderId", 11L, 2L, data, 5, 6, Optional.<Integer>of(54321));
+        InstallSnapshot expected = new InstallSnapshot(3L, "leaderId", 11L, 2L, data, 5, 6, Optional.<Integer>of(54321),
+                Optional.<ServerConfigurationPayload>absent());
 
         Object serialized = expected.toSerializable(RaftVersions.LITHIUM_VERSION);
         assertEquals("Serialized type", InstallSnapshot.SERIALIZABLE_CLASS, serialized.getClass());
@@ -77,11 +84,19 @@ public class InstallSnapshotTest {
         assertEquals("getLeaderId", expected.getLeaderId(), actual.getLeaderId());
         assertEquals("getChunkIndex", expected.getChunkIndex(), actual.getChunkIndex());
         assertArrayEquals("getData", expected.getData(), actual.getData());
+
         assertEquals("getLastChunkHashCode present", expected.getLastChunkHashCode().isPresent(),
                 actual.getLastChunkHashCode().isPresent());
         if(expected.getLastChunkHashCode().isPresent()) {
             assertEquals("getLastChunkHashCode", expected.getLastChunkHashCode().get(),
                     actual.getLastChunkHashCode().get());
         }
+
+        assertEquals("getServerConfig present", expected.getServerConfig().isPresent(),
+                actual.getServerConfig().isPresent());
+        if(expected.getServerConfig().isPresent()) {
+            assertEquals("getServerConfig", expected.getServerConfig().get().getServerConfig(),
+                    actual.getServerConfig().get().getServerConfig());
+        }
     }
 }