</module>
<module>
- <type xmlns:prefix="urn:opendaylight:params:xml:ns:yang:controller:config:cluster-config-provider">prefix:cluster-config-provider</type>
- <name>cluster-config-provider</name>
+ <type xmlns:prefix="urn:opendaylight:params:xml:ns:yang:controller:config:cluster-admin-provider">prefix:cluster-admin-provider</type>
+ <name>cluster-admin-provider</name>
<oper-data-store>
<type xmlns:operational-dom-store-spi="urn:opendaylight:params:xml:ns:yang:controller:md:sal:core:spi:operational-dom-store">operational-dom-store-spi:operational-dom-datastore</type>
<capability>urn:opendaylight:params:xml:ns:yang:controller:config:concurrent-data-broker?module=odl-concurrent-data-broker-cfg&revision=2014-11-24</capability>
<capability>urn:opendaylight:params:xml:ns:yang:controller:config:actor-system-provider:service?module=actor-system-provider-service&revision=2015-10-05</capability>
<capability>urn:opendaylight:params:xml:ns:yang:controller:config:actor-system-provider:impl?module=actor-system-provider-impl&revision=2015-10-05</capability>
- <capability>urn:opendaylight:params:xml:ns:yang:controller:md:sal:cluster:config?module=cluster-config&revision=2015-10-13</capability>
- <capability>urn:opendaylight:params:xml:ns:yang:controller:config:cluster-config-provider?module=cluster-config-provider&revision=2015-10-13</capability>
+ <capability>urn:opendaylight:params:xml:ns:yang:controller:md:sal:cluster:admin?module=cluster-admin&revision=2015-10-13</capability>
+ <capability>urn:opendaylight:params:xml:ns:yang:controller:config:cluster-admin-provider?module=cluster-admin-provider&revision=2015-10-13</capability>
<capability>urn:opendaylight:params:xml:ns:yang:controller:config:distributed-datastore-provider?module=distributed-datastore-provider&revision=2014-06-12</capability>
<capability>urn:opendaylight:params:xml:ns:yang:controller:md:sal:core:spi:config-dom-store?module=opendaylight-config-dom-datastore&revision=2014-06-17</capability>
<capability>urn:opendaylight:params:xml:ns:yang:controller:md:sal:core:spi:operational-dom-store?module=opendaylight-operational-dom-datastore&revision=2014-06-17</capability>
}
byte[] shardManagerSnapshot = null;
- ActorRef replyActor = getContext().actorOf(ShardManagerGetSnapshotReplyActor.props(localShards.size(),
- type, shardManagerSnapshot , getSender(), persistenceId(),
+ ActorRef replyActor = getContext().actorOf(ShardManagerGetSnapshotReplyActor.props(
+ new ArrayList<>(localShards.keySet()), type, shardManagerSnapshot , getSender(), persistenceId(),
datastoreContextFactory.getBaseDatastoreContext().getShardInitializationTimeout().duration()));
for(ShardInformation shardInfo: localShards.values()) {
import akka.actor.Status.Failure;
import akka.actor.UntypedActor;
import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
import java.util.List;
+import java.util.Set;
import java.util.concurrent.TimeoutException;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot;
class ShardManagerGetSnapshotReplyActor extends UntypedActor {
private static final Logger LOG = LoggerFactory.getLogger(ShardManagerGetSnapshotReplyActor.class);
- private int repliesReceived;
+ private final Set<String> remainingShardNames;
private final Params params;
private final List<ShardSnapshot> shardSnapshots = new ArrayList<>();
private ShardManagerGetSnapshotReplyActor(Params params) {
this.params = params;
+ remainingShardNames = new HashSet<>(params.shardNames);
- LOG.debug("{}: Expecting {} shard snapshot replies", params.id, params.totalShardCount);
+ LOG.debug("{}: Expecting {} shard snapshot replies", params.id, params.shardNames.size());
getContext().setReceiveTimeout(params.receiveTimeout);
}
params.replyToActor.tell(message, getSelf());
getSelf().tell(PoisonPill.getInstance(), getSelf());
} else if (message instanceof ReceiveTimeout) {
- LOG.warn("{}: Got ReceiveTimeout for inactivity - expected {} GetSnapshotReply messages within {} ms, received {}",
- params.id, params.totalShardCount, params.receiveTimeout.toMillis(), repliesReceived);
-
- params.replyToActor.tell(new Failure(new TimeoutException(String.format(
- "Timed out after %s ms while waiting for snapshot replies from %d shards. Actual replies received was %s",
- params.receiveTimeout.toMillis(), params.totalShardCount, repliesReceived))), getSelf());
+ String msg = String.format(
+ "Timed out after %s ms while waiting for snapshot replies from %d shard(s). %d shard(s) %s did not respond.",
+ params.receiveTimeout.toMillis(), params.shardNames.size(), remainingShardNames.size(),
+ remainingShardNames);
+ LOG.warn("{}: {}", params.id, msg);
+ params.replyToActor.tell(new Failure(new TimeoutException(msg)), getSelf());
getSelf().tell(PoisonPill.getInstance(), getSelf());
}
}
ShardIdentifier shardId = ShardIdentifier.builder().fromShardIdString(getSnapshotReply.getId()).build();
shardSnapshots.add(new ShardSnapshot(shardId.getShardName(), getSnapshotReply.getSnapshot()));
- if(++repliesReceived == params.totalShardCount) {
+ remainingShardNames.remove(shardId.getShardName());
+ if(remainingShardNames.isEmpty()) {
LOG.debug("{}: All shard snapshots received", params.id);
DatastoreSnapshot datastoreSnapshot = new DatastoreSnapshot(params.datastoreType, params.shardManagerSnapshot,
}
}
- public static Props props(int totalShardCount, String datastoreType, byte[] shardManagerSnapshot,
+ public static Props props(Collection<String> shardNames, String datastoreType, byte[] shardManagerSnapshot,
ActorRef replyToActor, String id, Duration receiveTimeout) {
- return Props.create(ShardManagerGetSnapshotReplyActor.class, new Params(totalShardCount, datastoreType,
+ return Props.create(ShardManagerGetSnapshotReplyActor.class, new Params(shardNames, datastoreType,
shardManagerSnapshot, replyToActor, id, receiveTimeout));
}
private static final class Params {
- final int totalShardCount;
+ final Collection<String> shardNames;
final String datastoreType;
final byte[] shardManagerSnapshot;
final ActorRef replyToActor;
final String id;
final Duration receiveTimeout;
- Params(int totalShardCount, String datastoreType, byte[] shardManagerSnapshot, ActorRef replyToActor,
+ Params(Collection<String> shardNames, String datastoreType, byte[] shardManagerSnapshot, ActorRef replyToActor,
String id, Duration receiveTimeout) {
- this.totalShardCount = totalShardCount;
+ this.shardNames = shardNames;
this.datastoreType = datastoreType;
this.shardManagerSnapshot = shardManagerSnapshot;
this.replyToActor = replyToActor;
--- /dev/null
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.admin;
+
+import akka.actor.ActorRef;
+import akka.dispatch.OnComplete;
+import akka.pattern.Patterns;
+import akka.util.Timeout;
+import com.google.common.base.Strings;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+import java.io.FileOutputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang3.SerializationUtils;
+import org.opendaylight.controller.cluster.datastore.DistributedDataStore;
+import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot;
+import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot;
+import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.RpcRegistration;
+import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddShardReplicaInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.BackupDatastoreInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ClusterAdminService;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ConvertMembersToNonvotingForAllShardsInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.ConvertMembersToVotingForAllShardsInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveShardReplicaInput;
+import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Implements the yang RPCs defined in the generated ClusterAdminService interface.
+ *
+ * @author Thomas Pantelis
+ */
+public class ClusterAdminRpcService implements ClusterAdminService, AutoCloseable {
+ private static final Logger LOG = LoggerFactory.getLogger(ClusterAdminRpcService.class);
+
+ private final DistributedDataStore configDataStore;
+ private final DistributedDataStore operDataStore;
+ private RpcRegistration<ClusterAdminService> rpcRegistration;
+
+ public ClusterAdminRpcService(DistributedDataStore configDataStore, DistributedDataStore operDataStore) {
+ this.configDataStore = configDataStore;
+ this.operDataStore = operDataStore;
+ }
+
+ public void start(RpcProviderRegistry rpcProviderRegistry) {
+ LOG.debug("ClusterAdminRpcService starting");
+
+ rpcRegistration = rpcProviderRegistry.addRpcImplementation(ClusterAdminService.class, this);
+ }
+
+ @Override
+ public void close() {
+ if(rpcRegistration != null) {
+ rpcRegistration.close();
+ }
+ }
+
+ @Override
+ public Future<RpcResult<Void>> addShardReplica(AddShardReplicaInput input) {
+ // TODO implement
+ return RpcResultBuilder.<Void>failed().withError(ErrorType.APPLICATION, "operation-not-supported",
+ "Not implemented yet").buildFuture();
+ }
+
+ @Override
+ public Future<RpcResult<Void>> removeShardReplica(RemoveShardReplicaInput input) {
+ // TODO implement
+ return RpcResultBuilder.<Void>failed().withError(ErrorType.APPLICATION, "operation-not-supported",
+ "Not implemented yet").buildFuture();
+ }
+
+ @Override
+ public Future<RpcResult<Void>> addReplicasForAllShards() {
+ // TODO implement
+ return RpcResultBuilder.<Void>failed().withError(ErrorType.APPLICATION, "operation-not-supported",
+ "Not implemented yet").buildFuture();
+ }
+
+ @Override
+ public Future<RpcResult<Void>> removeAllShardReplicas() {
+ // TODO implement
+ return RpcResultBuilder.<Void>failed().withError(ErrorType.APPLICATION, "operation-not-supported",
+ "Not implemented yet").buildFuture();
+ }
+
+ @Override
+ public Future<RpcResult<Void>> convertMembersToVotingForAllShards(ConvertMembersToVotingForAllShardsInput input) {
+ // TODO implement
+ return RpcResultBuilder.<Void>failed().withError(ErrorType.APPLICATION, "operation-not-supported",
+ "Not implemented yet").buildFuture();
+ }
+
+ @Override
+ public Future<RpcResult<Void>> convertMembersToNonvotingForAllShards(
+ ConvertMembersToNonvotingForAllShardsInput input) {
+ // TODO implement
+ return RpcResultBuilder.<Void>failed().withError(ErrorType.APPLICATION, "operation-not-supported",
+ "Not implemented yet").buildFuture();
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public Future<RpcResult<Void>> backupDatastore(final BackupDatastoreInput input) {
+ LOG.debug("backupDatastore: {}", input);
+
+ if(Strings.isNullOrEmpty(input.getFilePath())) {
+ return newFailedRpcResultBuilder("A valid file path must be specified").buildFuture();
+ }
+
+ Timeout timeout = new Timeout(1, TimeUnit.MINUTES);
+ ListenableFuture<DatastoreSnapshot> configFuture = ask(configDataStore.getActorContext().getShardManager(),
+ GetSnapshot.INSTANCE, timeout);
+ ListenableFuture<DatastoreSnapshot> operFuture = ask(operDataStore.getActorContext().getShardManager(),
+ GetSnapshot.INSTANCE, timeout);
+
+ final SettableFuture<RpcResult<Void>> returnFuture = SettableFuture.create();
+ Futures.addCallback(Futures.allAsList(configFuture, operFuture), new FutureCallback<List<DatastoreSnapshot>>() {
+ @Override
+ public void onSuccess(List<DatastoreSnapshot> snapshots) {
+ saveSnapshotsToFile(new ArrayList<>(snapshots), input.getFilePath(), returnFuture);
+ }
+
+ @Override
+ public void onFailure(Throwable failure) {
+ onDatastoreBackupFilure(input.getFilePath(), returnFuture, failure);
+ }
+ });
+
+ return returnFuture;
+ }
+
+ private static void saveSnapshotsToFile(ArrayList<DatastoreSnapshot> snapshots, String fileName,
+ SettableFuture<RpcResult<Void>> returnFuture) {
+ try(FileOutputStream fos = new FileOutputStream(fileName)) {
+ SerializationUtils.serialize(snapshots, fos);
+
+ returnFuture.set(newSuccessfulResult());
+ LOG.info("Successfully backed up datastore to file {}", fileName);
+ } catch(Exception e) {
+ onDatastoreBackupFilure(fileName, returnFuture, e);
+ }
+ }
+
+ private static void onDatastoreBackupFilure(String fileName, final SettableFuture<RpcResult<Void>> returnFuture,
+ Throwable failure) {
+ String msg = String.format("Failed to back up datastore to file %s", fileName);
+ LOG.error(msg, failure);
+ returnFuture.set(newFailedRpcResultBuilder(msg, failure).build());
+ }
+
+ private <T> ListenableFuture<T> ask(ActorRef actor, Object message, Timeout timeout) {
+ final SettableFuture<T> returnFuture = SettableFuture.create();
+
+ @SuppressWarnings("unchecked")
+ scala.concurrent.Future<T> askFuture = (scala.concurrent.Future<T>) Patterns.ask(actor, message, timeout);
+ askFuture.onComplete(new OnComplete<T>() {
+ @Override
+ public void onComplete(Throwable failure, T resp) {
+ if(failure != null) {
+ returnFuture.setException(failure);
+ } else {
+ returnFuture.set(resp);
+ }
+ }
+ }, configDataStore.getActorContext().getClientDispatcher());
+
+ return returnFuture;
+ }
+
+ private static RpcResultBuilder<Void> newFailedRpcResultBuilder(String message) {
+ return newFailedRpcResultBuilder(message, null);
+ }
+
+ private static RpcResultBuilder<Void> newFailedRpcResultBuilder(String message, Throwable cause) {
+ return RpcResultBuilder.<Void>failed().withError(ErrorType.RPC, message, cause);
+ }
+
+ private static RpcResult<Void> newSuccessfulResult() {
+ return RpcResultBuilder.<Void>success().build();
+ }
+}
+++ /dev/null
-/*
- * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.datastore.config;
-
-import java.util.concurrent.Future;
-import org.opendaylight.controller.cluster.datastore.DistributedDataStore;
-import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.RpcRegistration;
-import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.config.rev151013.AddShardReplicaInput;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.config.rev151013.ClusterConfigService;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.config.rev151013.ConvertMembersToNonvotingForAllShardsInput;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.config.rev151013.ConvertMembersToVotingForAllShardsInput;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.config.rev151013.RemoveShardReplicaInput;
-import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
-import org.opendaylight.yangtools.yang.common.RpcResult;
-import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Implements the yang RPCs defined in the generated ClusterConfigService interface.
- *
- * @author Thomas Pantelis
- */
-public class ClusterConfigRpcService implements ClusterConfigService, AutoCloseable {
- private static final Logger LOG = LoggerFactory.getLogger(ClusterConfigRpcService.class);
-
- private final DistributedDataStore configDataStore;
- private final DistributedDataStore operDataStore;
- private RpcRegistration<ClusterConfigService> rpcRegistration;
-
- public ClusterConfigRpcService(DistributedDataStore configDataStore, DistributedDataStore operDataStore) {
- this.configDataStore = configDataStore;
- this.operDataStore = operDataStore;
- }
-
- public void start(RpcProviderRegistry rpcProviderRegistry) {
- LOG.debug("ClusterConfigRpcService starting");
-
- rpcRegistration = rpcProviderRegistry.addRpcImplementation(ClusterConfigService.class, this);
- }
-
- @Override
- public Future<RpcResult<Void>> addShardReplica(AddShardReplicaInput input) {
- // TODO implement
- return RpcResultBuilder.<Void>failed().withError(ErrorType.APPLICATION, "operation-not-supported",
- "Not implemented yet").buildFuture();
- }
-
- @Override
- public Future<RpcResult<Void>> removeShardReplica(RemoveShardReplicaInput input) {
- // TODO implement
- return RpcResultBuilder.<Void>failed().withError(ErrorType.APPLICATION, "operation-not-supported",
- "Not implemented yet").buildFuture();
- }
-
- @Override
- public Future<RpcResult<Void>> addReplicasForAllShards() {
- // TODO implement
- return RpcResultBuilder.<Void>failed().withError(ErrorType.APPLICATION, "operation-not-supported",
- "Not implemented yet").buildFuture();
- }
-
- @Override
- public Future<RpcResult<Void>> removeAllShardReplicas() {
- // TODO implement
- return RpcResultBuilder.<Void>failed().withError(ErrorType.APPLICATION, "operation-not-supported",
- "Not implemented yet").buildFuture();
- }
-
- @Override
- public Future<RpcResult<Void>> convertMembersToVotingForAllShards(ConvertMembersToVotingForAllShardsInput input) {
- // TODO implement
- return RpcResultBuilder.<Void>failed().withError(ErrorType.APPLICATION, "operation-not-supported",
- "Not implemented yet").buildFuture();
- }
-
- @Override
- public Future<RpcResult<Void>> convertMembersToNonvotingForAllShards(
- ConvertMembersToNonvotingForAllShardsInput input) {
- // TODO implement
- return RpcResultBuilder.<Void>failed().withError(ErrorType.APPLICATION, "operation-not-supported",
- "Not implemented yet").buildFuture();
- }
-
- @Override
- public void close() {
- if(rpcRegistration != null) {
- rpcRegistration.close();
- }
- }
-}
-package org.opendaylight.controller.config.yang.config.cluster_config_provider;
+package org.opendaylight.controller.config.yang.config.cluster_admin_provider;
import com.google.common.base.Preconditions;
import org.opendaylight.controller.cluster.datastore.DistributedDataStore;
-import org.opendaylight.controller.cluster.datastore.config.ClusterConfigRpcService;
+import org.opendaylight.controller.cluster.datastore.admin.ClusterAdminRpcService;
import org.opendaylight.controller.config.api.DependencyResolver;
import org.opendaylight.controller.config.api.ModuleIdentifier;
-public class ClusterConfigProviderModule extends AbstractClusterConfigProviderModule {
- public ClusterConfigProviderModule(ModuleIdentifier identifier, DependencyResolver dependencyResolver) {
+public class ClusterAdminProviderModule extends AbstractClusterAdminProviderModule {
+ public ClusterAdminProviderModule(ModuleIdentifier identifier, DependencyResolver dependencyResolver) {
super(identifier, dependencyResolver);
}
- public ClusterConfigProviderModule(ModuleIdentifier identifier, DependencyResolver dependencyResolver,
- ClusterConfigProviderModule oldModule, java.lang.AutoCloseable oldInstance) {
+ public ClusterAdminProviderModule(ModuleIdentifier identifier, DependencyResolver dependencyResolver,
+ ClusterAdminProviderModule oldModule, java.lang.AutoCloseable oldInstance) {
super(identifier, dependencyResolver, oldModule, oldInstance);
}
"Injected config DOMStore must be an instance of DistributedDataStore");
Preconditions.checkArgument(getOperDataStoreDependency() instanceof DistributedDataStore,
"Injected operational DOMStore must be an instance of DistributedDataStore");
- ClusterConfigRpcService service = new ClusterConfigRpcService((DistributedDataStore)getConfigDataStoreDependency(),
+ ClusterAdminRpcService service = new ClusterAdminRpcService((DistributedDataStore)getConfigDataStoreDependency(),
(DistributedDataStore)getOperDataStoreDependency());
service.start(getRpcRegistryDependency());
return service;
--- /dev/null
+/*
+* Generated file
+*
+* Generated from: yang module name: cluster-admin-provider yang module local name: cluster-admin-provider
+* Generated by: org.opendaylight.controller.config.yangjmxgenerator.plugin.JMXGenerator
+* Generated at: Tue Nov 03 02:34:10 EST 2015
+*
+* Do not modify this file unless it is present under src/main directory
+*/
+package org.opendaylight.controller.config.yang.config.cluster_admin_provider;
+
+public class ClusterAdminProviderModuleFactory extends AbstractClusterAdminProviderModuleFactory {
+
+}
+++ /dev/null
-/*
-* Generated file
-*
-* Generated from: yang module name: cluster-config-provider yang module local name: cluster-config-provider
-* Generated by: org.opendaylight.controller.config.yangjmxgenerator.plugin.JMXGenerator
-* Generated at: Tue Oct 13 22:12:46 EDT 2015
-*
-* Do not modify this file unless it is present under src/main directory
-*/
-package org.opendaylight.controller.config.yang.config.cluster_config_provider;
-
-public class ClusterConfigProviderModuleFactory extends AbstractClusterConfigProviderModuleFactory {
-}
-module cluster-config-provider {
+module cluster-admin-provider {
yang-version 1;
- namespace "urn:opendaylight:params:xml:ns:yang:controller:config:cluster-config-provider";
- prefix "cluster-config-provider";
+ namespace "urn:opendaylight:params:xml:ns:yang:controller:config:cluster-admin-provider";
+ prefix "cluster-admin-provider";
import config { prefix config; revision-date 2013-04-05; }
import opendaylight-operational-dom-datastore {prefix operational-dom-store-spi;}
import opendaylight-md-sal-binding { prefix mdsal; revision-date 2013-10-28; }
description
- "This module contains the configuration YANG definitions for the ClusterConfigRpcService implementation";
+ "This module contains the configuration YANG definitions for the ClusterAdminRpcService implementation";
revision "2015-10-13" {
description "Initial revision.";
}
// This is the definition of the provider implementation as a module identity.
- identity cluster-config-provider {
+ identity cluster-admin-provider {
base config:module-type;
- config:java-name-prefix ClusterConfigProvider;
+ config:java-name-prefix ClusterAdminProvider;
}
// Augments the 'configuration' choice node under modules/module.
augment "/config:modules/config:module/config:configuration" {
- case cluster-config-provider {
- when "/config:modules/config:module/config:type = 'cluster-config-provider'";
+ case cluster-admin-provider {
+ when "/config:modules/config:module/config:type = 'cluster-admin-provider'";
container config-data-store {
uses config:service-ref {
refine type {
-module cluster-config {
+module cluster-admin {
yang-version 1;
- namespace "urn:opendaylight:params:xml:ns:yang:controller:md:sal:cluster:config";
- prefix "clustering-config";
+ namespace "urn:opendaylight:params:xml:ns:yang:controller:md:sal:cluster:admin";
+ prefix "cluster-admin";
description
- "This module contains YANG RPC definitions for configuring a cluster.";
+ "This module contains YANG RPC definitions for administering a cluster.";
revision "2015-10-13" {
description "Initial revision.";
description "The names of the cluster members to convert.";
}
}
-
+
description "Converts the given cluster members to voting for all shards. The members will
participate in leader elections and consensus.";
}
+
+ rpc backup-datastore {
+ input {
+ leaf file-path {
+ type string;
+ description "The path and name of the file in which to store the backup.";
+ }
+ }
+
+ description "Creates a backup file of the datastore state";
+ }
}
\ No newline at end of file
cohort.commit().get(5, TimeUnit.SECONDS);
}
- void cleanup(DistributedDataStore dataStore) {
+ public void cleanup(DistributedDataStore dataStore) {
if(dataStore != null) {
dataStore.getActorContext().getShardManager().tell(PoisonPill.getInstance(), null);
}
import akka.actor.Status.Failure;
import akka.actor.Terminated;
import akka.testkit.JavaTestKit;
+import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
JavaTestKit kit = new JavaTestKit(getSystem());
byte[] shardManagerSnapshot = new byte[]{0,5,9};
- ActorRef replyActor = actorFactory.createActor(ShardManagerGetSnapshotReplyActor.props(3, "config",
+ ActorRef replyActor = actorFactory.createActor(ShardManagerGetSnapshotReplyActor.props(
+ Arrays.asList("shard1", "shard2", "shard3"), "config",
shardManagerSnapshot, kit.getRef(), "shard-manager", Duration.create(100, TimeUnit.SECONDS)),
actorFactory.generateActorId("actor"));
JavaTestKit kit = new JavaTestKit(getSystem());
byte[] shardManagerSnapshot = new byte[]{0,5,9};
- ActorRef replyActor = actorFactory.createActor(ShardManagerGetSnapshotReplyActor.props(2, "config",
+ ActorRef replyActor = actorFactory.createActor(ShardManagerGetSnapshotReplyActor.props(
+ Arrays.asList("shard1", "shard2"), "config",
shardManagerSnapshot, kit.getRef(), "shard-manager", Duration.create(100, TimeUnit.SECONDS)),
actorFactory.generateActorId("actor"));
JavaTestKit kit = new JavaTestKit(getSystem());
byte[] shardManagerSnapshot = new byte[]{0,5,9};
- ActorRef replyActor = actorFactory.createActor(ShardManagerGetSnapshotReplyActor.props(1, "config",
+ ActorRef replyActor = actorFactory.createActor(ShardManagerGetSnapshotReplyActor.props(
+ Arrays.asList("shard1"), "config",
shardManagerSnapshot, kit.getRef(), "shard-manager", Duration.create(100, TimeUnit.MILLISECONDS)),
actorFactory.generateActorId("actor"));
--- /dev/null
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.admin;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Address;
+import akka.actor.AddressFromURIString;
+import akka.actor.PoisonPill;
+import akka.cluster.Cluster;
+import akka.testkit.JavaTestKit;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Sets;
+import com.typesafe.config.ConfigFactory;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import org.apache.commons.lang3.SerializationUtils;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.datastore.DatastoreContext;
+import org.opendaylight.controller.cluster.datastore.DistributedDataStore;
+import org.opendaylight.controller.cluster.datastore.IntegrationTestKit;
+import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.BackupDatastoreInputBuilder;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+
+/**
+ * Unit tests for ClusterAdminRpcService.
+ *
+ * @author Thomas Pantelis
+ */
+public class ClusterAdminRpcServiceTest {
+ private static ActorSystem system;
+
+ private final DatastoreContext.Builder datastoreContextBuilder = DatastoreContext.newBuilder();
+
+ private IntegrationTestKit kit;
+ private DistributedDataStore configDataStore;
+ private DistributedDataStore operDataStore;
+ private ClusterAdminRpcService service;
+
+ @BeforeClass
+ public static void setUpClass() throws IOException {
+ system = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
+ Address member1Address = AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558");
+ Cluster.get(system).join(member1Address);
+ }
+
+ @AfterClass
+ public static void tearDownClass() throws IOException {
+ JavaTestKit.shutdownActorSystem(system);
+ system = null;
+ }
+
+ @After
+ public void tearDown() {
+ if(kit != null) {
+ kit.cleanup(configDataStore);
+ kit.cleanup(operDataStore);
+ }
+ }
+
+ private void setup(String testName, String... shardNames) {
+ kit = new IntegrationTestKit(system, datastoreContextBuilder);
+
+ configDataStore = kit.setupDistributedDataStore(testName + "Config", "module-shards-member1.conf",
+ true, shardNames);
+ operDataStore = kit.setupDistributedDataStore(testName + "Oper", "module-shards-member1.conf",
+ true, shardNames);
+
+ service = new ClusterAdminRpcService(configDataStore, operDataStore);
+ }
+
+ @Test
+ public void testBackupDatastore() throws Exception {
+ setup("testBackupDatastore", "cars", "people");
+
+ String fileName = "target/testBackupDatastore";
+ new File(fileName).delete();
+
+ RpcResult<Void> rpcResult = service.backupDatastore(new BackupDatastoreInputBuilder().
+ setFilePath(fileName).build()).get(5, TimeUnit.SECONDS);
+ assertEquals("isSuccessful", true, rpcResult.isSuccessful());
+
+ try(FileInputStream fis = new FileInputStream(fileName)) {
+ List<DatastoreSnapshot> snapshots = SerializationUtils.deserialize(fis);
+ assertEquals("DatastoreSnapshot size", 2, snapshots.size());
+
+ ImmutableMap<String, DatastoreSnapshot> map = ImmutableMap.of(snapshots.get(0).getType(), snapshots.get(0),
+ snapshots.get(1).getType(), snapshots.get(1));
+ verifyDatastoreSnapshot(configDataStore.getActorContext().getDataStoreType(),
+ map.get(configDataStore.getActorContext().getDataStoreType()), "cars", "people");
+ } finally {
+ new File(fileName).delete();
+ }
+
+ // Test failure by killing a shard.
+
+ configDataStore.getActorContext().getShardManager().tell(datastoreContextBuilder.
+ shardInitializationTimeout(200, TimeUnit.MILLISECONDS).build(), ActorRef.noSender());
+
+ ActorRef carsShardActor = configDataStore.getActorContext().findLocalShard("cars").get();
+ kit.watch(carsShardActor);
+ carsShardActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
+ kit.expectTerminated(carsShardActor);
+
+ rpcResult = service.backupDatastore(new BackupDatastoreInputBuilder().setFilePath(fileName).build()).
+ get(5, TimeUnit.SECONDS);
+ assertEquals("isSuccessful", false, rpcResult.isSuccessful());
+ assertEquals("getErrors", 1, rpcResult.getErrors().size());
+ assertTrue("Expected error cause TimeoutException",
+ rpcResult.getErrors().iterator().next().getCause() instanceof TimeoutException);
+ }
+
+ private void verifyDatastoreSnapshot(String type, DatastoreSnapshot datastoreSnapshot, String... expShardNames) {
+ assertNotNull("Missing DatastoreSnapshot for type " + type, datastoreSnapshot);
+ Set<String> shardNames = new HashSet<>();
+ for(DatastoreSnapshot.ShardSnapshot s: datastoreSnapshot.getShardSnapshots()) {
+ shardNames.add(s.getName());
+ }
+
+ assertEquals("DatastoreSnapshot shard names", Sets.newHashSet(expShardNames), shardNames);
+ }
+
+ @Test
+ public void testAddShardReplica() {
+ // TODO implement
+ }
+
+ @Test
+ public void testRemoveShardReplica() {
+ // TODO implement
+ }
+
+ @Test
+ public void testAddReplicasForAllShards() {
+ // TODO implement
+ }
+
+ @Test
+ public void testRemoveAllShardReplicas() {
+ // TODO implement
+ }
+
+ @Test
+ public void testConvertMembersToVotingForAllShards() {
+ // TODO implement
+ }
+
+ @Test
+ public void testConvertMembersToNonvotingForAllShards() {
+ // TODO implement
+ }
+}
--- /dev/null
+module-shards = [
+ {
+ name = "people"
+ shards = [
+ {
+ name="people"
+ replicas = [
+ "member-1"
+ ]
+ }
+ ]
+ },
+ {
+ name = "cars"
+ shards = [
+ {
+ name="cars"
+ replicas = [
+ "member-1"
+ ]
+ }
+ ]
+ }
+]
\ No newline at end of file