Optional<String> getModuleNameFromNameSpace(String nameSpace);
Map<String, ShardStrategy> getModuleNameToShardStrategyMap();
List<String> getShardNamesFromModuleName(String moduleName);
+ List<String> getMembersFromShardName(String shardName);
}
return Collections.EMPTY_LIST;
}
+ @Override public List<String> getMembersFromShardName(String shardName) {
+ List<String> shards = new ArrayList();
+ for(ModuleShard ms : moduleShards){
+ for(Shard s : ms.getShards()) {
+ if(s.getName().equals(shardName)){
+ return s.getReplicas();
+ }
+ }
+ }
+ return Collections.EMPTY_LIST;
+ }
+
private void readModules(Config modulesConfig) {
private final Map<String, ActorPath> localShards = new HashMap<>();
+ private final String type;
+
private final ClusterWrapper cluster;
+ private final Configuration configuration;
+
/**
* @param type defines the kind of data that goes into shards created by this shard manager. Examples of type would be
* configuration or operational
*/
private ShardManager(String type, ClusterWrapper cluster, Configuration configuration) {
+ this.type = type;
this.cluster = cluster;
+ this.configuration = configuration;
String memberName = cluster.getCurrentMemberName();
List<String> memberShardNames =
configuration.getMemberShardNames(memberName);
for(String shardName : memberShardNames){
+ String shardActorName = getShardActorName(memberName, shardName);
ActorRef actor = getContext()
- .actorOf(Shard.props(memberName + "-shard-" + shardName + "-" + type),
- memberName + "-shard-" + shardName + "-" + type);
+ .actorOf(Shard.props(shardActorName), shardActorName);
ActorPath path = actor.path();
localShards.put(shardName, path);
}
FindPrimary msg = ((FindPrimary) message);
String shardName = msg.getShardName();
- if (Shard.DEFAULT_NAME.equals(shardName)) {
- ActorPath defaultShardPath = localShards.get(shardName);
- if(defaultShardPath == null){
- throw new IllegalStateException("local default shard not found");
+ List<String> members =
+ configuration.getMembersFromShardName(shardName);
+
+ for(String memberName : members) {
+ if (memberName.equals(cluster.getCurrentMemberName())) {
+ // This is a local shard
+ ActorPath shardPath = localShards.get(shardName);
+ // FIXME: This check may be redundant
+ if (shardPath == null) {
+ getSender()
+ .tell(new PrimaryNotFound(shardName), getSelf());
+ return;
+ }
+ getSender().tell(new PrimaryFound(shardPath.toString()),
+ getSelf());
+ return;
+ } else {
+ Address address = memberNameToAddress.get(shardName);
+ if(address != null){
+ String path =
+ address.toString() + "/user/" + getShardActorName(
+ memberName, shardName);
+ getSender().tell(new PrimaryFound(path), getSelf());
+ }
+
+
}
- getSender().tell(new PrimaryFound(defaultShardPath.toString()),
- getSelf());
- } else {
- getSender().tell(new PrimaryNotFound(shardName), getSelf());
}
+
+ getSender().tell(new PrimaryNotFound(shardName), getSelf());
+
} else if (message instanceof UpdateSchemaContext) {
for(ActorPath path : localShards.values()){
getContext().system().actorSelection(path)
}
}
+ private String getShardActorName(String memberName, String shardName){
+ return memberName + "-shard-" + shardName + "-" + this.type;
+ }
+
}
package org.opendaylight.controller.cluster.datastore;
+import akka.actor.ActorSystem;
+import akka.testkit.JavaTestKit;
import com.google.common.base.Optional;
import com.google.common.util.concurrent.ListenableFuture;
+import org.junit.After;
+import org.junit.Before;
import org.junit.Test;
import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper;
import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
+import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
+import org.opendaylight.controller.md.cluster.datastore.model.PeopleModel;
+import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
+import java.util.concurrent.ExecutionException;
+
import static junit.framework.Assert.assertEquals;
import static junit.framework.Assert.assertTrue;
-public class DistributedDataStoreIntegrationTest extends AbstractActorTest {
+public class DistributedDataStoreIntegrationTest{
+
+ private static ActorSystem system;
+
+ @Before
+ public void setUp() {
+ System.setProperty("shard.persistent", "false");
+ system = ActorSystem.create("test");
+ }
+
+ @After
+ public void tearDown() {
+ JavaTestKit.shutdownActorSystem(system);
+ system = null;
+ }
+
+ protected ActorSystem getSystem() {
+ return system;
+ }
@Test
public void integrationTest() throws Exception {
}
+
+ @Test
+ public void integrationTestWithMultiShardConfiguration()
+ throws ExecutionException, InterruptedException {
+ Configuration configuration = new ConfigurationImpl("module-shards.conf", "modules.conf");
+
+ ShardStrategyFactory.setConfiguration(configuration);
+ DistributedDataStore distributedDataStore =
+ new DistributedDataStore(getSystem(), "config", new MockClusterWrapper(), configuration);
+
+
+ distributedDataStore.onGlobalContextUpdated(SchemaContextHelper.full());
+
+ DOMStoreReadWriteTransaction transaction =
+ distributedDataStore.newReadWriteTransaction();
+
+ transaction.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
+ transaction.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer());
+
+ DOMStoreThreePhaseCommitCohort ready = transaction.ready();
+
+ ListenableFuture<Boolean> canCommit = ready.canCommit();
+
+ assertTrue(canCommit.get());
+
+ ListenableFuture<Void> preCommit = ready.preCommit();
+
+ preCommit.get();
+
+ ListenableFuture<Void> commit = ready.commit();
+
+ commit.get();
+
+ }
+
}
String moduleName) {
return Collections.EMPTY_LIST;
}
+
+ @Override public List<String> getMembersFromShardName(String shardName) {
+ List<String> shardNames = new ArrayList<>();
+ shardNames.add("member-1");
+ return shardNames;
+ }
}
public static final QName BASE_QNAME = QName.create("urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:test:people", "2014-03-13",
"people");
+ public static final InstanceIdentifier BASE_PATH = InstanceIdentifier.of(BASE_QNAME);
public static final QName PEOPLE_QNAME = QName.create(BASE_QNAME, "people");
public static final QName PERSON_QNAME = QName.create(PEOPLE_QNAME, "person");
public static final QName PERSON_NAME_QNAME = QName.create(PERSON_QNAME, "name");
public static final QName PERSON_AGE_QNAME = QName.create(PERSON_QNAME, "age");
+
public static NormalizedNode create(){
// Create a list builder
public static NormalizedNode emptyContainer(){
return ImmutableContainerNodeBuilder.create()
- .withNodeIdentifier(new InstanceIdentifier.NodeIdentifier(BASE_QNAME))
+ .withNodeIdentifier(
+ new InstanceIdentifier.NodeIdentifier(BASE_QNAME))
.build();
}