--- /dev/null
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import akka.actor.Props;
+import java.util.Map;
+import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+
+/**
+ * Implementation of ShardPropsCreator that creates a Props instance for the Shard class.
+ *
+ * @author Thomas Pantelis
+ */
+public class DefaultShardPropsCreator implements ShardPropsCreator {
+
+ @Override
+ public Props newProps(ShardIdentifier shardId, Map<String, String> peerAddresses,
+ DatastoreContext datastoreContext, SchemaContext schemaContext) {
+ return Shard.props(shardId, peerAddresses, datastoreContext, schemaContext);
+ }
+}
import com.google.common.collect.Sets;
import java.io.Serializable;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfo;
import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfoMBean;
import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
+import org.opendaylight.controller.cluster.datastore.messages.CreateShard;
+import org.opendaylight.controller.cluster.datastore.messages.CreateShardReply;
import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
private final PrimaryShardInfoFutureCache primaryShardInfoCache;
+ private SchemaContext schemaContext;
+
/**
*/
protected ShardManager(ClusterWrapper cluster, Configuration configuration,
onLeaderStateChanged((ShardLeaderStateChanged) message);
} else if(message instanceof SwitchShardBehavior){
onSwitchShardBehavior((SwitchShardBehavior) message);
+ } else if(message instanceof CreateShard) {
+ onCreateShard((CreateShard)message);
} else {
unknownMessage(message);
}
}
+ private void onCreateShard(CreateShard createShard) {
+ Object reply;
+ try {
+ if(localShards.containsKey(createShard.getShardName())) {
+ throw new IllegalStateException(String.format("Shard with name %s already exists",
+ createShard.getShardName()));
+ }
+
+ ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), createShard.getShardName());
+ Map<String, String> peerAddresses = getPeerAddresses(createShard.getShardName(), createShard.getMemberNames());
+
+ LOG.debug("onCreateShard: shardId: {}, peerAddresses: {}", shardId, peerAddresses);
+
+ DatastoreContext shardDatastoreContext = createShard.getDatastoreContext();
+ if(shardDatastoreContext == null) {
+ shardDatastoreContext = datastoreContext;
+ }
+
+ ShardInformation info = new ShardInformation(createShard.getShardName(), shardId, peerAddresses,
+ shardDatastoreContext, createShard.getShardPropsCreator());
+ localShards.put(createShard.getShardName(), info);
+
+ mBean.addLocalShard(shardId.toString());
+
+ if(schemaContext != null) {
+ info.setActor(newShardActor(schemaContext, info));
+ }
+
+ reply = new CreateShardReply();
+ } catch (Exception e) {
+ LOG.error("onCreateShard failed", e);
+ reply = new akka.actor.Status.Failure(e);
+ }
+
+ if(getSender() != null && !getContext().system().deadLetters().equals(getSender())) {
+ getSender().tell(reply, getSelf());
+ }
+ }
+
private void checkReady(){
if (isReadyWithLeaderId()) {
LOG.info("{}: All Shards are ready - data store {} is ready, available count is {}",
* @param message
*/
private void updateSchemaContext(final Object message) {
- final SchemaContext schemaContext = ((UpdateSchemaContext) message).getSchemaContext();
+ schemaContext = ((UpdateSchemaContext) message).getSchemaContext();
LOG.debug("Got updated SchemaContext: # of modules {}", schemaContext.getAllModuleIdentifiers().size());
@VisibleForTesting
protected ActorRef newShardActor(final SchemaContext schemaContext, ShardInformation info) {
- return getContext().actorOf(Shard.props(info.getShardId(),
- info.getPeerAddresses(), datastoreContext, schemaContext)
+ return getContext().actorOf(info.newProps(schemaContext)
.withDispatcher(shardDispatcherPath), info.getShardId().toString());
}
List<String> memberShardNames =
this.configuration.getMemberShardNames(memberName);
+ ShardPropsCreator shardPropsCreator = new DefaultShardPropsCreator();
List<String> localShardActorNames = new ArrayList<>();
for(String shardName : memberShardNames){
ShardIdentifier shardId = getShardIdentifier(memberName, shardName);
Map<String, String> peerAddresses = getPeerAddresses(shardName);
localShardActorNames.add(shardId.toString());
- localShards.put(shardName, new ShardInformation(shardName, shardId, peerAddresses));
+ localShards.put(shardName, new ShardInformation(shardName, shardId, peerAddresses, datastoreContext,
+ shardPropsCreator));
}
mBean = ShardManagerInfo.createShardManagerMBean(memberName, "shard-manager-" + this.type,
* @param shardName
* @return
*/
- private Map<String, String> getPeerAddresses(String shardName){
+ private Map<String, String> getPeerAddresses(String shardName) {
+ return getPeerAddresses(shardName, configuration.getMembersFromShardName(shardName));
+ }
- Map<String, String> peerAddresses = new HashMap<>();
+ private Map<String, String> getPeerAddresses(String shardName, Collection<String> members) {
- List<String> members = this.configuration.getMembersFromShardName(shardName);
+ Map<String, String> peerAddresses = new HashMap<>();
String currentMemberName = this.cluster.getCurrentMemberName();
- for(String memberName : members){
- if(!currentMemberName.equals(memberName)){
+ for(String memberName : members) {
+ if(!currentMemberName.equals(memberName)) {
ShardIdentifier shardId = getShardIdentifier(memberName, shardName);
String path = getShardActorPath(shardName, currentMemberName);
peerAddresses.put(shardId.toString(), path);
private String leaderId;
private short leaderVersion;
+ private final DatastoreContext datastoreContext;
+ private final ShardPropsCreator shardPropsCreator;
+
private ShardInformation(String shardName, ShardIdentifier shardId,
- Map<String, String> peerAddresses) {
+ Map<String, String> peerAddresses, DatastoreContext datastoreContext,
+ ShardPropsCreator shardPropsCreator) {
this.shardName = shardName;
this.shardId = shardId;
this.peerAddresses = peerAddresses;
+ this.datastoreContext = datastoreContext;
+ this.shardPropsCreator = shardPropsCreator;
+ }
+
+ Props newProps(SchemaContext schemaContext) {
+ return shardPropsCreator.newProps(shardId, peerAddresses, datastoreContext, schemaContext);
}
String getShardName() {
--- /dev/null
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import akka.actor.Props;
+import java.util.Map;
+import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+
+/**
+ * An interface for creating a Shard actor Props instance.
+ *
+ * @author Thomas Pantelis
+ */
+public interface ShardPropsCreator {
+
+ Props newProps(ShardIdentifier shardId, Map<String, String> peerAddresses, DatastoreContext datastoreContext,
+ SchemaContext schemaContext);
+}
return shardManagerInfo;
}
+ public void addLocalShard(String shardName) {
+ localShards.add(shardName);
+ }
+
@Override
public List<String> getLocalShards() {
return localShards;
--- /dev/null
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.messages;
+
+import com.google.common.base.Preconditions;
+import java.util.Collection;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import org.opendaylight.controller.cluster.datastore.DatastoreContext;
+import org.opendaylight.controller.cluster.datastore.ShardPropsCreator;
+
+/**
+ * A message sent to the ShardManager to dynamically create a new shard.
+ *
+ * @author Thomas Pantelis
+ */
+public class CreateShard {
+ private final String shardName;
+ private final Collection<String> memberNames;
+ private final ShardPropsCreator shardPropsCreator;
+ private final DatastoreContext datastoreContext;
+
+ /**
+ * Constructor.
+ *
+ * @param shardName the name of the new shard.
+ * @param memberNames the names of all the member replicas.
+ * @param shardPropsCreator used to obtain the Props for creating the shard actor instance.
+ * @param datastoreContext the DatastoreContext for the new shard. If null, the default is used.
+ */
+ public CreateShard(@Nonnull String shardName, @Nonnull Collection<String> memberNames,
+ @Nonnull ShardPropsCreator shardPropsCreator, @Nullable DatastoreContext datastoreContext) {
+ this.shardName = Preconditions.checkNotNull(shardName);
+ this.memberNames = Preconditions.checkNotNull(memberNames);
+ this.shardPropsCreator = Preconditions.checkNotNull(shardPropsCreator);
+ this.datastoreContext = datastoreContext;
+ }
+
+ @Nonnull public String getShardName() {
+ return shardName;
+ }
+
+ @Nonnull public Collection<String> getMemberNames() {
+ return memberNames;
+ }
+
+ @Nonnull public ShardPropsCreator getShardPropsCreator() {
+ return shardPropsCreator;
+ }
+
+ @Nullable public DatastoreContext getDatastoreContext() {
+ return datastoreContext;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder builder = new StringBuilder();
+ builder.append("CreateShard [shardName=").append(shardName).append(", memberNames=").append(memberNames)
+ .append("]");
+ return builder.toString();
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.messages;
+
+/**
+ * Reply message for CreateShard.
+ *
+ * @author Thomas Pantelis
+ */
+public class CreateShardReply {
+}
package org.opendaylight.controller.cluster.datastore;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
import com.google.common.util.concurrent.Uninterruptibles;
import com.typesafe.config.ConfigFactory;
import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier;
import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
+import org.opendaylight.controller.cluster.datastore.messages.CreateShard;
+import org.opendaylight.controller.cluster.datastore.messages.CreateShardReply;
import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
}};
}
+ public void testOnReceiveCreateShard() {
+ new JavaTestKit(getSystem()) {{
+ datastoreContextBuilder.shardInitializationTimeout(1, TimeUnit.MINUTES).persistent(true);
+
+ ActorRef shardManager = getSystem().actorOf(newShardMgrProps(false));
+
+ SchemaContext schemaContext = TestModel.createTestContext();
+ shardManager.tell(new UpdateSchemaContext(schemaContext), ActorRef.noSender());
+
+ DatastoreContext datastoreContext = DatastoreContext.newBuilder().shardElectionTimeoutFactor(100).
+ persistent(false).build();
+ TestShardPropsCreator shardPropsCreator = new TestShardPropsCreator();
+
+ shardManager.tell(new CreateShard("foo", Arrays.asList("member-1", "member-5", "member-6"), shardPropsCreator,
+ datastoreContext), getRef());
+
+ expectMsgClass(duration("5 seconds"), CreateShardReply.class);
+
+ shardManager.tell(new FindLocalShard("foo", true), getRef());
+
+ expectMsgClass(duration("5 seconds"), LocalShardFound.class);
+
+ assertEquals("isRecoveryApplicable", false, shardPropsCreator.datastoreContext.isPersistent());
+ assertEquals("peerMembers", Sets.newHashSet(new ShardIdentifier("foo", "member-5", shardMrgIDSuffix).toString(),
+ new ShardIdentifier("foo", "member-6", shardMrgIDSuffix).toString()),
+ shardPropsCreator.peerAddresses.keySet());
+ assertEquals("ShardIdentifier", new ShardIdentifier("foo", "member-1", shardMrgIDSuffix),
+ shardPropsCreator.shardId);
+ assertSame("schemaContext", schemaContext, shardPropsCreator.schemaContext);
+
+ // Send CreateShard with same name - should fail.
+
+ shardManager.tell(new CreateShard("foo", Collections.<String>emptyList(), shardPropsCreator, null), getRef());
+
+ expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
+ }};
+ }
+
+ @Test
+ public void testOnReceiveCreateShardWithNoInitialSchemaContext() {
+ new JavaTestKit(getSystem()) {{
+ ActorRef shardManager = getSystem().actorOf(newShardMgrProps(false));
+
+ TestShardPropsCreator shardPropsCreator = new TestShardPropsCreator();
+
+ shardManager.tell(new CreateShard("foo", Arrays.asList("member-1"), shardPropsCreator, null), getRef());
+
+ expectMsgClass(duration("5 seconds"), CreateShardReply.class);
+
+ SchemaContext schemaContext = TestModel.createTestContext();
+ shardManager.tell(new UpdateSchemaContext(schemaContext), ActorRef.noSender());
+
+ shardManager.tell(new FindLocalShard("foo", true), getRef());
+
+ expectMsgClass(duration("5 seconds"), LocalShardFound.class);
+
+ assertSame("schemaContext", schemaContext, shardPropsCreator.schemaContext);
+ assertNotNull("schemaContext is null", shardPropsCreator.datastoreContext);
+ }};
+ }
+
+ private static class TestShardPropsCreator implements ShardPropsCreator {
+ ShardIdentifier shardId;
+ Map<String, String> peerAddresses;
+ SchemaContext schemaContext;
+ DatastoreContext datastoreContext;
+
+ @Override
+ public Props newProps(ShardIdentifier shardId, Map<String, String> peerAddresses,
+ DatastoreContext datastoreContext, SchemaContext schemaContext) {
+ this.shardId = shardId;
+ this.peerAddresses = peerAddresses;
+ this.schemaContext = schemaContext;
+ this.datastoreContext = datastoreContext;
+ return Shard.props(shardId, peerAddresses, datastoreContext, schemaContext);
+ }
+
+ }
private static class TestShardManager extends ShardManager {
private final CountDownLatch recoveryComplete = new CountDownLatch(1);