BUG-4626: Introduce NormalizedNodeData{Input,Output}
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ShardManager.java
index e33d7cdce694eb3715eb4781a0a40c176855677b..d61e12e1cb2fd2bc97e7f6b34429c22adaf72669 100644 (file)
@@ -24,6 +24,7 @@ import akka.persistence.RecoveryCompleted;
 import akka.persistence.SaveSnapshotFailure;
 import akka.persistence.SaveSnapshotSuccess;
 import akka.persistence.SnapshotOffer;
+import akka.persistence.SnapshotSelectionCriteria;
 import akka.serialization.Serialization;
 import akka.util.Timeout;
 import com.google.common.annotations.VisibleForTesting;
@@ -33,6 +34,8 @@ import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
 import com.google.common.base.Supplier;
 import com.google.common.collect.Sets;
+import java.io.ByteArrayInputStream;
+import java.io.ObjectInputStream;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -48,6 +51,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
+import org.apache.commons.lang3.SerializationUtils;
 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering;
 import org.opendaylight.controller.cluster.datastore.config.Configuration;
 import org.opendaylight.controller.cluster.datastore.config.ModuleShardConfiguration;
@@ -139,7 +143,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
     private DatastoreSnapshot restoreFromSnapshot;
 
-    private ShardManagerSnapshot recoveredSnapshot;
+    private ShardManagerSnapshot currentSnapshot;
 
     private final Set<String> shardReplicaOperationsInProgress = new HashSet<>();
 
@@ -235,7 +239,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         } else if(message instanceof ServerRemoved){
             onShardReplicaRemoved((ServerRemoved) message);
         } else if (message instanceof SaveSnapshotSuccess) {
-            LOG.debug("{} saved ShardManager snapshot successfully", persistenceId());
+            onSaveSnapshotSuccess((SaveSnapshotSuccess)message);
         } else if (message instanceof SaveSnapshotFailure) {
             LOG.error("{}: SaveSnapshotFailure received for saving snapshot of shards",
                     persistenceId(), ((SaveSnapshotFailure) message).cause());
@@ -285,6 +289,10 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         }
 
         byte[] shardManagerSnapshot = null;
+        if(currentSnapshot != null) {
+            shardManagerSnapshot = SerializationUtils.serialize(currentSnapshot);
+        }
+
         ActorRef replyActor = getContext().actorOf(ShardManagerGetSnapshotReplyActor.props(
                 new ArrayList<>(localShards.keySet()), type, shardManagerSnapshot , getSender(), persistenceId(),
                 datastoreContextFactory.getBaseDatastoreContext().getShardInitializationTimeout().duration()));
@@ -333,8 +341,8 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
         ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), shardName);
 
-        boolean shardWasInRecoveredSnapshot = recoveredSnapshot != null &&
-                recoveredSnapshot.getShardList().contains(shardName);
+        boolean shardWasInRecoveredSnapshot = currentSnapshot != null &&
+                currentSnapshot.getShardList().contains(shardName);
 
         Map<String, String> peerAddresses;
         boolean isActiveMember;
@@ -510,15 +518,34 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
     @Override
     protected void handleRecover(Object message) throws Exception {
         if (message instanceof RecoveryCompleted) {
-            LOG.info("Recovery complete : {}", persistenceId());
-
-            // We no longer persist SchemaContext modules so delete all the prior messages from the akka
-            // journal on upgrade from Helium.
-            deleteMessages(lastSequenceNr());
-            createLocalShards();
+            onRecoveryCompleted();
         } else if (message instanceof SnapshotOffer) {
-            onSnapshotOffer((SnapshotOffer) message);
+            applyShardManagerSnapshot((ShardManagerSnapshot)((SnapshotOffer) message).snapshot());
+        }
+    }
+
+    private void onRecoveryCompleted() {
+        LOG.info("Recovery complete : {}", persistenceId());
+
+        // We no longer persist SchemaContext modules so delete all the prior messages from the akka
+        // journal on upgrade from Helium.
+        deleteMessages(lastSequenceNr());
+
+        if(currentSnapshot == null && restoreFromSnapshot != null &&
+                restoreFromSnapshot.getShardManagerSnapshot() != null) {
+            try(ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(
+                    restoreFromSnapshot.getShardManagerSnapshot()))) {
+                ShardManagerSnapshot snapshot = (ShardManagerSnapshot) ois.readObject();
+
+                LOG.debug("{}: Deserialized restored ShardManagerSnapshot: {}", persistenceId(), snapshot);
+
+                applyShardManagerSnapshot(snapshot);
+            } catch(Exception e) {
+                LOG.error("{}: Error deserializing restored ShardManagerSnapshot", persistenceId(), e);
+            }
         }
+
+        createLocalShards();
     }
 
     private void findLocalShard(FindLocalShard message) {
@@ -1067,18 +1094,23 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             }
         }
         LOG.debug ("{}: persisting the shard list {}", persistenceId(), shardList);
-        saveSnapshot(new ShardManagerSnapshot(shardList));
+        saveSnapshot(updateShardManagerSnapshot(shardList));
     }
 
-    private void onSnapshotOffer(SnapshotOffer offer) {
-        recoveredSnapshot = (ShardManagerSnapshot)offer.snapshot();
+    private ShardManagerSnapshot updateShardManagerSnapshot(List<String> shardList) {
+        currentSnapshot = new ShardManagerSnapshot(shardList);
+        return currentSnapshot;
+    }
+
+    private void applyShardManagerSnapshot(ShardManagerSnapshot snapshot) {
+        currentSnapshot = snapshot;
 
-        LOG.debug ("{}: onSnapshotOffer: {}", persistenceId(), recoveredSnapshot);
+        LOG.debug ("{}: onSnapshotOffer: {}", persistenceId(), currentSnapshot);
 
         String currentMember = cluster.getCurrentMemberName();
         Set<String> configuredShardList =
             new HashSet<>(configuration.getMemberShardNames(currentMember));
-        for (String shard : recoveredSnapshot.getShardList()) {
+        for (String shard : currentSnapshot.getShardList()) {
             if (!configuredShardList.contains(shard)) {
                 // add the current member as a replica for the shard
                 LOG.debug ("{}: adding shard {}", persistenceId(), shard);
@@ -1094,6 +1126,12 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         }
     }
 
+    private void onSaveSnapshotSuccess (SaveSnapshotSuccess successMessage) {
+        LOG.debug ("{} saved ShardManager snapshot successfully. Deleting the prev snapshot if available",
+            persistenceId());
+        deleteSnapshots(new SnapshotSelectionCriteria(scala.Long.MaxValue(), (successMessage.metadata().timestamp() - 1)));
+    }
+
     private static class ForwardedAddServerReply {
         ShardInformation shardInfo;
         AddServerReply addServerReply;