BUG 2187 - Persisting shard list in ShardManager
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ShardManager.java
index 49e1fe32f381ffd338f5b4ba8da035576eaca9fa..98a6090514c9549f2f506c82a85fce7376e35cf6 100644 (file)
@@ -21,6 +21,9 @@ import akka.cluster.ClusterEvent;
 import akka.dispatch.OnComplete;
 import akka.japi.Function;
 import akka.persistence.RecoveryCompleted;
+import akka.persistence.SaveSnapshotFailure;
+import akka.persistence.SaveSnapshotSuccess;
+import akka.persistence.SnapshotOffer;
 import akka.serialization.Serialization;
 import akka.util.Timeout;
 import com.google.common.annotations.VisibleForTesting;
@@ -34,6 +37,7 @@ import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -116,7 +120,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
     private final String shardDispatcherPath;
 
-    private ShardManagerInfo mBean;
+    private final ShardManagerInfo mBean;
 
     private DatastoreContextFactory datastoreContextFactory;
 
@@ -149,7 +153,12 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         // Subscribe this actor to cluster member events
         cluster.subscribeToMemberEvents(getSelf());
 
-        createLocalShards();
+        List<String> localShardActorNames = new ArrayList<>();
+        mBean = ShardManagerInfo.createShardManagerMBean(cluster.getCurrentMemberName(),
+                "shard-manager-" + this.type,
+                datastoreContextFactory.getBaseDatastoreContext().getDataStoreMXBeanType(),
+                localShardActorNames);
+        mBean.setShardManager(this);
     }
 
     @Override
@@ -199,6 +208,11 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             onRemoveShardReplica((RemoveShardReplica)message);
         } else if(message instanceof GetSnapshot) {
             onGetSnapshot();
+        } else if (message instanceof SaveSnapshotSuccess) {
+            LOG.debug ("{} saved ShardManager snapshot successfully", persistenceId());
+        } else if (message instanceof SaveSnapshotFailure) {
+            LOG.error ("{}: SaveSnapshotFailure received for saving snapshot of shards",
+                persistenceId(), ((SaveSnapshotFailure)message).cause());
         } else {
             unknownMessage(message);
         }
@@ -428,6 +442,9 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             // We no longer persist SchemaContext modules so delete all the prior messages from the akka
             // journal on upgrade from Helium.
             deleteMessages(lastSequenceNr());
+            createLocalShards();
+        } else if (message instanceof SnapshotOffer) {
+            handleShardRecovery((SnapshotOffer) message);
         }
     }
 
@@ -726,20 +743,14 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
         restoreFromSnapshot = null; // null out to GC
 
-        List<String> localShardActorNames = new ArrayList<>();
         for(String shardName : memberShardNames){
             ShardIdentifier shardId = getShardIdentifier(memberName, shardName);
             Map<String, String> peerAddresses = getPeerAddresses(shardName);
-            localShardActorNames.add(shardId.toString());
             localShards.put(shardName, new ShardInformation(shardName, shardId, peerAddresses,
                     newShardDatastoreContext(shardName), Shard.builder().restoreFromSnapshot(
                         shardSnapshots.get(shardName)), peerAddressResolver));
+            mBean.addLocalShard(shardId.toString());
         }
-
-        mBean = ShardManagerInfo.createShardManagerMBean(memberName, "shard-manager-" + this.type,
-                datastoreContextFactory.getBaseDatastoreContext().getDataStoreMXBeanType(), localShardActorNames);
-
-        mBean.setShardManager(this);
     }
 
     /**
@@ -870,6 +881,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         final ShardInformation shardInfo = new ShardInformation(shardName, shardId,
                           getPeerAddresses(shardName), datastoreContext,
                           Shard.builder(), peerAddressResolver);
+        shardInfo.setShardActiveMember(false);
         localShards.put(shardName, shardInfo);
         shardInfo.setActor(newShardActor(schemaContext, shardInfo));
 
@@ -916,6 +928,8 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
             // Make the local shard voting capable
             shardInfo.setDatastoreContext(newShardDatastoreContext(shardName), getSelf());
+            shardInfo.setShardActiveMember(true);
+            persistShardList();
 
             mBean.addLocalShard(shardInfo.getShardId().toString());
             sender.tell(new akka.actor.Status.Success(true), getSelf());
@@ -961,6 +975,39 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         return;
     }
 
+    private void persistShardList() {
+        List<String> shardList = new ArrayList(localShards.keySet());
+        for (ShardInformation shardInfo : localShards.values()) {
+            if (!shardInfo.isShardActiveMember()) {
+                shardList.remove(shardInfo.getShardName());
+            }
+        }
+        LOG.debug ("{}: persisting the shard list {}", persistenceId(), shardList);
+        saveSnapshot(new ShardManagerSnapshot(shardList));
+    }
+
+    private void handleShardRecovery(SnapshotOffer offer) {
+        LOG.debug ("{}: in handleShardRecovery", persistenceId());
+        ShardManagerSnapshot snapshot = (ShardManagerSnapshot)offer.snapshot();
+        String currentMember = cluster.getCurrentMemberName();
+        Set<String> configuredShardList =
+            new HashSet<>(configuration.getMemberShardNames(currentMember));
+        for (String shard : snapshot.getShardList()) {
+            if (!configuredShardList.contains(shard)) {
+                // add the current member as a replica for the shard
+                LOG.debug ("{}: adding shard {}", persistenceId(), shard);
+                configuration.addMemberReplicaForShard(shard, currentMember);
+            } else {
+                configuredShardList.remove(shard);
+            }
+        }
+        for (String shard : configuredShardList) {
+            // remove the member as a replica for the shard
+            LOG.debug ("{}: removing shard {}", persistenceId(), shard);
+            configuration.removeMemberReplicaForShard(shard, currentMember);
+        }
+    }
+
     @VisibleForTesting
     protected static class ShardInformation {
         private final ShardIdentifier shardId;
@@ -984,6 +1031,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         private DatastoreContext datastoreContext;
         private Shard.AbstractBuilder<?, ?> builder;
         private final ShardPeerAddressResolver addressResolver;
+        private boolean shardActiveStatus = true;
 
         private ShardInformation(String shardName, ShardIdentifier shardId,
                 Map<String, String> initialPeerAddresses, DatastoreContext datastoreContext,
@@ -1182,6 +1230,14 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         void setLeaderVersion(short leaderVersion) {
             this.leaderVersion = leaderVersion;
         }
+
+        void setShardActiveMember(boolean flag) {
+            shardActiveStatus = flag;
+        }
+
+        boolean isShardActiveMember() {
+            return shardActiveStatus;
+        }
     }
 
     private static class OnShardInitialized {
@@ -1307,13 +1363,17 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             return this;
         }
 
-        public Props props() {
+        protected void verify() {
             sealed = true;
             Preconditions.checkNotNull(cluster, "cluster should not be null");
             Preconditions.checkNotNull(configuration, "configuration should not be null");
             Preconditions.checkNotNull(datastoreContextFactory, "datastoreContextFactory should not be null");
             Preconditions.checkNotNull(waitTillReadyCountdownLatch, "waitTillReadyCountdownLatch should not be null");
             Preconditions.checkNotNull(primaryShardInfoCache, "primaryShardInfoCache should not be null");
+        }
+
+        public Props props() {
+            verify();
             return Props.create(ShardManager.class, this);
         }
     }