Bug 4564: Implement datastore restore from backup file 43/29243/12
authorTom Pantelis <tpanteli@brocade.com>
Wed, 4 Nov 2015 08:59:39 +0000 (03:59 -0500)
committerGerrit Code Review <gerrit@opendaylight.org>
Sat, 14 Nov 2015 22:41:12 +0000 (22:41 +0000)
Added a singleton DatastoreSnapshotRestore class that looks for and
reads a restore file in a specific directory and deserializes the datastore
snapshots. The restore file is then deleted.

The DatastoreSnapshotRestore instance needs to be injected into both
DistributedDatastore instances which are created via separate config
system Module instances. However the only way to inject the
DatastoreSnapshotRestore instance would be to define a yang module
and service. I didn't want to go thru the overhead of all that and I
didn't want the DatastoreSnapshotRestore advertised as a service. So I made
it a static singleton that is created via a new bundle Activator class.

The DatastoreSnapshot instance is passed to the ShardManager which
passes each ShardSnapshot to the corresponding Shard actor. On
recovery complete, the RaftActor takes care of applying the restored
snapshot.

Change-Id: Ied3db4e49b98320abb34e2acf73b27b29232f8d6
Signed-off-by: Tom Pantelis <tpanteli@brocade.com>
20 files changed:
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotManager.java
opendaylight/md-sal/sal-distributed-datastore/pom.xml
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreSnapshotRestore.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreFactory.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardRecoveryCoordinator.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/admin/ClusterAdminRpcService.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/DatastoreSnapshotList.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/osgi/Activator.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/distributed_datastore_provider/DistributedConfigDataStoreProviderModule.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/distributed_datastore_provider/DistributedOperationalDataStoreProviderModule.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DatastoreSnapshotRestoreTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/IntegrationTestKit.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardRecoveryCoordinatorTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/DistributedEntityOwnershipServiceTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/md/cluster/datastore/model/CarsModel.java

index 4a20e5b..10574e3 100644 (file)
@@ -139,7 +139,7 @@ public class SnapshotManager implements SnapshotState {
             lastLogEntryIndex = lastLogEntry.getIndex();
             lastLogEntryTerm = lastLogEntry.getTerm();
         } else {
-            LOG.warn("Capturing Snapshot : lastLogEntry is null. Using lastAppliedIndex {} and lastAppliedTerm {} instead.",
+            LOG.debug("Capturing Snapshot : lastLogEntry is null. Using lastAppliedIndex {} and lastAppliedTerm {} instead.",
                 lastAppliedIndex, lastAppliedTerm);
         }
 
index 74d4ec7..ea71351 100644 (file)
         <configuration>
           <instructions>
             <Bundle-Name>${project.groupId}.${project.artifactId}</Bundle-Name>
+            <Bundle-Activator>org.opendaylight.controller.cluster.datastore.osgi.Activator</Bundle-Activator>
             <Export-Package></Export-Package>
             <Import-Package>!*snappy;!org.jboss.*;!com.jcraft.*;!*jetty*;!sun.security.*;*</Import-Package>
             <!--
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreSnapshotRestore.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreSnapshotRestore.java
new file mode 100644 (file)
index 0000000..bed2389
--- /dev/null
@@ -0,0 +1,106 @@
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import com.google.common.base.Preconditions;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectInputStream;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot;
+import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshotList;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class looks for a previously saved data store backup file in a directory and, if found, de-serializes
+ * the DatastoreSnapshot instances. This class has a static singleton that is created on bundle activation.
+ *
+ * @author Thomas Pantelis
+ */
+public class DatastoreSnapshotRestore {
+    private static final Logger LOG = LoggerFactory.getLogger(DatastoreSnapshotRestore.class);
+
+    private static AtomicReference<DatastoreSnapshotRestore> instance = new AtomicReference<>();
+
+    private final String restoreDirectoryPath;
+    private final Map<String, DatastoreSnapshot> datastoreSnapshots = new ConcurrentHashMap<>();
+    private final AtomicBoolean initialized = new AtomicBoolean();
+
+    public static void createInstance(String restoreDirectoryPath) {
+        instance.compareAndSet(null, new DatastoreSnapshotRestore(restoreDirectoryPath));
+    }
+
+    public static void removeInstance() {
+        instance.set(null);
+    }
+
+    public static DatastoreSnapshotRestore instance() {
+        DatastoreSnapshotRestore localInstance = instance.get();
+        return Preconditions.checkNotNull(localInstance, "DatastoreSnapshotRestore instance was not created");
+    }
+
+    private DatastoreSnapshotRestore(String restoreDirectoryPath) {
+        this.restoreDirectoryPath = Preconditions.checkNotNull(restoreDirectoryPath);
+    }
+
+    private void initialize() {
+        if(!initialized.compareAndSet(false, true)) {
+            return;
+        }
+
+        File restoreDirectoryFile = new File(restoreDirectoryPath);
+
+        String[] files = restoreDirectoryFile.list();
+        if(files == null || files.length == 0) {
+            LOG.debug("Restore directory {} does not exist or is empty", restoreDirectoryFile);
+            return;
+        }
+
+        if(files.length > 1) {
+            LOG.error("Found {} files in clustered datastore restore directory {} - expected 1. No restore will be attempted",
+                    files.length, restoreDirectoryFile);
+            return;
+        }
+
+        File restoreFile = new File(restoreDirectoryFile, files[0]);
+
+        LOG.info("Clustered datastore will be restored from file {}", restoreFile);
+
+        try(FileInputStream fis = new FileInputStream(restoreFile)) {
+            DatastoreSnapshotList snapshots = deserialize(fis);
+            LOG.debug("Deserialized {} snapshots", snapshots.size());
+
+            for(DatastoreSnapshot snapshot: snapshots) {
+                datastoreSnapshots.put(snapshot.getType(), snapshot);
+            }
+        } catch (Exception e) {
+            LOG.error("Error reading clustered datastore restore file {}", restoreFile, e);
+        } finally {
+            if(!restoreFile.delete()) {
+                LOG.error("Could not delete clustered datastore restore file {}", restoreFile);
+            }
+        }
+    }
+
+    private DatastoreSnapshotList deserialize(InputStream inputStream) throws IOException, ClassNotFoundException {
+        try(ObjectInputStream ois = new ObjectInputStream(inputStream)) {
+            return (DatastoreSnapshotList) ois.readObject();
+        }
+    }
+
+    public DatastoreSnapshot getAndRemove(String datastoreType) {
+        initialize();
+        return datastoreSnapshots.remove(datastoreType);
+    }
+}
index 7f2416e..19c93b3 100644 (file)
@@ -19,6 +19,7 @@ import org.opendaylight.controller.cluster.datastore.config.Configuration;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier;
 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.DatastoreConfigurationMXBeanImpl;
 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.DatastoreInfoMXBeanImpl;
+import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot;
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
 import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
 import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache;
@@ -53,7 +54,6 @@ public class DistributedDataStore implements DOMStore, SchemaContextListener,
     private final ActorContext actorContext;
     private final long waitTillReadyTimeInMillis;
 
-
     private AutoCloseable closeable;
 
     private DatastoreConfigurationMXBeanImpl datastoreConfigMXBean;
@@ -67,7 +67,8 @@ public class DistributedDataStore implements DOMStore, SchemaContextListener,
     private final TransactionContextFactory txContextFactory;
 
     public DistributedDataStore(ActorSystem actorSystem, ClusterWrapper cluster,
-            Configuration configuration, DatastoreContextFactory datastoreContextFactory) {
+            Configuration configuration, DatastoreContextFactory datastoreContextFactory,
+            DatastoreSnapshot restoreFromSnapshot) {
         Preconditions.checkNotNull(actorSystem, "actorSystem should not be null");
         Preconditions.checkNotNull(cluster, "cluster should not be null");
         Preconditions.checkNotNull(configuration, "configuration should not be null");
@@ -83,9 +84,13 @@ public class DistributedDataStore implements DOMStore, SchemaContextListener,
                 new Dispatchers(actorSystem.dispatchers()).getDispatcherPath(Dispatchers.DispatcherType.Shard);
 
         PrimaryShardInfoFutureCache primaryShardInfoCache = new PrimaryShardInfoFutureCache();
-        actorContext = new ActorContext(actorSystem, createShardManager(actorSystem, cluster, configuration,
-                datastoreContextFactory, shardDispatcher, shardManagerId, primaryShardInfoCache), cluster,
-                configuration, datastoreContextFactory.getBaseDatastoreContext(), primaryShardInfoCache);
+
+        ShardManager.Builder builder = ShardManager.builder().cluster(cluster).configuration(configuration).
+                datastoreContextFactory(datastoreContextFactory).waitTillReadyCountdownLatch(waitTillReadyCountDownLatch).
+                primaryShardInfoCache(primaryShardInfoCache).restoreFromSnapshot(restoreFromSnapshot);
+
+        actorContext = new ActorContext(actorSystem, createShardManager(actorSystem, builder, shardDispatcher,
+                shardManagerId), cluster, configuration, datastoreContextFactory.getBaseDatastoreContext(), primaryShardInfoCache);
 
         this.waitTillReadyTimeInMillis =
                 actorContext.getDatastoreContext().getShardLeaderElectionTimeout().duration().toMillis() * READY_WAIT_FACTOR;
@@ -225,17 +230,14 @@ public class DistributedDataStore implements DOMStore, SchemaContextListener,
         }
     }
 
-    private ActorRef createShardManager(ActorSystem actorSystem, ClusterWrapper cluster, Configuration configuration,
-                                        DatastoreContextFactory datastoreContextFactory, String shardDispatcher,
-                                        String shardManagerId, PrimaryShardInfoFutureCache primaryShardInfoCache){
+    private ActorRef createShardManager(ActorSystem actorSystem, ShardManager.Builder builder, String shardDispatcher,
+                                        String shardManagerId){
         Exception lastException = null;
 
         for(int i=0;i<100;i++) {
             try {
-                return actorSystem.actorOf(
-                        ShardManager.props(cluster, configuration, datastoreContextFactory, waitTillReadyCountDownLatch,
-                                primaryShardInfoCache).withDispatcher(shardDispatcher).withMailbox(
-                                        ActorContext.MAILBOX), shardManagerId);
+                return actorSystem.actorOf(builder.props().withDispatcher(shardDispatcher).withMailbox(
+                        ActorContext.MAILBOX), shardManagerId);
             } catch (Exception e){
                 lastException = e;
                 Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
index e5aa33a..f55fa07 100644 (file)
@@ -10,6 +10,7 @@ package org.opendaylight.controller.cluster.datastore;
 import akka.actor.ActorSystem;
 import org.opendaylight.controller.cluster.datastore.config.Configuration;
 import org.opendaylight.controller.cluster.datastore.config.ConfigurationImpl;
+import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot;
 import org.opendaylight.controller.sal.core.api.model.SchemaService;
 import org.osgi.framework.BundleContext;
 import org.slf4j.Logger;
@@ -19,7 +20,8 @@ public class DistributedDataStoreFactory {
     private static final Logger LOG = LoggerFactory.getLogger(DistributedDataStoreFactory.class);
 
     public static DistributedDataStore createInstance(SchemaService schemaService,
-            DatastoreContext datastoreContext, ActorSystem actorSystem, BundleContext bundleContext) {
+            DatastoreContext datastoreContext, DatastoreSnapshot restoreFromSnapshot, ActorSystem actorSystem,
+            BundleContext bundleContext) {
 
         LOG.info("Create data store instance of type : {}", datastoreContext.getDataStoreType());
 
@@ -29,7 +31,7 @@ public class DistributedDataStoreFactory {
 
         Configuration config = new ConfigurationImpl("module-shards.conf", "modules.conf");
         final DistributedDataStore dataStore = new DistributedDataStore(actorSystem,
-                new ClusterWrapperImpl(actorSystem), config, introspector.newContextFactory());
+                new ClusterWrapperImpl(actorSystem), config, introspector.newContextFactory(), restoreFromSnapshot);
 
         overlay.setListener(dataStore);
 
index ee83ce2..6df64e1 100644 (file)
@@ -39,6 +39,8 @@ import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
+import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot;
+import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot.ShardSnapshot;
 import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.GetShardDataTree;
 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
@@ -114,12 +116,15 @@ public class Shard extends RaftActor {
     private final DataTreeChangeListenerSupport treeChangeSupport = new DataTreeChangeListenerSupport(this);
     private final DataChangeListenerSupport changeSupport = new DataChangeListenerSupport(this);
 
+    private ShardSnapshot restoreFromSnapshot;
+
     protected Shard(AbstractBuilder<?, ?> builder) {
         super(builder.getId().toString(), builder.getPeerAddresses(),
                 Optional.of(builder.getDatastoreContext().getShardRaftConfig()), DataStoreVersions.CURRENT_VERSION);
 
         this.name = builder.getId().toString();
         this.datastoreContext = builder.getDatastoreContext();
+        this.restoreFromSnapshot = builder.getRestoreFromSnapshot();
 
         setPersistence(datastoreContext.isPersistent());
 
@@ -585,11 +590,14 @@ public class Shard extends RaftActor {
     @Override
     @Nonnull
     protected RaftActorRecoveryCohort getRaftActorRecoveryCohort() {
-        return new ShardRecoveryCoordinator(store, store.getSchemaContext(), persistenceId(), LOG);
+        return new ShardRecoveryCoordinator(store, store.getSchemaContext(),
+                restoreFromSnapshot != null ? restoreFromSnapshot.getSnapshot() : null, persistenceId(), LOG);
     }
 
     @Override
     protected void onRecoveryComplete() {
+        restoreFromSnapshot = null;
+
         //notify shard manager
         getContext().parent().tell(new ActorInitialized(), getSelf());
 
@@ -712,6 +720,7 @@ public class Shard extends RaftActor {
         private Map<String, String> peerAddresses = Collections.emptyMap();
         private DatastoreContext datastoreContext;
         private SchemaContext schemaContext;
+        private DatastoreSnapshot.ShardSnapshot restoreFromSnapshot;
         private volatile boolean sealed;
 
         protected AbstractBuilder(Class<S> shardClass) {
@@ -751,6 +760,12 @@ public class Shard extends RaftActor {
             return self();
         }
 
+        public T restoreFromSnapshot(DatastoreSnapshot.ShardSnapshot restoreFromSnapshot) {
+            checkSealed();
+            this.restoreFromSnapshot = restoreFromSnapshot;
+            return self();
+        }
+
         public ShardIdentifier getId() {
             return id;
         }
@@ -767,6 +782,10 @@ public class Shard extends RaftActor {
             return schemaContext;
         }
 
+        public DatastoreSnapshot.ShardSnapshot getRestoreFromSnapshot() {
+            return restoreFromSnapshot;
+        }
+
         protected void verify() {
             Preconditions.checkNotNull(id, "id should not be null");
             Preconditions.checkNotNull(peerAddresses, "peerAddresses should not be null");
index 9bb2ea8..49e1fe3 100644 (file)
@@ -19,7 +19,6 @@ import akka.actor.Props;
 import akka.actor.SupervisorStrategy;
 import akka.cluster.ClusterEvent;
 import akka.dispatch.OnComplete;
-import akka.japi.Creator;
 import akka.japi.Function;
 import akka.persistence.RecoveryCompleted;
 import akka.serialization.Serialization;
@@ -54,6 +53,7 @@ import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
 import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica;
 import org.opendaylight.controller.cluster.datastore.messages.CreateShard;
 import org.opendaylight.controller.cluster.datastore.messages.CreateShardReply;
+import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot;
 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
 import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
@@ -128,20 +128,21 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
     private SchemaContext schemaContext;
 
+    private DatastoreSnapshot restoreFromSnapshot;
+
     /**
      */
-    protected ShardManager(ClusterWrapper cluster, Configuration configuration,
-            DatastoreContextFactory datastoreContextFactory, CountDownLatch waitTillReadyCountdownLatch,
-            PrimaryShardInfoFutureCache primaryShardInfoCache) {
-
-        this.cluster = Preconditions.checkNotNull(cluster, "cluster should not be null");
-        this.configuration = Preconditions.checkNotNull(configuration, "configuration should not be null");
-        this.datastoreContextFactory = datastoreContextFactory;
-        this.type = datastoreContextFactory.getBaseDatastoreContext().getDataStoreType();
+    protected ShardManager(Builder builder) {
+
+        this.cluster = builder.cluster;
+        this.configuration = builder.configuration;
+        this.datastoreContextFactory = builder.datastoreContextFactory;
+        this.type = builder.datastoreContextFactory.getBaseDatastoreContext().getDataStoreType();
         this.shardDispatcherPath =
                 new Dispatchers(context().system().dispatchers()).getDispatcherPath(Dispatchers.DispatcherType.Shard);
-        this.waitTillReadyCountdownLatch = waitTillReadyCountdownLatch;
-        this.primaryShardInfoCache = primaryShardInfoCache;
+        this.waitTillReadyCountdownLatch = builder.waitTillReadyCountdownLatch;
+        this.primaryShardInfoCache = builder.primaryShardInfoCache;
+        this.restoreFromSnapshot = builder.restoreFromSnapshot;
 
         peerAddressResolver = new ShardPeerAddressResolver(type, cluster.getCurrentMemberName());
 
@@ -151,22 +152,6 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         createLocalShards();
     }
 
-    public static Props props(
-            final ClusterWrapper cluster,
-            final Configuration configuration,
-            final DatastoreContextFactory datastoreContextFactory,
-            final CountDownLatch waitTillReadyCountdownLatch,
-            final PrimaryShardInfoFutureCache primaryShardInfoCache) {
-
-        Preconditions.checkNotNull(cluster, "cluster should not be null");
-        Preconditions.checkNotNull(configuration, "configuration should not be null");
-        Preconditions.checkNotNull(waitTillReadyCountdownLatch, "waitTillReadyCountdownLatch should not be null");
-        Preconditions.checkNotNull(primaryShardInfoCache, "primaryShardInfoCache should not be null");
-
-        return Props.create(new ShardManagerCreator(cluster, configuration, datastoreContextFactory,
-                waitTillReadyCountdownLatch, primaryShardInfoCache));
-    }
-
     @Override
     public void postStop() {
         LOG.info("Stopping ShardManager");
@@ -731,13 +716,24 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         String memberName = this.cluster.getCurrentMemberName();
         Collection<String> memberShardNames = this.configuration.getMemberShardNames(memberName);
 
+        Map<String, DatastoreSnapshot.ShardSnapshot> shardSnapshots = new HashMap<>();
+        if(restoreFromSnapshot != null)
+        {
+            for(DatastoreSnapshot.ShardSnapshot snapshot: restoreFromSnapshot.getShardSnapshots()) {
+                shardSnapshots.put(snapshot.getName(), snapshot);
+            }
+        }
+
+        restoreFromSnapshot = null; // null out to GC
+
         List<String> localShardActorNames = new ArrayList<>();
         for(String shardName : memberShardNames){
             ShardIdentifier shardId = getShardIdentifier(memberName, shardName);
             Map<String, String> peerAddresses = getPeerAddresses(shardName);
             localShardActorNames.add(shardId.toString());
             localShards.put(shardName, new ShardInformation(shardName, shardId, peerAddresses,
-                    newShardDatastoreContext(shardName), Shard.builder(), peerAddressResolver));
+                    newShardDatastoreContext(shardName), Shard.builder().restoreFromSnapshot(
+                        shardSnapshots.get(shardName)), peerAddressResolver));
         }
 
         mBean = ShardManagerInfo.createShardManagerMBean(memberName, "shard-manager-" + this.type,
@@ -986,7 +982,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         private short leaderVersion;
 
         private DatastoreContext datastoreContext;
-        private final Shard.AbstractBuilder<?, ?> builder;
+        private Shard.AbstractBuilder<?, ?> builder;
         private final ShardPeerAddressResolver addressResolver;
 
         private ShardInformation(String shardName, ShardIdentifier shardId,
@@ -1001,8 +997,11 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         }
 
         Props newProps(SchemaContext schemaContext) {
-            return builder.id(shardId).peerAddresses(initialPeerAddresses).datastoreContext(datastoreContext).
+            Preconditions.checkNotNull(builder);
+            Props props = builder.id(shardId).peerAddresses(initialPeerAddresses).datastoreContext(datastoreContext).
                     schemaContext(schemaContext).props();
+            builder = null;
+            return props;
         }
 
         String getShardName() {
@@ -1185,32 +1184,6 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         }
     }
 
-    private static class ShardManagerCreator implements Creator<ShardManager> {
-        private static final long serialVersionUID = 1L;
-
-        final ClusterWrapper cluster;
-        final Configuration configuration;
-        final DatastoreContextFactory datastoreContextFactory;
-        private final CountDownLatch waitTillReadyCountdownLatch;
-        private final PrimaryShardInfoFutureCache primaryShardInfoCache;
-
-        ShardManagerCreator(ClusterWrapper cluster, Configuration configuration,
-                DatastoreContextFactory datastoreContextFactory, CountDownLatch waitTillReadyCountdownLatch,
-                PrimaryShardInfoFutureCache primaryShardInfoCache) {
-            this.cluster = cluster;
-            this.configuration = configuration;
-            this.datastoreContextFactory = datastoreContextFactory;
-            this.waitTillReadyCountdownLatch = waitTillReadyCountdownLatch;
-            this.primaryShardInfoCache = primaryShardInfoCache;
-        }
-
-        @Override
-        public ShardManager create() throws Exception {
-            return new ShardManager(cluster, configuration, datastoreContextFactory, waitTillReadyCountdownLatch,
-                    primaryShardInfoCache);
-        }
-    }
-
     private static class OnShardInitialized {
         private final Runnable replyRunnable;
         private Cancellable timeoutSchedule;
@@ -1280,6 +1253,70 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             return modules;
         }
     }
+
+    public static Builder builder() {
+        return new Builder();
+    }
+
+    public static class Builder {
+        private ClusterWrapper cluster;
+        private Configuration configuration;
+        private DatastoreContextFactory datastoreContextFactory;
+        private CountDownLatch waitTillReadyCountdownLatch;
+        private PrimaryShardInfoFutureCache primaryShardInfoCache;
+        private DatastoreSnapshot restoreFromSnapshot;
+        private volatile boolean sealed;
+
+        protected void checkSealed() {
+            Preconditions.checkState(!sealed, "Builder is already sealed - further modifications are not allowed");
+        }
+
+        public Builder cluster(ClusterWrapper cluster) {
+            checkSealed();
+            this.cluster = cluster;
+            return this;
+        }
+
+        public Builder configuration(Configuration configuration) {
+            checkSealed();
+            this.configuration = configuration;
+            return this;
+        }
+
+        public Builder datastoreContextFactory(DatastoreContextFactory datastoreContextFactory) {
+            checkSealed();
+            this.datastoreContextFactory = datastoreContextFactory;
+            return this;
+        }
+
+        public Builder waitTillReadyCountdownLatch(CountDownLatch waitTillReadyCountdownLatch) {
+            checkSealed();
+            this.waitTillReadyCountdownLatch = waitTillReadyCountdownLatch;
+            return this;
+        }
+
+        public Builder primaryShardInfoCache(PrimaryShardInfoFutureCache primaryShardInfoCache) {
+            checkSealed();
+            this.primaryShardInfoCache = primaryShardInfoCache;
+            return this;
+        }
+
+        public Builder restoreFromSnapshot(DatastoreSnapshot restoreFromSnapshot) {
+            checkSealed();
+            this.restoreFromSnapshot = restoreFromSnapshot;
+            return this;
+        }
+
+        public Props props() {
+            sealed = true;
+            Preconditions.checkNotNull(cluster, "cluster should not be null");
+            Preconditions.checkNotNull(configuration, "configuration should not be null");
+            Preconditions.checkNotNull(datastoreContextFactory, "datastoreContextFactory should not be null");
+            Preconditions.checkNotNull(waitTillReadyCountdownLatch, "waitTillReadyCountdownLatch should not be null");
+            Preconditions.checkNotNull(primaryShardInfoCache, "primaryShardInfoCache should not be null");
+            return Props.create(ShardManager.class, this);
+        }
+    }
 }
 
 
index 87d591d..82a6b72 100644 (file)
@@ -44,9 +44,12 @@ class ShardRecoveryCoordinator implements RaftActorRecoveryCohort {
     private final Set<URI> validNamespaces;
     private PruningDataTreeModification transaction;
     private int size;
+    private final byte[] restoreFromSnapshot;
 
-    ShardRecoveryCoordinator(ShardDataTree store, SchemaContext schemaContext, String shardName, Logger log) {
+    ShardRecoveryCoordinator(ShardDataTree store, SchemaContext schemaContext, byte[] restoreFromSnapshot,
+            String shardName, Logger log) {
         this.store = Preconditions.checkNotNull(store);
+        this.restoreFromSnapshot = restoreFromSnapshot;
         this.shardName = shardName;
         this.log = log;
         this.validNamespaces = NormalizedNodePruner.namespaces(schemaContext);
@@ -128,7 +131,6 @@ class ShardRecoveryCoordinator implements RaftActorRecoveryCohort {
 
     @Override
     public byte[] getRestoreFromSnapshot() {
-        // TODO Auto-generated method stub
-        return null;
+        return restoreFromSnapshot;
     }
 }
index a386c35..c6aa0dc 100644 (file)
@@ -17,13 +17,13 @@ import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.SettableFuture;
 import java.io.FileOutputStream;
-import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import org.apache.commons.lang3.SerializationUtils;
 import org.opendaylight.controller.cluster.datastore.DistributedDataStore;
 import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot;
+import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshotList;
 import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot;
 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.RpcRegistration;
 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
@@ -131,7 +131,7 @@ public class ClusterAdminRpcService implements ClusterAdminService, AutoCloseabl
         Futures.addCallback(Futures.allAsList(configFuture, operFuture), new FutureCallback<List<DatastoreSnapshot>>() {
             @Override
             public void onSuccess(List<DatastoreSnapshot> snapshots) {
-                saveSnapshotsToFile(new ArrayList<>(snapshots), input.getFilePath(), returnFuture);
+                saveSnapshotsToFile(new DatastoreSnapshotList(snapshots), input.getFilePath(), returnFuture);
             }
 
             @Override
@@ -143,7 +143,7 @@ public class ClusterAdminRpcService implements ClusterAdminService, AutoCloseabl
         return returnFuture;
     }
 
-    private static void saveSnapshotsToFile(ArrayList<DatastoreSnapshot> snapshots, String fileName,
+    private static void saveSnapshotsToFile(DatastoreSnapshotList snapshots, String fileName,
             SettableFuture<RpcResult<Void>> returnFuture) {
         try(FileOutputStream fos = new FileOutputStream(fileName)) {
             SerializationUtils.serialize(snapshots, fos);
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/DatastoreSnapshotList.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/DatastoreSnapshotList.java
new file mode 100644 (file)
index 0000000..f0f1856
--- /dev/null
@@ -0,0 +1,25 @@
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.messages;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Stores a list of DatastoreSnapshot instances.
+ */
+public class DatastoreSnapshotList extends ArrayList<DatastoreSnapshot> {
+    private static final long serialVersionUID = 1L;
+
+    public DatastoreSnapshotList() {
+    }
+
+    public DatastoreSnapshotList(List<DatastoreSnapshot> snapshots) {
+        super(snapshots);
+    }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/osgi/Activator.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/osgi/Activator.java
new file mode 100644 (file)
index 0000000..afbf34e
--- /dev/null
@@ -0,0 +1,31 @@
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.osgi;
+
+import org.opendaylight.controller.cluster.datastore.DatastoreSnapshotRestore;
+import org.osgi.framework.BundleActivator;
+import org.osgi.framework.BundleContext;
+
+/**
+ * Activator for the bundle.
+ *
+ * @author Thomas Pantelis
+ */
+public class Activator implements BundleActivator {
+    private static final String RESTORE_DIRECTORY_PATH = "./clustered-datastore-restore";
+
+    @Override
+    public void start(BundleContext context) {
+        DatastoreSnapshotRestore.createInstance(RESTORE_DIRECTORY_PATH);
+    }
+
+    @Override
+    public void stop(BundleContext context) {
+        DatastoreSnapshotRestore.removeInstance();
+    }
+}
index 4ad6ca7..e028887 100644 (file)
@@ -1,6 +1,7 @@
 package org.opendaylight.controller.config.yang.config.distributed_datastore_provider;
 
 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
+import org.opendaylight.controller.cluster.datastore.DatastoreSnapshotRestore;
 import org.opendaylight.controller.cluster.datastore.DistributedDataStoreFactory;
 import org.osgi.framework.BundleContext;
 
@@ -72,7 +73,8 @@ public class DistributedConfigDataStoreProviderModule extends
                 .build();
 
         return DistributedDataStoreFactory.createInstance(getConfigSchemaServiceDependency(),
-                datastoreContext, getConfigActorSystemProviderDependency().getActorSystem(), bundleContext);
+                datastoreContext, DatastoreSnapshotRestore.instance().getAndRemove(datastoreContext.getDataStoreType()),
+                getConfigActorSystemProviderDependency().getActorSystem(), bundleContext);
     }
 
     public void setBundleContext(BundleContext bundleContext) {
index ef3fa45..e89708f 100644 (file)
@@ -1,6 +1,7 @@
 package org.opendaylight.controller.config.yang.config.distributed_datastore_provider;
 
 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
+import org.opendaylight.controller.cluster.datastore.DatastoreSnapshotRestore;
 import org.opendaylight.controller.cluster.datastore.DistributedDataStoreFactory;
 import org.osgi.framework.BundleContext;
 
@@ -73,7 +74,8 @@ public class DistributedOperationalDataStoreProviderModule extends
                 .build();
 
         return DistributedDataStoreFactory.createInstance(getOperationalSchemaServiceDependency(),
-                datastoreContext, getOperationalActorSystemProviderDependency().getActorSystem(), bundleContext);
+                datastoreContext, DatastoreSnapshotRestore.instance().getAndRemove(datastoreContext.getDataStoreType()),
+                getOperationalActorSystemProviderDependency().getActorSystem(), bundleContext);
     }
 
     public void setBundleContext(BundleContext bundleContext) {
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DatastoreSnapshotRestoreTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DatastoreSnapshotRestoreTest.java
new file mode 100644 (file)
index 0000000..aa67e53
--- /dev/null
@@ -0,0 +1,96 @@
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import org.apache.commons.lang3.SerializationUtils;
+import org.junit.After;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot;
+import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot.ShardSnapshot;
+import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshotList;
+
+/**
+ * Unit tests for DatastoreSnapshotRestore.
+ *
+ * @author Thomas Pantelis
+ */
+public class DatastoreSnapshotRestoreTest {
+    String restoreDirectoryPath = "target/DatastoreSnapshotRestoreTest-" + System.nanoTime();
+    File restoreDirectoryFile = new File(restoreDirectoryPath);
+    File backupFile = new File(restoreDirectoryFile, "backup");
+
+    @After
+    public void tearDown() {
+        backupFile.delete();
+        restoreDirectoryFile.delete();
+    }
+
+    @Test
+    public void test() throws Exception {
+        assertTrue("Failed to mkdir " + restoreDirectoryPath, restoreDirectoryFile.mkdirs());
+
+        List<ShardSnapshot> shardSnapshots = new ArrayList<>();
+        shardSnapshots.add(new ShardSnapshot("cars", new byte[]{1,2}));
+        shardSnapshots.add(new ShardSnapshot("people", new byte[]{3,4}));
+        DatastoreSnapshot configSnapshot = new DatastoreSnapshot("config", null, shardSnapshots );
+
+        shardSnapshots = new ArrayList<>();
+        shardSnapshots.add(new ShardSnapshot("cars", new byte[]{5,6}));
+        shardSnapshots.add(new ShardSnapshot("people", new byte[]{7,8}));
+        shardSnapshots.add(new ShardSnapshot("bikes", new byte[]{9,0}));
+        DatastoreSnapshot operSnapshot = new DatastoreSnapshot("oper", null, shardSnapshots );
+
+        DatastoreSnapshotList snapshotList = new DatastoreSnapshotList();
+        snapshotList.add(configSnapshot);
+        snapshotList.add(operSnapshot);
+
+        File backupFile = new File(restoreDirectoryFile, "backup");
+        try(FileOutputStream fos = new FileOutputStream(backupFile)) {
+            SerializationUtils.serialize(snapshotList, fos);
+        }
+
+        DatastoreSnapshotRestore.createInstance(restoreDirectoryPath);
+
+        verifySnapshot(configSnapshot, DatastoreSnapshotRestore.instance().getAndRemove("config"));
+        verifySnapshot(operSnapshot, DatastoreSnapshotRestore.instance().getAndRemove("oper"));
+
+        assertNull("DatastoreSnapshot was not removed", DatastoreSnapshotRestore.instance().getAndRemove("config"));
+
+        assertFalse(backupFile + " was not deleted", backupFile.exists());
+
+        DatastoreSnapshotRestore.removeInstance();
+        DatastoreSnapshotRestore.createInstance("target/does-not-exist");
+        assertNull("Expected null DatastoreSnapshot", DatastoreSnapshotRestore.instance().getAndRemove("config"));
+        assertNull("Expected null DatastoreSnapshot", DatastoreSnapshotRestore.instance().getAndRemove("oper"));
+    }
+
+    private void verifySnapshot(DatastoreSnapshot expected, DatastoreSnapshot actual) {
+        assertNotNull("DatastoreSnapshot is null", actual);
+        assertEquals("getType", expected.getType(), actual.getType());
+        assertTrue("ShardManager snapshots don't match", Objects.deepEquals(expected.getShardManagerSnapshot(),
+                actual.getShardManagerSnapshot()));
+        assertEquals("ShardSnapshots size", expected.getShardSnapshots().size(), actual.getShardSnapshots().size());
+        for(int i = 0; i < expected.getShardSnapshots().size(); i++) {
+            assertEquals("ShardSnapshot " + (i + 1) + " name", expected.getShardSnapshots().get(i).getName(),
+                    actual.getShardSnapshots().get(i).getName());
+            assertArrayEquals("ShardSnapshot " + (i + 1) + " snapshot", expected.getShardSnapshots().get(i).getSnapshot(),
+                    actual.getShardSnapshots().get(i).getSnapshot());
+        }
+    }
+}
index fa066ee..7acde42 100644 (file)
@@ -31,7 +31,9 @@ import com.typesafe.config.ConfigFactory;
 import java.io.IOException;
 import java.math.BigInteger;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
@@ -43,12 +45,17 @@ import org.junit.Test;
 import org.mockito.Mockito;
 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
+import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot;
 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
 import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener;
+import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
+import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
+import org.opendaylight.controller.cluster.raft.Snapshot;
 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
 import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
 import org.opendaylight.controller.md.cluster.datastore.model.PeopleModel;
+import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
@@ -1106,4 +1113,52 @@ public class DistributedDataStoreIntegrationTest {
             cleanup(dataStore);
         }};
     }
+
+    @Test
+    public void testRestoreFromDatastoreSnapshot() throws Exception{
+        new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
+            String name = "transactionIntegrationTest";
+
+            ContainerNode carsNode = CarsModel.newCarsNode(CarsModel.newCarsMapNode(
+                    CarsModel.newCarEntry("optima", BigInteger.valueOf(20000L)),
+                    CarsModel.newCarEntry("sportage", BigInteger.valueOf(30000L))));
+
+            ShardDataTree dataTree = new ShardDataTree(SchemaContextHelper.full());
+            AbstractShardTest.writeToStore(dataTree, CarsModel.BASE_PATH, carsNode);
+            NormalizedNode<?, ?> root = AbstractShardTest.readStore(dataTree.getDataTree(),
+                    YangInstanceIdentifier.builder().build());
+
+            Snapshot carsSnapshot = Snapshot.create(SerializationUtils.serializeNormalizedNode(root),
+                    Collections.<ReplicatedLogEntry>emptyList(), 2, 1, 2, 1, 1, "member-1");
+
+            NormalizedNode<?, ?> peopleNode = PeopleModel.create();
+            dataTree = new ShardDataTree(SchemaContextHelper.full());
+            AbstractShardTest.writeToStore(dataTree, PeopleModel.BASE_PATH, peopleNode);
+            root = AbstractShardTest.readStore(dataTree.getDataTree(), YangInstanceIdentifier.builder().build());
+
+            Snapshot peopleSnapshot = Snapshot.create(SerializationUtils.serializeNormalizedNode(root),
+                    Collections.<ReplicatedLogEntry>emptyList(), 2, 1, 2, 1, 1, "member-1");
+
+            restoreFromSnapshot = new DatastoreSnapshot(name, null, Arrays.asList(
+                    new DatastoreSnapshot.ShardSnapshot("cars",
+                            org.apache.commons.lang3.SerializationUtils.serialize(carsSnapshot)),
+                    new DatastoreSnapshot.ShardSnapshot("people",
+                            org.apache.commons.lang3.SerializationUtils.serialize(peopleSnapshot))));
+
+            DistributedDataStore dataStore = setupDistributedDataStore(name, "module-shards-member1.conf",
+                    true, "cars", "people");
+
+            DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
+
+            Optional<NormalizedNode<?, ?>> optional = readTx.read(CarsModel.BASE_PATH).get(5, TimeUnit.SECONDS);
+            assertEquals("isPresent", true, optional.isPresent());
+            assertEquals("Data node", carsNode, optional.get());
+
+            optional = readTx.read(PeopleModel.BASE_PATH).get(5, TimeUnit.SECONDS);
+            assertEquals("isPresent", true, optional.isPresent());
+            assertEquals("Data node", peopleNode, optional.get());
+
+            cleanup(dataStore);
+        }};
+    }
 }
index 67a6479..40d123e 100644 (file)
@@ -22,6 +22,7 @@ import org.mockito.Mockito;
 import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
 import org.opendaylight.controller.cluster.datastore.config.Configuration;
 import org.opendaylight.controller.cluster.datastore.config.ConfigurationImpl;
+import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot;
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
@@ -35,6 +36,7 @@ import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 public class IntegrationTestKit extends ShardTestKit {
 
     DatastoreContext.Builder datastoreContextBuilder;
+    DatastoreSnapshot restoreFromSnapshot;
 
     public IntegrationTestKit(ActorSystem actorSystem, Builder datastoreContextBuilder) {
         super(actorSystem);
@@ -69,7 +71,8 @@ public class IntegrationTestKit extends ShardTestKit {
         Mockito.doReturn(datastoreContext).when(mockContextFactory).getBaseDatastoreContext();
         Mockito.doReturn(datastoreContext).when(mockContextFactory).getShardDatastoreContext(Mockito.anyString());
 
-        DistributedDataStore dataStore = new DistributedDataStore(getSystem(), cluster, config, mockContextFactory);
+        DistributedDataStore dataStore = new DistributedDataStore(getSystem(), cluster, config, mockContextFactory,
+                restoreFromSnapshot);
 
         dataStore.onGlobalContextUpdated(schemaContext);
 
index 0f31c6a..29cb189 100644 (file)
@@ -163,8 +163,9 @@ public class ShardManagerTest extends AbstractActorTest {
     }
 
     private Props newShardMgrProps(Configuration config) {
-        return ShardManager.props(new MockClusterWrapper(), config,
-                newDatastoreContextFactory(datastoreContextBuilder.build()), ready, primaryShardInfoCache);
+        return ShardManager.builder().cluster(new MockClusterWrapper()).configuration(config).
+                datastoreContextFactory(newDatastoreContextFactory(datastoreContextBuilder.build())).
+                waitTillReadyCountdownLatch(ready).primaryShardInfoCache(primaryShardInfoCache).props();
     }
 
     private Props newPropsShardMgrWithMockShardActor() {
@@ -178,8 +179,9 @@ public class ShardManagerTest extends AbstractActorTest {
             private static final long serialVersionUID = 1L;
             @Override
             public ShardManager create() throws Exception {
-                return new ForwardingShardManager(clusterWrapper, config, newDatastoreContextFactory(
-                        datastoreContextBuilder.build()), ready, name, shardActor, primaryShardInfoCache);
+                return new ForwardingShardManager(ShardManager.builder().cluster(clusterWrapper).configuration(config).
+                        datastoreContextFactory(newDatastoreContextFactory(datastoreContextBuilder.build())).
+                        waitTillReadyCountdownLatch(ready).primaryShardInfoCache(primaryShardInfoCache), name, shardActor);
             }
         };
 
@@ -224,7 +226,9 @@ public class ShardManagerTest extends AbstractActorTest {
             private static final long serialVersionUID = 1L;
             @Override
             public ShardManager create() throws Exception {
-                return new ShardManager(new MockClusterWrapper(), mockConfig, mockFactory, ready, primaryShardInfoCache) {
+                return new ShardManager(ShardManager.builder().cluster(new MockClusterWrapper()).configuration(mockConfig).
+                        datastoreContextFactory(mockFactory).waitTillReadyCountdownLatch(ready).
+                        primaryShardInfoCache(primaryShardInfoCache)) {
                     @Override
                     protected ActorRef newShardActor(SchemaContext schemaContext, ShardInformation info) {
                         Entry<ActorRef, DatastoreContext> entry = shardInfoMap.get(info.getShardName());
@@ -950,14 +954,10 @@ public class ShardManagerTest extends AbstractActorTest {
 
     @Test
     public void testWhenShardIsLeaderSyncStatusIsTrue() throws Exception{
-        final Props persistentProps = ShardManager.props(new MockClusterWrapper(), new MockConfiguration(),
-                newDatastoreContextFactory(DatastoreContext.newBuilder().persistent(true).build()), ready,
-                primaryShardInfoCache);
-        final TestActorRef<ShardManager> shardManager =
-                TestActorRef.create(getSystem(), persistentProps);
+        final TestActorRef<ShardManager> shardManager = TestActorRef.create(getSystem(), newShardMgrProps());
 
         ShardManager shardManagerActor = shardManager.underlyingActor();
-        shardManagerActor.onReceiveCommand(new RoleChangeNotification("member-1-shard-default-unknown",
+        shardManagerActor.onReceiveCommand(new RoleChangeNotification("member-1-shard-default-" + shardMrgIDSuffix,
                 RaftState.Follower.name(), RaftState.Leader.name()));
 
         assertEquals(true, shardManagerActor.getMBean().getSyncStatus());
@@ -965,44 +965,43 @@ public class ShardManagerTest extends AbstractActorTest {
 
     @Test
     public void testWhenShardIsCandidateSyncStatusIsFalse() throws Exception{
-        final Props persistentProps = newShardMgrProps();
-        final TestActorRef<ShardManager> shardManager =
-                TestActorRef.create(getSystem(), persistentProps);
+        final TestActorRef<ShardManager> shardManager = TestActorRef.create(getSystem(), newShardMgrProps());
 
         ShardManager shardManagerActor = shardManager.underlyingActor();
-        shardManagerActor.onReceiveCommand(new RoleChangeNotification("member-1-shard-default-unknown",
+        String shardId = "member-1-shard-default-" + shardMrgIDSuffix;
+        shardManagerActor.onReceiveCommand(new RoleChangeNotification(shardId,
                 RaftState.Follower.name(), RaftState.Candidate.name()));
 
         assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
 
         // Send a FollowerInitialSyncStatus with status = true for the replica whose current state is candidate
-        shardManagerActor.onReceiveCommand(new FollowerInitialSyncUpStatus(true, "member-1-shard-default-unknown"));
+        shardManagerActor.onReceiveCommand(new FollowerInitialSyncUpStatus(
+                true, shardId));
 
         assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
     }
 
     @Test
     public void testWhenShardIsFollowerSyncStatusDependsOnFollowerInitialSyncStatus() throws Exception{
-        final Props persistentProps = ShardManager.props(new MockClusterWrapper(), new MockConfiguration(),
-                newDatastoreContextFactory(DatastoreContext.newBuilder().persistent(true).build()), ready,
-                primaryShardInfoCache);
+        final Props persistentProps = newShardMgrProps();
         final TestActorRef<ShardManager> shardManager =
                 TestActorRef.create(getSystem(), persistentProps);
 
+        String shardId = "member-1-shard-default-" + shardMrgIDSuffix;
         ShardManager shardManagerActor = shardManager.underlyingActor();
-        shardManagerActor.onReceiveCommand(new RoleChangeNotification("member-1-shard-default-unknown",
+        shardManagerActor.onReceiveCommand(new RoleChangeNotification(shardId,
                 RaftState.Candidate.name(), RaftState.Follower.name()));
 
         // Initially will be false
         assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
 
         // Send status true will make sync status true
-        shardManagerActor.onReceiveCommand(new FollowerInitialSyncUpStatus(true, "member-1-shard-default-unknown"));
+        shardManagerActor.onReceiveCommand(new FollowerInitialSyncUpStatus(true, shardId));
 
         assertEquals(true, shardManagerActor.getMBean().getSyncStatus());
 
         // Send status false will make sync status false
-        shardManagerActor.onReceiveCommand(new FollowerInitialSyncUpStatus(false, "member-1-shard-default-unknown"));
+        shardManagerActor.onReceiveCommand(new FollowerInitialSyncUpStatus(false, shardId));
 
         assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
 
@@ -1010,15 +1009,13 @@ public class ShardManagerTest extends AbstractActorTest {
 
     @Test
     public void testWhenMultipleShardsPresentSyncStatusMustBeTrueForAllShards() throws Exception{
-        final Props persistentProps = ShardManager.props(new MockClusterWrapper(),
-                new MockConfiguration() {
-                    @Override
-                    public List<String> getMemberShardNames(String memberName) {
-                        return Arrays.asList("default", "astronauts");
-                    }
-                },
-                newDatastoreContextFactory(DatastoreContext.newBuilder().persistent(true).build()), ready,
-                primaryShardInfoCache);
+        final Props persistentProps = newShardMgrProps(new MockConfiguration() {
+            @Override
+            public List<String> getMemberShardNames(String memberName) {
+                return Arrays.asList("default", "astronauts");
+            }
+        });
+
         final TestActorRef<ShardManager> shardManager =
                 TestActorRef.create(getSystem(), persistentProps);
 
@@ -1028,28 +1025,30 @@ public class ShardManagerTest extends AbstractActorTest {
         assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
 
         // Make default shard leader
-        shardManagerActor.onReceiveCommand(new RoleChangeNotification("member-1-shard-default-unknown",
+        String defaultShardId = "member-1-shard-default-" + shardMrgIDSuffix;
+        shardManagerActor.onReceiveCommand(new RoleChangeNotification(defaultShardId,
                 RaftState.Follower.name(), RaftState.Leader.name()));
 
         // default = Leader, astronauts is unknown so sync status remains false
         assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
 
         // Make astronauts shard leader as well
-        shardManagerActor.onReceiveCommand(new RoleChangeNotification("member-1-shard-astronauts-unknown",
+        String astronautsShardId = "member-1-shard-astronauts-" + shardMrgIDSuffix;
+        shardManagerActor.onReceiveCommand(new RoleChangeNotification(astronautsShardId,
                 RaftState.Follower.name(), RaftState.Leader.name()));
 
         // Now sync status should be true
         assertEquals(true, shardManagerActor.getMBean().getSyncStatus());
 
         // Make astronauts a Follower
-        shardManagerActor.onReceiveCommand(new RoleChangeNotification("member-1-shard-astronauts-unknown",
+        shardManagerActor.onReceiveCommand(new RoleChangeNotification(astronautsShardId,
                 RaftState.Leader.name(), RaftState.Follower.name()));
 
         // Sync status is not true
         assertEquals(false, shardManagerActor.getMBean().getSyncStatus());
 
         // Make the astronauts follower sync status true
-        shardManagerActor.onReceiveCommand(new FollowerInitialSyncUpStatus(true, "member-1-shard-astronauts-unknown"));
+        shardManagerActor.onReceiveCommand(new FollowerInitialSyncUpStatus(true, astronautsShardId));
 
         // Sync status is now true
         assertEquals(true, shardManagerActor.getMBean().getSyncStatus());
@@ -1319,9 +1318,9 @@ public class ShardManagerTest extends AbstractActorTest {
         private final CountDownLatch recoveryComplete = new CountDownLatch(1);
 
         TestShardManager(String shardMrgIDSuffix) {
-            super(new MockClusterWrapper(), new MockConfiguration(),
-                    newDatastoreContextFactory(DatastoreContext.newBuilder().dataStoreType(shardMrgIDSuffix).build()),
-                    ready, new PrimaryShardInfoFutureCache());
+            super(ShardManager.builder().cluster(new MockClusterWrapper()).configuration(new MockConfiguration()).
+                    datastoreContextFactory(newDatastoreContextFactory(DatastoreContext.newBuilder().dataStoreType(shardMrgIDSuffix).build())).
+                    waitTillReadyCountdownLatch(ready).primaryShardInfoCache(new PrimaryShardInfoFutureCache()));
         }
 
         @Override
@@ -1379,10 +1378,8 @@ public class ShardManagerTest extends AbstractActorTest {
         private final ActorRef shardActor;
         private final String name;
 
-        protected ForwardingShardManager(ClusterWrapper cluster, Configuration configuration,
-                DatastoreContextFactory factory, CountDownLatch waitTillReadyCountdownLatch, String name,
-                ActorRef shardActor, PrimaryShardInfoFutureCache primaryShardInfoCache) {
-            super(cluster, configuration, factory, waitTillReadyCountdownLatch, primaryShardInfoCache);
+        public ForwardingShardManager(Builder builder, String name, ActorRef shardActor) {
+            super(builder);
             this.shardActor = shardActor;
             this.name = name;
         }
index 49b0937..bbcda6d 100644 (file)
@@ -51,7 +51,8 @@ public class ShardRecoveryCoordinatorTest {
 
     @Test
     public void testAppendRecoveredLogEntryDataTreeCandidatePayload(){
-        final ShardRecoveryCoordinator coordinator = new ShardRecoveryCoordinator(peopleDataTree, peopleSchemaContext, "foobar", LoggerFactory.getLogger("foo"));
+        final ShardRecoveryCoordinator coordinator = new ShardRecoveryCoordinator(peopleDataTree,
+                peopleSchemaContext, null, "foobar", LoggerFactory.getLogger("foo"));
         coordinator.startLogRecoveryBatch(10);
         try {
             coordinator.appendRecoveredLogEntry(DataTreeCandidatePayload.create(createCar()));
@@ -64,7 +65,8 @@ public class ShardRecoveryCoordinatorTest {
 
     @Test
     public void testAppendRecoveredLogEntryModificationPayload() throws IOException {
-        final ShardRecoveryCoordinator coordinator = new ShardRecoveryCoordinator(peopleDataTree, peopleSchemaContext, "foobar", LoggerFactory.getLogger("foo"));
+        final ShardRecoveryCoordinator coordinator = new ShardRecoveryCoordinator(peopleDataTree,
+                peopleSchemaContext, null, "foobar", LoggerFactory.getLogger("foo"));
         coordinator.startLogRecoveryBatch(10);
         try {
             final MutableCompositeModification modification  = new MutableCompositeModification((short) 1);
@@ -77,7 +79,8 @@ public class ShardRecoveryCoordinatorTest {
 
     @Test
     public void testAppendRecoveredLogEntryCompositeModificationPayload() throws IOException {
-        final ShardRecoveryCoordinator coordinator = new ShardRecoveryCoordinator(peopleDataTree, peopleSchemaContext, "foobar", LoggerFactory.getLogger("foo"));
+        final ShardRecoveryCoordinator coordinator = new ShardRecoveryCoordinator(peopleDataTree,
+                peopleSchemaContext, null, "foobar", LoggerFactory.getLogger("foo"));
         coordinator.startLogRecoveryBatch(10);
         try {
             final MutableCompositeModification modification  = new MutableCompositeModification((short) 1);
@@ -90,7 +93,8 @@ public class ShardRecoveryCoordinatorTest {
 
     @Test
     public void testAppendRecoveredLogEntryCompositeModificationByteStringPayload() throws IOException {
-        final ShardRecoveryCoordinator coordinator = new ShardRecoveryCoordinator(peopleDataTree, peopleSchemaContext, "foobar", LoggerFactory.getLogger("foo"));
+        final ShardRecoveryCoordinator coordinator = new ShardRecoveryCoordinator(peopleDataTree,
+                peopleSchemaContext, null, "foobar", LoggerFactory.getLogger("foo"));
         coordinator.startLogRecoveryBatch(10);
         try {
             final MutableCompositeModification modification  = new MutableCompositeModification((short) 1);
@@ -105,7 +109,8 @@ public class ShardRecoveryCoordinatorTest {
 
     @Test
     public void testApplyRecoverySnapshot(){
-        final ShardRecoveryCoordinator coordinator = new ShardRecoveryCoordinator(peopleDataTree , peopleSchemaContext, "foobar", LoggerFactory.getLogger("foo"));
+        final ShardRecoveryCoordinator coordinator = new ShardRecoveryCoordinator(peopleDataTree,
+                peopleSchemaContext, null, "foobar", LoggerFactory.getLogger("foo"));
         coordinator.startLogRecoveryBatch(10);
 
         coordinator.applyRecoverySnapshot(createSnapshot());
@@ -117,7 +122,8 @@ public class ShardRecoveryCoordinatorTest {
 
     @Test
     public void testApplyCurrentLogRecoveryBatch(){
-        final ShardRecoveryCoordinator coordinator = new ShardRecoveryCoordinator(peopleDataTree, peopleSchemaContext, "foobar", LoggerFactory.getLogger("foo"));
+        final ShardRecoveryCoordinator coordinator = new ShardRecoveryCoordinator(peopleDataTree,
+                peopleSchemaContext, null, "foobar", LoggerFactory.getLogger("foo"));
         coordinator.startLogRecoveryBatch(10);
 
         try {
index 9608b81..75fa987 100644 (file)
@@ -97,7 +97,7 @@ public class DistributedEntityOwnershipServiceTest extends AbstractEntityOwnersh
         Mockito.doReturn(datastoreContext).when(mockContextFactory).getBaseDatastoreContext();
         Mockito.doReturn(datastoreContext).when(mockContextFactory).getShardDatastoreContext(Mockito.anyString());
 
-        dataStore = new DistributedDataStore(getSystem(), new MockClusterWrapper(), configuration, mockContextFactory);
+        dataStore = new DistributedDataStore(getSystem(), new MockClusterWrapper(), configuration, mockContextFactory, null);
 
         dataStore.onGlobalContextUpdated(SchemaContextHelper.entityOwners());
     }
index 651d556..6ad88e5 100644 (file)
@@ -11,6 +11,7 @@ package org.opendaylight.controller.md.cluster.datastore.model;
 import java.math.BigInteger;
 import org.opendaylight.yangtools.yang.common.QName;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
 import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
@@ -64,18 +65,22 @@ public class CarsModel {
     }
 
     public static NormalizedNode<?, ?> createEmptyCarsList(){
+        return newCarsNode(newCarsMapNode());
+    }
 
-        // Create a list builder
-        CollectionNodeBuilder<MapEntryNode, MapNode> cars =
-                ImmutableMapNodeBuilder.create().withNodeIdentifier(
-                        new YangInstanceIdentifier.NodeIdentifier(
-                                CAR_QNAME));
+    public static ContainerNode newCarsNode(MapNode carsList) {
+        return ImmutableContainerNodeBuilder.create().withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(
+                BASE_QNAME)).withChild(carsList).build();
+    }
 
-        return ImmutableContainerNodeBuilder.create()
-                .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(BASE_QNAME))
-                .withChild(cars.build())
-                .build();
+    public static MapNode newCarsMapNode(MapEntryNode... carEntries) {
+        CollectionNodeBuilder<MapEntryNode, MapNode> builder = ImmutableMapNodeBuilder.create().
+                withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(CAR_QNAME));
+        for(MapEntryNode e: carEntries) {
+            builder.withChild(e);
+        }
 
+        return builder.build();
     }
 
     public static NormalizedNode<?, ?> emptyContainer(){

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.