Bug 4564: Implement clustering backup-datastore RPC 15/29215/3
authorTom Pantelis <tpanteli@brocade.com>
Tue, 3 Nov 2015 13:45:10 +0000 (08:45 -0500)
committerTom Pantelis <tpanteli@brocade.com>
Mon, 9 Nov 2015 03:48:01 +0000 (22:48 -0500)
Added a new RPC backup-datastore to send the GetSnapshot message to the
ShardManager's and persist the list of DatastoreSnapshots to a file.

I also renamed the cluster-config yang module to cluster-admin to make
it more general as the backup RPC isn't related to configuration.

Change-Id: I18e5d47f7052b890c3547066145e4d5d0fbe1277
Signed-off-by: Tom Pantelis <tpanteli@brocade.com>
14 files changed:
opendaylight/md-sal/sal-clustering-config/src/main/resources/initial/05-clustering.xml.conf
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManagerGetSnapshotReplyActor.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/admin/ClusterAdminRpcService.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/config/ClusterConfigRpcService.java [deleted file]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/cluster_admin_provider/ClusterAdminProviderModule.java [moved from opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/cluster_config_provider/ClusterConfigProviderModule.java with 61% similarity]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/cluster_admin_provider/ClusterAdminProviderModuleFactory.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/cluster_config_provider/ClusterConfigProviderModuleFactory.java [deleted file]
opendaylight/md-sal/sal-distributed-datastore/src/main/yang/cluster-admin-service.yang [moved from opendaylight/md-sal/sal-distributed-datastore/src/main/yang/cluster-config-service.yang with 85% similarity]
opendaylight/md-sal/sal-distributed-datastore/src/main/yang/cluster-admin.yang [moved from opendaylight/md-sal/sal-distributed-datastore/src/main/yang/cluster-config.yang with 86% similarity]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/IntegrationTestKit.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerGetSnapshotReplyActorTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/admin/ClusterAdminRpcServiceTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/test/resources/module-shards-member1.conf [new file with mode: 0644]

index 3cb1dfe..6a66efb 100644 (file)
@@ -85,8 +85,8 @@
                 </module>
 
                 <module>
-                    <type xmlns:prefix="urn:opendaylight:params:xml:ns:yang:controller:config:cluster-config-provider">prefix:cluster-config-provider</type>
-                    <name>cluster-config-provider</name>
+                    <type xmlns:prefix="urn:opendaylight:params:xml:ns:yang:controller:config:cluster-admin-provider">prefix:cluster-admin-provider</type>
+                    <name>cluster-admin-provider</name>
 
                     <oper-data-store>
                         <type xmlns:operational-dom-store-spi="urn:opendaylight:params:xml:ns:yang:controller:md:sal:core:spi:operational-dom-store">operational-dom-store-spi:operational-dom-datastore</type>
         <capability>urn:opendaylight:params:xml:ns:yang:controller:config:concurrent-data-broker?module=odl-concurrent-data-broker-cfg&amp;revision=2014-11-24</capability>
         <capability>urn:opendaylight:params:xml:ns:yang:controller:config:actor-system-provider:service?module=actor-system-provider-service&amp;revision=2015-10-05</capability>
         <capability>urn:opendaylight:params:xml:ns:yang:controller:config:actor-system-provider:impl?module=actor-system-provider-impl&amp;revision=2015-10-05</capability>
-        <capability>urn:opendaylight:params:xml:ns:yang:controller:md:sal:cluster:config?module=cluster-config&amp;revision=2015-10-13</capability>
-        <capability>urn:opendaylight:params:xml:ns:yang:controller:config:cluster-config-provider?module=cluster-config-provider&amp;revision=2015-10-13</capability>
+        <capability>urn:opendaylight:params:xml:ns:yang:controller:md:sal:cluster:admin?module=cluster-admin&amp;revision=2015-10-13</capability>
+        <capability>urn:opendaylight:params:xml:ns:yang:controller:config:cluster-admin-provider?module=cluster-admin-provider&amp;revision=2015-10-13</capability>
         <capability>urn:opendaylight:params:xml:ns:yang:controller:config:distributed-datastore-provider?module=distributed-datastore-provider&amp;revision=2014-06-12</capability>
         <capability>urn:opendaylight:params:xml:ns:yang:controller:md:sal:core:spi:config-dom-store?module=opendaylight-config-dom-datastore&amp;revision=2014-06-17</capability>
         <capability>urn:opendaylight:params:xml:ns:yang:controller:md:sal:core:spi:operational-dom-store?module=opendaylight-operational-dom-datastore&amp;revision=2014-06-17</capability>
index 0804a50..2f6bb46 100644 (file)
@@ -241,8 +241,8 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         }
 
         byte[] shardManagerSnapshot = null;
-        ActorRef replyActor = getContext().actorOf(ShardManagerGetSnapshotReplyActor.props(localShards.size(),
-                type, shardManagerSnapshot , getSender(), persistenceId(),
+        ActorRef replyActor = getContext().actorOf(ShardManagerGetSnapshotReplyActor.props(
+                new ArrayList<>(localShards.keySet()), type, shardManagerSnapshot , getSender(), persistenceId(),
                 datastoreContextFactory.getBaseDatastoreContext().getShardInitializationTimeout().duration()));
 
         for(ShardInformation shardInfo: localShards.values()) {
index f77b3cb..d0b2dab 100644 (file)
@@ -14,7 +14,10 @@ import akka.actor.ReceiveTimeout;
 import akka.actor.Status.Failure;
 import akka.actor.UntypedActor;
 import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 import java.util.concurrent.TimeoutException;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
 import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot;
@@ -33,14 +36,15 @@ import scala.concurrent.duration.Duration;
 class ShardManagerGetSnapshotReplyActor extends UntypedActor {
     private static final Logger LOG = LoggerFactory.getLogger(ShardManagerGetSnapshotReplyActor.class);
 
-    private int repliesReceived;
+    private final Set<String> remainingShardNames;
     private final Params params;
     private final List<ShardSnapshot> shardSnapshots = new ArrayList<>();
 
     private ShardManagerGetSnapshotReplyActor(Params params) {
         this.params = params;
+        remainingShardNames = new HashSet<>(params.shardNames);
 
-        LOG.debug("{}: Expecting {} shard snapshot replies", params.id, params.totalShardCount);
+        LOG.debug("{}: Expecting {} shard snapshot replies", params.id, params.shardNames.size());
 
         getContext().setReceiveTimeout(params.receiveTimeout);
     }
@@ -55,12 +59,12 @@ class ShardManagerGetSnapshotReplyActor extends UntypedActor {
             params.replyToActor.tell(message, getSelf());
             getSelf().tell(PoisonPill.getInstance(), getSelf());
         } else if (message instanceof ReceiveTimeout) {
-            LOG.warn("{}: Got ReceiveTimeout for inactivity - expected {} GetSnapshotReply messages within {} ms, received {}",
-                    params.id, params.totalShardCount, params.receiveTimeout.toMillis(), repliesReceived);
-
-            params.replyToActor.tell(new Failure(new TimeoutException(String.format(
-                    "Timed out after %s ms while waiting for snapshot replies from %d shards. Actual replies received was %s",
-                        params.receiveTimeout.toMillis(), params.totalShardCount, repliesReceived))), getSelf());
+            String msg = String.format(
+                    "Timed out after %s ms while waiting for snapshot replies from %d shard(s). %d shard(s) %s did not respond.",
+                        params.receiveTimeout.toMillis(), params.shardNames.size(), remainingShardNames.size(),
+                        remainingShardNames);
+            LOG.warn("{}: {}", params.id, msg);
+            params.replyToActor.tell(new Failure(new TimeoutException(msg)), getSelf());
             getSelf().tell(PoisonPill.getInstance(), getSelf());
         }
     }
@@ -71,7 +75,8 @@ class ShardManagerGetSnapshotReplyActor extends UntypedActor {
         ShardIdentifier shardId = ShardIdentifier.builder().fromShardIdString(getSnapshotReply.getId()).build();
         shardSnapshots.add(new ShardSnapshot(shardId.getShardName(), getSnapshotReply.getSnapshot()));
 
-        if(++repliesReceived == params.totalShardCount) {
+        remainingShardNames.remove(shardId.getShardName());
+        if(remainingShardNames.isEmpty()) {
             LOG.debug("{}: All shard snapshots received", params.id);
 
             DatastoreSnapshot datastoreSnapshot = new DatastoreSnapshot(params.datastoreType, params.shardManagerSnapshot,
@@ -81,23 +86,23 @@ class ShardManagerGetSnapshotReplyActor extends UntypedActor {
         }
     }
 
-    public static Props props(int totalShardCount, String datastoreType, byte[] shardManagerSnapshot,
+    public static Props props(Collection<String> shardNames, String datastoreType, byte[] shardManagerSnapshot,
             ActorRef replyToActor, String id, Duration receiveTimeout) {
-        return Props.create(ShardManagerGetSnapshotReplyActor.class, new Params(totalShardCount, datastoreType,
+        return Props.create(ShardManagerGetSnapshotReplyActor.class, new Params(shardNames, datastoreType,
                 shardManagerSnapshot, replyToActor, id, receiveTimeout));
     }
 
     private static final class Params {
-        final int totalShardCount;
+        final Collection<String> shardNames;
         final String datastoreType;
         final byte[] shardManagerSnapshot;
         final ActorRef replyToActor;
         final String id;
         final Duration receiveTimeout;
 
-        Params(int totalShardCount, String datastoreType, byte[] shardManagerSnapshot, ActorRef replyToActor,
+        Params(Collection<String> shardNames, String datastoreType, byte[] shardManagerSnapshot, ActorRef replyToActor,
                 String id, Duration receiveTimeout) {
-            this.totalShardCount = totalShardCount;
+            this.shardNames = shardNames;
             this.datastoreType = datastoreType;
             this.shardManagerSnapshot = shardManagerSnapshot;
             this.replyToActor = replyToActor;
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/admin/ClusterAdminRpcService.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/admin/ClusterAdminRpcService.java
new file mode 100644 (file)
index 0000000..a386c35
--- /dev/null
@@ -0,0 +1,195 @@
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.admin;
+
+import akka.actor.ActorRef;
+import akka.dispatch.OnComplete;
+import akka.pattern.Patterns;
+import akka.util.Timeout;
+import com.google.common.base.Strings;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+import java.io.FileOutputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang3.SerializationUtils;
+import org.opendaylight.controller.cluster.datastore.DistributedDataStore;
+import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot;
+import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot;
+import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.RpcRegistration;
+import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddShardReplicaInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.BackupDatastoreInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ClusterAdminService;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ConvertMembersToNonvotingForAllShardsInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ConvertMembersToVotingForAllShardsInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveShardReplicaInput;
+import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Implements the yang RPCs defined in the generated ClusterAdminService interface.
+ *
+ * @author Thomas Pantelis
+ */
+public class ClusterAdminRpcService implements ClusterAdminService, AutoCloseable {
+    private static final Logger LOG = LoggerFactory.getLogger(ClusterAdminRpcService.class);
+
+    private final DistributedDataStore configDataStore;
+    private final DistributedDataStore operDataStore;
+    private RpcRegistration<ClusterAdminService> rpcRegistration;
+
+    public ClusterAdminRpcService(DistributedDataStore configDataStore, DistributedDataStore operDataStore) {
+        this.configDataStore = configDataStore;
+        this.operDataStore = operDataStore;
+    }
+
+    public void start(RpcProviderRegistry rpcProviderRegistry) {
+        LOG.debug("ClusterAdminRpcService starting");
+
+        rpcRegistration = rpcProviderRegistry.addRpcImplementation(ClusterAdminService.class, this);
+    }
+
+    @Override
+    public void close() {
+        if(rpcRegistration != null) {
+            rpcRegistration.close();
+        }
+    }
+
+    @Override
+    public Future<RpcResult<Void>> addShardReplica(AddShardReplicaInput input) {
+        // TODO implement
+        return RpcResultBuilder.<Void>failed().withError(ErrorType.APPLICATION, "operation-not-supported",
+                "Not implemented yet").buildFuture();
+    }
+
+    @Override
+    public Future<RpcResult<Void>> removeShardReplica(RemoveShardReplicaInput input) {
+        // TODO implement
+        return RpcResultBuilder.<Void>failed().withError(ErrorType.APPLICATION, "operation-not-supported",
+                "Not implemented yet").buildFuture();
+    }
+
+    @Override
+    public Future<RpcResult<Void>> addReplicasForAllShards() {
+        // TODO implement
+        return RpcResultBuilder.<Void>failed().withError(ErrorType.APPLICATION, "operation-not-supported",
+                "Not implemented yet").buildFuture();
+    }
+
+    @Override
+    public Future<RpcResult<Void>> removeAllShardReplicas() {
+        // TODO implement
+        return RpcResultBuilder.<Void>failed().withError(ErrorType.APPLICATION, "operation-not-supported",
+                "Not implemented yet").buildFuture();
+    }
+
+    @Override
+    public Future<RpcResult<Void>> convertMembersToVotingForAllShards(ConvertMembersToVotingForAllShardsInput input) {
+        // TODO implement
+        return RpcResultBuilder.<Void>failed().withError(ErrorType.APPLICATION, "operation-not-supported",
+                "Not implemented yet").buildFuture();
+    }
+
+    @Override
+    public Future<RpcResult<Void>> convertMembersToNonvotingForAllShards(
+            ConvertMembersToNonvotingForAllShardsInput input) {
+        // TODO implement
+        return RpcResultBuilder.<Void>failed().withError(ErrorType.APPLICATION, "operation-not-supported",
+                "Not implemented yet").buildFuture();
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public Future<RpcResult<Void>> backupDatastore(final BackupDatastoreInput input) {
+        LOG.debug("backupDatastore: {}", input);
+
+        if(Strings.isNullOrEmpty(input.getFilePath())) {
+            return newFailedRpcResultBuilder("A valid file path must be specified").buildFuture();
+        }
+
+        Timeout timeout = new Timeout(1, TimeUnit.MINUTES);
+        ListenableFuture<DatastoreSnapshot> configFuture = ask(configDataStore.getActorContext().getShardManager(),
+                GetSnapshot.INSTANCE, timeout);
+        ListenableFuture<DatastoreSnapshot> operFuture = ask(operDataStore.getActorContext().getShardManager(),
+                GetSnapshot.INSTANCE, timeout);
+
+        final SettableFuture<RpcResult<Void>> returnFuture = SettableFuture.create();
+        Futures.addCallback(Futures.allAsList(configFuture, operFuture), new FutureCallback<List<DatastoreSnapshot>>() {
+            @Override
+            public void onSuccess(List<DatastoreSnapshot> snapshots) {
+                saveSnapshotsToFile(new ArrayList<>(snapshots), input.getFilePath(), returnFuture);
+            }
+
+            @Override
+            public void onFailure(Throwable failure) {
+                onDatastoreBackupFilure(input.getFilePath(), returnFuture, failure);
+            }
+        });
+
+        return returnFuture;
+    }
+
+    private static void saveSnapshotsToFile(ArrayList<DatastoreSnapshot> snapshots, String fileName,
+            SettableFuture<RpcResult<Void>> returnFuture) {
+        try(FileOutputStream fos = new FileOutputStream(fileName)) {
+            SerializationUtils.serialize(snapshots, fos);
+
+            returnFuture.set(newSuccessfulResult());
+            LOG.info("Successfully backed up datastore to file {}", fileName);
+        } catch(Exception e) {
+            onDatastoreBackupFilure(fileName, returnFuture, e);
+        }
+    }
+
+    private static void onDatastoreBackupFilure(String fileName, final SettableFuture<RpcResult<Void>> returnFuture,
+            Throwable failure) {
+        String msg = String.format("Failed to back up datastore to file %s", fileName);
+        LOG.error(msg, failure);
+        returnFuture.set(newFailedRpcResultBuilder(msg, failure).build());
+    }
+
+    private <T> ListenableFuture<T> ask(ActorRef actor, Object message, Timeout timeout) {
+        final SettableFuture<T> returnFuture = SettableFuture.create();
+
+        @SuppressWarnings("unchecked")
+        scala.concurrent.Future<T> askFuture = (scala.concurrent.Future<T>) Patterns.ask(actor, message, timeout);
+        askFuture.onComplete(new OnComplete<T>() {
+            @Override
+            public void onComplete(Throwable failure, T resp) {
+                if(failure != null) {
+                    returnFuture.setException(failure);
+                } else {
+                    returnFuture.set(resp);
+                }
+            }
+        }, configDataStore.getActorContext().getClientDispatcher());
+
+        return returnFuture;
+    }
+
+    private static RpcResultBuilder<Void> newFailedRpcResultBuilder(String message) {
+        return newFailedRpcResultBuilder(message, null);
+    }
+
+    private static RpcResultBuilder<Void> newFailedRpcResultBuilder(String message, Throwable cause) {
+        return RpcResultBuilder.<Void>failed().withError(ErrorType.RPC, message, cause);
+    }
+
+    private static RpcResult<Void> newSuccessfulResult() {
+        return RpcResultBuilder.<Void>success().build();
+    }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/config/ClusterConfigRpcService.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/config/ClusterConfigRpcService.java
deleted file mode 100644 (file)
index 0e0c48c..0000000
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.datastore.config;
-
-import java.util.concurrent.Future;
-import org.opendaylight.controller.cluster.datastore.DistributedDataStore;
-import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.RpcRegistration;
-import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.config.rev151013.AddShardReplicaInput;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.config.rev151013.ClusterConfigService;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.config.rev151013.ConvertMembersToNonvotingForAllShardsInput;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.config.rev151013.ConvertMembersToVotingForAllShardsInput;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.config.rev151013.RemoveShardReplicaInput;
-import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
-import org.opendaylight.yangtools.yang.common.RpcResult;
-import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Implements the yang RPCs defined in the generated ClusterConfigService interface.
- *
- * @author Thomas Pantelis
- */
-public class ClusterConfigRpcService implements ClusterConfigService, AutoCloseable {
-    private static final Logger LOG = LoggerFactory.getLogger(ClusterConfigRpcService.class);
-
-    private final DistributedDataStore configDataStore;
-    private final DistributedDataStore operDataStore;
-    private RpcRegistration<ClusterConfigService> rpcRegistration;
-
-    public ClusterConfigRpcService(DistributedDataStore configDataStore, DistributedDataStore operDataStore) {
-        this.configDataStore = configDataStore;
-        this.operDataStore = operDataStore;
-    }
-
-    public void start(RpcProviderRegistry rpcProviderRegistry) {
-        LOG.debug("ClusterConfigRpcService starting");
-
-        rpcRegistration = rpcProviderRegistry.addRpcImplementation(ClusterConfigService.class, this);
-    }
-
-    @Override
-    public Future<RpcResult<Void>> addShardReplica(AddShardReplicaInput input) {
-        // TODO implement
-        return RpcResultBuilder.<Void>failed().withError(ErrorType.APPLICATION, "operation-not-supported",
-                "Not implemented yet").buildFuture();
-    }
-
-    @Override
-    public Future<RpcResult<Void>> removeShardReplica(RemoveShardReplicaInput input) {
-        // TODO implement
-        return RpcResultBuilder.<Void>failed().withError(ErrorType.APPLICATION, "operation-not-supported",
-                "Not implemented yet").buildFuture();
-    }
-
-    @Override
-    public Future<RpcResult<Void>> addReplicasForAllShards() {
-        // TODO implement
-        return RpcResultBuilder.<Void>failed().withError(ErrorType.APPLICATION, "operation-not-supported",
-                "Not implemented yet").buildFuture();
-    }
-
-    @Override
-    public Future<RpcResult<Void>> removeAllShardReplicas() {
-        // TODO implement
-        return RpcResultBuilder.<Void>failed().withError(ErrorType.APPLICATION, "operation-not-supported",
-                "Not implemented yet").buildFuture();
-    }
-
-    @Override
-    public Future<RpcResult<Void>> convertMembersToVotingForAllShards(ConvertMembersToVotingForAllShardsInput input) {
-        // TODO implement
-        return RpcResultBuilder.<Void>failed().withError(ErrorType.APPLICATION, "operation-not-supported",
-                "Not implemented yet").buildFuture();
-    }
-
-    @Override
-    public Future<RpcResult<Void>> convertMembersToNonvotingForAllShards(
-            ConvertMembersToNonvotingForAllShardsInput input) {
-        // TODO implement
-        return RpcResultBuilder.<Void>failed().withError(ErrorType.APPLICATION, "operation-not-supported",
-                "Not implemented yet").buildFuture();
-    }
-
-    @Override
-    public void close() {
-        if(rpcRegistration != null) {
-            rpcRegistration.close();
-        }
-    }
-}
@@ -1,18 +1,18 @@
-package org.opendaylight.controller.config.yang.config.cluster_config_provider;
+package org.opendaylight.controller.config.yang.config.cluster_admin_provider;
 
 import com.google.common.base.Preconditions;
 import org.opendaylight.controller.cluster.datastore.DistributedDataStore;
-import org.opendaylight.controller.cluster.datastore.config.ClusterConfigRpcService;
+import org.opendaylight.controller.cluster.datastore.admin.ClusterAdminRpcService;
 import org.opendaylight.controller.config.api.DependencyResolver;
 import org.opendaylight.controller.config.api.ModuleIdentifier;
 
-public class ClusterConfigProviderModule extends AbstractClusterConfigProviderModule {
-    public ClusterConfigProviderModule(ModuleIdentifier identifier, DependencyResolver dependencyResolver) {
+public class ClusterAdminProviderModule extends AbstractClusterAdminProviderModule {
+    public ClusterAdminProviderModule(ModuleIdentifier identifier, DependencyResolver dependencyResolver) {
         super(identifier, dependencyResolver);
     }
 
-    public ClusterConfigProviderModule(ModuleIdentifier identifier, DependencyResolver dependencyResolver,
-            ClusterConfigProviderModule oldModule, java.lang.AutoCloseable oldInstance) {
+    public ClusterAdminProviderModule(ModuleIdentifier identifier, DependencyResolver dependencyResolver,
+            ClusterAdminProviderModule oldModule, java.lang.AutoCloseable oldInstance) {
         super(identifier, dependencyResolver, oldModule, oldInstance);
     }
 
@@ -27,7 +27,7 @@ public class ClusterConfigProviderModule extends AbstractClusterConfigProviderMo
                 "Injected config DOMStore must be an instance of DistributedDataStore");
         Preconditions.checkArgument(getOperDataStoreDependency() instanceof DistributedDataStore,
                 "Injected operational DOMStore must be an instance of DistributedDataStore");
-        ClusterConfigRpcService service = new ClusterConfigRpcService((DistributedDataStore)getConfigDataStoreDependency(),
+        ClusterAdminRpcService service = new ClusterAdminRpcService((DistributedDataStore)getConfigDataStoreDependency(),
                 (DistributedDataStore)getOperDataStoreDependency());
         service.start(getRpcRegistryDependency());
         return service;
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/cluster_admin_provider/ClusterAdminProviderModuleFactory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/cluster_admin_provider/ClusterAdminProviderModuleFactory.java
new file mode 100644 (file)
index 0000000..ef90217
--- /dev/null
@@ -0,0 +1,14 @@
+/*
+* Generated file
+*
+* Generated from: yang module name: cluster-admin-provider yang module local name: cluster-admin-provider
+* Generated by: org.opendaylight.controller.config.yangjmxgenerator.plugin.JMXGenerator
+* Generated at: Tue Nov 03 02:34:10 EST 2015
+*
+* Do not modify this file unless it is present under src/main directory
+*/
+package org.opendaylight.controller.config.yang.config.cluster_admin_provider;
+
+public class ClusterAdminProviderModuleFactory extends AbstractClusterAdminProviderModuleFactory {
+
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/cluster_config_provider/ClusterConfigProviderModuleFactory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/cluster_config_provider/ClusterConfigProviderModuleFactory.java
deleted file mode 100644 (file)
index bf29af8..0000000
+++ /dev/null
@@ -1,13 +0,0 @@
-/*
-* Generated file
-*
-* Generated from: yang module name: cluster-config-provider yang module local name: cluster-config-provider
-* Generated by: org.opendaylight.controller.config.yangjmxgenerator.plugin.JMXGenerator
-* Generated at: Tue Oct 13 22:12:46 EDT 2015
-*
-* Do not modify this file unless it is present under src/main directory
-*/
-package org.opendaylight.controller.config.yang.config.cluster_config_provider;
-
-public class ClusterConfigProviderModuleFactory extends AbstractClusterConfigProviderModuleFactory {
-}
@@ -1,7 +1,7 @@
-module cluster-config-provider {
+module cluster-admin-provider {
     yang-version 1;
-    namespace "urn:opendaylight:params:xml:ns:yang:controller:config:cluster-config-provider";
-    prefix "cluster-config-provider";
+    namespace "urn:opendaylight:params:xml:ns:yang:controller:config:cluster-admin-provider";
+    prefix "cluster-admin-provider";
 
     import config { prefix config; revision-date 2013-04-05; }
     import opendaylight-operational-dom-datastore {prefix operational-dom-store-spi;}
@@ -9,22 +9,22 @@ module cluster-config-provider {
     import opendaylight-md-sal-binding { prefix mdsal; revision-date 2013-10-28; }
 
     description
-        "This module contains the configuration YANG definitions for the ClusterConfigRpcService implementation";
+        "This module contains the configuration YANG definitions for the ClusterAdminRpcService implementation";
 
     revision "2015-10-13" {
         description "Initial revision.";
     }
 
     // This is the definition of the provider implementation as a module identity.
-    identity cluster-config-provider {
+    identity cluster-admin-provider {
         base config:module-type;
-        config:java-name-prefix ClusterConfigProvider;
+        config:java-name-prefix ClusterAdminProvider;
     }
 
     //  Augments the 'configuration' choice node under modules/module.
     augment "/config:modules/config:module/config:configuration" {
-        case cluster-config-provider {
-            when "/config:modules/config:module/config:type = 'cluster-config-provider'";
+        case cluster-admin-provider {
+            when "/config:modules/config:module/config:type = 'cluster-admin-provider'";
                 container config-data-store {
                     uses config:service-ref {
                         refine type {
@@ -1,10 +1,10 @@
-module cluster-config {
+module cluster-admin {
     yang-version 1;
-    namespace "urn:opendaylight:params:xml:ns:yang:controller:md:sal:cluster:config";
-    prefix "clustering-config";
+    namespace "urn:opendaylight:params:xml:ns:yang:controller:md:sal:cluster:admin";
+    prefix "cluster-admin";
 
     description
-        "This module contains YANG RPC definitions for configuring a cluster.";
+        "This module contains YANG RPC definitions for administering a cluster.";
 
     revision "2015-10-13" {
         description "Initial revision.";
@@ -69,8 +69,19 @@ module cluster-config {
                 description "The names of the cluster members to convert.";
             }
         }
-        
+
         description "Converts the given cluster members to voting for all shards. The members will 
             participate in leader elections and consensus.";
     }
+
+    rpc backup-datastore {
+        input {
+            leaf file-path {
+              type string;
+              description "The path and name of the file in which to store the backup.";
+            }
+        }
+
+        description "Creates a backup file of the datastore state";
+    }
 }
\ No newline at end of file
index 9c07a61..67a6479 100644 (file)
@@ -154,7 +154,7 @@ public class IntegrationTestKit extends ShardTestKit {
         cohort.commit().get(5, TimeUnit.SECONDS);
     }
 
-    void cleanup(DistributedDataStore dataStore) {
+    public void cleanup(DistributedDataStore dataStore) {
         if(dataStore != null) {
             dataStore.getActorContext().getShardManager().tell(PoisonPill.getInstance(), null);
         }
index 3052397..4bb00d2 100644 (file)
@@ -13,6 +13,7 @@ import akka.actor.ActorRef;
 import akka.actor.Status.Failure;
 import akka.actor.Terminated;
 import akka.testkit.JavaTestKit;
+import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
@@ -44,7 +45,8 @@ public class ShardManagerGetSnapshotReplyActorTest extends AbstractActorTest {
         JavaTestKit kit = new JavaTestKit(getSystem());
 
         byte[] shardManagerSnapshot = new byte[]{0,5,9};
-        ActorRef replyActor = actorFactory.createActor(ShardManagerGetSnapshotReplyActor.props(3, "config",
+        ActorRef replyActor = actorFactory.createActor(ShardManagerGetSnapshotReplyActor.props(
+                Arrays.asList("shard1", "shard2", "shard3"), "config",
                 shardManagerSnapshot, kit.getRef(), "shard-manager", Duration.create(100, TimeUnit.SECONDS)),
                     actorFactory.generateActorId("actor"));
 
@@ -85,7 +87,8 @@ public class ShardManagerGetSnapshotReplyActorTest extends AbstractActorTest {
         JavaTestKit kit = new JavaTestKit(getSystem());
 
         byte[] shardManagerSnapshot = new byte[]{0,5,9};
-        ActorRef replyActor = actorFactory.createActor(ShardManagerGetSnapshotReplyActor.props(2, "config",
+        ActorRef replyActor = actorFactory.createActor(ShardManagerGetSnapshotReplyActor.props(
+                Arrays.asList("shard1", "shard2"), "config",
                 shardManagerSnapshot, kit.getRef(), "shard-manager", Duration.create(100, TimeUnit.SECONDS)),
                     actorFactory.generateActorId("actor"));
 
@@ -105,7 +108,8 @@ public class ShardManagerGetSnapshotReplyActorTest extends AbstractActorTest {
         JavaTestKit kit = new JavaTestKit(getSystem());
 
         byte[] shardManagerSnapshot = new byte[]{0,5,9};
-        ActorRef replyActor = actorFactory.createActor(ShardManagerGetSnapshotReplyActor.props(1, "config",
+        ActorRef replyActor = actorFactory.createActor(ShardManagerGetSnapshotReplyActor.props(
+                Arrays.asList("shard1"), "config",
                 shardManagerSnapshot, kit.getRef(), "shard-manager", Duration.create(100, TimeUnit.MILLISECONDS)),
                     actorFactory.generateActorId("actor"));
 
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/admin/ClusterAdminRpcServiceTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/admin/ClusterAdminRpcServiceTest.java
new file mode 100644 (file)
index 0000000..1f23f68
--- /dev/null
@@ -0,0 +1,170 @@
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.admin;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Address;
+import akka.actor.AddressFromURIString;
+import akka.actor.PoisonPill;
+import akka.cluster.Cluster;
+import akka.testkit.JavaTestKit;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Sets;
+import com.typesafe.config.ConfigFactory;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import org.apache.commons.lang3.SerializationUtils;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.datastore.DatastoreContext;
+import org.opendaylight.controller.cluster.datastore.DistributedDataStore;
+import org.opendaylight.controller.cluster.datastore.IntegrationTestKit;
+import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.BackupDatastoreInputBuilder;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+
+/**
+ * Unit tests for ClusterAdminRpcService.
+ *
+ * @author Thomas Pantelis
+ */
+public class ClusterAdminRpcServiceTest {
+    private static ActorSystem system;
+
+    private final DatastoreContext.Builder datastoreContextBuilder = DatastoreContext.newBuilder();
+
+    private IntegrationTestKit kit;
+    private DistributedDataStore configDataStore;
+    private DistributedDataStore operDataStore;
+    private ClusterAdminRpcService service;
+
+    @BeforeClass
+    public static void setUpClass() throws IOException {
+        system = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
+        Address member1Address = AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558");
+        Cluster.get(system).join(member1Address);
+    }
+
+    @AfterClass
+    public static void tearDownClass() throws IOException {
+        JavaTestKit.shutdownActorSystem(system);
+        system = null;
+    }
+
+    @After
+    public void tearDown() {
+        if(kit != null) {
+            kit.cleanup(configDataStore);
+            kit.cleanup(operDataStore);
+        }
+    }
+
+    private void setup(String testName, String... shardNames) {
+        kit = new IntegrationTestKit(system, datastoreContextBuilder);
+
+        configDataStore = kit.setupDistributedDataStore(testName + "Config", "module-shards-member1.conf",
+                true, shardNames);
+        operDataStore = kit.setupDistributedDataStore(testName + "Oper", "module-shards-member1.conf",
+                true, shardNames);
+
+        service = new ClusterAdminRpcService(configDataStore, operDataStore);
+    }
+
+    @Test
+    public void testBackupDatastore() throws Exception {
+        setup("testBackupDatastore", "cars", "people");
+
+        String fileName = "target/testBackupDatastore";
+        new File(fileName).delete();
+
+        RpcResult<Void> rpcResult = service.backupDatastore(new BackupDatastoreInputBuilder().
+                setFilePath(fileName).build()).get(5, TimeUnit.SECONDS);
+        assertEquals("isSuccessful", true, rpcResult.isSuccessful());
+
+        try(FileInputStream fis = new FileInputStream(fileName)) {
+            List<DatastoreSnapshot> snapshots = SerializationUtils.deserialize(fis);
+            assertEquals("DatastoreSnapshot size", 2, snapshots.size());
+
+            ImmutableMap<String, DatastoreSnapshot> map = ImmutableMap.of(snapshots.get(0).getType(), snapshots.get(0),
+                    snapshots.get(1).getType(), snapshots.get(1));
+            verifyDatastoreSnapshot(configDataStore.getActorContext().getDataStoreType(),
+                    map.get(configDataStore.getActorContext().getDataStoreType()), "cars", "people");
+        } finally {
+            new File(fileName).delete();
+        }
+
+        // Test failure by killing a shard.
+
+        configDataStore.getActorContext().getShardManager().tell(datastoreContextBuilder.
+                shardInitializationTimeout(200, TimeUnit.MILLISECONDS).build(), ActorRef.noSender());
+
+        ActorRef carsShardActor = configDataStore.getActorContext().findLocalShard("cars").get();
+        kit.watch(carsShardActor);
+        carsShardActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
+        kit.expectTerminated(carsShardActor);
+
+        rpcResult = service.backupDatastore(new BackupDatastoreInputBuilder().setFilePath(fileName).build()).
+                get(5, TimeUnit.SECONDS);
+        assertEquals("isSuccessful", false, rpcResult.isSuccessful());
+        assertEquals("getErrors", 1, rpcResult.getErrors().size());
+        assertTrue("Expected error cause TimeoutException",
+                rpcResult.getErrors().iterator().next().getCause() instanceof TimeoutException);
+    }
+
+    private void verifyDatastoreSnapshot(String type, DatastoreSnapshot datastoreSnapshot, String... expShardNames) {
+        assertNotNull("Missing DatastoreSnapshot for type " + type, datastoreSnapshot);
+        Set<String> shardNames = new HashSet<>();
+        for(DatastoreSnapshot.ShardSnapshot s: datastoreSnapshot.getShardSnapshots()) {
+            shardNames.add(s.getName());
+        }
+
+        assertEquals("DatastoreSnapshot shard names", Sets.newHashSet(expShardNames), shardNames);
+    }
+
+    @Test
+    public void testAddShardReplica() {
+        // TODO implement
+    }
+
+    @Test
+    public void testRemoveShardReplica() {
+        // TODO implement
+    }
+
+    @Test
+    public void testAddReplicasForAllShards() {
+        // TODO implement
+    }
+
+    @Test
+    public void testRemoveAllShardReplicas() {
+        // TODO implement
+    }
+
+    @Test
+    public void testConvertMembersToVotingForAllShards() {
+        // TODO implement
+    }
+
+    @Test
+    public void testConvertMembersToNonvotingForAllShards() {
+        // TODO implement
+    }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/resources/module-shards-member1.conf b/opendaylight/md-sal/sal-distributed-datastore/src/test/resources/module-shards-member1.conf
new file mode 100644 (file)
index 0000000..7dfbce5
--- /dev/null
@@ -0,0 +1,24 @@
+module-shards = [
+    {
+        name = "people"
+        shards = [
+            {
+                name="people"
+                replicas = [
+                    "member-1"
+                ]
+            }
+        ]
+    },
+    {
+        name = "cars"
+        shards = [
+            {
+                name="cars"
+                replicas = [
+                    "member-1"
+                ]
+            }
+        ]
+    }
+]
\ No newline at end of file

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.