import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
import org.opendaylight.controller.cluster.datastore.messages.SwitchShardBehavior;
import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
+import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica;
+import org.opendaylight.controller.cluster.datastore.messages.RemoveShardReplica;
import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache;
import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
onSwitchShardBehavior((SwitchShardBehavior) message);
} else if(message instanceof CreateShard) {
onCreateShard((CreateShard)message);
+ } else if(message instanceof AddShardReplica){
+ onAddShardReplica((AddShardReplica)message);
+ } else if(message instanceof RemoveShardReplica){
+ onRemoveShardReplica((RemoveShardReplica)message);
} else {
unknownMessage(message);
}
return mBean;
}
+ private void onAddShardReplica (AddShardReplica shardReplicaMsg) {
+ String shardName = shardReplicaMsg.getShardName();
+
+ // verify the local shard replica is already available in the controller node
+ if (localShards.containsKey(shardName)) {
+ LOG.debug ("Local shard {} already available in the controller node", shardName);
+ getSender().tell(new akka.actor.Status.Failure(
+ new IllegalArgumentException(String.format("Local shard %s already exists",
+ shardName))), getSelf());
+ return;
+ }
+ // verify the shard with the specified name is present in the cluster configuration
+ if (!(this.configuration.isShardConfigured(shardName))) {
+ LOG.debug ("No module configuration exists for shard {}", shardName);
+ getSender().tell(new akka.actor.Status.Failure(new IllegalArgumentException(
+ String.format("No module configuration exists for shard %s",
+ shardName))), getSelf());
+ return;
+ }
+
+ // Create the localShard
+ getSender().tell(new akka.actor.Status.Success(true), getSelf());
+ return;
+ }
+
+ private void onRemoveShardReplica (RemoveShardReplica shardReplicaMsg) {
+ String shardName = shardReplicaMsg.getShardName();
+ boolean deleteStatus = false;
+
+ // verify the local shard replica is available in the controller node
+ if (!localShards.containsKey(shardName)) {
+ LOG.debug ("Local shard replica {} is not available in the controller node", shardName);
+ getSender().tell(new akka.actor.Status.Failure(
+ new IllegalArgumentException(String.format("Local shard %s not available",
+ shardName))), getSelf());
+ return;
+ }
+ // call RemoveShard for the shardName
+ getSender().tell(new akka.actor.Status.Success(true), getSelf());
+ return;
+ }
+
@VisibleForTesting
protected static class ShardInformation {
private final ShardIdentifier shardId;
@Nonnull Collection<String> getMemberShardNames(@Nonnull String memberName);
/**
- * Returns the namespace for the given module name or null if not found.
+ * Returns the module name for the given namespace name or null if not found.
*/
@Nullable String getModuleNameFromNameSpace(@Nonnull String nameSpace);
* Returns a unique set of all member names configured for all shards.
*/
Collection<String> getUniqueMemberNamesForAllShards();
+
+ /*
+ * Verifies if the given module shard in available in the cluster
+ */
+ boolean isShardConfigured(String shardName);
}
private ShardStrategy createShardStrategy(String moduleName, String shardStrategyName) {
return ShardStrategyFactory.newShardStrategyInstance(moduleName, shardStrategyName, this);
}
+
+ @Override
+ public boolean isShardConfigured(String shardName) {
+ Preconditions.checkNotNull(shardName, "shardName should not be null");
+ return allShardNames.contains(shardName);
+ }
}
import org.opendaylight.controller.md.sal.common.util.jmx.AbstractMXBean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica;
+import org.opendaylight.controller.cluster.datastore.messages.RemoveShardReplica;
+import akka.pattern.Patterns;
+import akka.util.Timeout;
+import java.util.concurrent.TimeUnit;
+import scala.concurrent.Await;
public class ShardManagerInfo extends AbstractMXBean implements ShardManagerInfoMBean {
public void setShardManager(ShardManager shardManager){
this.shardManager = shardManager;
}
+
+ @Override
+ public void setAddShardReplica (String shardName) {
+ LOG.info ("addShardReplica initiated for shard {}", shardName);
+
+ // TODO addTimeout to be made configurable
+ Timeout addTimeOut = new Timeout(1, TimeUnit.MINUTES);
+ try {
+ Await.result(Patterns.ask(shardManager.getSelf(),
+ new AddShardReplica(shardName), addTimeOut),
+ addTimeOut.duration());
+ } catch (Exception ex) {
+ LOG.debug ("Obtained an exception during addShardReplica", ex);
+ throw (new RuntimeException(ex.getMessage()));
+ }
+ return;
+ }
+
+ @Override
+ public void setRemoveShardReplica (String shardName) {
+ LOG.info ("removeShardReplica initiated for shard {}", shardName);
+
+ // TODO remTimeOut to be made configurable
+ Timeout remTimeOut = new Timeout(30, TimeUnit.SECONDS);
+ try {
+ Await.result(Patterns.ask(shardManager.getSelf(),
+ new RemoveShardReplica(shardName), remTimeOut),
+ remTimeOut.duration());
+ } catch (Exception ex) {
+ LOG.debug ("Obtained an exception during removeShardReplica", ex);
+ throw (new RuntimeException(ex.getMessage()));
+ }
+ return;
+ }
}
* the term provided when switching to a new Leader should always be higher than the previous term.
*/
void switchShardState(String shardName, String newBehavior, long term);
+
+ /**
+ * Add a new Shard replica for an existing Shard in this controller node
+ *
+ * @param shardName the shard that is to be created and replicated in this controller instance
+ */
+ void setAddShardReplica (String shardName);
+
+ /**
+ * Remove a Shard replica available in this controller node
+ *
+ * @param shardName the shard that is to be removed from this controller instance
+ */
+ void setRemoveShardReplica (String shardName);
+
}
--- /dev/null
+/*
+ * Copyright (c) 2015 Dell Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore.messages;
+
+import javax.annotation.Nonnull;
+import com.google.common.base.Preconditions;
+
+/**
+ * A message sent to the ShardManager to dynamically add a new local shard
+ * that is a replica for an existing shard that is already available in the
+ * cluster.
+ */
+
+public class AddShardReplica {
+
+ private final String shardName;
+
+ /**
+ * Constructor.
+ *
+ * @param shardName name of the shard that is to be locally replicated.
+ */
+
+ public AddShardReplica (@Nonnull String shardName){
+ this.shardName = Preconditions.checkNotNull(shardName, "ShardName should not be null");
+ }
+
+ public String getShardName(){
+ return this.shardName;
+ }
+
+ @Override
+ public String toString(){
+ return "AddShardReplica[ShardName=" + shardName + "]";
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Dell Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore.messages;
+
+import javax.annotation.Nonnull;
+import com.google.common.base.Preconditions;
+
+/**
+ * A message sent to the ShardManager to dynamically remove a local shard
+ * replica available in this node.
+ */
+
+public class RemoveShardReplica {
+
+ private final String shardName;
+
+ /**
+ * Constructor.
+ *
+ * @param shardName name of the local shard that is to be dynamically removed.
+ */
+
+ public RemoveShardReplica (@Nonnull String shardName){
+ this.shardName = Preconditions.checkNotNull(shardName, "ShardName should not be null");
+ }
+
+ public String getShardName(){
+ return this.shardName;
+ }
+
+ @Override
+ public String toString(){
+ return "RemoveShardReplica[ShardName=" + shardName + "]";
+ }
+}
import akka.actor.ActorSystem;
import akka.actor.AddressFromURIString;
import akka.actor.Props;
+import akka.actor.Status;
import akka.cluster.Cluster;
import akka.cluster.ClusterEvent;
import akka.dispatch.Dispatchers;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier;
import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
+import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica;
import org.opendaylight.controller.cluster.datastore.messages.CreateShard;
import org.opendaylight.controller.cluster.datastore.messages.CreateShardReply;
import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
import org.opendaylight.controller.cluster.datastore.messages.PeerUp;
import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
+import org.opendaylight.controller.cluster.datastore.messages.RemoveShardReplica;
import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
import org.opendaylight.controller.cluster.datastore.messages.SwitchShardBehavior;
import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
}};
}
+ @Test
+ public void testAddShardReplicaForNonExistentShard() throws Exception {
+ new JavaTestKit(getSystem()) {{
+ ActorRef shardManager = getSystem().actorOf(newShardMgrProps(
+ new ConfigurationImpl(new EmptyModuleShardConfigProvider())));
+
+ shardManager.tell(new AddShardReplica("model-inventory"), getRef());
+ Status.Failure resp = expectMsgClass(duration("2 seconds"), Status.Failure.class);
+
+ assertEquals("Failure obtained", true,
+ (resp.cause() instanceof IllegalArgumentException));
+ }};
+ }
+
+ @Test
+ public void testAddShardReplicaForAlreadyCreatedShard() throws Exception {
+ new JavaTestKit(getSystem()) {{
+ ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
+ shardManager.tell(new AddShardReplica("default"), getRef());
+ Status.Failure resp = expectMsgClass(duration("2 seconds"), Status.Failure.class);
+ assertEquals("Failure obtained", true,
+ (resp.cause() instanceof IllegalArgumentException));
+ }};
+ }
+
+ @Test
+ public void testAddShardReplica() throws Exception {
+ new JavaTestKit(getSystem()) {{
+ MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.<String, List<String>>builder().
+ put("default", Arrays.asList("member-1", "member-2")).
+ put("astronauts", Arrays.asList("member-2")).build());
+
+ ActorRef shardManager = getSystem().actorOf(newShardMgrProps(mockConfig));
+
+ shardManager.tell(new AddShardReplica("astronauts"), getRef());
+ expectMsgClass(duration("2 seconds"), Status.Success.class);
+ }};
+ }
+
+ @Test
+ public void testRemoveShardReplicaForNonExistentShard() throws Exception {
+ new JavaTestKit(getSystem()) {{
+ ActorRef shardManager = getSystem().actorOf(newShardMgrProps(
+ new ConfigurationImpl(new EmptyModuleShardConfigProvider())));
+
+ shardManager.tell(new RemoveShardReplica("model-inventory"), getRef());
+ Status.Failure resp = expectMsgClass(duration("2 seconds"), Status.Failure.class);
+ assertEquals("Failure obtained", true,
+ (resp.cause() instanceof IllegalArgumentException));
+ }};
+
+ }
+
private static class TestShardPropsCreator implements ShardPropsCreator {
ShardIdentifier shardId;
Map<String, String> peerAddresses;
@Override
public Collection<String> getMemberShardNames(final String memberName) {
- return new ArrayList<>(shardMembers.keySet());
+ ArrayList<String> shardNames = new ArrayList<String>();
+ for(Map.Entry<String, List<String>> shard : shardMembers.entrySet()) {
+ if (shard.getValue().contains(memberName)) {
+ shardNames.add(shard.getKey());
+ }
+ }
+ return shardNames;
}
@Override
@Override
public void addModuleShardConfiguration(ModuleShardConfiguration config) {
}
+
+ @Override
+ public boolean isShardConfigured(String shardName) {
+ return (shardMembers.containsKey(shardName));
+ }
}