Bug 2187: Recover Peer Id's and Update peer map during Journal recovery 12/29112/4
authorRajesh_Sindagi <Rajesh_Sindagi@dell.com>
Fri, 6 Nov 2015 18:15:31 +0000 (23:45 +0530)
committerGerrit Code Review <gerrit@opendaylight.org>
Wed, 11 Nov 2015 00:22:38 +0000 (00:22 +0000)
Recover ServerConfigurationPayload ReplicatedLogEntry's and immediately apply to the peer map in RaftActorContext.
Review Comments incoporated.

Change-Id: I1b1b3c21e83eb5ea799dd040a4da8f78f1155082
Signed-off-by: Rajesh_Sindagi <Rajesh_Sindagi@dell.com>
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContext.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContextImpl.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorRecoverySupport.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorRecoverySupportTest.java

index c325875950169854685c2746178524155eb5f6fc..aa49e1681348adb0b7a9b1de67d5acb32aef1172 100644 (file)
@@ -112,6 +112,11 @@ public interface RaftActorContext {
      */
     String getPeerAddress(String peerId);
 
+    /**
+     * @param serverCfgPayload
+     */
+    void updatePeerIds(ServerConfigurationPayload serverCfgPayload);
+
     /**
      * @return list of PeerInfo
      */
index b86a015f6929aa9acc6eb7c0b152fede241e250a..257d307f686d14419630aa1e34c329b1565055e9 100644 (file)
@@ -18,9 +18,12 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Supplier;
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
+import java.util.Set;
 import org.opendaylight.controller.cluster.DataPersistenceProvider;
 import org.opendaylight.controller.cluster.raft.policy.RaftPolicy;
+import org.opendaylight.controller.cluster.raft.ServerConfigurationPayload.ServerInfo;
 import org.slf4j.Logger;
 
 public class RaftActorContextImpl implements RaftActorContext {
@@ -179,6 +182,27 @@ public class RaftActorContextImpl implements RaftActorContext {
         return peerAddress;
     }
 
+    @Override
+    public void updatePeerIds(ServerConfigurationPayload serverConfig){
+
+        Set<String> currentPeers = new HashSet<>(this.getPeerIds());
+        for(ServerInfo server: serverConfig.getServerConfig()) {
+            if(!getId().equals(server.getId())) {
+                VotingState votingState = server.isVoting() ? VotingState.VOTING: VotingState.NON_VOTING;
+                if(!currentPeers.contains(server.getId())) {
+                    this.addToPeers(server.getId(), null, votingState);
+                } else {
+                    this.getPeerInfo(server.getId()).setVotingState(votingState);
+                    currentPeers.remove(server.getId());
+                }
+            }
+        }
+
+        for(String peerIdToRemove: currentPeers) {
+            this.removePeer(peerIdToRemove);
+        }
+    }
+
     @Override public ConfigParams getConfigParams() {
         return configParams;
     }
index caa853da2390af2a30802a90c22a63b26e68df31..05405dc6dfc2457d9a33a5340a619ba8990064f3 100644 (file)
@@ -95,7 +95,18 @@ class RaftActorRecoverySupport {
                         context.getTermInformation().getVotedFor());
             }
         } else {
-            dataRecoveredWithPersistenceDisabled = true;
+            boolean isServerConfigPayload = false;
+            if(message instanceof ReplicatedLogEntry){
+                ReplicatedLogEntry repLogEntry = (ReplicatedLogEntry)message;
+                if(isServerConfigurationPayload(repLogEntry)){
+                    isServerConfigPayload = true;
+                    context.updatePeerIds((ServerConfigurationPayload)repLogEntry.getData());
+                }
+            }
+
+            if(!isServerConfigPayload){
+                dataRecoveredWithPersistenceDisabled = true;
+            }
         }
 
         return recoveryComplete;
@@ -146,6 +157,9 @@ class RaftActorRecoverySupport {
                     logEntry.getIndex(), logEntry.size());
         }
 
+        if(isServerConfigurationPayload(logEntry)){
+            context.updatePeerIds((ServerConfigurationPayload)logEntry.getData());
+        }
         replicatedLog().append(logEntry);
     }
 
@@ -180,14 +194,16 @@ class RaftActorRecoverySupport {
         initRecoveryTimer();
 
         int batchSize = context.getConfigParams().getJournalRecoveryLogBatchSize();
-        if(currentRecoveryBatchCount == 0) {
-            cohort.startLogRecoveryBatch(batchSize);
-        }
+        if(!isServerConfigurationPayload(logEntry)){
+            if(currentRecoveryBatchCount == 0) {
+                cohort.startLogRecoveryBatch(batchSize);
+            }
 
-        cohort.appendRecoveredLogEntry(logEntry.getData());
+            cohort.appendRecoveredLogEntry(logEntry.getData());
 
-        if(++currentRecoveryBatchCount >= batchSize) {
-            endCurrentLogRecoveryBatch();
+            if(++currentRecoveryBatchCount >= batchSize) {
+                endCurrentLogRecoveryBatch();
+            }
         }
     }
 
@@ -214,4 +230,8 @@ class RaftActorRecoverySupport {
                  "journal-size = {}", replicatedLog().lastIndex(), replicatedLog().getSnapshotIndex(),
                  replicatedLog().getSnapshotTerm(), replicatedLog().size());
     }
+
+    private boolean isServerConfigurationPayload(ReplicatedLogEntry repLogEntry){
+        return (repLogEntry.getData() instanceof ServerConfigurationPayload);
+    }
 }
index 39c097f4ee36a250bd7e69a30e9042bc41d91fe5..48b68bdfd09ab8128a70fbfaf34621d653611ca5 100644 (file)
@@ -10,18 +10,13 @@ package org.opendaylight.controller.cluster.raft.behaviors;
 
 import akka.actor.ActorRef;
 import akka.actor.Cancellable;
-import java.util.HashSet;
 import java.util.Random;
-import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import org.opendaylight.controller.cluster.raft.ClientRequestTracker;
 import org.opendaylight.controller.cluster.raft.RaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftState;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
 import org.opendaylight.controller.cluster.raft.SerializationUtils;
-import org.opendaylight.controller.cluster.raft.ServerConfigurationPayload;
-import org.opendaylight.controller.cluster.raft.ServerConfigurationPayload.ServerInfo;
-import org.opendaylight.controller.cluster.raft.VotingState;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
@@ -494,23 +489,4 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
     protected String getId(){
         return context.getId();
     }
-
-    public void applyServerConfiguration(ServerConfigurationPayload serverConfig) {
-        Set<String> currentPeers = new HashSet<>(context.getPeerIds());
-        for(ServerInfo server: serverConfig.getServerConfig()) {
-            if(!getId().equals(server.getId())) {
-                VotingState votingState = server.isVoting() ? VotingState.VOTING: VotingState.NON_VOTING;
-                if(!currentPeers.contains(server.getId())) {
-                    context.addToPeers(server.getId(), null, votingState);
-                } else {
-                    context.getPeerInfo(server.getId()).setVotingState(votingState);
-                    currentPeers.remove(server.getId());
-                }
-            }
-        }
-
-        for(String peerIdToRemove: currentPeers) {
-            context.removePeer(peerIdToRemove);
-        }
-    }
 }
index c69528230084af9091dc906324f8c3eff7e2f856..321b2dd86410d6cbac8fccbe0bde0b12b606c4ef 100644 (file)
@@ -192,7 +192,7 @@ public class Follower extends AbstractRaftActorBehavior {
                 context.getReplicatedLog().appendAndPersist(entry);
 
                 if(entry.getData() instanceof ServerConfigurationPayload) {
-                    applyServerConfiguration((ServerConfigurationPayload)entry.getData());
+                    context.updatePeerIds((ServerConfigurationPayload)entry.getData());
                 }
             }
 
index 26bf7228c1adece502635ca71c1c586ada3615b1..e4f77f1c40d33687627216ed44aa3a939fd157f5 100644 (file)
@@ -20,6 +20,8 @@ import akka.persistence.SnapshotOffer;
 import akka.persistence.SnapshotSelectionCriteria;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
 import org.hamcrest.Description;
 import org.junit.Before;
 import org.junit.Test;
@@ -36,6 +38,7 @@ import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
 import org.opendaylight.controller.cluster.raft.base.messages.DeleteEntries;
 import org.opendaylight.controller.cluster.raft.base.messages.UpdateElectionTerm;
 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
+import org.opendaylight.controller.cluster.raft.ServerConfigurationPayload.ServerInfo;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -65,6 +68,7 @@ public class RaftActorRecoverySupportTest {
     private RaftActorContext context;
     private final DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
 
+
     @Before
     public void setup() {
         MockitoAnnotations.initMocks(this);
@@ -366,4 +370,53 @@ public class RaftActorRecoverySupportTest {
 
         verifyNoMoreInteractions(mockCohort, mockPersistentProvider);
     }
+
+    @Test
+    public void testUpdatePeerIds() {
+
+            String leader = "Leader";
+            String follower1 = "follower1";
+            String follower2 = "follower2";
+            String follower3 = "follower3";
+
+            Map<String, String> peerAddresses = new HashMap<>();
+
+            peerAddresses.put(leader, null);
+            peerAddresses.put(follower1, null);
+            peerAddresses.put(follower2, null);
+
+            context.addToPeers(leader,null,VotingState.VOTING);
+            context.addToPeers(follower1,null,VotingState.VOTING);
+            context.addToPeers(follower2,null,VotingState.VOTING);
+
+            assertEquals("Size", 3, context.getPeers().size());
+
+            //add new Server
+            ServerConfigurationPayload obj = new ServerConfigurationPayload(Arrays.asList(
+                                                           new ServerInfo(leader, true),
+                                                           new ServerInfo(follower1, true),
+                                                           new ServerInfo(follower2, true),
+                                                           new ServerInfo(follower3, true)));
+
+            MockRaftActorContext.MockReplicatedLogEntry logEntry = new MockRaftActorContext.MockReplicatedLogEntry(1,
+                1, obj);
+
+            sendMessageToSupport(logEntry);
+            //verify size and names
+            assertEquals("Size", 4, context.getPeers().size());
+            assertEquals("New follower matched", true , context.getPeerIds().contains(follower3));
+
+            //remove existing follower1
+            obj = new ServerConfigurationPayload(Arrays.asList(
+                                                           new ServerInfo("Leader", true),
+                                                           new ServerInfo("follower2", true),
+                                                           new ServerInfo("follower3", true)));
+
+            logEntry = new MockRaftActorContext.MockReplicatedLogEntry(1, 1, obj);
+
+            sendMessageToSupport(logEntry);
+            //verify size and names
+            assertEquals("Size", 3, context.getPeers().size());
+            assertEquals("Removed follower matched", false, context.getPeerIds().contains(follower1));
+    }
 }