From: Tom Pantelis Date: Thu, 6 Aug 2015 20:13:03 +0000 (-0400) Subject: Bug 4105: Add CreateShard message in ShardManager X-Git-Tag: release/beryllium~319 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=635b5b19764c8c99267f35690ca68b02cf1aea3a Bug 4105: Add CreateShard message in ShardManager Added a new CreateShard message that is processed by the ShardManager. A new interface, ShardPropsCreator, was added allowing the caller to instantiate a sub-class of Shard if need be via the CreateShard message. The DefaultShardPropsCreator creates Props for the Shard class. Change-Id: Ieb2c895c85709d963445dc7e15ae9dec9cb3a810 Signed-off-by: Tom Pantelis (cherry picked from commit 76e5d5ef4b8fc83e2c480c8fac81a05b65c14799) --- diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DefaultShardPropsCreator.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DefaultShardPropsCreator.java new file mode 100644 index 0000000000..566932b6ba --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DefaultShardPropsCreator.java @@ -0,0 +1,27 @@ +/* + * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore; + +import akka.actor.Props; +import java.util.Map; +import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier; +import org.opendaylight.yangtools.yang.model.api.SchemaContext; + +/** + * Implementation of ShardPropsCreator that creates a Props instance for the Shard class. + * + * @author Thomas Pantelis + */ +public class DefaultShardPropsCreator implements ShardPropsCreator { + + @Override + public Props newProps(ShardIdentifier shardId, Map peerAddresses, + DatastoreContext datastoreContext, SchemaContext schemaContext) { + return Shard.props(shardId, peerAddresses, datastoreContext, schemaContext); + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java index 5f59672ed9..c215321d0e 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java @@ -29,6 +29,7 @@ import com.google.common.base.Supplier; import com.google.common.collect.Sets; import java.io.Serializable; import java.util.ArrayList; +import java.util.Collection; import java.util.HashMap; import java.util.Iterator; import java.util.List; @@ -45,6 +46,8 @@ import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIde import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfo; import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfoMBean; import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized; +import org.opendaylight.controller.cluster.datastore.messages.CreateShard; +import org.opendaylight.controller.cluster.datastore.messages.CreateShardReply; import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard; import org.opendaylight.controller.cluster.datastore.messages.FindPrimary; import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound; @@ -113,6 +116,8 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { private final PrimaryShardInfoFutureCache primaryShardInfoCache; + private SchemaContext schemaContext; + /** */ protected ShardManager(ClusterWrapper cluster, Configuration configuration, @@ -188,12 +193,53 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { onLeaderStateChanged((ShardLeaderStateChanged) message); } else if(message instanceof SwitchShardBehavior){ onSwitchShardBehavior((SwitchShardBehavior) message); + } else if(message instanceof CreateShard) { + onCreateShard((CreateShard)message); } else { unknownMessage(message); } } + private void onCreateShard(CreateShard createShard) { + Object reply; + try { + if(localShards.containsKey(createShard.getShardName())) { + throw new IllegalStateException(String.format("Shard with name %s already exists", + createShard.getShardName())); + } + + ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), createShard.getShardName()); + Map peerAddresses = getPeerAddresses(createShard.getShardName(), createShard.getMemberNames()); + + LOG.debug("onCreateShard: shardId: {}, peerAddresses: {}", shardId, peerAddresses); + + DatastoreContext shardDatastoreContext = createShard.getDatastoreContext(); + if(shardDatastoreContext == null) { + shardDatastoreContext = datastoreContext; + } + + ShardInformation info = new ShardInformation(createShard.getShardName(), shardId, peerAddresses, + shardDatastoreContext, createShard.getShardPropsCreator()); + localShards.put(createShard.getShardName(), info); + + mBean.addLocalShard(shardId.toString()); + + if(schemaContext != null) { + info.setActor(newShardActor(schemaContext, info)); + } + + reply = new CreateShardReply(); + } catch (Exception e) { + LOG.error("onCreateShard failed", e); + reply = new akka.actor.Status.Failure(e); + } + + if(getSender() != null && !getContext().system().deadLetters().equals(getSender())) { + getSender().tell(reply, getSelf()); + } + } + private void checkReady(){ if (isReadyWithLeaderId()) { LOG.info("{}: All Shards are ready - data store {} is ready, available count is {}", @@ -502,7 +548,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { * @param message */ private void updateSchemaContext(final Object message) { - final SchemaContext schemaContext = ((UpdateSchemaContext) message).getSchemaContext(); + schemaContext = ((UpdateSchemaContext) message).getSchemaContext(); LOG.debug("Got updated SchemaContext: # of modules {}", schemaContext.getAllModuleIdentifiers().size()); @@ -523,8 +569,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { @VisibleForTesting protected ActorRef newShardActor(final SchemaContext schemaContext, ShardInformation info) { - return getContext().actorOf(Shard.props(info.getShardId(), - info.getPeerAddresses(), datastoreContext, schemaContext) + return getContext().actorOf(info.newProps(schemaContext) .withDispatcher(shardDispatcherPath), info.getShardId().toString()); } @@ -614,12 +659,14 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { List memberShardNames = this.configuration.getMemberShardNames(memberName); + ShardPropsCreator shardPropsCreator = new DefaultShardPropsCreator(); List localShardActorNames = new ArrayList<>(); for(String shardName : memberShardNames){ ShardIdentifier shardId = getShardIdentifier(memberName, shardName); Map peerAddresses = getPeerAddresses(shardName); localShardActorNames.add(shardId.toString()); - localShards.put(shardName, new ShardInformation(shardName, shardId, peerAddresses)); + localShards.put(shardName, new ShardInformation(shardName, shardId, peerAddresses, datastoreContext, + shardPropsCreator)); } mBean = ShardManagerInfo.createShardManagerMBean(memberName, "shard-manager-" + this.type, @@ -634,16 +681,18 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { * @param shardName * @return */ - private Map getPeerAddresses(String shardName){ + private Map getPeerAddresses(String shardName) { + return getPeerAddresses(shardName, configuration.getMembersFromShardName(shardName)); + } - Map peerAddresses = new HashMap<>(); + private Map getPeerAddresses(String shardName, Collection members) { - List members = this.configuration.getMembersFromShardName(shardName); + Map peerAddresses = new HashMap<>(); String currentMemberName = this.cluster.getCurrentMemberName(); - for(String memberName : members){ - if(!currentMemberName.equals(memberName)){ + for(String memberName : members) { + if(!currentMemberName.equals(memberName)) { ShardIdentifier shardId = getShardIdentifier(memberName, shardName); String path = getShardActorPath(shardName, currentMemberName); peerAddresses.put(shardId.toString(), path); @@ -697,11 +746,21 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { private String leaderId; private short leaderVersion; + private final DatastoreContext datastoreContext; + private final ShardPropsCreator shardPropsCreator; + private ShardInformation(String shardName, ShardIdentifier shardId, - Map peerAddresses) { + Map peerAddresses, DatastoreContext datastoreContext, + ShardPropsCreator shardPropsCreator) { this.shardName = shardName; this.shardId = shardId; this.peerAddresses = peerAddresses; + this.datastoreContext = datastoreContext; + this.shardPropsCreator = shardPropsCreator; + } + + Props newProps(SchemaContext schemaContext) { + return shardPropsCreator.newProps(shardId, peerAddresses, datastoreContext, schemaContext); } String getShardName() { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardPropsCreator.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardPropsCreator.java new file mode 100644 index 0000000000..b57db297fd --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardPropsCreator.java @@ -0,0 +1,24 @@ +/* + * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore; + +import akka.actor.Props; +import java.util.Map; +import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier; +import org.opendaylight.yangtools.yang.model.api.SchemaContext; + +/** + * An interface for creating a Shard actor Props instance. + * + * @author Thomas Pantelis + */ +public interface ShardPropsCreator { + + Props newProps(ShardIdentifier shardId, Map peerAddresses, DatastoreContext datastoreContext, + SchemaContext schemaContext); +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/jmx/mbeans/shardmanager/ShardManagerInfo.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/jmx/mbeans/shardmanager/ShardManagerInfo.java index 5de46cb87b..79beae72a3 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/jmx/mbeans/shardmanager/ShardManagerInfo.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/jmx/mbeans/shardmanager/ShardManagerInfo.java @@ -51,6 +51,10 @@ public class ShardManagerInfo extends AbstractMXBean implements ShardManagerInfo return shardManagerInfo; } + public void addLocalShard(String shardName) { + localShards.add(shardName); + } + @Override public List getLocalShards() { return localShards; diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CreateShard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CreateShard.java new file mode 100644 index 0000000000..11f3f13ec4 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CreateShard.java @@ -0,0 +1,67 @@ +/* + * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore.messages; + +import com.google.common.base.Preconditions; +import java.util.Collection; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import org.opendaylight.controller.cluster.datastore.DatastoreContext; +import org.opendaylight.controller.cluster.datastore.ShardPropsCreator; + +/** + * A message sent to the ShardManager to dynamically create a new shard. + * + * @author Thomas Pantelis + */ +public class CreateShard { + private final String shardName; + private final Collection memberNames; + private final ShardPropsCreator shardPropsCreator; + private final DatastoreContext datastoreContext; + + /** + * Constructor. + * + * @param shardName the name of the new shard. + * @param memberNames the names of all the member replicas. + * @param shardPropsCreator used to obtain the Props for creating the shard actor instance. + * @param datastoreContext the DatastoreContext for the new shard. If null, the default is used. + */ + public CreateShard(@Nonnull String shardName, @Nonnull Collection memberNames, + @Nonnull ShardPropsCreator shardPropsCreator, @Nullable DatastoreContext datastoreContext) { + this.shardName = Preconditions.checkNotNull(shardName); + this.memberNames = Preconditions.checkNotNull(memberNames); + this.shardPropsCreator = Preconditions.checkNotNull(shardPropsCreator); + this.datastoreContext = datastoreContext; + } + + @Nonnull public String getShardName() { + return shardName; + } + + @Nonnull public Collection getMemberNames() { + return memberNames; + } + + @Nonnull public ShardPropsCreator getShardPropsCreator() { + return shardPropsCreator; + } + + @Nullable public DatastoreContext getDatastoreContext() { + return datastoreContext; + } + + @Override + public String toString() { + StringBuilder builder = new StringBuilder(); + builder.append("CreateShard [shardName=").append(shardName).append(", memberNames=").append(memberNames) + .append("]"); + return builder.toString(); + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CreateShardReply.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CreateShardReply.java new file mode 100644 index 0000000000..1882391369 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CreateShardReply.java @@ -0,0 +1,16 @@ +/* + * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore.messages; + +/** + * Reply message for CreateShard. + * + * @author Thomas Pantelis + */ +public class CreateShardReply { +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java index b6ba73cd83..94d14dcf97 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java @@ -9,6 +9,7 @@ package org.opendaylight.controller.cluster.datastore; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; @@ -32,9 +33,11 @@ import akka.util.Timeout; import com.google.common.base.Optional; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Sets; import com.google.common.util.concurrent.Uninterruptibles; import com.typesafe.config.ConfigFactory; import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.concurrent.CountDownLatch; @@ -50,6 +53,8 @@ import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundE import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier; import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier; import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized; +import org.opendaylight.controller.cluster.datastore.messages.CreateShard; +import org.opendaylight.controller.cluster.datastore.messages.CreateShardReply; import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard; import org.opendaylight.controller.cluster.datastore.messages.FindPrimary; import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound; @@ -928,6 +933,84 @@ public class ShardManagerTest extends AbstractActorTest { }}; } + public void testOnReceiveCreateShard() { + new JavaTestKit(getSystem()) {{ + datastoreContextBuilder.shardInitializationTimeout(1, TimeUnit.MINUTES).persistent(true); + + ActorRef shardManager = getSystem().actorOf(newShardMgrProps(false)); + + SchemaContext schemaContext = TestModel.createTestContext(); + shardManager.tell(new UpdateSchemaContext(schemaContext), ActorRef.noSender()); + + DatastoreContext datastoreContext = DatastoreContext.newBuilder().shardElectionTimeoutFactor(100). + persistent(false).build(); + TestShardPropsCreator shardPropsCreator = new TestShardPropsCreator(); + + shardManager.tell(new CreateShard("foo", Arrays.asList("member-1", "member-5", "member-6"), shardPropsCreator, + datastoreContext), getRef()); + + expectMsgClass(duration("5 seconds"), CreateShardReply.class); + + shardManager.tell(new FindLocalShard("foo", true), getRef()); + + expectMsgClass(duration("5 seconds"), LocalShardFound.class); + + assertEquals("isRecoveryApplicable", false, shardPropsCreator.datastoreContext.isPersistent()); + assertEquals("peerMembers", Sets.newHashSet(new ShardIdentifier("foo", "member-5", shardMrgIDSuffix).toString(), + new ShardIdentifier("foo", "member-6", shardMrgIDSuffix).toString()), + shardPropsCreator.peerAddresses.keySet()); + assertEquals("ShardIdentifier", new ShardIdentifier("foo", "member-1", shardMrgIDSuffix), + shardPropsCreator.shardId); + assertSame("schemaContext", schemaContext, shardPropsCreator.schemaContext); + + // Send CreateShard with same name - should fail. + + shardManager.tell(new CreateShard("foo", Collections.emptyList(), shardPropsCreator, null), getRef()); + + expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class); + }}; + } + + @Test + public void testOnReceiveCreateShardWithNoInitialSchemaContext() { + new JavaTestKit(getSystem()) {{ + ActorRef shardManager = getSystem().actorOf(newShardMgrProps(false)); + + TestShardPropsCreator shardPropsCreator = new TestShardPropsCreator(); + + shardManager.tell(new CreateShard("foo", Arrays.asList("member-1"), shardPropsCreator, null), getRef()); + + expectMsgClass(duration("5 seconds"), CreateShardReply.class); + + SchemaContext schemaContext = TestModel.createTestContext(); + shardManager.tell(new UpdateSchemaContext(schemaContext), ActorRef.noSender()); + + shardManager.tell(new FindLocalShard("foo", true), getRef()); + + expectMsgClass(duration("5 seconds"), LocalShardFound.class); + + assertSame("schemaContext", schemaContext, shardPropsCreator.schemaContext); + assertNotNull("schemaContext is null", shardPropsCreator.datastoreContext); + }}; + } + + private static class TestShardPropsCreator implements ShardPropsCreator { + ShardIdentifier shardId; + Map peerAddresses; + SchemaContext schemaContext; + DatastoreContext datastoreContext; + + @Override + public Props newProps(ShardIdentifier shardId, Map peerAddresses, + DatastoreContext datastoreContext, SchemaContext schemaContext) { + this.shardId = shardId; + this.peerAddresses = peerAddresses; + this.schemaContext = schemaContext; + this.datastoreContext = datastoreContext; + return Shard.props(shardId, peerAddresses, datastoreContext, schemaContext); + } + + } private static class TestShardManager extends ShardManager { private final CountDownLatch recoveryComplete = new CountDownLatch(1);