BUG 2187 - Persisting shard list in ShardManager
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ShardManager.java
index 0804a50e9b1f2b6d5587742e83b8efa2218ddd30..98a6090514c9549f2f506c82a85fce7376e35cf6 100644 (file)
@@ -19,9 +19,11 @@ import akka.actor.Props;
 import akka.actor.SupervisorStrategy;
 import akka.cluster.ClusterEvent;
 import akka.dispatch.OnComplete;
-import akka.japi.Creator;
 import akka.japi.Function;
 import akka.persistence.RecoveryCompleted;
+import akka.persistence.SaveSnapshotFailure;
+import akka.persistence.SaveSnapshotSuccess;
+import akka.persistence.SnapshotOffer;
 import akka.serialization.Serialization;
 import akka.util.Timeout;
 import com.google.common.annotations.VisibleForTesting;
@@ -35,6 +37,7 @@ import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -54,6 +57,7 @@ import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
 import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica;
 import org.opendaylight.controller.cluster.datastore.messages.CreateShard;
 import org.opendaylight.controller.cluster.datastore.messages.CreateShardReply;
+import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot;
 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
 import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
@@ -116,7 +120,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
     private final String shardDispatcherPath;
 
-    private ShardManagerInfo mBean;
+    private final ShardManagerInfo mBean;
 
     private DatastoreContextFactory datastoreContextFactory;
 
@@ -128,43 +132,33 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
     private SchemaContext schemaContext;
 
+    private DatastoreSnapshot restoreFromSnapshot;
+
     /**
      */
-    protected ShardManager(ClusterWrapper cluster, Configuration configuration,
-            DatastoreContextFactory datastoreContextFactory, CountDownLatch waitTillReadyCountdownLatch,
-            PrimaryShardInfoFutureCache primaryShardInfoCache) {
-
-        this.cluster = Preconditions.checkNotNull(cluster, "cluster should not be null");
-        this.configuration = Preconditions.checkNotNull(configuration, "configuration should not be null");
-        this.datastoreContextFactory = datastoreContextFactory;
-        this.type = datastoreContextFactory.getBaseDatastoreContext().getDataStoreType();
+    protected ShardManager(Builder builder) {
+
+        this.cluster = builder.cluster;
+        this.configuration = builder.configuration;
+        this.datastoreContextFactory = builder.datastoreContextFactory;
+        this.type = builder.datastoreContextFactory.getBaseDatastoreContext().getDataStoreType();
         this.shardDispatcherPath =
                 new Dispatchers(context().system().dispatchers()).getDispatcherPath(Dispatchers.DispatcherType.Shard);
-        this.waitTillReadyCountdownLatch = waitTillReadyCountdownLatch;
-        this.primaryShardInfoCache = primaryShardInfoCache;
+        this.waitTillReadyCountdownLatch = builder.waitTillReadyCountdownLatch;
+        this.primaryShardInfoCache = builder.primaryShardInfoCache;
+        this.restoreFromSnapshot = builder.restoreFromSnapshot;
 
         peerAddressResolver = new ShardPeerAddressResolver(type, cluster.getCurrentMemberName());
 
         // Subscribe this actor to cluster member events
         cluster.subscribeToMemberEvents(getSelf());
 
-        createLocalShards();
-    }
-
-    public static Props props(
-            final ClusterWrapper cluster,
-            final Configuration configuration,
-            final DatastoreContextFactory datastoreContextFactory,
-            final CountDownLatch waitTillReadyCountdownLatch,
-            final PrimaryShardInfoFutureCache primaryShardInfoCache) {
-
-        Preconditions.checkNotNull(cluster, "cluster should not be null");
-        Preconditions.checkNotNull(configuration, "configuration should not be null");
-        Preconditions.checkNotNull(waitTillReadyCountdownLatch, "waitTillReadyCountdownLatch should not be null");
-        Preconditions.checkNotNull(primaryShardInfoCache, "primaryShardInfoCache should not be null");
-
-        return Props.create(new ShardManagerCreator(cluster, configuration, datastoreContextFactory,
-                waitTillReadyCountdownLatch, primaryShardInfoCache));
+        List<String> localShardActorNames = new ArrayList<>();
+        mBean = ShardManagerInfo.createShardManagerMBean(cluster.getCurrentMemberName(),
+                "shard-manager-" + this.type,
+                datastoreContextFactory.getBaseDatastoreContext().getDataStoreMXBeanType(),
+                localShardActorNames);
+        mBean.setShardManager(this);
     }
 
     @Override
@@ -214,6 +208,11 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             onRemoveShardReplica((RemoveShardReplica)message);
         } else if(message instanceof GetSnapshot) {
             onGetSnapshot();
+        } else if (message instanceof SaveSnapshotSuccess) {
+            LOG.debug ("{} saved ShardManager snapshot successfully", persistenceId());
+        } else if (message instanceof SaveSnapshotFailure) {
+            LOG.error ("{}: SaveSnapshotFailure received for saving snapshot of shards",
+                persistenceId(), ((SaveSnapshotFailure)message).cause());
         } else {
             unknownMessage(message);
         }
@@ -241,8 +240,8 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         }
 
         byte[] shardManagerSnapshot = null;
-        ActorRef replyActor = getContext().actorOf(ShardManagerGetSnapshotReplyActor.props(localShards.size(),
-                type, shardManagerSnapshot , getSender(), persistenceId(),
+        ActorRef replyActor = getContext().actorOf(ShardManagerGetSnapshotReplyActor.props(
+                new ArrayList<>(localShards.keySet()), type, shardManagerSnapshot , getSender(), persistenceId(),
                 datastoreContextFactory.getBaseDatastoreContext().getShardInitializationTimeout().duration()));
 
         for(ShardInformation shardInfo: localShards.values()) {
@@ -277,7 +276,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             }
 
             ShardInformation info = new ShardInformation(moduleShardConfig.getShardName(), shardId, peerAddresses,
-                    shardDatastoreContext, createShard.getShardPropsCreator(), peerAddressResolver);
+                    shardDatastoreContext, createShard.getShardBuilder(), peerAddressResolver);
             localShards.put(info.getShardName(), info);
 
             mBean.addLocalShard(shardId.toString());
@@ -443,6 +442,9 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             // We no longer persist SchemaContext modules so delete all the prior messages from the akka
             // journal on upgrade from Helium.
             deleteMessages(lastSequenceNr());
+            createLocalShards();
+        } else if (message instanceof SnapshotOffer) {
+            handleShardRecovery((SnapshotOffer) message);
         }
     }
 
@@ -731,20 +733,24 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         String memberName = this.cluster.getCurrentMemberName();
         Collection<String> memberShardNames = this.configuration.getMemberShardNames(memberName);
 
-        ShardPropsCreator shardPropsCreator = new DefaultShardPropsCreator();
-        List<String> localShardActorNames = new ArrayList<>();
+        Map<String, DatastoreSnapshot.ShardSnapshot> shardSnapshots = new HashMap<>();
+        if(restoreFromSnapshot != null)
+        {
+            for(DatastoreSnapshot.ShardSnapshot snapshot: restoreFromSnapshot.getShardSnapshots()) {
+                shardSnapshots.put(snapshot.getName(), snapshot);
+            }
+        }
+
+        restoreFromSnapshot = null; // null out to GC
+
         for(String shardName : memberShardNames){
             ShardIdentifier shardId = getShardIdentifier(memberName, shardName);
             Map<String, String> peerAddresses = getPeerAddresses(shardName);
-            localShardActorNames.add(shardId.toString());
             localShards.put(shardName, new ShardInformation(shardName, shardId, peerAddresses,
-                    newShardDatastoreContext(shardName), shardPropsCreator, peerAddressResolver));
+                    newShardDatastoreContext(shardName), Shard.builder().restoreFromSnapshot(
+                        shardSnapshots.get(shardName)), peerAddressResolver));
+            mBean.addLocalShard(shardId.toString());
         }
-
-        mBean = ShardManagerInfo.createShardManagerMBean(memberName, "shard-manager-" + this.type,
-                datastoreContextFactory.getBaseDatastoreContext().getDataStoreMXBeanType(), localShardActorNames);
-
-        mBean.setShardManager(this);
     }
 
     /**
@@ -874,7 +880,8 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
         final ShardInformation shardInfo = new ShardInformation(shardName, shardId,
                           getPeerAddresses(shardName), datastoreContext,
-                          new DefaultShardPropsCreator(), peerAddressResolver);
+                          Shard.builder(), peerAddressResolver);
+        shardInfo.setShardActiveMember(false);
         localShards.put(shardName, shardInfo);
         shardInfo.setActor(newShardActor(schemaContext, shardInfo));
 
@@ -921,6 +928,8 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
             // Make the local shard voting capable
             shardInfo.setDatastoreContext(newShardDatastoreContext(shardName), getSelf());
+            shardInfo.setShardActiveMember(true);
+            persistShardList();
 
             mBean.addLocalShard(shardInfo.getShardId().toString());
             sender.tell(new akka.actor.Status.Success(true), getSelf());
@@ -966,6 +975,39 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         return;
     }
 
+    private void persistShardList() {
+        List<String> shardList = new ArrayList(localShards.keySet());
+        for (ShardInformation shardInfo : localShards.values()) {
+            if (!shardInfo.isShardActiveMember()) {
+                shardList.remove(shardInfo.getShardName());
+            }
+        }
+        LOG.debug ("{}: persisting the shard list {}", persistenceId(), shardList);
+        saveSnapshot(new ShardManagerSnapshot(shardList));
+    }
+
+    private void handleShardRecovery(SnapshotOffer offer) {
+        LOG.debug ("{}: in handleShardRecovery", persistenceId());
+        ShardManagerSnapshot snapshot = (ShardManagerSnapshot)offer.snapshot();
+        String currentMember = cluster.getCurrentMemberName();
+        Set<String> configuredShardList =
+            new HashSet<>(configuration.getMemberShardNames(currentMember));
+        for (String shard : snapshot.getShardList()) {
+            if (!configuredShardList.contains(shard)) {
+                // add the current member as a replica for the shard
+                LOG.debug ("{}: adding shard {}", persistenceId(), shard);
+                configuration.addMemberReplicaForShard(shard, currentMember);
+            } else {
+                configuredShardList.remove(shard);
+            }
+        }
+        for (String shard : configuredShardList) {
+            // remove the member as a replica for the shard
+            LOG.debug ("{}: removing shard {}", persistenceId(), shard);
+            configuration.removeMemberReplicaForShard(shard, currentMember);
+        }
+    }
+
     @VisibleForTesting
     protected static class ShardInformation {
         private final ShardIdentifier shardId;
@@ -987,22 +1029,27 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         private short leaderVersion;
 
         private DatastoreContext datastoreContext;
-        private final ShardPropsCreator shardPropsCreator;
+        private Shard.AbstractBuilder<?, ?> builder;
         private final ShardPeerAddressResolver addressResolver;
+        private boolean shardActiveStatus = true;
 
         private ShardInformation(String shardName, ShardIdentifier shardId,
                 Map<String, String> initialPeerAddresses, DatastoreContext datastoreContext,
-                ShardPropsCreator shardPropsCreator, ShardPeerAddressResolver addressResolver) {
+                Shard.AbstractBuilder<?, ?> builder, ShardPeerAddressResolver addressResolver) {
             this.shardName = shardName;
             this.shardId = shardId;
             this.initialPeerAddresses = initialPeerAddresses;
             this.datastoreContext = datastoreContext;
-            this.shardPropsCreator = shardPropsCreator;
+            this.builder = builder;
             this.addressResolver = addressResolver;
         }
 
         Props newProps(SchemaContext schemaContext) {
-            return shardPropsCreator.newProps(shardId, initialPeerAddresses, datastoreContext, schemaContext);
+            Preconditions.checkNotNull(builder);
+            Props props = builder.id(shardId).peerAddresses(initialPeerAddresses).datastoreContext(datastoreContext).
+                    schemaContext(schemaContext).props();
+            builder = null;
+            return props;
         }
 
         String getShardName() {
@@ -1183,31 +1230,13 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         void setLeaderVersion(short leaderVersion) {
             this.leaderVersion = leaderVersion;
         }
-    }
 
-    private static class ShardManagerCreator implements Creator<ShardManager> {
-        private static final long serialVersionUID = 1L;
-
-        final ClusterWrapper cluster;
-        final Configuration configuration;
-        final DatastoreContextFactory datastoreContextFactory;
-        private final CountDownLatch waitTillReadyCountdownLatch;
-        private final PrimaryShardInfoFutureCache primaryShardInfoCache;
-
-        ShardManagerCreator(ClusterWrapper cluster, Configuration configuration,
-                DatastoreContextFactory datastoreContextFactory, CountDownLatch waitTillReadyCountdownLatch,
-                PrimaryShardInfoFutureCache primaryShardInfoCache) {
-            this.cluster = cluster;
-            this.configuration = configuration;
-            this.datastoreContextFactory = datastoreContextFactory;
-            this.waitTillReadyCountdownLatch = waitTillReadyCountdownLatch;
-            this.primaryShardInfoCache = primaryShardInfoCache;
+        void setShardActiveMember(boolean flag) {
+            shardActiveStatus = flag;
         }
 
-        @Override
-        public ShardManager create() throws Exception {
-            return new ShardManager(cluster, configuration, datastoreContextFactory, waitTillReadyCountdownLatch,
-                    primaryShardInfoCache);
+        boolean isShardActiveMember() {
+            return shardActiveStatus;
         }
     }
 
@@ -1280,6 +1309,74 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             return modules;
         }
     }
+
+    public static Builder builder() {
+        return new Builder();
+    }
+
+    public static class Builder {
+        private ClusterWrapper cluster;
+        private Configuration configuration;
+        private DatastoreContextFactory datastoreContextFactory;
+        private CountDownLatch waitTillReadyCountdownLatch;
+        private PrimaryShardInfoFutureCache primaryShardInfoCache;
+        private DatastoreSnapshot restoreFromSnapshot;
+        private volatile boolean sealed;
+
+        protected void checkSealed() {
+            Preconditions.checkState(!sealed, "Builder is already sealed - further modifications are not allowed");
+        }
+
+        public Builder cluster(ClusterWrapper cluster) {
+            checkSealed();
+            this.cluster = cluster;
+            return this;
+        }
+
+        public Builder configuration(Configuration configuration) {
+            checkSealed();
+            this.configuration = configuration;
+            return this;
+        }
+
+        public Builder datastoreContextFactory(DatastoreContextFactory datastoreContextFactory) {
+            checkSealed();
+            this.datastoreContextFactory = datastoreContextFactory;
+            return this;
+        }
+
+        public Builder waitTillReadyCountdownLatch(CountDownLatch waitTillReadyCountdownLatch) {
+            checkSealed();
+            this.waitTillReadyCountdownLatch = waitTillReadyCountdownLatch;
+            return this;
+        }
+
+        public Builder primaryShardInfoCache(PrimaryShardInfoFutureCache primaryShardInfoCache) {
+            checkSealed();
+            this.primaryShardInfoCache = primaryShardInfoCache;
+            return this;
+        }
+
+        public Builder restoreFromSnapshot(DatastoreSnapshot restoreFromSnapshot) {
+            checkSealed();
+            this.restoreFromSnapshot = restoreFromSnapshot;
+            return this;
+        }
+
+        protected void verify() {
+            sealed = true;
+            Preconditions.checkNotNull(cluster, "cluster should not be null");
+            Preconditions.checkNotNull(configuration, "configuration should not be null");
+            Preconditions.checkNotNull(datastoreContextFactory, "datastoreContextFactory should not be null");
+            Preconditions.checkNotNull(waitTillReadyCountdownLatch, "waitTillReadyCountdownLatch should not be null");
+            Preconditions.checkNotNull(primaryShardInfoCache, "primaryShardInfoCache should not be null");
+        }
+
+        public Props props() {
+            verify();
+            return Props.create(ShardManager.class, this);
+        }
+    }
 }