import java.util.concurrent.TimeUnit;
import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering;
import org.opendaylight.controller.cluster.datastore.config.Configuration;
+import org.opendaylight.controller.cluster.datastore.config.ModuleShardConfiguration;
import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
private void onCreateShard(CreateShard createShard) {
Object reply;
try {
- if(localShards.containsKey(createShard.getShardName())) {
+ ModuleShardConfiguration moduleShardConfig = createShard.getModuleShardConfig();
+ if(localShards.containsKey(moduleShardConfig.getShardName())) {
throw new IllegalStateException(String.format("Shard with name %s already exists",
- createShard.getShardName()));
+ moduleShardConfig.getShardName()));
}
- ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), createShard.getShardName());
- Map<String, String> peerAddresses = getPeerAddresses(createShard.getShardName(), createShard.getMemberNames());
+ configuration.addModuleShardConfiguration(moduleShardConfig);
+
+ ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), moduleShardConfig.getShardName());
+ Map<String, String> peerAddresses = getPeerAddresses(moduleShardConfig.getShardName()/*,
+ moduleShardConfig.getShardMemberNames()*/);
LOG.debug("onCreateShard: shardId: {}, memberNames: {}. peerAddresses: {}", shardId,
- createShard.getMemberNames(), peerAddresses);
+ moduleShardConfig.getShardMemberNames(), peerAddresses);
DatastoreContext shardDatastoreContext = createShard.getDatastoreContext();
if(shardDatastoreContext == null) {
shardDatastoreContext = datastoreContext;
}
- ShardInformation info = new ShardInformation(createShard.getShardName(), shardId, peerAddresses,
+ ShardInformation info = new ShardInformation(moduleShardConfig.getShardName(), shardId, peerAddresses,
shardDatastoreContext, createShard.getShardPropsCreator());
- localShards.put(createShard.getShardName(), info);
+ localShards.put(info.getShardName(), info);
mBean.addLocalShard(shardId.toString());
* Given the name of the shard find the addresses of all it's peers
*
* @param shardName
- * @return
*/
private Map<String, String> getPeerAddresses(String shardName) {
- return getPeerAddresses(shardName, configuration.getMembersFromShardName(shardName));
- }
-
- private Map<String, String> getPeerAddresses(String shardName, Collection<String> members) {
-
+ Collection<String> members = configuration.getMembersFromShardName(shardName);
Map<String, String> peerAddresses = new HashMap<>();
String currentMemberName = this.cluster.getCurrentMemberName();
public Collection<String> getShardMemberNames() {
return shardMemberNames;
}
+
+ @Override
+ public String toString() {
+ return "ModuleShardConfiguration [namespace=" + namespace + ", moduleName=" + moduleName + ", shardName="
+ + shardName + ", shardMemberNames=" + shardMemberNames + ", shardStrategyName=" + shardStrategyName
+ + "]";
+ }
}
Configuration configuration = datastore.getActorContext().getConfiguration();
Collection<String> entityOwnersMemberNames = configuration.getUniqueMemberNamesForAllShards();
- configuration.addModuleShardConfiguration(new ModuleShardConfiguration(EntityOwners.QNAME.getNamespace(),
- "entity-owners", ENTITY_OWNERSHIP_SHARD_NAME, ModuleShardStrategy.NAME, entityOwnersMemberNames));
-
- CreateShard createShard = new CreateShard(ENTITY_OWNERSHIP_SHARD_NAME,
- entityOwnersMemberNames, newShardPropsCreator(), null);
+ CreateShard createShard = new CreateShard(new ModuleShardConfiguration(EntityOwners.QNAME.getNamespace(),
+ "entity-owners", ENTITY_OWNERSHIP_SHARD_NAME, ModuleShardStrategy.NAME, entityOwnersMemberNames),
+ newShardPropsCreator(), null);
Future<Object> createFuture = datastore.getActorContext().executeOperationAsync(shardManagerActor,
createShard, MESSAGE_TIMEOUT);
package org.opendaylight.controller.cluster.datastore.messages;
import com.google.common.base.Preconditions;
-import java.util.Collection;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.opendaylight.controller.cluster.datastore.DatastoreContext;
import org.opendaylight.controller.cluster.datastore.ShardPropsCreator;
+import org.opendaylight.controller.cluster.datastore.config.ModuleShardConfiguration;
/**
* A message sent to the ShardManager to dynamically create a new shard.
* @author Thomas Pantelis
*/
public class CreateShard {
- private final String shardName;
- private final Collection<String> memberNames;
+ private final ModuleShardConfiguration moduleShardConfig;
private final ShardPropsCreator shardPropsCreator;
private final DatastoreContext datastoreContext;
/**
* Constructor.
*
- * @param shardName the name of the new shard.
- * @param memberNames the names of all the member replicas.
+ * @param moduleShardConfig the configuration of the new shard.
* @param shardPropsCreator used to obtain the Props for creating the shard actor instance.
* @param datastoreContext the DatastoreContext for the new shard. If null, the default is used.
*/
- public CreateShard(@Nonnull String shardName, @Nonnull Collection<String> memberNames,
+ public CreateShard(@Nonnull ModuleShardConfiguration moduleShardConfig,
@Nonnull ShardPropsCreator shardPropsCreator, @Nullable DatastoreContext datastoreContext) {
- this.shardName = Preconditions.checkNotNull(shardName);
- this.memberNames = Preconditions.checkNotNull(memberNames);
+ this.moduleShardConfig = Preconditions.checkNotNull(moduleShardConfig);
this.shardPropsCreator = Preconditions.checkNotNull(shardPropsCreator);
this.datastoreContext = datastoreContext;
}
- @Nonnull public String getShardName() {
- return shardName;
- }
-
- @Nonnull public Collection<String> getMemberNames() {
- return memberNames;
+ @Nonnull public ModuleShardConfiguration getModuleShardConfig() {
+ return moduleShardConfig;
}
@Nonnull public ShardPropsCreator getShardPropsCreator() {
@Override
public String toString() {
- StringBuilder builder = new StringBuilder();
- builder.append("CreateShard [shardName=").append(shardName).append(", memberNames=").append(memberNames)
- .append("]");
- return builder.toString();
+ return "CreateShard [moduleShardConfig=" + moduleShardConfig + ", shardPropsCreator=" + shardPropsCreator + "]";
}
}
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.Uninterruptibles;
import com.typesafe.config.ConfigFactory;
+import java.net.URI;
import java.util.Arrays;
-import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.opendaylight.controller.cluster.datastore.config.Configuration;
+import org.opendaylight.controller.cluster.datastore.config.ConfigurationImpl;
+import org.opendaylight.controller.cluster.datastore.config.EmptyModuleShardConfigProvider;
+import org.opendaylight.controller.cluster.datastore.config.ModuleShardConfiguration;
import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
InMemoryJournal.clear();
}
- private Props newShardMgrProps(boolean persistent) {
- return ShardManager.props(new MockClusterWrapper(), new MockConfiguration(),
- datastoreContextBuilder.persistent(persistent).build(), ready, primaryShardInfoCache);
+ private Props newShardMgrProps() {
+ return newShardMgrProps(new MockConfiguration());
+ }
+
+ private Props newShardMgrProps(Configuration config) {
+ return ShardManager.props(new MockClusterWrapper(), config, datastoreContextBuilder.build(), ready,
+ primaryShardInfoCache);
}
private Props newPropsShardMgrWithMockShardActor() {
public void testRoleChangeNotificationAndShardLeaderStateChangedReleaseReady() throws Exception {
new JavaTestKit(getSystem()) {
{
- TestActorRef<ShardManager> shardManager = TestActorRef.create(getSystem(), newShardMgrProps(true));
+ TestActorRef<ShardManager> shardManager = TestActorRef.create(getSystem(), newShardMgrProps());
String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
shardManager.underlyingActor().onReceiveCommand(new RoleChangeNotification(
public void testRoleChangeNotificationToFollowerWithShardLeaderStateChangedReleaseReady() throws Exception {
new JavaTestKit(getSystem()) {
{
- TestActorRef<ShardManager> shardManager = TestActorRef.create(getSystem(), newShardMgrProps(true));
+ TestActorRef<ShardManager> shardManager = TestActorRef.create(getSystem(), newShardMgrProps());
String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
shardManager.underlyingActor().onReceiveCommand(new RoleChangeNotification(
public void testReadyCountDownForMemberUpAfterLeaderStateChanged() throws Exception {
new JavaTestKit(getSystem()) {
{
- TestActorRef<ShardManager> shardManager = TestActorRef.create(getSystem(), newShardMgrProps(true));
+ TestActorRef<ShardManager> shardManager = TestActorRef.create(getSystem(), newShardMgrProps());
String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
shardManager.underlyingActor().onReceiveCommand(new RoleChangeNotification(
public void testRoleChangeNotificationDoNothingForUnknownShard() throws Exception {
new JavaTestKit(getSystem()) {
{
- TestActorRef<ShardManager> shardManager = TestActorRef.create(getSystem(), newShardMgrProps(true));
+ TestActorRef<ShardManager> shardManager = TestActorRef.create(getSystem(), newShardMgrProps());
shardManager.underlyingActor().onReceiveCommand(new RoleChangeNotification(
"unknown", RaftState.Candidate.name(), RaftState.Leader.name()));
@Test
public void testByDefaultSyncStatusIsFalse() throws Exception{
- final Props persistentProps = newShardMgrProps(true);
+ final Props persistentProps = newShardMgrProps();
final TestActorRef<ShardManager> shardManager =
TestActorRef.create(getSystem(), persistentProps);
@Test
public void testWhenShardIsCandidateSyncStatusIsFalse() throws Exception{
- final Props persistentProps = newShardMgrProps(true);
+ final Props persistentProps = newShardMgrProps();
final TestActorRef<ShardManager> shardManager =
TestActorRef.create(getSystem(), persistentProps);
new JavaTestKit(getSystem()) {{
datastoreContextBuilder.shardInitializationTimeout(1, TimeUnit.MINUTES).persistent(true);
- ActorRef shardManager = getSystem().actorOf(newShardMgrProps(false));
+ ActorRef shardManager = getSystem().actorOf(newShardMgrProps(
+ new ConfigurationImpl(new EmptyModuleShardConfigProvider())));
SchemaContext schemaContext = TestModel.createTestContext();
shardManager.tell(new UpdateSchemaContext(schemaContext), ActorRef.noSender());
persistent(false).build();
TestShardPropsCreator shardPropsCreator = new TestShardPropsCreator();
- shardManager.tell(new CreateShard("foo", Arrays.asList("member-1", "member-5", "member-6"), shardPropsCreator,
- datastoreContext), getRef());
+ ModuleShardConfiguration config = new ModuleShardConfiguration(URI.create("foo-ns"), "foo-module",
+ "foo", null, Arrays.asList("member-1", "member-5", "member-6"));
+ shardManager.tell(new CreateShard(config, shardPropsCreator, datastoreContext), getRef());
expectMsgClass(duration("5 seconds"), CreateShardReply.class);
// Send CreateShard with same name - should fail.
- shardManager.tell(new CreateShard("foo", Collections.<String>emptyList(), shardPropsCreator, null), getRef());
+ shardManager.tell(new CreateShard(config, shardPropsCreator, null), getRef());
expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
}};
@Test
public void testOnReceiveCreateShardWithNoInitialSchemaContext() {
new JavaTestKit(getSystem()) {{
- ActorRef shardManager = getSystem().actorOf(newShardMgrProps(false));
+ ActorRef shardManager = getSystem().actorOf(newShardMgrProps(
+ new ConfigurationImpl(new EmptyModuleShardConfigProvider())));
TestShardPropsCreator shardPropsCreator = new TestShardPropsCreator();
- shardManager.tell(new CreateShard("foo", Arrays.asList("member-1"), shardPropsCreator, null), getRef());
+ ModuleShardConfiguration config = new ModuleShardConfiguration(URI.create("foo-ns"), "foo-module",
+ "foo", null, Arrays.asList("member-1"));
+ shardManager.tell(new CreateShard(config, shardPropsCreator, null), getRef());
expectMsgClass(duration("5 seconds"), CreateShardReply.class);
--- /dev/null
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.config;
+
+import java.util.Collections;
+import java.util.Map;
+
+/**
+ * ModuleShardConfigProvider implementation that returns an empty map.
+ *
+ * @author Thomas Pantelis
+ */
+public class EmptyModuleShardConfigProvider implements ModuleShardConfigProvider {
+
+ @Override
+ public Map<String, ModuleConfig> retrieveModuleConfigs(Configuration configuration) {
+ return Collections.emptyMap();
+ }
+}
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
-import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_OWNERS_PATH;
import akka.actor.ActorRef;
import akka.actor.PoisonPill;
import akka.actor.Props;
-import com.google.common.base.Optional;
-import com.google.common.base.Stopwatch;
+import com.google.common.base.Function;
import com.google.common.util.concurrent.Uninterruptibles;
import java.util.Collections;
import java.util.Map;
import org.opendaylight.controller.md.sal.common.api.clustering.Entity;
import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipCandidate;
import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipCandidateRegistration;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
import org.opendaylight.yangtools.yang.common.QName;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
verifyEntityOwnershipCandidateRegistration(entity, reg);
verifyRegisterCandidateLocal(shardPropsCreator, entity, candidate);
- verifyEntityCandidate(readEntityOwners(service.getLocalEntityOwnershipShard()), ENTITY_TYPE, entityId,
+ verifyEntityCandidate(service.getLocalEntityOwnershipShard(), ENTITY_TYPE, entityId,
dataStore.getActorContext().getCurrentMemberName());
// Register the same entity - should throw exception
verifyEntityOwnershipCandidateRegistration(entity2, reg2);
verifyRegisterCandidateLocal(shardPropsCreator, entity2, candidate);
- verifyEntityCandidate(readEntityOwners(service.getLocalEntityOwnershipShard()), ENTITY_TYPE2, entityId,
+ verifyEntityCandidate(service.getLocalEntityOwnershipShard(), ENTITY_TYPE2, entityId,
dataStore.getActorContext().getCurrentMemberName());
service.close();
@Test
public void testRegisterListener() {
+ // TODO
}
- private NormalizedNode<?, ?> readEntityOwners(ActorRef shard) throws Exception {
- Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
- Stopwatch sw = Stopwatch.createStarted();
- while(sw.elapsed(TimeUnit.MILLISECONDS) <= 5000) {
- DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
- Optional<NormalizedNode<?, ?>> optional = readTx.read(ENTITY_OWNERS_PATH).
- checkedGet(5, TimeUnit.SECONDS);
- if(optional.isPresent()) {
- return optional.get();
- }
-
- Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
- }
-
- return null;
+ private void verifyEntityCandidate(ActorRef entityOwnershipShard, String entityType,
+ YangInstanceIdentifier entityId, String candidateName) {
+ verifyEntityCandidate(entityType, entityId, candidateName,
+ new Function<YangInstanceIdentifier, NormalizedNode<?,?>>() {
+ @Override
+ public NormalizedNode<?, ?> apply(YangInstanceIdentifier path) {
+ try {
+ return dataStore.newReadOnlyTransaction().read(path).get(5, TimeUnit.SECONDS).get();
+ } catch (Exception e) {
+ return null;
+ }
+ }
+ });
}
private void verifyRegisterCandidateLocal(final TestShardPropsCreator shardPropsCreator, Entity entity,