Bug 2187: Persisting Actor peerIds' in snapshot 42/29542/3
authorkalaiselvik <Kalaiselvi_K@Dell.com>
Thu, 12 Nov 2015 08:48:39 +0000 (14:18 +0530)
committerGerrit Code Review <gerrit@opendaylight.org>
Sat, 14 Nov 2015 13:53:10 +0000 (13:53 +0000)
Persisting Raft Actor's peer information in a snapshot and recovering the same
from the snapshot.
Incorporated the comments.

Change-Id: I12831f129b2bdeb1c64f473e94be617f8d6ee487
Signed-off-by: kalaiselvik <Kalaiselvi_K@Dell.com>
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/GetSnapshotReplyActor.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContext.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContextImpl.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorRecoverySupport.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupport.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorSnapshotMessageSupport.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/Snapshot.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotManager.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorRecoverySupportTest.java

index ca09823..1b9ca21 100644 (file)
@@ -45,7 +45,8 @@ class GetSnapshotReplyActor extends UntypedActor {
                     params.captureSnapshot.getUnAppliedEntries(),
                     params.captureSnapshot.getLastIndex(), params.captureSnapshot.getLastTerm(),
                     params.captureSnapshot.getLastAppliedIndex(), params.captureSnapshot.getLastAppliedTerm(),
-                    params.electionTerm.getCurrentTerm(), params.electionTerm.getVotedFor());
+                    params.electionTerm.getCurrentTerm(), params.electionTerm.getVotedFor(),
+                    params.peerInformation);
 
             LOG.debug("{}: Received CaptureSnapshotReply, sending {}", params.id, snapshot);
 
@@ -63,9 +64,9 @@ class GetSnapshotReplyActor extends UntypedActor {
     }
 
     public static Props props(CaptureSnapshot captureSnapshot, ElectionTerm electionTerm, ActorRef replyToActor,
-            Duration receiveTimeout, String id) {
+            Duration receiveTimeout, String id, ServerConfigurationPayload updatedPeerInfo) {
         return Props.create(GetSnapshotReplyActor.class, new Params(captureSnapshot, electionTerm, replyToActor,
-                receiveTimeout, id));
+                receiveTimeout, id, updatedPeerInfo));
     }
 
     private static final class Params {
@@ -74,14 +75,16 @@ class GetSnapshotReplyActor extends UntypedActor {
         final ElectionTerm electionTerm;
         final Duration receiveTimeout;
         final String id;
+        final ServerConfigurationPayload peerInformation;
 
         Params(CaptureSnapshot captureSnapshot, ElectionTerm electionTerm, ActorRef replyToActor,
-                Duration receiveTimeout, String id) {
+                Duration receiveTimeout, String id, ServerConfigurationPayload peerInfo) {
             this.captureSnapshot = Preconditions.checkNotNull(captureSnapshot);
             this.electionTerm = Preconditions.checkNotNull(electionTerm);
             this.replyToActor = Preconditions.checkNotNull(replyToActor);
             this.receiveTimeout = Preconditions.checkNotNull(receiveTimeout);
             this.id = Preconditions.checkNotNull(id);
+            this.peerInformation = peerInfo;
         }
     }
 }
index aa49e16..b20e9da 100644 (file)
@@ -15,6 +15,7 @@ import akka.actor.Props;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Supplier;
 import java.util.Collection;
+import javax.annotation.Nullable;
 import org.opendaylight.controller.cluster.DataPersistenceProvider;
 import org.opendaylight.controller.cluster.raft.policy.RaftPolicy;
 import org.slf4j.Logger;
@@ -218,4 +219,21 @@ public interface RaftActorContext {
      * @return an implementation of the RaftPolicy so that the Raft code can be adapted
      */
     RaftPolicy getRaftPolicy();
+
+    /**
+     * @return true if there are any dynamic server configuration changes available,
+     *  false if static peer configurations are still in use
+     */
+    boolean isDynamicServerConfigurationInUse();
+
+    /**
+     * Configures the dynamic server configurations are avaialble for the RaftActor
+     */
+    void setDynamicServerConfigurationInUse();
+
+    /**
+     * @return the RaftActor's peer information as a ServerConfigurationPayload if the
+     * dynamic server configurations are available, otherwise returns null
+     */
+    @Nullable ServerConfigurationPayload getPeerServerInfo();
 }
index 257d307..66059b5 100644 (file)
@@ -16,9 +16,11 @@ import akka.actor.Props;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Supplier;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import org.opendaylight.controller.cluster.DataPersistenceProvider;
@@ -48,6 +50,8 @@ public class RaftActorContextImpl implements RaftActorContext {
 
     private ConfigParams configParams;
 
+    private boolean dynamicServerConfiguration = false;
+
     @VisibleForTesting
     private Supplier<Long> totalMemoryRetriever;
 
@@ -201,6 +205,7 @@ public class RaftActorContextImpl implements RaftActorContext {
         for(String peerIdToRemove: currentPeers) {
             this.removePeer(peerIdToRemove);
         }
+        setDynamicServerConfigurationInUse();
     }
 
     @Override public ConfigParams getConfigParams() {
@@ -266,4 +271,29 @@ public class RaftActorContextImpl implements RaftActorContext {
     public RaftPolicy getRaftPolicy() {
         return configParams.getRaftPolicy();
     }
+
+    @Override
+    public boolean isDynamicServerConfigurationInUse() {
+        return dynamicServerConfiguration;
+    }
+
+    @Override
+    public void setDynamicServerConfigurationInUse() {
+        this.dynamicServerConfiguration = true;
+    }
+
+    @Override
+    public ServerConfigurationPayload getPeerServerInfo() {
+        if (!isDynamicServerConfigurationInUse()) {
+            return null;
+        }
+        Collection<PeerInfo> peers = getPeers();
+        List<ServerInfo> newConfig = new ArrayList<>(peers.size() + 1);
+        for(PeerInfo peer: peers) {
+            newConfig.add(new ServerInfo(peer.getId(), peer.isVoting()));
+        }
+
+        newConfig.add(new ServerInfo(getId(), true));
+        return (new ServerConfigurationPayload(newConfig));
+    }
 }
index 0a37ef7..3ac1e79 100644 (file)
@@ -177,6 +177,10 @@ class RaftActorRecoverySupport {
         // Apply the snapshot to the actors state
         cohort.applyRecoverySnapshot(snapshot.getState());
 
+        if (snapshot.getServerConfiguration() != null) {
+            context.updatePeerIds(snapshot.getServerConfiguration());
+        }
+
         timer.stop();
         log.info("Recovery snapshot applied for {} in {}: snapshotIndex={}, snapshotTerm={}, journal-size={}",
                 context.getId(), timer.toString(), replicatedLog().getSnapshotIndex(),
index 258287a..39f4993 100644 (file)
@@ -11,15 +11,11 @@ import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
 import akka.actor.Cancellable;
 import com.google.common.base.Preconditions;
-import java.util.ArrayList;
-import java.util.Collection;
 import java.util.LinkedList;
-import java.util.List;
 import java.util.Queue;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 import javax.annotation.Nullable;
-import org.opendaylight.controller.cluster.raft.ServerConfigurationPayload.ServerInfo;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
 import org.opendaylight.controller.cluster.raft.base.messages.SnapshotComplete;
 import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader;
@@ -183,17 +179,9 @@ class RaftActorServerConfigurationSupport {
         }
 
         protected void persistNewServerConfiguration(RaftActor raftActor, ServerOperationContext<?> operationContext){
-            Collection<PeerInfo> peers = raftContext.getPeers();
-            List<ServerInfo> newConfig = new ArrayList<>(peers.size() + 1);
-            for(PeerInfo peer: peers) {
-                newConfig.add(new ServerInfo(peer.getId(), peer.isVoting()));
-            }
-
-            newConfig.add(new ServerInfo(raftContext.getId(), true));
-
-            LOG.debug("{}: Persisting new server configuration : {}", raftContext.getId(), newConfig);
-
-            ServerConfigurationPayload payload = new ServerConfigurationPayload(newConfig);
+            raftContext.setDynamicServerConfigurationInUse();
+            ServerConfigurationPayload payload = raftContext.getPeerServerInfo();
+            LOG.debug("{}: New server configuration : {}", raftContext.getId(), payload.getServerConfig());
 
             raftActor.persistData(operationContext.getClientRequestor(), operationContext.getContextId(), payload);
 
index ec46f30..39548dc 100644 (file)
@@ -125,12 +125,13 @@ class RaftActorSnapshotMessageSupport {
 
             ActorRef snapshotReplyActor = context.actorOf(GetSnapshotReplyActor.props(captureSnapshot,
                     ImmutableElectionTerm.copyOf(context.getTermInformation()), sender,
-                    snapshotReplyActorTimeout, context.getId()));
+                    snapshotReplyActorTimeout, context.getId(), context.getPeerServerInfo()));
 
             cohort.createSnapshot(snapshotReplyActor);
         } else {
             Snapshot snapshot = Snapshot.create(new byte[0], Collections.<ReplicatedLogEntry>emptyList(), -1, -1, -1, -1,
-                    context.getTermInformation().getCurrentTerm(), context.getTermInformation().getVotedFor());
+                    context.getTermInformation().getCurrentTerm(), context.getTermInformation().getVotedFor(),
+                    context.getPeerServerInfo());
 
             sender.tell(new GetSnapshotReply(context.getId(), SerializationUtils.serialize(snapshot)),
                     context.getActor());
index a369c25..3d19764 100644 (file)
@@ -22,9 +22,11 @@ public class Snapshot implements Serializable {
     private final long lastAppliedTerm;
     private final long electionTerm;
     private final String electionVotedFor;
+    private final ServerConfigurationPayload serverConfig;
 
     private Snapshot(byte[] state, List<ReplicatedLogEntry> unAppliedEntries, long lastIndex, long lastTerm,
-            long lastAppliedIndex, long lastAppliedTerm, long electionTerm, String electionVotedFor) {
+            long lastAppliedIndex, long lastAppliedTerm, long electionTerm, String electionVotedFor,
+            ServerConfigurationPayload serverConfig) {
         this.state = state;
         this.unAppliedEntries = unAppliedEntries;
         this.lastIndex = lastIndex;
@@ -33,17 +35,25 @@ public class Snapshot implements Serializable {
         this.lastAppliedTerm = lastAppliedTerm;
         this.electionTerm = electionTerm;
         this.electionVotedFor = electionVotedFor;
+        this.serverConfig = serverConfig;
     }
 
     public static Snapshot create(byte[] state, List<ReplicatedLogEntry> entries, long lastIndex, long lastTerm,
             long lastAppliedIndex, long lastAppliedTerm) {
-        return new Snapshot(state, entries, lastIndex, lastTerm, lastAppliedIndex, lastAppliedTerm, -1, null);
+        return new Snapshot(state, entries, lastIndex, lastTerm, lastAppliedIndex, lastAppliedTerm, -1, null, null);
     }
 
     public static Snapshot create(byte[] state, List<ReplicatedLogEntry> entries, long lastIndex, long lastTerm,
             long lastAppliedIndex, long lastAppliedTerm, long electionTerm, String electionVotedFor) {
         return new Snapshot(state, entries, lastIndex, lastTerm, lastAppliedIndex, lastAppliedTerm,
-                electionTerm, electionVotedFor);
+                electionTerm, electionVotedFor, null);
+    }
+
+    public static Snapshot create(byte[] state, List<ReplicatedLogEntry> entries, long lastIndex, long lastTerm,
+            long lastAppliedIndex, long lastAppliedTerm, long electionTerm, String electionVotedFor,
+            ServerConfigurationPayload serverConfig) {
+        return new Snapshot(state, entries, lastIndex, lastTerm, lastAppliedIndex, lastAppliedTerm,
+                electionTerm, electionVotedFor, serverConfig);
     }
 
     public byte[] getState() {
@@ -79,11 +89,15 @@ public class Snapshot implements Serializable {
         return electionVotedFor;
     }
 
+    public ServerConfigurationPayload getServerConfiguration() {
+        return serverConfig;
+    }
+
     @Override
     public String toString() {
         return "Snapshot [lastIndex=" + lastIndex + ", lastTerm=" + lastTerm + ", lastAppliedIndex=" + lastAppliedIndex
                 + ", lastAppliedTerm=" + lastAppliedTerm + ", unAppliedEntries size=" + unAppliedEntries.size()
                 + ", state size=" + state.length + ", electionTerm=" + electionTerm + ", electionVotedFor=" + electionVotedFor
-                + "]";
+                + ", ServerConfigPayload="  + serverConfig + "]";
     }
 }
index 0d0c910..4a20e5b 100644 (file)
@@ -303,7 +303,7 @@ public class SnapshotManager implements SnapshotState {
                     captureSnapshot.getLastIndex(), captureSnapshot.getLastTerm(),
                     captureSnapshot.getLastAppliedIndex(), captureSnapshot.getLastAppliedTerm(),
                     context.getTermInformation().getCurrentTerm(),
-                    context.getTermInformation().getVotedFor());
+                    context.getTermInformation().getVotedFor(), context.getPeerServerInfo());
 
             context.getPersistenceProvider().saveSnapshot(snapshot);
 
index 321b2dd..3b1e69d 100644 (file)
@@ -365,7 +365,8 @@ public class Follower extends AbstractRaftActorBehavior {
                         installSnapshot.getLastIncludedIndex(),
                         installSnapshot.getLastIncludedTerm(),
                         context.getTermInformation().getCurrentTerm(),
-                        context.getTermInformation().getVotedFor());
+                        context.getTermInformation().getVotedFor(),
+                        context.getPeerServerInfo());
 
                 ApplySnapshot.Callback applySnapshotCallback = new ApplySnapshot.Callback() {
                     @Override
index ddc8bed..71ca4ca 100644 (file)
@@ -8,6 +8,8 @@
 package org.opendaylight.controller.cluster.raft;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyInt;
 import static org.mockito.Mockito.doReturn;
@@ -213,6 +215,7 @@ public class RaftActorRecoverySupportTest {
         assertEquals("Snapshot index", lastAppliedDuringSnapshotCapture, context.getReplicatedLog().getSnapshotIndex());
         assertEquals("Election term", electionTerm, context.getTermInformation().getCurrentTerm());
         assertEquals("Election votedFor", electionVotedFor, context.getTermInformation().getVotedFor());
+        assertFalse("Dynamic server configuration", context.isDynamicServerConfigurationInUse());
 
         verify(mockCohort).applyRecoverySnapshot(snapshotBytes);
     }
@@ -395,6 +398,7 @@ public class RaftActorRecoverySupportTest {
         sendMessageToSupport(new MockRaftActorContext.MockReplicatedLogEntry(1, 0, obj));
 
         //verify new peers
+        assertTrue("Dynamic server configuration", context.isDynamicServerConfigurationInUse());
         assertEquals("New peer Ids", Sets.newHashSet(follower1, follower2, follower3),
                 Sets.newHashSet(context.getPeerIds()));
         assertEquals("follower1 isVoting", true, context.getPeerInfo(follower1).isVoting());
@@ -415,6 +419,7 @@ public class RaftActorRecoverySupportTest {
         sendMessageToSupport(new MockRaftActorContext.MockReplicatedLogEntry(1, 1, obj));
 
         //verify new peers
+        assertTrue("Dynamic server configuration", context.isDynamicServerConfigurationInUse());
         assertEquals("New peer Ids", Sets.newHashSet(follower2, follower3), Sets.newHashSet(context.getPeerIds()));
     }
 
@@ -431,4 +436,29 @@ public class RaftActorRecoverySupportTest {
         //verify new peers
         assertEquals("New peer Ids", Sets.newHashSet(follower), Sets.newHashSet(context.getPeerIds()));
     }
+
+    @Test
+    public void testOnSnapshotOfferWithServerConfiguration() {
+        long electionTerm = 2;
+        String electionVotedFor = "member-2";
+        ServerConfigurationPayload serverPayload = new ServerConfigurationPayload(Arrays.asList(
+                                                        new ServerInfo(localId, true),
+                                                        new ServerInfo("follower1", true),
+                                                        new ServerInfo("follower2", true)));
+
+        Snapshot snapshot = Snapshot.create(new byte[]{1}, Collections.<ReplicatedLogEntry>emptyList(),
+                -1, -1, -1, -1, electionTerm, electionVotedFor, serverPayload);
+
+        SnapshotMetadata metadata = new SnapshotMetadata("test", 6, 12345);
+        SnapshotOffer snapshotOffer = new SnapshotOffer(metadata , snapshot);
+
+        sendMessageToSupport(snapshotOffer);
+
+        assertEquals("Journal log size", 0, context.getReplicatedLog().size());
+        assertEquals("Election term", electionTerm, context.getTermInformation().getCurrentTerm());
+        assertEquals("Election votedFor", electionVotedFor, context.getTermInformation().getVotedFor());
+        assertTrue("Dynamic server configuration", context.isDynamicServerConfigurationInUse());
+        assertEquals("Peer List", Sets.newHashSet("follower1", "follower2"),
+            Sets.newHashSet(context.getPeerIds()));
+    }
 }

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.