From 92edfd0e7e15de0f3b8ad089c45ea91812fae867 Mon Sep 17 00:00:00 2001 From: Tom Pantelis Date: Tue, 3 Nov 2015 08:45:10 -0500 Subject: [PATCH] Bug 4564: Implement clustering backup-datastore RPC Added a new RPC backup-datastore to send the GetSnapshot message to the ShardManager's and persist the list of DatastoreSnapshots to a file. I also renamed the cluster-config yang module to cluster-admin to make it more general as the backup RPC isn't related to configuration. Change-Id: I18e5d47f7052b890c3547066145e4d5d0fbe1277 Signed-off-by: Tom Pantelis --- .../resources/initial/05-clustering.xml.conf | 8 +- .../cluster/datastore/ShardManager.java | 4 +- .../ShardManagerGetSnapshotReplyActor.java | 33 +-- .../admin/ClusterAdminRpcService.java | 195 ++++++++++++++++++ .../config/ClusterConfigRpcService.java | 97 --------- .../ClusterAdminProviderModule.java} | 14 +- .../ClusterAdminProviderModuleFactory.java | 14 ++ .../ClusterConfigProviderModuleFactory.java | 13 -- ...ervice.yang => cluster-admin-service.yang} | 16 +- ...cluster-config.yang => cluster-admin.yang} | 21 +- .../cluster/datastore/IntegrationTestKit.java | 2 +- ...ShardManagerGetSnapshotReplyActorTest.java | 10 +- .../admin/ClusterAdminRpcServiceTest.java | 170 +++++++++++++++ .../test/resources/module-shards-member1.conf | 24 +++ 14 files changed, 467 insertions(+), 154 deletions(-) create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/admin/ClusterAdminRpcService.java delete mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/config/ClusterConfigRpcService.java rename opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/{cluster_config_provider/ClusterConfigProviderModule.java => cluster_admin_provider/ClusterAdminProviderModule.java} (61%) create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/cluster_admin_provider/ClusterAdminProviderModuleFactory.java delete mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/cluster_config_provider/ClusterConfigProviderModuleFactory.java rename opendaylight/md-sal/sal-distributed-datastore/src/main/yang/{cluster-config-service.yang => cluster-admin-service.yang} (85%) rename opendaylight/md-sal/sal-distributed-datastore/src/main/yang/{cluster-config.yang => cluster-admin.yang} (86%) create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/admin/ClusterAdminRpcServiceTest.java create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/test/resources/module-shards-member1.conf diff --git a/opendaylight/md-sal/sal-clustering-config/src/main/resources/initial/05-clustering.xml.conf b/opendaylight/md-sal/sal-clustering-config/src/main/resources/initial/05-clustering.xml.conf index 3cb1dfe432..6a66efb54d 100644 --- a/opendaylight/md-sal/sal-clustering-config/src/main/resources/initial/05-clustering.xml.conf +++ b/opendaylight/md-sal/sal-clustering-config/src/main/resources/initial/05-clustering.xml.conf @@ -85,8 +85,8 @@ - prefix:cluster-config-provider - cluster-config-provider + prefix:cluster-admin-provider + cluster-admin-provider operational-dom-store-spi:operational-dom-datastore @@ -145,8 +145,8 @@ urn:opendaylight:params:xml:ns:yang:controller:config:concurrent-data-broker?module=odl-concurrent-data-broker-cfg&revision=2014-11-24 urn:opendaylight:params:xml:ns:yang:controller:config:actor-system-provider:service?module=actor-system-provider-service&revision=2015-10-05 urn:opendaylight:params:xml:ns:yang:controller:config:actor-system-provider:impl?module=actor-system-provider-impl&revision=2015-10-05 - urn:opendaylight:params:xml:ns:yang:controller:md:sal:cluster:config?module=cluster-config&revision=2015-10-13 - urn:opendaylight:params:xml:ns:yang:controller:config:cluster-config-provider?module=cluster-config-provider&revision=2015-10-13 + urn:opendaylight:params:xml:ns:yang:controller:md:sal:cluster:admin?module=cluster-admin&revision=2015-10-13 + urn:opendaylight:params:xml:ns:yang:controller:config:cluster-admin-provider?module=cluster-admin-provider&revision=2015-10-13 urn:opendaylight:params:xml:ns:yang:controller:config:distributed-datastore-provider?module=distributed-datastore-provider&revision=2014-06-12 urn:opendaylight:params:xml:ns:yang:controller:md:sal:core:spi:config-dom-store?module=opendaylight-config-dom-datastore&revision=2014-06-17 urn:opendaylight:params:xml:ns:yang:controller:md:sal:core:spi:operational-dom-store?module=opendaylight-operational-dom-datastore&revision=2014-06-17 diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java index 0804a50e9b..2f6bb464c6 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java @@ -241,8 +241,8 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { } byte[] shardManagerSnapshot = null; - ActorRef replyActor = getContext().actorOf(ShardManagerGetSnapshotReplyActor.props(localShards.size(), - type, shardManagerSnapshot , getSender(), persistenceId(), + ActorRef replyActor = getContext().actorOf(ShardManagerGetSnapshotReplyActor.props( + new ArrayList<>(localShards.keySet()), type, shardManagerSnapshot , getSender(), persistenceId(), datastoreContextFactory.getBaseDatastoreContext().getShardInitializationTimeout().duration())); for(ShardInformation shardInfo: localShards.values()) { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManagerGetSnapshotReplyActor.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManagerGetSnapshotReplyActor.java index f77b3cbd7b..d0b2dab91b 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManagerGetSnapshotReplyActor.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManagerGetSnapshotReplyActor.java @@ -14,7 +14,10 @@ import akka.actor.ReceiveTimeout; import akka.actor.Status.Failure; import akka.actor.UntypedActor; import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; import java.util.List; +import java.util.Set; import java.util.concurrent.TimeoutException; import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier; import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot; @@ -33,14 +36,15 @@ import scala.concurrent.duration.Duration; class ShardManagerGetSnapshotReplyActor extends UntypedActor { private static final Logger LOG = LoggerFactory.getLogger(ShardManagerGetSnapshotReplyActor.class); - private int repliesReceived; + private final Set remainingShardNames; private final Params params; private final List shardSnapshots = new ArrayList<>(); private ShardManagerGetSnapshotReplyActor(Params params) { this.params = params; + remainingShardNames = new HashSet<>(params.shardNames); - LOG.debug("{}: Expecting {} shard snapshot replies", params.id, params.totalShardCount); + LOG.debug("{}: Expecting {} shard snapshot replies", params.id, params.shardNames.size()); getContext().setReceiveTimeout(params.receiveTimeout); } @@ -55,12 +59,12 @@ class ShardManagerGetSnapshotReplyActor extends UntypedActor { params.replyToActor.tell(message, getSelf()); getSelf().tell(PoisonPill.getInstance(), getSelf()); } else if (message instanceof ReceiveTimeout) { - LOG.warn("{}: Got ReceiveTimeout for inactivity - expected {} GetSnapshotReply messages within {} ms, received {}", - params.id, params.totalShardCount, params.receiveTimeout.toMillis(), repliesReceived); - - params.replyToActor.tell(new Failure(new TimeoutException(String.format( - "Timed out after %s ms while waiting for snapshot replies from %d shards. Actual replies received was %s", - params.receiveTimeout.toMillis(), params.totalShardCount, repliesReceived))), getSelf()); + String msg = String.format( + "Timed out after %s ms while waiting for snapshot replies from %d shard(s). %d shard(s) %s did not respond.", + params.receiveTimeout.toMillis(), params.shardNames.size(), remainingShardNames.size(), + remainingShardNames); + LOG.warn("{}: {}", params.id, msg); + params.replyToActor.tell(new Failure(new TimeoutException(msg)), getSelf()); getSelf().tell(PoisonPill.getInstance(), getSelf()); } } @@ -71,7 +75,8 @@ class ShardManagerGetSnapshotReplyActor extends UntypedActor { ShardIdentifier shardId = ShardIdentifier.builder().fromShardIdString(getSnapshotReply.getId()).build(); shardSnapshots.add(new ShardSnapshot(shardId.getShardName(), getSnapshotReply.getSnapshot())); - if(++repliesReceived == params.totalShardCount) { + remainingShardNames.remove(shardId.getShardName()); + if(remainingShardNames.isEmpty()) { LOG.debug("{}: All shard snapshots received", params.id); DatastoreSnapshot datastoreSnapshot = new DatastoreSnapshot(params.datastoreType, params.shardManagerSnapshot, @@ -81,23 +86,23 @@ class ShardManagerGetSnapshotReplyActor extends UntypedActor { } } - public static Props props(int totalShardCount, String datastoreType, byte[] shardManagerSnapshot, + public static Props props(Collection shardNames, String datastoreType, byte[] shardManagerSnapshot, ActorRef replyToActor, String id, Duration receiveTimeout) { - return Props.create(ShardManagerGetSnapshotReplyActor.class, new Params(totalShardCount, datastoreType, + return Props.create(ShardManagerGetSnapshotReplyActor.class, new Params(shardNames, datastoreType, shardManagerSnapshot, replyToActor, id, receiveTimeout)); } private static final class Params { - final int totalShardCount; + final Collection shardNames; final String datastoreType; final byte[] shardManagerSnapshot; final ActorRef replyToActor; final String id; final Duration receiveTimeout; - Params(int totalShardCount, String datastoreType, byte[] shardManagerSnapshot, ActorRef replyToActor, + Params(Collection shardNames, String datastoreType, byte[] shardManagerSnapshot, ActorRef replyToActor, String id, Duration receiveTimeout) { - this.totalShardCount = totalShardCount; + this.shardNames = shardNames; this.datastoreType = datastoreType; this.shardManagerSnapshot = shardManagerSnapshot; this.replyToActor = replyToActor; diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/admin/ClusterAdminRpcService.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/admin/ClusterAdminRpcService.java new file mode 100644 index 0000000000..a386c350bb --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/admin/ClusterAdminRpcService.java @@ -0,0 +1,195 @@ +/* + * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore.admin; + +import akka.actor.ActorRef; +import akka.dispatch.OnComplete; +import akka.pattern.Patterns; +import akka.util.Timeout; +import com.google.common.base.Strings; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; +import java.io.FileOutputStream; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import org.apache.commons.lang3.SerializationUtils; +import org.opendaylight.controller.cluster.datastore.DistributedDataStore; +import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot; +import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot; +import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.RpcRegistration; +import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddShardReplicaInput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.BackupDatastoreInput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ClusterAdminService; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ConvertMembersToNonvotingForAllShardsInput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ConvertMembersToVotingForAllShardsInput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveShardReplicaInput; +import org.opendaylight.yangtools.yang.common.RpcError.ErrorType; +import org.opendaylight.yangtools.yang.common.RpcResult; +import org.opendaylight.yangtools.yang.common.RpcResultBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Implements the yang RPCs defined in the generated ClusterAdminService interface. + * + * @author Thomas Pantelis + */ +public class ClusterAdminRpcService implements ClusterAdminService, AutoCloseable { + private static final Logger LOG = LoggerFactory.getLogger(ClusterAdminRpcService.class); + + private final DistributedDataStore configDataStore; + private final DistributedDataStore operDataStore; + private RpcRegistration rpcRegistration; + + public ClusterAdminRpcService(DistributedDataStore configDataStore, DistributedDataStore operDataStore) { + this.configDataStore = configDataStore; + this.operDataStore = operDataStore; + } + + public void start(RpcProviderRegistry rpcProviderRegistry) { + LOG.debug("ClusterAdminRpcService starting"); + + rpcRegistration = rpcProviderRegistry.addRpcImplementation(ClusterAdminService.class, this); + } + + @Override + public void close() { + if(rpcRegistration != null) { + rpcRegistration.close(); + } + } + + @Override + public Future> addShardReplica(AddShardReplicaInput input) { + // TODO implement + return RpcResultBuilder.failed().withError(ErrorType.APPLICATION, "operation-not-supported", + "Not implemented yet").buildFuture(); + } + + @Override + public Future> removeShardReplica(RemoveShardReplicaInput input) { + // TODO implement + return RpcResultBuilder.failed().withError(ErrorType.APPLICATION, "operation-not-supported", + "Not implemented yet").buildFuture(); + } + + @Override + public Future> addReplicasForAllShards() { + // TODO implement + return RpcResultBuilder.failed().withError(ErrorType.APPLICATION, "operation-not-supported", + "Not implemented yet").buildFuture(); + } + + @Override + public Future> removeAllShardReplicas() { + // TODO implement + return RpcResultBuilder.failed().withError(ErrorType.APPLICATION, "operation-not-supported", + "Not implemented yet").buildFuture(); + } + + @Override + public Future> convertMembersToVotingForAllShards(ConvertMembersToVotingForAllShardsInput input) { + // TODO implement + return RpcResultBuilder.failed().withError(ErrorType.APPLICATION, "operation-not-supported", + "Not implemented yet").buildFuture(); + } + + @Override + public Future> convertMembersToNonvotingForAllShards( + ConvertMembersToNonvotingForAllShardsInput input) { + // TODO implement + return RpcResultBuilder.failed().withError(ErrorType.APPLICATION, "operation-not-supported", + "Not implemented yet").buildFuture(); + } + + @SuppressWarnings("unchecked") + @Override + public Future> backupDatastore(final BackupDatastoreInput input) { + LOG.debug("backupDatastore: {}", input); + + if(Strings.isNullOrEmpty(input.getFilePath())) { + return newFailedRpcResultBuilder("A valid file path must be specified").buildFuture(); + } + + Timeout timeout = new Timeout(1, TimeUnit.MINUTES); + ListenableFuture configFuture = ask(configDataStore.getActorContext().getShardManager(), + GetSnapshot.INSTANCE, timeout); + ListenableFuture operFuture = ask(operDataStore.getActorContext().getShardManager(), + GetSnapshot.INSTANCE, timeout); + + final SettableFuture> returnFuture = SettableFuture.create(); + Futures.addCallback(Futures.allAsList(configFuture, operFuture), new FutureCallback>() { + @Override + public void onSuccess(List snapshots) { + saveSnapshotsToFile(new ArrayList<>(snapshots), input.getFilePath(), returnFuture); + } + + @Override + public void onFailure(Throwable failure) { + onDatastoreBackupFilure(input.getFilePath(), returnFuture, failure); + } + }); + + return returnFuture; + } + + private static void saveSnapshotsToFile(ArrayList snapshots, String fileName, + SettableFuture> returnFuture) { + try(FileOutputStream fos = new FileOutputStream(fileName)) { + SerializationUtils.serialize(snapshots, fos); + + returnFuture.set(newSuccessfulResult()); + LOG.info("Successfully backed up datastore to file {}", fileName); + } catch(Exception e) { + onDatastoreBackupFilure(fileName, returnFuture, e); + } + } + + private static void onDatastoreBackupFilure(String fileName, final SettableFuture> returnFuture, + Throwable failure) { + String msg = String.format("Failed to back up datastore to file %s", fileName); + LOG.error(msg, failure); + returnFuture.set(newFailedRpcResultBuilder(msg, failure).build()); + } + + private ListenableFuture ask(ActorRef actor, Object message, Timeout timeout) { + final SettableFuture returnFuture = SettableFuture.create(); + + @SuppressWarnings("unchecked") + scala.concurrent.Future askFuture = (scala.concurrent.Future) Patterns.ask(actor, message, timeout); + askFuture.onComplete(new OnComplete() { + @Override + public void onComplete(Throwable failure, T resp) { + if(failure != null) { + returnFuture.setException(failure); + } else { + returnFuture.set(resp); + } + } + }, configDataStore.getActorContext().getClientDispatcher()); + + return returnFuture; + } + + private static RpcResultBuilder newFailedRpcResultBuilder(String message) { + return newFailedRpcResultBuilder(message, null); + } + + private static RpcResultBuilder newFailedRpcResultBuilder(String message, Throwable cause) { + return RpcResultBuilder.failed().withError(ErrorType.RPC, message, cause); + } + + private static RpcResult newSuccessfulResult() { + return RpcResultBuilder.success().build(); + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/config/ClusterConfigRpcService.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/config/ClusterConfigRpcService.java deleted file mode 100644 index 0e0c48c0fa..0000000000 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/config/ClusterConfigRpcService.java +++ /dev/null @@ -1,97 +0,0 @@ -/* - * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ -package org.opendaylight.controller.cluster.datastore.config; - -import java.util.concurrent.Future; -import org.opendaylight.controller.cluster.datastore.DistributedDataStore; -import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.RpcRegistration; -import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry; -import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.config.rev151013.AddShardReplicaInput; -import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.config.rev151013.ClusterConfigService; -import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.config.rev151013.ConvertMembersToNonvotingForAllShardsInput; -import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.config.rev151013.ConvertMembersToVotingForAllShardsInput; -import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.config.rev151013.RemoveShardReplicaInput; -import org.opendaylight.yangtools.yang.common.RpcError.ErrorType; -import org.opendaylight.yangtools.yang.common.RpcResult; -import org.opendaylight.yangtools.yang.common.RpcResultBuilder; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Implements the yang RPCs defined in the generated ClusterConfigService interface. - * - * @author Thomas Pantelis - */ -public class ClusterConfigRpcService implements ClusterConfigService, AutoCloseable { - private static final Logger LOG = LoggerFactory.getLogger(ClusterConfigRpcService.class); - - private final DistributedDataStore configDataStore; - private final DistributedDataStore operDataStore; - private RpcRegistration rpcRegistration; - - public ClusterConfigRpcService(DistributedDataStore configDataStore, DistributedDataStore operDataStore) { - this.configDataStore = configDataStore; - this.operDataStore = operDataStore; - } - - public void start(RpcProviderRegistry rpcProviderRegistry) { - LOG.debug("ClusterConfigRpcService starting"); - - rpcRegistration = rpcProviderRegistry.addRpcImplementation(ClusterConfigService.class, this); - } - - @Override - public Future> addShardReplica(AddShardReplicaInput input) { - // TODO implement - return RpcResultBuilder.failed().withError(ErrorType.APPLICATION, "operation-not-supported", - "Not implemented yet").buildFuture(); - } - - @Override - public Future> removeShardReplica(RemoveShardReplicaInput input) { - // TODO implement - return RpcResultBuilder.failed().withError(ErrorType.APPLICATION, "operation-not-supported", - "Not implemented yet").buildFuture(); - } - - @Override - public Future> addReplicasForAllShards() { - // TODO implement - return RpcResultBuilder.failed().withError(ErrorType.APPLICATION, "operation-not-supported", - "Not implemented yet").buildFuture(); - } - - @Override - public Future> removeAllShardReplicas() { - // TODO implement - return RpcResultBuilder.failed().withError(ErrorType.APPLICATION, "operation-not-supported", - "Not implemented yet").buildFuture(); - } - - @Override - public Future> convertMembersToVotingForAllShards(ConvertMembersToVotingForAllShardsInput input) { - // TODO implement - return RpcResultBuilder.failed().withError(ErrorType.APPLICATION, "operation-not-supported", - "Not implemented yet").buildFuture(); - } - - @Override - public Future> convertMembersToNonvotingForAllShards( - ConvertMembersToNonvotingForAllShardsInput input) { - // TODO implement - return RpcResultBuilder.failed().withError(ErrorType.APPLICATION, "operation-not-supported", - "Not implemented yet").buildFuture(); - } - - @Override - public void close() { - if(rpcRegistration != null) { - rpcRegistration.close(); - } - } -} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/cluster_config_provider/ClusterConfigProviderModule.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/cluster_admin_provider/ClusterAdminProviderModule.java similarity index 61% rename from opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/cluster_config_provider/ClusterConfigProviderModule.java rename to opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/cluster_admin_provider/ClusterAdminProviderModule.java index dea6a23636..b15f45557b 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/cluster_config_provider/ClusterConfigProviderModule.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/cluster_admin_provider/ClusterAdminProviderModule.java @@ -1,18 +1,18 @@ -package org.opendaylight.controller.config.yang.config.cluster_config_provider; +package org.opendaylight.controller.config.yang.config.cluster_admin_provider; import com.google.common.base.Preconditions; import org.opendaylight.controller.cluster.datastore.DistributedDataStore; -import org.opendaylight.controller.cluster.datastore.config.ClusterConfigRpcService; +import org.opendaylight.controller.cluster.datastore.admin.ClusterAdminRpcService; import org.opendaylight.controller.config.api.DependencyResolver; import org.opendaylight.controller.config.api.ModuleIdentifier; -public class ClusterConfigProviderModule extends AbstractClusterConfigProviderModule { - public ClusterConfigProviderModule(ModuleIdentifier identifier, DependencyResolver dependencyResolver) { +public class ClusterAdminProviderModule extends AbstractClusterAdminProviderModule { + public ClusterAdminProviderModule(ModuleIdentifier identifier, DependencyResolver dependencyResolver) { super(identifier, dependencyResolver); } - public ClusterConfigProviderModule(ModuleIdentifier identifier, DependencyResolver dependencyResolver, - ClusterConfigProviderModule oldModule, java.lang.AutoCloseable oldInstance) { + public ClusterAdminProviderModule(ModuleIdentifier identifier, DependencyResolver dependencyResolver, + ClusterAdminProviderModule oldModule, java.lang.AutoCloseable oldInstance) { super(identifier, dependencyResolver, oldModule, oldInstance); } @@ -27,7 +27,7 @@ public class ClusterConfigProviderModule extends AbstractClusterConfigProviderMo "Injected config DOMStore must be an instance of DistributedDataStore"); Preconditions.checkArgument(getOperDataStoreDependency() instanceof DistributedDataStore, "Injected operational DOMStore must be an instance of DistributedDataStore"); - ClusterConfigRpcService service = new ClusterConfigRpcService((DistributedDataStore)getConfigDataStoreDependency(), + ClusterAdminRpcService service = new ClusterAdminRpcService((DistributedDataStore)getConfigDataStoreDependency(), (DistributedDataStore)getOperDataStoreDependency()); service.start(getRpcRegistryDependency()); return service; diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/cluster_admin_provider/ClusterAdminProviderModuleFactory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/cluster_admin_provider/ClusterAdminProviderModuleFactory.java new file mode 100644 index 0000000000..ef90217f89 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/cluster_admin_provider/ClusterAdminProviderModuleFactory.java @@ -0,0 +1,14 @@ +/* +* Generated file +* +* Generated from: yang module name: cluster-admin-provider yang module local name: cluster-admin-provider +* Generated by: org.opendaylight.controller.config.yangjmxgenerator.plugin.JMXGenerator +* Generated at: Tue Nov 03 02:34:10 EST 2015 +* +* Do not modify this file unless it is present under src/main directory +*/ +package org.opendaylight.controller.config.yang.config.cluster_admin_provider; + +public class ClusterAdminProviderModuleFactory extends AbstractClusterAdminProviderModuleFactory { + +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/cluster_config_provider/ClusterConfigProviderModuleFactory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/cluster_config_provider/ClusterConfigProviderModuleFactory.java deleted file mode 100644 index bf29af8642..0000000000 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/cluster_config_provider/ClusterConfigProviderModuleFactory.java +++ /dev/null @@ -1,13 +0,0 @@ -/* -* Generated file -* -* Generated from: yang module name: cluster-config-provider yang module local name: cluster-config-provider -* Generated by: org.opendaylight.controller.config.yangjmxgenerator.plugin.JMXGenerator -* Generated at: Tue Oct 13 22:12:46 EDT 2015 -* -* Do not modify this file unless it is present under src/main directory -*/ -package org.opendaylight.controller.config.yang.config.cluster_config_provider; - -public class ClusterConfigProviderModuleFactory extends AbstractClusterConfigProviderModuleFactory { -} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/yang/cluster-config-service.yang b/opendaylight/md-sal/sal-distributed-datastore/src/main/yang/cluster-admin-service.yang similarity index 85% rename from opendaylight/md-sal/sal-distributed-datastore/src/main/yang/cluster-config-service.yang rename to opendaylight/md-sal/sal-distributed-datastore/src/main/yang/cluster-admin-service.yang index 478907a137..15cb54361d 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/yang/cluster-config-service.yang +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/yang/cluster-admin-service.yang @@ -1,7 +1,7 @@ -module cluster-config-provider { +module cluster-admin-provider { yang-version 1; - namespace "urn:opendaylight:params:xml:ns:yang:controller:config:cluster-config-provider"; - prefix "cluster-config-provider"; + namespace "urn:opendaylight:params:xml:ns:yang:controller:config:cluster-admin-provider"; + prefix "cluster-admin-provider"; import config { prefix config; revision-date 2013-04-05; } import opendaylight-operational-dom-datastore {prefix operational-dom-store-spi;} @@ -9,22 +9,22 @@ module cluster-config-provider { import opendaylight-md-sal-binding { prefix mdsal; revision-date 2013-10-28; } description - "This module contains the configuration YANG definitions for the ClusterConfigRpcService implementation"; + "This module contains the configuration YANG definitions for the ClusterAdminRpcService implementation"; revision "2015-10-13" { description "Initial revision."; } // This is the definition of the provider implementation as a module identity. - identity cluster-config-provider { + identity cluster-admin-provider { base config:module-type; - config:java-name-prefix ClusterConfigProvider; + config:java-name-prefix ClusterAdminProvider; } // Augments the 'configuration' choice node under modules/module. augment "/config:modules/config:module/config:configuration" { - case cluster-config-provider { - when "/config:modules/config:module/config:type = 'cluster-config-provider'"; + case cluster-admin-provider { + when "/config:modules/config:module/config:type = 'cluster-admin-provider'"; container config-data-store { uses config:service-ref { refine type { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/yang/cluster-config.yang b/opendaylight/md-sal/sal-distributed-datastore/src/main/yang/cluster-admin.yang similarity index 86% rename from opendaylight/md-sal/sal-distributed-datastore/src/main/yang/cluster-config.yang rename to opendaylight/md-sal/sal-distributed-datastore/src/main/yang/cluster-admin.yang index a13c13ac5c..51aeb8cfb2 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/yang/cluster-config.yang +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/yang/cluster-admin.yang @@ -1,10 +1,10 @@ -module cluster-config { +module cluster-admin { yang-version 1; - namespace "urn:opendaylight:params:xml:ns:yang:controller:md:sal:cluster:config"; - prefix "clustering-config"; + namespace "urn:opendaylight:params:xml:ns:yang:controller:md:sal:cluster:admin"; + prefix "cluster-admin"; description - "This module contains YANG RPC definitions for configuring a cluster."; + "This module contains YANG RPC definitions for administering a cluster."; revision "2015-10-13" { description "Initial revision."; @@ -69,8 +69,19 @@ module cluster-config { description "The names of the cluster members to convert."; } } - + description "Converts the given cluster members to voting for all shards. The members will participate in leader elections and consensus."; } + + rpc backup-datastore { + input { + leaf file-path { + type string; + description "The path and name of the file in which to store the backup."; + } + } + + description "Creates a backup file of the datastore state"; + } } \ No newline at end of file diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/IntegrationTestKit.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/IntegrationTestKit.java index 9c07a61e60..67a6479e08 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/IntegrationTestKit.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/IntegrationTestKit.java @@ -154,7 +154,7 @@ public class IntegrationTestKit extends ShardTestKit { cohort.commit().get(5, TimeUnit.SECONDS); } - void cleanup(DistributedDataStore dataStore) { + public void cleanup(DistributedDataStore dataStore) { if(dataStore != null) { dataStore.getActorContext().getShardManager().tell(PoisonPill.getInstance(), null); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerGetSnapshotReplyActorTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerGetSnapshotReplyActorTest.java index 3052397c4f..4bb00d2629 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerGetSnapshotReplyActorTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerGetSnapshotReplyActorTest.java @@ -13,6 +13,7 @@ import akka.actor.ActorRef; import akka.actor.Status.Failure; import akka.actor.Terminated; import akka.testkit.JavaTestKit; +import java.util.Arrays; import java.util.List; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -44,7 +45,8 @@ public class ShardManagerGetSnapshotReplyActorTest extends AbstractActorTest { JavaTestKit kit = new JavaTestKit(getSystem()); byte[] shardManagerSnapshot = new byte[]{0,5,9}; - ActorRef replyActor = actorFactory.createActor(ShardManagerGetSnapshotReplyActor.props(3, "config", + ActorRef replyActor = actorFactory.createActor(ShardManagerGetSnapshotReplyActor.props( + Arrays.asList("shard1", "shard2", "shard3"), "config", shardManagerSnapshot, kit.getRef(), "shard-manager", Duration.create(100, TimeUnit.SECONDS)), actorFactory.generateActorId("actor")); @@ -85,7 +87,8 @@ public class ShardManagerGetSnapshotReplyActorTest extends AbstractActorTest { JavaTestKit kit = new JavaTestKit(getSystem()); byte[] shardManagerSnapshot = new byte[]{0,5,9}; - ActorRef replyActor = actorFactory.createActor(ShardManagerGetSnapshotReplyActor.props(2, "config", + ActorRef replyActor = actorFactory.createActor(ShardManagerGetSnapshotReplyActor.props( + Arrays.asList("shard1", "shard2"), "config", shardManagerSnapshot, kit.getRef(), "shard-manager", Duration.create(100, TimeUnit.SECONDS)), actorFactory.generateActorId("actor")); @@ -105,7 +108,8 @@ public class ShardManagerGetSnapshotReplyActorTest extends AbstractActorTest { JavaTestKit kit = new JavaTestKit(getSystem()); byte[] shardManagerSnapshot = new byte[]{0,5,9}; - ActorRef replyActor = actorFactory.createActor(ShardManagerGetSnapshotReplyActor.props(1, "config", + ActorRef replyActor = actorFactory.createActor(ShardManagerGetSnapshotReplyActor.props( + Arrays.asList("shard1"), "config", shardManagerSnapshot, kit.getRef(), "shard-manager", Duration.create(100, TimeUnit.MILLISECONDS)), actorFactory.generateActorId("actor")); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/admin/ClusterAdminRpcServiceTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/admin/ClusterAdminRpcServiceTest.java new file mode 100644 index 0000000000..1f23f68a84 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/admin/ClusterAdminRpcServiceTest.java @@ -0,0 +1,170 @@ +/* + * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore.admin; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.actor.Address; +import akka.actor.AddressFromURIString; +import akka.actor.PoisonPill; +import akka.cluster.Cluster; +import akka.testkit.JavaTestKit; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Sets; +import com.typesafe.config.ConfigFactory; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import org.apache.commons.lang3.SerializationUtils; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.opendaylight.controller.cluster.datastore.DatastoreContext; +import org.opendaylight.controller.cluster.datastore.DistributedDataStore; +import org.opendaylight.controller.cluster.datastore.IntegrationTestKit; +import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.BackupDatastoreInputBuilder; +import org.opendaylight.yangtools.yang.common.RpcResult; + +/** + * Unit tests for ClusterAdminRpcService. + * + * @author Thomas Pantelis + */ +public class ClusterAdminRpcServiceTest { + private static ActorSystem system; + + private final DatastoreContext.Builder datastoreContextBuilder = DatastoreContext.newBuilder(); + + private IntegrationTestKit kit; + private DistributedDataStore configDataStore; + private DistributedDataStore operDataStore; + private ClusterAdminRpcService service; + + @BeforeClass + public static void setUpClass() throws IOException { + system = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1")); + Address member1Address = AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"); + Cluster.get(system).join(member1Address); + } + + @AfterClass + public static void tearDownClass() throws IOException { + JavaTestKit.shutdownActorSystem(system); + system = null; + } + + @After + public void tearDown() { + if(kit != null) { + kit.cleanup(configDataStore); + kit.cleanup(operDataStore); + } + } + + private void setup(String testName, String... shardNames) { + kit = new IntegrationTestKit(system, datastoreContextBuilder); + + configDataStore = kit.setupDistributedDataStore(testName + "Config", "module-shards-member1.conf", + true, shardNames); + operDataStore = kit.setupDistributedDataStore(testName + "Oper", "module-shards-member1.conf", + true, shardNames); + + service = new ClusterAdminRpcService(configDataStore, operDataStore); + } + + @Test + public void testBackupDatastore() throws Exception { + setup("testBackupDatastore", "cars", "people"); + + String fileName = "target/testBackupDatastore"; + new File(fileName).delete(); + + RpcResult rpcResult = service.backupDatastore(new BackupDatastoreInputBuilder(). + setFilePath(fileName).build()).get(5, TimeUnit.SECONDS); + assertEquals("isSuccessful", true, rpcResult.isSuccessful()); + + try(FileInputStream fis = new FileInputStream(fileName)) { + List snapshots = SerializationUtils.deserialize(fis); + assertEquals("DatastoreSnapshot size", 2, snapshots.size()); + + ImmutableMap map = ImmutableMap.of(snapshots.get(0).getType(), snapshots.get(0), + snapshots.get(1).getType(), snapshots.get(1)); + verifyDatastoreSnapshot(configDataStore.getActorContext().getDataStoreType(), + map.get(configDataStore.getActorContext().getDataStoreType()), "cars", "people"); + } finally { + new File(fileName).delete(); + } + + // Test failure by killing a shard. + + configDataStore.getActorContext().getShardManager().tell(datastoreContextBuilder. + shardInitializationTimeout(200, TimeUnit.MILLISECONDS).build(), ActorRef.noSender()); + + ActorRef carsShardActor = configDataStore.getActorContext().findLocalShard("cars").get(); + kit.watch(carsShardActor); + carsShardActor.tell(PoisonPill.getInstance(), ActorRef.noSender()); + kit.expectTerminated(carsShardActor); + + rpcResult = service.backupDatastore(new BackupDatastoreInputBuilder().setFilePath(fileName).build()). + get(5, TimeUnit.SECONDS); + assertEquals("isSuccessful", false, rpcResult.isSuccessful()); + assertEquals("getErrors", 1, rpcResult.getErrors().size()); + assertTrue("Expected error cause TimeoutException", + rpcResult.getErrors().iterator().next().getCause() instanceof TimeoutException); + } + + private void verifyDatastoreSnapshot(String type, DatastoreSnapshot datastoreSnapshot, String... expShardNames) { + assertNotNull("Missing DatastoreSnapshot for type " + type, datastoreSnapshot); + Set shardNames = new HashSet<>(); + for(DatastoreSnapshot.ShardSnapshot s: datastoreSnapshot.getShardSnapshots()) { + shardNames.add(s.getName()); + } + + assertEquals("DatastoreSnapshot shard names", Sets.newHashSet(expShardNames), shardNames); + } + + @Test + public void testAddShardReplica() { + // TODO implement + } + + @Test + public void testRemoveShardReplica() { + // TODO implement + } + + @Test + public void testAddReplicasForAllShards() { + // TODO implement + } + + @Test + public void testRemoveAllShardReplicas() { + // TODO implement + } + + @Test + public void testConvertMembersToVotingForAllShards() { + // TODO implement + } + + @Test + public void testConvertMembersToNonvotingForAllShards() { + // TODO implement + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/resources/module-shards-member1.conf b/opendaylight/md-sal/sal-distributed-datastore/src/test/resources/module-shards-member1.conf new file mode 100644 index 0000000000..7dfbce5f1a --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/resources/module-shards-member1.conf @@ -0,0 +1,24 @@ +module-shards = [ + { + name = "people" + shards = [ + { + name="people" + replicas = [ + "member-1" + ] + } + ] + }, + { + name = "cars" + shards = [ + { + name="cars" + replicas = [ + "member-1" + ] + } + ] + } +] \ No newline at end of file -- 2.36.6