Bug 4564: Implement datastore restore from backup file 43/29243/12
authorTom Pantelis <tpanteli@brocade.com>
Wed, 4 Nov 2015 08:59:39 +0000 (03:59 -0500)
committerGerrit Code Review <gerrit@opendaylight.org>
Sat, 14 Nov 2015 22:41:12 +0000 (22:41 +0000)
Added a singleton DatastoreSnapshotRestore class that looks for and
reads a restore file in a specific directory and deserializes the datastore
snapshots. The restore file is then deleted.

The DatastoreSnapshotRestore instance needs to be injected into both
DistributedDatastore instances which are created via separate config
system Module instances. However the only way to inject the
DatastoreSnapshotRestore instance would be to define a yang module
and service. I didn't want to go thru the overhead of all that and I
didn't want the DatastoreSnapshotRestore advertised as a service. So I made
it a static singleton that is created via a new bundle Activator class.

The DatastoreSnapshot instance is passed to the ShardManager which
passes each ShardSnapshot to the corresponding Shard actor. On
recovery complete, the RaftActor takes care of applying the restored
snapshot.

Change-Id: Ied3db4e49b98320abb34e2acf73b27b29232f8d6
Signed-off-by: Tom Pantelis <tpanteli@brocade.com>
20 files changed:
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotManager.java
opendaylight/md-sal/sal-distributed-datastore/pom.xml
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreSnapshotRestore.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreFactory.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardRecoveryCoordinator.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/admin/ClusterAdminRpcService.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/DatastoreSnapshotList.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/osgi/Activator.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/distributed_datastore_provider/DistributedConfigDataStoreProviderModule.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/distributed_datastore_provider/DistributedOperationalDataStoreProviderModule.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DatastoreSnapshotRestoreTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/IntegrationTestKit.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardRecoveryCoordinatorTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/DistributedEntityOwnershipServiceTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/md/cluster/datastore/model/CarsModel.java

index 4a20e5b3aed77741cbc9bec66316211f73d4959a..10574e3a02cf881c689a88e6f81de0f021ffe70b 100644 (file)
@@ -139,7 +139,7 @@ public class SnapshotManager implements SnapshotState {
             lastLogEntryIndex = lastLogEntry.getIndex();
             lastLogEntryTerm = lastLogEntry.getTerm();
         } else {
-            LOG.warn("Capturing Snapshot : lastLogEntry is null. Using lastAppliedIndex {} and lastAppliedTerm {} instead.",
+            LOG.debug("Capturing Snapshot : lastLogEntry is null. Using lastAppliedIndex {} and lastAppliedTerm {} instead.",
                 lastAppliedIndex, lastAppliedTerm);
         }
 
index 74d4ec72b30215f8746df5ae7e0a1b22febdd269..ea71351b698f77b3b049bde2ba37fefdf88a528c 100644 (file)
         <configuration>
           <instructions>
             <Bundle-Name>${project.groupId}.${project.artifactId}</Bundle-Name>
+            <Bundle-Activator>org.opendaylight.controller.cluster.datastore.osgi.Activator</Bundle-Activator>
             <Export-Package></Export-Package>
             <Import-Package>!*snappy;!org.jboss.*;!com.jcraft.*;!*jetty*;!sun.security.*;*</Import-Package>
             <!--
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreSnapshotRestore.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreSnapshotRestore.java
new file mode 100644 (file)
index 0000000..bed2389
--- /dev/null
@@ -0,0 +1,106 @@
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import com.google.common.base.Preconditions;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectInputStream;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot;
+import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshotList;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class looks for a previously saved data store backup file in a directory and, if found, de-serializes
+ * the DatastoreSnapshot instances. This class has a static singleton that is created on bundle activation.
+ *
+ * @author Thomas Pantelis
+ */
+public class DatastoreSnapshotRestore {
+    private static final Logger LOG = LoggerFactory.getLogger(DatastoreSnapshotRestore.class);
+
+    private static AtomicReference<DatastoreSnapshotRestore> instance = new AtomicReference<>();
+
+    private final String restoreDirectoryPath;
+    private final Map<String, DatastoreSnapshot> datastoreSnapshots = new ConcurrentHashMap<>();
+    private final AtomicBoolean initialized = new AtomicBoolean();
+
+    public static void createInstance(String restoreDirectoryPath) {
+        instance.compareAndSet(null, new DatastoreSnapshotRestore(restoreDirectoryPath));
+    }
+
+    public static void removeInstance() {
+        instance.set(null);
+    }
+
+    public static DatastoreSnapshotRestore instance() {
+        DatastoreSnapshotRestore localInstance = instance.get();
+        return Preconditions.checkNotNull(localInstance, "DatastoreSnapshotRestore instance was not created");
+    }
+
+    private DatastoreSnapshotRestore(String restoreDirectoryPath) {
+        this.restoreDirectoryPath = Preconditions.checkNotNull(restoreDirectoryPath);
+    }
+
+    private void initialize() {
+        if(!initialized.compareAndSet(false, true)) {
+            return;
+        }
+
+        File restoreDirectoryFile = new File(restoreDirectoryPath);
+
+        String[] files = restoreDirectoryFile.list();
+        if(files == null || files.length == 0) {
+            LOG.debug("Restore directory {} does not exist or is empty", restoreDirectoryFile);
+            return;
+        }
+
+        if(files.length > 1) {
+            LOG.error("Found {} files in clustered datastore restore directory {} - expected 1. No restore will be attempted",
+                    files.length, restoreDirectoryFile);
+            return;
+        }
+
+        File restoreFile = new File(restoreDirectoryFile, files[0]);
+
+        LOG.info("Clustered datastore will be restored from file {}", restoreFile);
+
+        try(FileInputStream fis = new FileInputStream(restoreFile)) {
+            DatastoreSnapshotList snapshots = deserialize(fis);
+            LOG.debug("Deserialized {} snapshots", snapshots.size());
+
+            for(DatastoreSnapshot snapshot: snapshots) {
+                datastoreSnapshots.put(snapshot.getType(), snapshot);
+            }
+        } catch (Exception e) {
+            LOG.error("Error reading clustered datastore restore file {}", restoreFile, e);
+        } finally {
+            if(!restoreFile.delete()) {
+                LOG.error("Could not delete clustered datastore restore file {}", restoreFile);
+            }
+        }
+    }
+
+    private DatastoreSnapshotList deserialize(InputStream inputStream) throws IOException, ClassNotFoundException {
+        try(ObjectInputStream ois = new ObjectInputStream(inputStream)) {
+            return (DatastoreSnapshotList) ois.readObject();
+        }
+    }
+
+    public DatastoreSnapshot getAndRemove(String datastoreType) {
+        initialize();
+        return datastoreSnapshots.remove(datastoreType);
+    }
+}
index 7f2416efc36cc17ce9101fc968ca45a1edbc3adf..19c93b3dc418d15b0b772d8365122a897b4dff7d 100644 (file)
@@ -19,6 +19,7 @@ import org.opendaylight.controller.cluster.datastore.config.Configuration;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier;
 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.DatastoreConfigurationMXBeanImpl;
 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.DatastoreInfoMXBeanImpl;
+import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot;
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
 import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
 import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache;
@@ -53,7 +54,6 @@ public class DistributedDataStore implements DOMStore, SchemaContextListener,
     private final ActorContext actorContext;
     private final long waitTillReadyTimeInMillis;
 
-
     private AutoCloseable closeable;
 
     private DatastoreConfigurationMXBeanImpl datastoreConfigMXBean;
@@ -67,7 +67,8 @@ public class DistributedDataStore implements DOMStore, SchemaContextListener,
     private final TransactionContextFactory txContextFactory;
 
     public DistributedDataStore(ActorSystem actorSystem, ClusterWrapper cluster,
-            Configuration configuration, DatastoreContextFactory datastoreContextFactory) {
+            Configuration configuration, DatastoreContextFactory datastoreContextFactory,
+            DatastoreSnapshot restoreFromSnapshot) {
         Preconditions.checkNotNull(actorSystem, "actorSystem should not be null");
         Preconditions.checkNotNull(cluster, "cluster should not be null");
         Preconditions.checkNotNull(configuration, "configuration should not be null");
@@ -83,9 +84,13 @@ public class DistributedDataStore implements DOMStore, SchemaContextListener,
                 new Dispatchers(actorSystem.dispatchers()).getDispatcherPath(Dispatchers.DispatcherType.Shard);
 
         PrimaryShardInfoFutureCache primaryShardInfoCache = new PrimaryShardInfoFutureCache();
-        actorContext = new ActorContext(actorSystem, createShardManager(actorSystem, cluster, configuration,
-                datastoreContextFactory, shardDispatcher, shardManagerId, primaryShardInfoCache), cluster,
-                configuration, datastoreContextFactory.getBaseDatastoreContext(), primaryShardInfoCache);
+
+        ShardManager.Builder builder = ShardManager.builder().cluster(cluster).configuration(configuration).
+                datastoreContextFactory(datastoreContextFactory).waitTillReadyCountdownLatch(waitTillReadyCountDownLatch).
+                primaryShardInfoCache(primaryShardInfoCache).restoreFromSnapshot(restoreFromSnapshot);
+
+        actorContext = new ActorContext(actorSystem, createShardManager(actorSystem, builder, shardDispatcher,
+                shardManagerId), cluster, configuration, datastoreContextFactory.getBaseDatastoreContext(), primaryShardInfoCache);
 
         this.waitTillReadyTimeInMillis =
                 actorContext.getDatastoreContext().getShardLeaderElectionTimeout().duration().toMillis() * READY_WAIT_FACTOR;
@@ -225,17 +230,14 @@ public class DistributedDataStore implements DOMStore, SchemaContextListener,
         }
     }
 
-    private ActorRef createShardManager(ActorSystem actorSystem, ClusterWrapper cluster, Configuration configuration,
-                                        DatastoreContextFactory datastoreContextFactory, String shardDispatcher,
-                                        String shardManagerId, PrimaryShardInfoFutureCache primaryShardInfoCache){
+    private ActorRef createShardManager(ActorSystem actorSystem, ShardManager.Builder builder, String shardDispatcher,
+                                        String shardManagerId){
         Exception lastException = null;
 
         for(int i=0;i<100;i++) {
             try {
-                return actorSystem.actorOf(
-                        ShardManager.props(cluster, configuration, datastoreContextFactory, waitTillReadyCountDownLatch,
-                                primaryShardInfoCache).withDispatcher(shardDispatcher).withMailbox(
-                                        ActorContext.MAILBOX), shardManagerId);
+                return actorSystem.actorOf(builder.props().withDispatcher(shardDispatcher).withMailbox(
+                        ActorContext.MAILBOX), shardManagerId);
             } catch (Exception e){
                 lastException = e;
                 Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
index e5aa33a0f4caa98495178eb8f842a9f5e8c89904..f55fa0731d3bc072610635fd8b05e77c67b03ac9 100644 (file)
@@ -10,6 +10,7 @@ package org.opendaylight.controller.cluster.datastore;
 import akka.actor.ActorSystem;
 import org.opendaylight.controller.cluster.datastore.config.Configuration;
 import org.opendaylight.controller.cluster.datastore.config.ConfigurationImpl;
+import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot;
 import org.opendaylight.controller.sal.core.api.model.SchemaService;
 import org.osgi.framework.BundleContext;
 import org.slf4j.Logger;
@@ -19,7 +20,8 @@ public class DistributedDataStoreFactory {
     private static final Logger LOG = LoggerFactory.getLogger(DistributedDataStoreFactory.class);
 
     public static DistributedDataStore createInstance(SchemaService schemaService,
-            DatastoreContext datastoreContext, ActorSystem actorSystem, BundleContext bundleContext) {
+            DatastoreContext datastoreContext, DatastoreSnapshot restoreFromSnapshot, ActorSystem actorSystem,
+            BundleContext bundleContext) {
 
         LOG.info("Create data store instance of type : {}", datastoreContext.getDataStoreType());
 
@@ -29,7 +31,7 @@ public class DistributedDataStoreFactory {
 
         Configuration config = new ConfigurationImpl("module-shards.conf", "modules.conf");
         final DistributedDataStore dataStore = new DistributedDataStore(actorSystem,
-                new ClusterWrapperImpl(actorSystem), config, introspector.newContextFactory());
+                new ClusterWrapperImpl(actorSystem), config, introspector.newContextFactory(), restoreFromSnapshot);
 
         overlay.setListener(dataStore);
 
index ee83ce2513dc1f9c3c8582820254ebdc3c5957c6..6df64e162888915d85315e2cacf0c05af3ebb170 100644 (file)
@@ -39,6 +39,8 @@ import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
+import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot;
+import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot.ShardSnapshot;
 import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.GetShardDataTree;
 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
@@ -114,12 +116,15 @@ public class Shard extends RaftActor {
     private final DataTreeChangeListenerSupport treeChangeSupport = new DataTreeChangeListenerSupport(this);
     private final DataChangeListenerSupport changeSupport = new DataChangeListenerSupport(this);
 
+    private ShardSnapshot restoreFromSnapshot;
+
     protected Shard(AbstractBuilder<?, ?> builder) {
         super(builder.getId().toString(), builder.getPeerAddresses(),
                 Optional.of(builder.getDatastoreContext().getShardRaftConfig()), DataStoreVersions.CURRENT_VERSION);
 
         this.name = builder.getId().toString();
         this.datastoreContext = builder.getDatastoreContext();
+        this.restoreFromSnapshot = builder.getRestoreFromSnapshot();
 
         setPersistence(datastoreContext.isPersistent());
 
@@ -585,11 +590,14 @@ public class Shard extends RaftActor {
     @Override
     @Nonnull
     protected RaftActorRecoveryCohort getRaftActorRecoveryCohort() {
-        return new ShardRecoveryCoordinator(store, store.getSchemaContext(), persistenceId(), LOG);
+        return new ShardRecoveryCoordinator(store, store.getSchemaContext(),
+                restoreFromSnapshot != null ? restoreFromSnapshot.getSnapshot() : null, persistenceId(), LOG);
     }
 
     @Override
     protected void onRecoveryComplete() {
+        restoreFromSnapshot = null;
+
         //notify shard manager
         getContext().parent().tell(new ActorInitialized(), getSelf());
 
@@ -712,6 +720,7 @@ public class Shard extends RaftActor {
         private Map<String, String> peerAddresses = Collections.emptyMap();
         private DatastoreContext datastoreContext;
         private SchemaContext schemaContext;
+        private DatastoreSnapshot.ShardSnapshot restoreFromSnapshot;
         private volatile boolean sealed;
 
         protected AbstractBuilder(Class<S> shardClass) {
@@ -751,6 +760,12 @@ public class Shard extends RaftActor {
             return self();
         }
 
+        public T restoreFromSnapshot(DatastoreSnapshot.ShardSnapshot restoreFromSnapshot) {
+            checkSealed();
+            this.restoreFromSnapshot = restoreFromSnapshot;
+            return self();
+        }
+
         public ShardIdentifier getId() {
             return id;
         }
@@ -767,6 +782,10 @@ public class Shard extends RaftActor {
             return schemaContext;
         }
 
+        public DatastoreSnapshot.ShardSnapshot getRestoreFromSnapshot() {
+            return restoreFromSnapshot;
+        }
+
         protected void verify() {
             Preconditions.checkNotNull(id, "id should not be null");
             Preconditions.checkNotNull(peerAddresses, "peerAddresses should not be null");
index 9bb2ea8f79d2773124a4fde1b35e865897e21e72..49e1fe32f381ffd338f5b4ba8da035576eaca9fa 100644 (file)
@@ -19,7 +19,6 @@ import akka.actor.Props;
 import akka.actor.SupervisorStrategy;
 import akka.cluster.ClusterEvent;
 import akka.dispatch.OnComplete;
-import akka.japi.Creator;
 import akka.japi.Function;
 import akka.persistence.RecoveryCompleted;
 import akka.serialization.Serialization;
@@ -54,6 +53,7 @@ import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
 import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica;
 import org.opendaylight.controller.cluster.datastore.messages.CreateShard;
 import org.opendaylight.controller.cluster.datastore.messages.CreateShardReply;
+import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot;
 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
 import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
@@ -128,20 +128,21 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
     private SchemaContext schemaContext;
 
+    private DatastoreSnapshot restoreFromSnapshot;
+
     /**
      */
-    protected ShardManager(ClusterWrapper cluster, Configuration configuration,
-            DatastoreContextFactory datastoreContextFactory, CountDownLatch waitTillReadyCountdownLatch,
-            PrimaryShardInfoFutureCache primaryShardInfoCache) {
-
-        this.cluster = Preconditions.checkNotNull(cluster, "cluster should not be null");
-        this.configuration = Preconditions.checkNotNull(configuration, "configuration should not be null");
-        this.datastoreContextFactory = datastoreContextFactory;
-        this.type = datastoreContextFactory.getBaseDatastoreContext().getDataStoreType();
+    protected ShardManager(Builder builder) {
+
+        this.cluster = builder.cluster;
+        this.configuration = builder.configuration;
+        this.datastoreContextFactory = builder.datastoreContextFactory;
+        this.type = builder.datastoreContextFactory.getBaseDatastoreContext().getDataStoreType();
         this.shardDispatcherPath =
                 new Dispatchers(context().system().dispatchers()).getDispatcherPath(Dispatchers.DispatcherType.Shard);
-        this.waitTillReadyCountdownLatch = waitTillReadyCountdownLatch;
-        this.primaryShardInfoCache = primaryShardInfoCache;
+        this.waitTillReadyCountdownLatch = builder.waitTillReadyCountdownLatch;
+        this.primaryShardInfoCache = builder.primaryShardInfoCache;
+        this.restoreFromSnapshot = builder.restoreFromSnapshot;
 
         peerAddressResolver = new ShardPeerAddressResolver(type, cluster.getCurrentMemberName());
 
@@ -151,22 +152,6 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         createLocalShards();
     }
 
-    public static Props props(
-            final ClusterWrapper cluster,
-            final Configuration configuration,
-            final DatastoreContextFactory datastoreContextFactory,
-            final CountDownLatch waitTillReadyCountdownLatch,
-            final PrimaryShardInfoFutureCache primaryShardInfoCache) {
-
-        Preconditions.checkNotNull(cluster, "cluster should not be null");
-        Preconditions.checkNotNull(configuration, "configuration should not be null");
-        Preconditions.checkNotNull(waitTillReadyCountdownLatch, "waitTillReadyCountdownLatch should not be null");
-        Preconditions.checkNotNull(primaryShardInfoCache, "primaryShardInfoCache should not be null");
-
-        return Props.create(new ShardManagerCreator(cluster, configuration, datastoreContextFactory,
-                waitTillReadyCountdownLatch, primaryShardInfoCache));
-    }
-
     @Override
     public void postStop() {
         LOG.info("Stopping ShardManager");
@@ -731,13 +716,24 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         String memberName = this.cluster.getCurrentMemberName();
         Collection<String> memberShardNames = this.configuration.getMemberShardNames(memberName);
 
+        Map<String, DatastoreSnapshot.ShardSnapshot> shardSnapshots = new HashMap<>();
+        if(restoreFromSnapshot != null)
+        {
+            for(DatastoreSnapshot.ShardSnapshot snapshot: restoreFromSnapshot.getShardSnapshots()) {
+                shardSnapshots.put(snapshot.getName(), snapshot);
+            }
+        }
+
+        restoreFromSnapshot = null; // null out to GC
+
         List<String> localShardActorNames = new ArrayList<>();
         for(String shardName : memberShardNames){
             ShardIdentifier shardId = getShardIdentifier(memberName, shardName);
             Map<String, String> peerAddresses = getPeerAddresses(shardName);
             localShardActorNames.add(shardId.toString());
             localShards.put(shardName, new ShardInformation(shardName, shardId, peerAddresses,
-                    newShardDatastoreContext(shardName), Shard.builder(), peerAddressResolver));
+                    newShardDatastoreContext(shardName), Shard.builder().restoreFromSnapshot(
+                        shardSnapshots.get(shardName)), peerAddressResolver));
         }
 
         mBean = ShardManagerInfo.createShardManagerMBean(memberName, "shard-manager-" + this.type,
@@ -986,7 +982,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         private short leaderVersion;
 
         private DatastoreContext datastoreContext;
-        private final Shard.AbstractBuilder<?, ?> builder;
+        private Shard.AbstractBuilder<?, ?> builder;
         private final ShardPeerAddressResolver addressResolver;
 
         private ShardInformation(String shardName, ShardIdentifier shardId,
@@ -1001,8 +997,11 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         }
 
         Props newProps(SchemaContext schemaContext) {
-            return builder.id(shardId).peerAddresses(initialPeerAddresses).datastoreContext(datastoreContext).
+            Preconditions.checkNotNull(builder);
+            Props props = builder.id(shardId).peerAddresses(initialPeerAddresses).datastoreContext(datastoreContext).
                     schemaContext(schemaContext).props();
+            builder = null;
+            return props;
         }
 
         String getShardName() {
@@ -1185,32 +1184,6 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         }
     }
 
-    private static class ShardManagerCreator implements Creator<ShardManager> {
-        private static final long serialVersionUID = 1L;
-
-        final ClusterWrapper cluster;
-        final Configuration configuration;
-        final DatastoreContextFactory datastoreContextFactory;
-        private final CountDownLatch waitTillReadyCountdownLatch;
-        private final PrimaryShardInfoFutureCache primaryShardInfoCache;
-
-        ShardManagerCreator(ClusterWrapper cluster, Configuration configuration,
-                DatastoreContextFactory datastoreContextFactory, CountDownLatch waitTillReadyCountdownLatch,
-                PrimaryShardInfoFutureCache primaryShardInfoCache) {
-            this.cluster = cluster;
-            this.configuration = configuration;
-            this.datastoreContextFactory = datastoreContextFactory;
-            this.waitTillReadyCountdownLatch = waitTillReadyCountdownLatch;
-            this.primaryShardInfoCache = primaryShardInfoCache;
-        }
-
-        @Override
-        public ShardManager create() throws Exception {
-            return new ShardManager(cluster, configuration, datastoreContextFactory, waitTillReadyCountdownLatch,
-                    primaryShardInfoCache);
-        }
-    }
-
     private static class OnShardInitialized {
         private final Runnable replyRunnable;
         private Cancellable timeoutSchedule;
@@ -1280,6 +1253,70 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             return modules;
         }
     }
+
+    public static Builder builder() {
+        return new Builder();
+    }
+
+    public static class Builder {
+        private ClusterWrapper cluster;
+        private Configuration configuration;
+        private DatastoreContextFactory datastoreContextFactory;
+        private CountDownLatch waitTillReadyCountdownLatch;
+        private PrimaryShardInfoFutureCache primaryShardInfoCache;
+        private DatastoreSnapshot restoreFromSnapshot;
+        private volatile boolean sealed;
+
+        protected void checkSealed() {
+            Preconditions.checkState(!sealed, "Builder is already sealed - further modifications are not allowed");
+        }
+
+        public Builder cluster(ClusterWrapper cluster) {
+            checkSealed();
+            this.cluster = cluster;
+            return this;
+        }
+
+        public Builder configuration(Configuration configuration) {
+            checkSealed();
+            this.configuration = configuration;
+            return this;
+        }
+
+        public Builder datastoreContextFactory(DatastoreContextFactory datastoreContextFactory) {
+            checkSealed();
+            this.datastoreContextFactory = datastoreContextFactory;
+            return this;
+        }
+
+        public Builder waitTillReadyCountdownLatch(CountDownLatch waitTillReadyCountdownLatch) {
+            checkSealed();
+            this.waitTillReadyCountdownLatch = waitTillReadyCountdownLatch;
+            return this;
+        }
+
+        public Builder primaryShardInfoCache(PrimaryShardInfoFutureCache primaryShardInfoCache) {
+            checkSealed();
+            this.primaryShardInfoCache = primaryShardInfoCache;
+            return this;
+        }
+
+        public Builder restoreFromSnapshot(DatastoreSnapshot restoreFromSnapshot) {
+            checkSealed();
+            this.restoreFromSnapshot = restoreFromSnapshot;
+            return this;
+        }
+
+        public Props props() {
+            sealed = true;
+            Preconditions.checkNotNull(cluster, "cluster should not be null");
+            Preconditions.checkNotNull(configuration, "configuration should not be null");
+            Preconditions.checkNotNull(datastoreContextFactory, "datastoreContextFactory should not be null");
+            Preconditions.checkNotNull(waitTillReadyCountdownLatch, "waitTillReadyCountdownLatch should not be null");
+            Preconditions.checkNotNull(primaryShardInfoCache, "primaryShardInfoCache should not be null");
+            return Props.create(ShardManager.class, this);
+        }
+    }
 }
 
 
index 87d591dd228286c9f7da385f6d8c42a34b1286c0..82a6b720f07d05303991235619515641af02acd6 100644 (file)
@@ -44,9 +44,12 @@ class ShardRecoveryCoordinator implements RaftActorRecoveryCohort {
     private final Set<URI> validNamespaces;
     private PruningDataTreeModification transaction;
     private int size;
+    private final byte[] restoreFromSnapshot;
 
-    ShardRecoveryCoordinator(ShardDataTree store, SchemaContext schemaContext, String shardName, Logger log) {
+    ShardRecoveryCoordinator(ShardDataTree store, SchemaContext schemaContext, byte[] restoreFromSnapshot,
+            String shardName, Logger log) {
         this.store = Preconditions.checkNotNull(store);
+        this.restoreFromSnapshot = restoreFromSnapshot;
         this.shardName = shardName;
         this.log = log;
         this.validNamespaces = NormalizedNodePruner.namespaces(schemaContext);
@@ -128,7 +131,6 @@ class ShardRecoveryCoordinator implements RaftActorRecoveryCohort {
 
     @Override
     public byte[] getRestoreFromSnapshot() {
-        // TODO Auto-generated method stub
-        return null;
+        return restoreFromSnapshot;
     }
 }
index a386c350bb7d5b6d7da5477e1677556477f5512f..c6aa0dca87228ad81e50d4c3860207d125dee996 100644 (file)
@@ -17,13 +17,13 @@ import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.SettableFuture;
 import java.io.FileOutputStream;
-import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import org.apache.commons.lang3.SerializationUtils;
 import org.opendaylight.controller.cluster.datastore.DistributedDataStore;
 import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot;
+import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshotList;
 import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot;
 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.RpcRegistration;
 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
@@ -131,7 +131,7 @@ public class ClusterAdminRpcService implements ClusterAdminService, AutoCloseabl
         Futures.addCallback(Futures.allAsList(configFuture, operFuture), new FutureCallback<List<DatastoreSnapshot>>() {
             @Override
             public void onSuccess(List<DatastoreSnapshot> snapshots) {
-                saveSnapshotsToFile(new ArrayList<>(snapshots), input.getFilePath(), returnFuture);
+                saveSnapshotsToFile(new DatastoreSnapshotList(snapshots), input.getFilePath(), returnFuture);
             }
 
             @Override
@@ -143,7 +143,7 @@ public class ClusterAdminRpcService implements ClusterAdminService, AutoCloseabl
         return returnFuture;
     }
 
-    private static void saveSnapshotsToFile(ArrayList<DatastoreSnapshot> snapshots, String fileName,
+    private static void saveSnapshotsToFile(DatastoreSnapshotList snapshots, String fileName,
             SettableFuture<RpcResult<Void>> returnFuture) {
         try(FileOutputStream fos = new FileOutputStream(fileName)) {
             SerializationUtils.serialize(snapshots, fos);
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/DatastoreSnapshotList.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/DatastoreSnapshotList.java
new file mode 100644 (file)
index 0000000..f0f1856
--- /dev/null
@@ -0,0 +1,25 @@
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.messages;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Stores a list of DatastoreSnapshot instances.
+ */
+public class DatastoreSnapshotList extends ArrayList<DatastoreSnapshot> {
+    private static final long serialVersionUID = 1L;
+
+    public DatastoreSnapshotList() {
+    }
+
+    public DatastoreSnapshotList(List<DatastoreSnapshot> snapshots) {
+        super(snapshots);
+    }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/osgi/Activator.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/osgi/Activator.java
new file mode 100644 (file)
index 0000000..afbf34e
--- /dev/null
@@ -0,0 +1,31 @@
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.osgi;
+
+import org.opendaylight.controller.cluster.datastore.DatastoreSnapshotRestore;
+import org.osgi.framework.BundleActivator;
+import org.osgi.framework.BundleContext;
+
+/**
+ * Activator for the bundle.
+ *
+ * @author Thomas Pantelis
+ */
+public class Activator implements BundleActivator {
+    private static final String RESTORE_DIRECTORY_PATH = "./clustered-datastore-restore";
+
+    @Override
+    public void start(BundleContext context) {
+        DatastoreSnapshotRestore.createInstance(RESTORE_DIRECTORY_PATH);
+    }
+
+    @Override
+    public void stop(BundleContext context) {
+        DatastoreSnapshotRestore.removeInstance();
+    }
+}
index 4ad6ca7e5e69277983e37320e0a4592fda2628f0..e028887a3c1dc0896f11387b1481e2785bb184fd 100644 (file)
@@ -1,6 +1,7 @@
 package org.opendaylight.controller.config.yang.config.distributed_datastore_provider;
 
 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
+import org.opendaylight.controller.cluster.datastore.DatastoreSnapshotRestore;
 import org.opendaylight.controller.cluster.datastore.DistributedDataStoreFactory;
 import org.osgi.framework.BundleContext;
 
@@ -72,7 +73,8 @@ public class DistributedConfigDataStoreProviderModule extends
                 .build();
 
         return DistributedDataStoreFactory.createInstance(getConfigSchemaServiceDependency(),
-                datastoreContext, getConfigActorSystemProviderDependency().getActorSystem(), bundleContext);
+                datastoreContext, DatastoreSnapshotRestore.instance().getAndRemove(datastoreContext.getDataStoreType()),
+                getConfigActorSystemProviderDependency().getActorSystem(), bundleContext);
     }
 
     public void setBundleContext(BundleContext bundleContext) {
index ef3fa45147bd7dbbd211c8f5a5b7a921d20044cc..e89708f211e1a85a8657190c87c33cc8d57e1f30 100644 (file)
@@ -1,6 +1,7 @@
 package org.opendaylight.controller.config.yang.config.distributed_datastore_provider;
 
 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
+import org.opendaylight.controller.cluster.datastore.DatastoreSnapshotRestore;
 import org.opendaylight.controller.cluster.datastore.DistributedDataStoreFactory;
 import org.osgi.framework.BundleContext;
 
@@ -73,7 +74,8 @@ public class DistributedOperationalDataStoreProviderModule extends
                 .build();
 
         return DistributedDataStoreFactory.createInstance(getOperationalSchemaServiceDependency(),
-                datastoreContext, getOperationalActorSystemProviderDependency().getActorSystem(), bundleContext);
+                datastoreContext, DatastoreSnapshotRestore.instance().getAndRemove(datastoreContext.getDataStoreType()),
+                getOperationalActorSystemProviderDependency().getActorSystem(), bundleContext);
     }
 
     public void setBundleContext(BundleContext bundleContext) {
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DatastoreSnapshotRestoreTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DatastoreSnapshotRestoreTest.java
new file mode 100644 (file)
index 0000000..aa67e53
--- /dev/null
@@ -0,0 +1,96 @@
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import org.apache.commons.lang3.SerializationUtils;
+import org.junit.After;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot;
+import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot.ShardSnapshot;
+import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshotList;
+
+/**
+ * Unit tests for DatastoreSnapshotRestore.
+ *
+ * @author Thomas Pantelis
+ */
+public class DatastoreSnapshotRestoreTest {
+    String restoreDirectoryPath = "target/DatastoreSnapshotRestoreTest-" + System.nanoTime();
+    File restoreDirectoryFile = new File(restoreDirectoryPath);
+    File backupFile = new File(restoreDirectoryFile, "backup");
+
+    @After
+    public void tearDown() {
+        backupFile.delete();
+        restoreDirectoryFile.delete();
+    }
+
+    @Test
+    public void test() throws Exception {
+        assertTrue("Failed to mkdir " + restoreDirectoryPath, restoreDirectoryFile.mkdirs());
+
+        List<ShardSnapshot> shardSnapshots = new ArrayList<>();
+        shardSnapshots.add(new ShardSnapshot("cars", new byte[]{1,2}));
+        shardSnapshots.add(new ShardSnapshot("people", new byte[]{3,4}));
+        DatastoreSnapshot configSnapshot = new DatastoreSnapshot("config", null, shardSnapshots );
+
+        shardSnapshots = new ArrayList<>();
+        shardSnapshots.add(new ShardSnapshot("cars", new byte[]{5,6}));
+        shardSnapshots.add(new ShardSnapshot("people", new byte[]{7,8}));
+        shardSnapshots.add(new ShardSnapshot("bikes", new byte[]{9,0}));
+        DatastoreSnapshot operSnapshot = new DatastoreSnapshot("oper", null, shardSnapshots );
+
+        DatastoreSnapshotList snapshotList = new DatastoreSnapshotList();
+        snapshotList.add(configSnapshot);
+        snapshotList.add(operSnapshot);
+
+        File backupFile = new File(restoreDirectoryFile, "backup");
+        try(FileOutputStream fos = new FileOutputStream(backupFile)) {
+            SerializationUtils.serialize(snapshotList, fos);
+        }
+
+        DatastoreSnapshotRestore.createInstance(restoreDirectoryPath);
+
+        verifySnapshot(configSnapshot, DatastoreSnapshotRestore.instance().getAndRemove("config"));
+        verifySnapshot(operSnapshot, DatastoreSnapshotRestore.instance().getAndRemove("oper"));
+
+        assertNull("DatastoreSnapshot was not removed", DatastoreSnapshotRestore.instance().getAndRemove("config"));
+
+        assertFalse(backupFile + " was not deleted", backupFile.exists());
+
+        DatastoreSnapshotRestore.removeInstance();
+        DatastoreSnapshotRestore.createInstance("target/does-not-exist");
+        assertNull("Expected null DatastoreSnapshot", DatastoreSnapshotRestore.instance().getAndRemove("config"));
+        assertNull("Expected null DatastoreSnapshot", DatastoreSnapshotRestore.instance().getAndRemove("oper"));
+    }
+
+    private void verifySnapshot(DatastoreSnapshot expected, DatastoreSnapshot actual) {
+        assertNotNull("DatastoreSnapshot is null", actual);
+        assertEquals("getType", expected.getType(), actual.getType());
+        assertTrue("ShardManager snapshots don't match", Objects.deepEquals(expected.getShardManagerSnapshot(),
+                actual.getShardManagerSnapshot()));
+        assertEquals("ShardSnapshots size", expected.getShardSnapshots().size(), actual.getShardSnapshots().size());
+        for(int i = 0; i < expected.getShardSnapshots().size(); i++) {
+            assertEquals("ShardSnapshot " + (i + 1) + " name", expected.getShardSnapshots().get(i).getName(),
+                    actual.getShardSnapshots().get(i).getName());
+            assertArrayEquals("ShardSnapshot " + (i + 1) + " snapshot", expected.getShardSnapshots().get(i).getSnapshot(),
+                    actual.getShardSnapshots().get(i).getSnapshot());
+        }
+    }
+}
index fa066ee32e5973347c6ff1fc9d22b31add12bd44..7acde4268f5f7f334ad7ebae0bb2435f27085086 100644 (file)
@@ -31,7 +31,9 @@ import com.typesafe.config.ConfigFactory;
 import java.io.IOException;
 import java.math.BigInteger;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
@@ -43,12 +45,17 @@ import org.junit.Test;
 import org.mockito.Mockito;
 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
+import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot;
 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
 import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener;
+import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
+import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
+import org.opendaylight.controller.cluster.raft.Snapshot;
 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
 import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
 import org.opendaylight.controller.md.cluster.datastore.model.PeopleModel;
+import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
@@ -1106,4 +1113,52 @@ public class DistributedDataStoreIntegrationTest {
             cleanup(dataStore);
         }};
     }
+
+    @Test
+    public void testRestoreFromDatastoreSnapshot() throws Exception{
+        new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
+            String name = "transactionIntegrationTest";
+
+            ContainerNode carsNode = CarsModel.newCarsNode(CarsModel.newCarsMapNode(
+                    CarsModel.newCarEntry("optima", BigInteger.valueOf(20000L)),
+                    CarsModel.newCarEntry("sportage", BigInteger.valueOf(30000L))));
+
+            ShardDataTree dataTree = new ShardDataTree(SchemaContextHelper.full());
+            AbstractShardTest.writeToStore(dataTree, CarsModel.BASE_PATH, carsNode);
+            NormalizedNode<?, ?> root = AbstractShardTest.readStore(dataTree.getDataTree(),
+                    YangInstanceIdentifier.builder().build());
+
+            Snapshot carsSnapshot = Snapshot.create(SerializationUtils.serializeNormalizedNode(root),
+                    Collections.<ReplicatedLogEntry>emptyList(), 2, 1, 2, 1, 1, "member-1");
+
+            NormalizedNode<?, ?> peopleNode = PeopleModel.create();
+            dataTree = new ShardDataTree(SchemaContextHelper.full());
+            AbstractShardTest.writeToStore(dataTree, PeopleModel.BASE_PATH, peopleNode);
+            root = AbstractShardTest.readStore(dataTree.getDataTree(), YangInstanceIdentifier.builder().build());
+
+            Snapshot peopleSnapshot = Snapshot.create(SerializationUtils.serializeNormalizedNode(root),
+                    Collections.<ReplicatedLogEntry>emptyList(), 2, 1, 2, 1, 1, "member-1");
+
+            restoreFromSnapshot = new DatastoreSnapshot(name, null, Arrays.asList(
+                    new DatastoreSnapshot.ShardSnapshot("cars",
+                            org.apache.commons.lang3.SerializationUtils.serialize(carsSnapshot)),
+                    new DatastoreSnapshot.ShardSnapshot("people",
+                            org.apache.commons.lang3.SerializationUtils.serialize(peopleSnapshot))));
+
+            DistributedDataStore dataStore = setupDistributedDataStore(name, "module-shards-member1.conf",
+                    true, "cars", "people");
+
+            DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
+
+            Optional<NormalizedNode<?, ?>> optional = readTx.read(CarsModel.BASE_PATH).get(5, TimeUnit.SECONDS);
+            assertEquals("isPresent", true, optional.isPresent());
+            assertEquals("Data node", carsNode, optional.get());
+
+            optional = readTx.read(PeopleModel.BASE_PATH).get(5, TimeUnit.SECONDS);
+            assertEquals("isPresent", true, optional.isPresent());
+            assertEquals("Data node", peopleNode, optional.get());
+
+            cleanup(dataStore);
+        }};
+    }
 }
index 67a6479e0847c160871913782266808fcf8251a1..40d123e7ed0f8f7aea8ef035084b773392633db6 100644 (file)
@@ -22,6 +22,7 @@ import org.mockito.Mockito;
 import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
 import org.opendaylight.controller.cluster.datastore.config.Configuration;
 import org.opendaylight.controller.cluster.datastore.config.ConfigurationImpl;
+import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot;
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
@@ -35,6 +36,7 @@ import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 public class IntegrationTestKit extends ShardTestKit {
 
     DatastoreContext.Builder datastoreContextBuilder;
+    DatastoreSnapshot restoreFromSnapshot;
 
     public IntegrationTestKit(ActorSystem actorSystem, Builder datastoreContextBuilder) {
         super(actorSystem);
@@ -69,7 +71,8 @@ public class IntegrationTestKit extends ShardTestKit {
         Mockito.doReturn(datastoreContext).when(mockContextFactory).getBaseDatastoreContext();
         Mockito.doReturn(datastoreContext).when(mockContextFactory).getShardDatastoreContext(Mockito.anyString());
 
-        DistributedDataStore dataStore = new DistributedDataStore(getSystem(), cluster, config, mockContextFactory);
+        DistributedDataStore dataStore = new DistributedDataStore(getSystem(), cluster, config, mockContextFactory,
+                restoreFromSnapshot);
 
         dataStore.onGlobalContextUpdated(schemaContext);
 
index 0f31c6a3a1fb4a5a9df34fe0abf891ad1868c46a..29cb189fdaf252dd1d533e5a0020eb71784d0a0c 100644 (file)
@@ -163,8 +163,9 @@ public class ShardManagerTest extends AbstractActorTest {
     }
 
     private Props newShardMgrProps(Configuration config) {
-        return ShardManager.props(new MockClusterWrapper(), config,
-                newDatastoreContextFactory(datastoreContextBuilder.build()), ready, primaryShardInfoCache);
+        return ShardManager.builder().cluster(new MockClusterWrapper()).configuration(config).
+                datastoreContextFactory(newDatastoreContextFactory(datastoreContextBuilder.build())).
+                waitTillReadyCountdownLatch(ready).primaryShardInfoCache(primaryShardInfoCache).props();
     }
 
     private Props newPropsShardMgrWithMockShardActor() {
@@ -178,8 +179,9 @@ public class ShardManagerTest extends AbstractActorTest {
             private static final long serialVersionUID = 1L;
             @Override
             public ShardManager create() throws Exception {
-                return new ForwardingShardManager(clusterWrapper, config, newDatastoreContextFactory(
-                        datastoreContextBuilder.build()), ready, name, shardActor, primaryShardInfoCache);
+                return new ForwardingShardManager(ShardManager.builder().cluster(clusterWrapper).configuration(config).
+                        datastoreContextFactory(newDatastoreContextFactory(datastoreContextBuilder.build())).
+                        waitTillReadyCountdownLatch(ready).primaryShardInfoCache(primaryShardInfoCache), name, shardActor);
             }
         };
 
@@ -224,7 +226,9 @@ public class ShardManagerTest extends AbstractActorTest {
             private static final long serialVersionUID = 1L;
             @Override
             public ShardManager create() throws Exception {
-                return new ShardManager(new MockClusterWrapper(), mockConfig, mockFactory, ready, primaryShardInfoCache) {
+                return new ShardManager(ShardManager.builder().cluster(new MockClusterWrapper()).configuration(mockConfig).
+                        datastoreContextFactory(mockFactory).waitTillReadyCountdownLatch(ready).
+                        primaryShardInfoCache(primaryShardInfoCache)) {
                     @Override
                     protected ActorRef newShardActor(SchemaContext schemaContext, ShardInformation info) {
                         Entry<ActorRef, DatastoreContext> entry = shardInfoMap.get(info.getShardName());
@@ -950,14 +954,10 @@ public class ShardManagerTest extends AbstractActorTest {
 
     @Test
     public void testWhenShardIsLeaderSyncStatusIsTrue() throws Exception{
-        final Props persistentProps = ShardManager.props(new MockClusterWrapper(), new MockConfiguration(),
-                newDatastoreContextFactory(DatastoreContext.newBuilder().persistent(true).build()), ready,
-                primaryShardInfoCache);
-        final TestActorRef<ShardManager> shardManager =
-                TestActorRef.create(getSystem(), persistentProps);
+        final TestActorRef<ShardManager> shardManager = TestActorRef.create(getSystem(), newShardMgrProps());
 
         ShardManager shardManagerActor = shardManager.underlyingActor();
-        shardManagerActor.onReceiveCommand(new RoleChangeNotification("member-1-shard-default-unknown",
+        shardManagerActor.onReceiveCommand(new RoleChangeNotification("member-1-shard-default-" + shardMrgIDSuffix,
                 RaftState.Follower.name(), RaftState.Leader.name()));
 
         assertEquals(true, shardManagerActor.getMBean().getSyncStatus());
@@ -965,44 +965,43 @@ public class ShardManagerTest extends AbstractActorTest {
 
     @Test
     public void testWhenShardIsCandidateSyncStatusIsFalse() throws Exception{
-        final Props persistentProps = newShardMgrProps();
-        final TestActorRef<ShardManager> shardManager =
-                TestActorRef.create(getSystem(), persistentProps);
+        final TestActorRef<ShardManager> shardManager = TestActorRef.create(getSystem(), newShardMgrProps());
 
         ShardManager shardManagerActor = shardManager.underlyingActor();
-        shardManagerActor.onReceiveCommand(new RoleChangeNotification("member-1-shard-default-unknown",
+        String shardId = "member-1-shard-default-" + shardMrgIDSuffix;
+        shardManagerActor.onReceiveCommand(new RoleChangeNotification(shardId,
                 RaftState.Follower.name(), RaftState.Candidate.name()));
 
         assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
 
         // Send a FollowerInitialSyncStatus with status = true for the replica whose current state is candidate
-        shardManagerActor.onReceiveCommand(new FollowerInitialSyncUpStatus(true, "member-1-shard-default-unknown"));
+        shardManagerActor.onReceiveCommand(new FollowerInitialSyncUpStatus(
+                true, shardId));
 
         assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
     }
 
     @Test
     public void testWhenShardIsFollowerSyncStatusDependsOnFollowerInitialSyncStatus() throws Exception{
-        final Props persistentProps = ShardManager.props(new MockClusterWrapper(), new MockConfiguration(),
-                newDatastoreContextFactory(DatastoreContext.newBuilder().persistent(true).build()), ready,
-                primaryShardInfoCache);
+        final Props persistentProps = newShardMgrProps();
         final TestActorRef<ShardManager> shardManager =
                 TestActorRef.create(getSystem(), persistentProps);
 
+        String shardId = "member-1-shard-default-" + shardMrgIDSuffix;
         ShardManager shardManagerActor = shardManager.underlyingActor();
-        shardManagerActor.onReceiveCommand(new RoleChangeNotification("member-1-shard-default-unknown",
+        shardManagerActor.onReceiveCommand(new RoleChangeNotification(shardId,
                 RaftState.Candidate.name(), RaftState.Follower.name()));
 
         // Initially will be false
         assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
 
         // Send status true will make sync status true
-        shardManagerActor.onReceiveCommand(new FollowerInitialSyncUpStatus(true, "member-1-shard-default-unknown"));
+        shardManagerActor.onReceiveCommand(new FollowerInitialSyncUpStatus(true, shardId));
 
         assertEquals(true, shardManagerActor.getMBean().getSyncStatus());
 
         // Send status false will make sync status false
-        shardManagerActor.onReceiveCommand(new FollowerInitialSyncUpStatus(false, "member-1-shard-default-unknown"));
+        shardManagerActor.onReceiveCommand(new FollowerInitialSyncUpStatus(false, shardId));
 
         assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
 
@@ -1010,15 +1009,13 @@ public class ShardManagerTest extends AbstractActorTest {
 
     @Test
     public void testWhenMultipleShardsPresentSyncStatusMustBeTrueForAllShards() throws Exception{
-        final Props persistentProps = ShardManager.props(new MockClusterWrapper(),
-                new MockConfiguration() {
-                    @Override
-                    public List<String> getMemberShardNames(String memberName) {
-                        return Arrays.asList("default", "astronauts");
-                    }
-                },
-                newDatastoreContextFactory(DatastoreContext.newBuilder().persistent(true).build()), ready,
-                primaryShardInfoCache);
+        final Props persistentProps = newShardMgrProps(new MockConfiguration() {
+            @Override
+            public List<String> getMemberShardNames(String memberName) {
+                return Arrays.asList("default", "astronauts");
+            }
+        });
+
         final TestActorRef<ShardManager> shardManager =
                 TestActorRef.create(getSystem(), persistentProps);
 
@@ -1028,28 +1025,30 @@ public class ShardManagerTest extends AbstractActorTest {
         assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
 
         // Make default shard leader
-        shardManagerActor.onReceiveCommand(new RoleChangeNotification("member-1-shard-default-unknown",
+        String defaultShardId = "member-1-shard-default-" + shardMrgIDSuffix;
+        shardManagerActor.onReceiveCommand(new RoleChangeNotification(defaultShardId,
                 RaftState.Follower.name(), RaftState.Leader.name()));
 
         // default = Leader, astronauts is unknown so sync status remains false
         assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
 
         // Make astronauts shard leader as well
-        shardManagerActor.onReceiveCommand(new RoleChangeNotification("member-1-shard-astronauts-unknown",
+        String astronautsShardId = "member-1-shard-astronauts-" + shardMrgIDSuffix;
+        shardManagerActor.onReceiveCommand(new RoleChangeNotification(astronautsShardId,
                 RaftState.Follower.name(), RaftState.Leader.name()));
 
         // Now sync status should be true
         assertEquals(true, shardManagerActor.getMBean().getSyncStatus());
 
         // Make astronauts a Follower
-        shardManagerActor.onReceiveCommand(new RoleChangeNotification("member-1-shard-astronauts-unknown",
+        shardManagerActor.onReceiveCommand(new RoleChangeNotification(astronautsShardId,
                 RaftState.Leader.name(), RaftState.Follower.name()));
 
         // Sync status is not true
         assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
 
         // Make the astronauts follower sync status true
-        shardManagerActor.onReceiveCommand(new FollowerInitialSyncUpStatus(true, "member-1-shard-astronauts-unknown"));
+        shardManagerActor.onReceiveCommand(new FollowerInitialSyncUpStatus(true, astronautsShardId));
 
         // Sync status is now true
         assertEquals(true, shardManagerActor.getMBean().getSyncStatus());
@@ -1319,9 +1318,9 @@ public class ShardManagerTest extends AbstractActorTest {
         private final CountDownLatch recoveryComplete = new CountDownLatch(1);
 
         TestShardManager(String shardMrgIDSuffix) {
-            super(new MockClusterWrapper(), new MockConfiguration(),
-                    newDatastoreContextFactory(DatastoreContext.newBuilder().dataStoreType(shardMrgIDSuffix).build()),
-                    ready, new PrimaryShardInfoFutureCache());
+            super(ShardManager.builder().cluster(new MockClusterWrapper()).configuration(new MockConfiguration()).
+                    datastoreContextFactory(newDatastoreContextFactory(DatastoreContext.newBuilder().dataStoreType(shardMrgIDSuffix).build())).
+                    waitTillReadyCountdownLatch(ready).primaryShardInfoCache(new PrimaryShardInfoFutureCache()));
         }
 
         @Override
@@ -1379,10 +1378,8 @@ public class ShardManagerTest extends AbstractActorTest {
         private final ActorRef shardActor;
         private final String name;
 
-        protected ForwardingShardManager(ClusterWrapper cluster, Configuration configuration,
-                DatastoreContextFactory factory, CountDownLatch waitTillReadyCountdownLatch, String name,
-                ActorRef shardActor, PrimaryShardInfoFutureCache primaryShardInfoCache) {
-            super(cluster, configuration, factory, waitTillReadyCountdownLatch, primaryShardInfoCache);
+        public ForwardingShardManager(Builder builder, String name, ActorRef shardActor) {
+            super(builder);
             this.shardActor = shardActor;
             this.name = name;
         }
index 49b093770b47132b53f165af023e02d85baaa22b..bbcda6ddb7bb42e077568724cb35cc87e12536a7 100644 (file)
@@ -51,7 +51,8 @@ public class ShardRecoveryCoordinatorTest {
 
     @Test
     public void testAppendRecoveredLogEntryDataTreeCandidatePayload(){
-        final ShardRecoveryCoordinator coordinator = new ShardRecoveryCoordinator(peopleDataTree, peopleSchemaContext, "foobar", LoggerFactory.getLogger("foo"));
+        final ShardRecoveryCoordinator coordinator = new ShardRecoveryCoordinator(peopleDataTree,
+                peopleSchemaContext, null, "foobar", LoggerFactory.getLogger("foo"));
         coordinator.startLogRecoveryBatch(10);
         try {
             coordinator.appendRecoveredLogEntry(DataTreeCandidatePayload.create(createCar()));
@@ -64,7 +65,8 @@ public class ShardRecoveryCoordinatorTest {
 
     @Test
     public void testAppendRecoveredLogEntryModificationPayload() throws IOException {
-        final ShardRecoveryCoordinator coordinator = new ShardRecoveryCoordinator(peopleDataTree, peopleSchemaContext, "foobar", LoggerFactory.getLogger("foo"));
+        final ShardRecoveryCoordinator coordinator = new ShardRecoveryCoordinator(peopleDataTree,
+                peopleSchemaContext, null, "foobar", LoggerFactory.getLogger("foo"));
         coordinator.startLogRecoveryBatch(10);
         try {
             final MutableCompositeModification modification  = new MutableCompositeModification((short) 1);
@@ -77,7 +79,8 @@ public class ShardRecoveryCoordinatorTest {
 
     @Test
     public void testAppendRecoveredLogEntryCompositeModificationPayload() throws IOException {
-        final ShardRecoveryCoordinator coordinator = new ShardRecoveryCoordinator(peopleDataTree, peopleSchemaContext, "foobar", LoggerFactory.getLogger("foo"));
+        final ShardRecoveryCoordinator coordinator = new ShardRecoveryCoordinator(peopleDataTree,
+                peopleSchemaContext, null, "foobar", LoggerFactory.getLogger("foo"));
         coordinator.startLogRecoveryBatch(10);
         try {
             final MutableCompositeModification modification  = new MutableCompositeModification((short) 1);
@@ -90,7 +93,8 @@ public class ShardRecoveryCoordinatorTest {
 
     @Test
     public void testAppendRecoveredLogEntryCompositeModificationByteStringPayload() throws IOException {
-        final ShardRecoveryCoordinator coordinator = new ShardRecoveryCoordinator(peopleDataTree, peopleSchemaContext, "foobar", LoggerFactory.getLogger("foo"));
+        final ShardRecoveryCoordinator coordinator = new ShardRecoveryCoordinator(peopleDataTree,
+                peopleSchemaContext, null, "foobar", LoggerFactory.getLogger("foo"));
         coordinator.startLogRecoveryBatch(10);
         try {
             final MutableCompositeModification modification  = new MutableCompositeModification((short) 1);
@@ -105,7 +109,8 @@ public class ShardRecoveryCoordinatorTest {
 
     @Test
     public void testApplyRecoverySnapshot(){
-        final ShardRecoveryCoordinator coordinator = new ShardRecoveryCoordinator(peopleDataTree , peopleSchemaContext, "foobar", LoggerFactory.getLogger("foo"));
+        final ShardRecoveryCoordinator coordinator = new ShardRecoveryCoordinator(peopleDataTree,
+                peopleSchemaContext, null, "foobar", LoggerFactory.getLogger("foo"));
         coordinator.startLogRecoveryBatch(10);
 
         coordinator.applyRecoverySnapshot(createSnapshot());
@@ -117,7 +122,8 @@ public class ShardRecoveryCoordinatorTest {
 
     @Test
     public void testApplyCurrentLogRecoveryBatch(){
-        final ShardRecoveryCoordinator coordinator = new ShardRecoveryCoordinator(peopleDataTree, peopleSchemaContext, "foobar", LoggerFactory.getLogger("foo"));
+        final ShardRecoveryCoordinator coordinator = new ShardRecoveryCoordinator(peopleDataTree,
+                peopleSchemaContext, null, "foobar", LoggerFactory.getLogger("foo"));
         coordinator.startLogRecoveryBatch(10);
 
         try {
index 9608b812e402fc419299e6804890ac0a26f0764b..75fa9875447b131c8a9cb0d1d947e83778bd193a 100644 (file)
@@ -97,7 +97,7 @@ public class DistributedEntityOwnershipServiceTest extends AbstractEntityOwnersh
         Mockito.doReturn(datastoreContext).when(mockContextFactory).getBaseDatastoreContext();
         Mockito.doReturn(datastoreContext).when(mockContextFactory).getShardDatastoreContext(Mockito.anyString());
 
-        dataStore = new DistributedDataStore(getSystem(), new MockClusterWrapper(), configuration, mockContextFactory);
+        dataStore = new DistributedDataStore(getSystem(), new MockClusterWrapper(), configuration, mockContextFactory, null);
 
         dataStore.onGlobalContextUpdated(SchemaContextHelper.entityOwners());
     }
index 651d55616effaf1fc168e1bd98586f96b8289ee5..6ad88e568ad479de489219cd53f0529ab7f456f5 100644 (file)
@@ -11,6 +11,7 @@ package org.opendaylight.controller.md.cluster.datastore.model;
 import java.math.BigInteger;
 import org.opendaylight.yangtools.yang.common.QName;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
 import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
@@ -64,18 +65,22 @@ public class CarsModel {
     }
 
     public static NormalizedNode<?, ?> createEmptyCarsList(){
+        return newCarsNode(newCarsMapNode());
+    }
 
-        // Create a list builder
-        CollectionNodeBuilder<MapEntryNode, MapNode> cars =
-                ImmutableMapNodeBuilder.create().withNodeIdentifier(
-                        new YangInstanceIdentifier.NodeIdentifier(
-                                CAR_QNAME));
+    public static ContainerNode newCarsNode(MapNode carsList) {
+        return ImmutableContainerNodeBuilder.create().withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(
+                BASE_QNAME)).withChild(carsList).build();
+    }
 
-        return ImmutableContainerNodeBuilder.create()
-                .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(BASE_QNAME))
-                .withChild(cars.build())
-                .build();
+    public static MapNode newCarsMapNode(MapEntryNode... carEntries) {
+        CollectionNodeBuilder<MapEntryNode, MapNode> builder = ImmutableMapNodeBuilder.create().
+                withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(CAR_QNAME));
+        for(MapEntryNode e: carEntries) {
+            builder.withChild(e);
+        }
 
+        return builder.build();
     }
 
     public static NormalizedNode<?, ?> emptyContainer(){