Modify the FindPrimary implementation so that it works correctly with a configuration 87/8887/6
authorMoiz Raja <moraja@cisco.com>
Thu, 10 Jul 2014 03:18:14 +0000 (20:18 -0700)
committerMoiz Raja <moraja@cisco.com>
Mon, 28 Jul 2014 20:56:34 +0000 (13:56 -0700)
Change-Id: Ie41b688adf54de06332bbe5add7aba8107eb4264
Signed-off-by: Moiz Raja <moraja@cisco.com>
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Configuration.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ConfigurationImpl.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MockConfiguration.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/md/cluster/datastore/model/PeopleModel.java

index 1c3a82a7904a962aebfd101c2ea96701a84830d8..a67f58c760bbdaeda30f2f6b9c32512537ee52c0 100644 (file)
@@ -19,4 +19,5 @@ public interface Configuration {
     Optional<String> getModuleNameFromNameSpace(String nameSpace);
     Map<String, ShardStrategy> getModuleNameToShardStrategyMap();
     List<String> getShardNamesFromModuleName(String moduleName);
+    List<String> getMembersFromShardName(String shardName);
 }
index 1b8bf3e160e75c9c47cfd20dfe8e0dc9cdbb8388..9a9ac2c725dafee3426c08235d0a5a45eabc94ac 100644 (file)
@@ -85,6 +85,18 @@ public class ConfigurationImpl implements Configuration {
         return Collections.EMPTY_LIST;
     }
 
+    @Override public List<String> getMembersFromShardName(String shardName) {
+        List<String> shards = new ArrayList();
+        for(ModuleShard ms : moduleShards){
+            for(Shard s : ms.getShards()) {
+                if(s.getName().equals(shardName)){
+                    return s.getReplicas();
+                }
+            }
+        }
+        return Collections.EMPTY_LIST;
+    }
+
 
 
     private void readModules(Config modulesConfig) {
index 8fd13bd45a2b11cdd25dd2e6d40736bd549b0c1d..3e0c97f6afb8e33d1c9b580f4a96111d2a3b6884 100644 (file)
@@ -67,22 +67,28 @@ public class ShardManager extends AbstractUntypedActor {
     private final Map<String, ActorPath> localShards = new HashMap<>();
 
 
+    private final String type;
+
     private final ClusterWrapper cluster;
 
+    private final Configuration configuration;
+
     /**
      * @param type defines the kind of data that goes into shards created by this shard manager. Examples of type would be
      *             configuration or operational
      */
     private ShardManager(String type, ClusterWrapper cluster, Configuration configuration) {
+        this.type = type;
         this.cluster = cluster;
+        this.configuration = configuration;
         String memberName = cluster.getCurrentMemberName();
         List<String> memberShardNames =
             configuration.getMemberShardNames(memberName);
 
         for(String shardName : memberShardNames){
+            String shardActorName = getShardActorName(memberName, shardName);
             ActorRef actor = getContext()
-                .actorOf(Shard.props(memberName + "-shard-" + shardName + "-" + type),
-                    memberName + "-shard-" + shardName + "-" + type);
+                .actorOf(Shard.props(shardActorName), shardActorName);
             ActorPath path = actor.path();
             localShards.put(shardName, path);
         }
@@ -106,16 +112,37 @@ public class ShardManager extends AbstractUntypedActor {
             FindPrimary msg = ((FindPrimary) message);
             String shardName = msg.getShardName();
 
-            if (Shard.DEFAULT_NAME.equals(shardName)) {
-                ActorPath defaultShardPath = localShards.get(shardName);
-                if(defaultShardPath == null){
-                    throw new IllegalStateException("local default shard not found");
+            List<String> members =
+                configuration.getMembersFromShardName(shardName);
+
+            for(String memberName : members) {
+                if (memberName.equals(cluster.getCurrentMemberName())) {
+                    // This is a local shard
+                    ActorPath shardPath = localShards.get(shardName);
+                    // FIXME: This check may be redundant
+                    if (shardPath == null) {
+                        getSender()
+                            .tell(new PrimaryNotFound(shardName), getSelf());
+                        return;
+                    }
+                    getSender().tell(new PrimaryFound(shardPath.toString()),
+                        getSelf());
+                    return;
+                } else {
+                    Address address = memberNameToAddress.get(shardName);
+                    if(address != null){
+                        String path =
+                            address.toString() + "/user/" + getShardActorName(
+                                memberName, shardName);
+                        getSender().tell(new PrimaryFound(path), getSelf());
+                    }
+
+
                 }
-                getSender().tell(new PrimaryFound(defaultShardPath.toString()),
-                    getSelf());
-            } else {
-                getSender().tell(new PrimaryNotFound(shardName), getSelf());
             }
+
+            getSender().tell(new PrimaryNotFound(shardName), getSelf());
+
         } else if (message instanceof UpdateSchemaContext) {
             for(ActorPath path : localShards.values()){
                 getContext().system().actorSelection(path)
@@ -125,5 +152,9 @@ public class ShardManager extends AbstractUntypedActor {
         }
     }
 
+    private String getShardActorName(String memberName, String shardName){
+        return memberName + "-shard-" + shardName + "-" + this.type;
+    }
+
 
 }
index 6a25328a4d2559c2ea654284bc48e17191e367e7..116e5e75b50d261e3d9673378ee643cfe2f2e31f 100644 (file)
@@ -1,21 +1,48 @@
 package org.opendaylight.controller.cluster.datastore;
 
+import akka.actor.ActorSystem;
+import akka.testkit.JavaTestKit;
 import com.google.common.base.Optional;
 import com.google.common.util.concurrent.ListenableFuture;
+import org.junit.After;
+import org.junit.Before;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
 import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper;
 import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
+import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
+import org.opendaylight.controller.md.cluster.datastore.model.PeopleModel;
+import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
 
+import java.util.concurrent.ExecutionException;
+
 import static junit.framework.Assert.assertEquals;
 import static junit.framework.Assert.assertTrue;
 
-public class DistributedDataStoreIntegrationTest extends AbstractActorTest {
+public class DistributedDataStoreIntegrationTest{
+
+    private static ActorSystem system;
+
+    @Before
+    public void setUp() {
+        System.setProperty("shard.persistent", "false");
+        system = ActorSystem.create("test");
+    }
+
+    @After
+    public void tearDown() {
+        JavaTestKit.shutdownActorSystem(system);
+        system = null;
+    }
+
+    protected ActorSystem getSystem() {
+        return system;
+    }
 
     @Test
     public void integrationTest() throws Exception {
@@ -55,4 +82,39 @@ public class DistributedDataStoreIntegrationTest extends AbstractActorTest {
 
     }
 
+
+    @Test
+    public void integrationTestWithMultiShardConfiguration()
+        throws ExecutionException, InterruptedException {
+        Configuration configuration = new ConfigurationImpl("module-shards.conf", "modules.conf");
+
+        ShardStrategyFactory.setConfiguration(configuration);
+        DistributedDataStore distributedDataStore =
+            new DistributedDataStore(getSystem(), "config", new MockClusterWrapper(), configuration);
+
+
+        distributedDataStore.onGlobalContextUpdated(SchemaContextHelper.full());
+
+        DOMStoreReadWriteTransaction transaction =
+            distributedDataStore.newReadWriteTransaction();
+
+        transaction.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
+        transaction.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer());
+
+        DOMStoreThreePhaseCommitCohort ready = transaction.ready();
+
+        ListenableFuture<Boolean> canCommit = ready.canCommit();
+
+        assertTrue(canCommit.get());
+
+        ListenableFuture<Void> preCommit = ready.preCommit();
+
+        preCommit.get();
+
+        ListenableFuture<Void> commit = ready.commit();
+
+        commit.get();
+
+    }
+
 }
index 470726e56abdc723500b486050cf498f69b0a694..2597dda04c0e475a71bc65ba6fc47a05965d5a87 100644 (file)
@@ -38,4 +38,10 @@ public class MockConfiguration implements Configuration{
         String moduleName) {
         return Collections.EMPTY_LIST;
     }
+
+    @Override public List<String> getMembersFromShardName(String shardName) {
+        List<String> shardNames = new ArrayList<>();
+        shardNames.add("member-1");
+        return shardNames;
+    }
 }
index 9ccb054623afb2781ec24ac6b865fa73f6a003d7..14b02a222a5d8b9d785612c57baf2a89e852db49 100644 (file)
@@ -22,12 +22,14 @@ public class PeopleModel {
     public static final QName BASE_QNAME = QName.create("urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:test:people", "2014-03-13",
         "people");
 
+    public static final InstanceIdentifier BASE_PATH = InstanceIdentifier.of(BASE_QNAME);
     public static final QName PEOPLE_QNAME = QName.create(BASE_QNAME, "people");
     public static final QName PERSON_QNAME = QName.create(PEOPLE_QNAME, "person");
     public static final QName PERSON_NAME_QNAME = QName.create(PERSON_QNAME, "name");
     public static final QName PERSON_AGE_QNAME = QName.create(PERSON_QNAME, "age");
 
 
+
     public static NormalizedNode create(){
 
         // Create a list builder
@@ -62,7 +64,8 @@ public class PeopleModel {
 
     public static NormalizedNode emptyContainer(){
         return ImmutableContainerNodeBuilder.create()
-            .withNodeIdentifier(new InstanceIdentifier.NodeIdentifier(BASE_QNAME))
+            .withNodeIdentifier(
+                new InstanceIdentifier.NodeIdentifier(BASE_QNAME))
             .build();
     }