Serialization/Deserialization and a host of other fixes 55/9455/2
authorMoiz Raja <moraja@cisco.com>
Thu, 10 Jul 2014 10:39:22 +0000 (03:39 -0700)
committerMoiz Raja <moraja@cisco.com>
Tue, 29 Jul 2014 22:01:06 +0000 (15:01 -0700)
- Hande Cluster MemberUp and MemberRemoved events in ShardManager

- Cohort messages and close listener messages switched
to use protobuff

- Distributed Datastore switch messages to use protobuff
CreateTransaction
CreateTransactionReply
CreateTransactionChain
CreateTransactionChainReply distributed datastore messages
switched to protobuff

- ShardManager messages switch to protobuff

- DataChanged and other messages switch to protobuf in
distributed datastore

- Fixed few things found during testing
1. ShardStrategy - setting of configuration
2. NodeToNormalizedNodeBuilder - leaf node/leafsetentry node checks
3. DataChanged event - passing of scope instanceidentifier used during deserialization

- Introducing JMX MBeans for distributed datastore

-Fixed issues which were preventing remote Shards from talking to each other

- Fixed a number of issues related to deserialization

- Add distributed datastore to the build

- Switch from InstanceIdentifier to YangInstanceIdentifier

Change-Id: I0d15dc482cb2b0fb2170b1344bad9fa3b421e8e0
Signed-off-by: Moiz Raja <moraja@cisco.com>
126 files changed:
opendaylight/md-sal/pom.xml
opendaylight/md-sal/sal-distributed-datastore/pom.xml
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractUntypedActor.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ConfigurationImpl.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListener.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerProxy.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistration.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistrationProxy.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreFactory.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransactionChain.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohort.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxy.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/jmx/mbeans/AbstractBaseMBean.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/jmx/mbeans/shard/ShardMBeanFactory.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/jmx/mbeans/shard/ShardStats.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/jmx/mbeans/shard/ShardStatsMBean.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/AbortTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/AbortTransactionReply.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CanCommitTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CanCommitTransactionReply.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CloseDataChangeListenerRegistration.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CloseDataChangeListenerRegistrationReply.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CloseTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CloseTransactionChain.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CloseTransactionChainReply.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CloseTransactionReply.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CommitTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CommitTransactionReply.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CreateTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CreateTransactionChain.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CreateTransactionChainReply.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CreateTransactionReply.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/DataChanged.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/DataChangedReply.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/DeleteData.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/DeleteDataReply.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/FindPrimary.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/MergeData.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/MergeDataReply.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ModifyData.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/PreCommitTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/PreCommitTransactionReply.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/PrimaryFound.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/PrimaryNotFound.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ReadData.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ReadDataReply.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ReadyTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ReadyTransactionReply.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/RegisterChangeListener.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/WriteData.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/WriteDataReply.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/modification/AbstractModification.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/modification/DeleteModification.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/modification/MergeModification.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/modification/WriteModification.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardstrategy/DefaultShardStrategy.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardstrategy/ModuleShardStrategy.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardstrategy/ShardStrategy.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardstrategy/ShardStrategyFactory.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/InstanceIdentifierUtils.java
opendaylight/md-sal/sal-distributed-datastore/src/main/resources/module-shards.conf [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/resources/modules.conf [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/BasicIntegrationTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ConfigurationImplTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerProxyTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistrationProxyTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistrationTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionChainTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxyTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/jmx/mbeans/shard/ShardStatsTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/modification/AbstractModificationTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/shardstrategy/ShardStrategyFactoryTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MockActorContext.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MockClusterWrapper.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MockConfiguration.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/TestUtils.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/md/cluster/datastore/model/CarsModel.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/md/cluster/datastore/model/CompositeModel.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/md/cluster/datastore/model/PeopleModel.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/md/cluster/datastore/model/SampleModelsTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/md/cluster/datastore/model/TestModel.java
opendaylight/md-sal/sal-distributed-datastore/src/test/resources/application.conf
opendaylight/md-sal/sal-distributed-datastore/src/test/resources/odl-datastore-augmentation.yang [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/test/resources/odl-datastore-test-notification.yang [new file with mode: 0644]
opendaylight/md-sal/sal-protocolbuffer-encoding/run.sh
opendaylight/md-sal/sal-protocolbuffer-encoding/src/main/java/org/opendaylight/controller/cluster/datastore/node/NodeToNormalizedNodeBuilder.java
opendaylight/md-sal/sal-protocolbuffer-encoding/src/main/java/org/opendaylight/controller/cluster/datastore/node/NodeValueCodec.java [new file with mode: 0644]
opendaylight/md-sal/sal-protocolbuffer-encoding/src/main/java/org/opendaylight/controller/cluster/datastore/node/NormalizedNodeToNodeCodec.java
opendaylight/md-sal/sal-protocolbuffer-encoding/src/main/java/org/opendaylight/controller/cluster/datastore/node/NormalizedNodeToProtocolBufferNode.java
opendaylight/md-sal/sal-protocolbuffer-encoding/src/main/java/org/opendaylight/controller/cluster/datastore/node/utils/NodeIdentifierFactory.java
opendaylight/md-sal/sal-protocolbuffer-encoding/src/main/java/org/opendaylight/controller/cluster/datastore/node/utils/NodeIdentifierWithPredicatesGenerator.java
opendaylight/md-sal/sal-protocolbuffer-encoding/src/main/java/org/opendaylight/controller/cluster/datastore/node/utils/NodeIdentifierWithValueGenerator.java
opendaylight/md-sal/sal-protocolbuffer-encoding/src/main/java/org/opendaylight/controller/cluster/datastore/util/InstanceIdentifierUtils.java [new file with mode: 0644]
opendaylight/md-sal/sal-protocolbuffer-encoding/src/main/java/org/opendaylight/controller/protobuff/messages/common/NormalizedNodeMessages.java
opendaylight/md-sal/sal-protocolbuffer-encoding/src/main/java/org/opendaylight/controller/protobuff/messages/datachange/notification/DataChangeListenerMessages.java
opendaylight/md-sal/sal-protocolbuffer-encoding/src/main/java/org/opendaylight/controller/protobuff/messages/persistent/PersistentMessages.java
opendaylight/md-sal/sal-protocolbuffer-encoding/src/main/java/org/opendaylight/controller/protobuff/messages/registration/ListenerRegistrationMessages.java
opendaylight/md-sal/sal-protocolbuffer-encoding/src/main/java/org/opendaylight/controller/protobuff/messages/shard/ShardManagerMessages.java
opendaylight/md-sal/sal-protocolbuffer-encoding/src/main/java/org/opendaylight/controller/protobuff/messages/transaction/ShardTransactionMessages.java
opendaylight/md-sal/sal-protocolbuffer-encoding/src/main/resources/Common.proto
opendaylight/md-sal/sal-protocolbuffer-encoding/src/main/resources/DataChangeListener.proto
opendaylight/md-sal/sal-protocolbuffer-encoding/src/main/resources/ListenerRegistration.proto
opendaylight/md-sal/sal-protocolbuffer-encoding/src/main/resources/Persistent.proto
opendaylight/md-sal/sal-protocolbuffer-encoding/src/main/resources/ShardManager.proto
opendaylight/md-sal/sal-protocolbuffer-encoding/src/main/resources/ShardTransaction.proto
opendaylight/md-sal/sal-protocolbuffer-encoding/src/test/java/org/opendaylight/controller/cluster/datastore/node/NormalizedNodeToNodeCodecTest.java
opendaylight/md-sal/sal-protocolbuffer-encoding/src/test/java/org/opendaylight/controller/cluster/datastore/node/NormalizedNodeToProtocolBufferNodeTest.java [deleted file]
opendaylight/md-sal/sal-protocolbuffer-encoding/src/test/java/org/opendaylight/controller/cluster/datastore/node/utils/NodeIdentifierFactoryTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-protocolbuffer-encoding/src/test/java/org/opendaylight/controller/cluster/datastore/util/InstanceIdentifierUtilsTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-protocolbuffer-encoding/src/test/java/org/opendaylight/controller/cluster/datastore/util/TestModel.java
opendaylight/md-sal/sal-protocolbuffer-encoding/src/test/resources/odl-datastore-augmentation.yang
opendaylight/md-sal/sal-protocolbuffer-encoding/src/test/resources/odl-datastore-test.yang
opendaylight/netconf/netconf-cli/src/main/java/org/opendaylight/controller/netconf/cli/reader/impl/LeafListEntryReader.java

index 2cddbbf..bfa7e26 100644 (file)
@@ -64,6 +64,9 @@
     <!--sal-protocolbuffer-encoding-->
     <module>sal-protocolbuffer-encoding</module>
 
+    <!--sal-distributed-datastore-->
+    <module>sal-distributed-datastore</module>
+
     <!-- Yang Test Models for MD-SAL -->
     <module>sal-test-model</module>
   </modules>
index 3dd44bb..16b97b7 100644 (file)
             <Private-Package></Private-Package>
             <Import-Package>!*snappy;!org.jboss.*;*</Import-Package>
             <Embed-Dependency>
+                sal-protocolbuffer-encoding;
                 !sal*;
                 !*config-api*;
                 !*testkit*;
index 0f10258..ce05160 100644 (file)
@@ -27,9 +27,9 @@ public abstract class AbstractUntypedActor extends UntypedActor {
     }
 
     @Override public void onReceive(Object message) throws Exception {
-        LOG.debug("Received message {}", message);
+        LOG.debug("Received message {}", message.getClass().getSimpleName());
         handleReceive(message);
-        LOG.debug("Done handling message {}", message);
+        LOG.debug("Done handling message {}", message.getClass().getSimpleName());
     }
 
     protected abstract void handleReceive(Object message) throws Exception;
index 9a9ac2c..3459002 100644 (file)
@@ -15,7 +15,10 @@ import com.typesafe.config.ConfigObject;
 import org.opendaylight.controller.cluster.datastore.shardstrategy.DefaultShardStrategy;
 import org.opendaylight.controller.cluster.datastore.shardstrategy.ModuleShardStrategy;
 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategy;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import java.io.File;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -28,11 +31,34 @@ public class ConfigurationImpl implements Configuration {
 
     private final List<Module> modules = new ArrayList<>();
 
+    private static final Logger
+        LOG = LoggerFactory.getLogger(DistributedDataStore.class);
+
 
     public ConfigurationImpl(String moduleShardsConfigPath,
+
         String modulesConfigPath){
-        Config moduleShardsConfig = ConfigFactory.load(moduleShardsConfigPath);
-        Config modulesConfig = ConfigFactory.load(modulesConfigPath);
+
+        File moduleShardsFile = new File("./configuration/initial/" + moduleShardsConfigPath);
+        File modulesFile = new File("./configuration/initial/" + modulesConfigPath);
+
+        Config moduleShardsConfig = null;
+        if(moduleShardsFile.exists()) {
+            LOG.info("module shards config file exists - reading config from it");
+            moduleShardsConfig = ConfigFactory.parseFile(moduleShardsFile);
+        } else {
+            LOG.warn("module shards configuration read from resource");
+            moduleShardsConfig = ConfigFactory.load(moduleShardsConfigPath);
+        }
+
+        Config modulesConfig = null;
+        if(modulesFile.exists()) {
+            LOG.info("modules config file exists - reading config from it");
+            modulesConfig = ConfigFactory.parseFile(modulesFile);
+        } else {
+            LOG.warn("modules configuration read from resource");
+            modulesConfig = ConfigFactory.load(modulesConfigPath);
+        }
 
         readModuleShards(moduleShardsConfig);
 
index fd4f9f7..3af6f56 100644 (file)
@@ -14,36 +14,41 @@ import org.opendaylight.controller.cluster.datastore.messages.DataChanged;
 import org.opendaylight.controller.cluster.datastore.messages.DataChangedReply;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
-import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 
 public class DataChangeListener extends AbstractUntypedActor {
-    private final AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>> listener;
+    private final AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener;
+    private final SchemaContext schemaContext;
+    private final YangInstanceIdentifier pathId;
 
-    public DataChangeListener(
-        AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>> listener) {
+    public DataChangeListener(SchemaContext schemaContext,
+                              AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener, YangInstanceIdentifier pathId) {
         this.listener = listener;
+        this.schemaContext = schemaContext;
+        this.pathId  = pathId;
     }
 
     @Override public void handleReceive(Object message) throws Exception {
-        if(message instanceof DataChanged){
-            DataChanged reply = (DataChanged) message;
-            AsyncDataChangeEvent<InstanceIdentifier, NormalizedNode<?, ?>>
+        if(message.getClass().equals(DataChanged.SERIALIZABLE_CLASS)){
+            DataChanged reply = DataChanged.fromSerialize(schemaContext,message, pathId);
+            AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>>
                 change = reply.getChange();
             this.listener.onDataChanged(change);
 
             if(getSender() != null){
-                getSender().tell(new DataChangedReply(), getSelf());
+                getSender().tell(new DataChangedReply().toSerializable(), getSelf());
             }
 
         }
     }
 
-    public static Props props(final AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>> listener) {
+    public static Props props(final SchemaContext schemaContext, final AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener, final YangInstanceIdentifier pathId) {
         return Props.create(new Creator<DataChangeListener>() {
             @Override
             public DataChangeListener create() throws Exception {
-                return new DataChangeListener(listener);
+                return new DataChangeListener(schemaContext,listener,pathId );
             }
 
         });
index 8423b98..cd9c330 100644 (file)
@@ -12,21 +12,24 @@ import akka.actor.ActorSelection;
 import org.opendaylight.controller.cluster.datastore.messages.DataChanged;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
-import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 
 /**
  * DataChangeListenerProxy represents a single remote DataChangeListener
  */
-public class DataChangeListenerProxy implements AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>>{
+public class DataChangeListenerProxy implements AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>{
     private final ActorSelection dataChangeListenerActor;
+    private final SchemaContext schemaContext;
 
-    public DataChangeListenerProxy(ActorSelection dataChangeListenerActor) {
+    public DataChangeListenerProxy(SchemaContext schemaContext,ActorSelection dataChangeListenerActor) {
         this.dataChangeListenerActor = dataChangeListenerActor;
+        this.schemaContext = schemaContext;
     }
 
     @Override public void onDataChanged(
-        AsyncDataChangeEvent<InstanceIdentifier, NormalizedNode<?, ?>> change) {
-        dataChangeListenerActor.tell(new DataChanged(change), null);
+        AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change) {
+        dataChangeListenerActor.tell(new DataChanged(schemaContext,change).toSerializable(), null);
     }
 }
index dca9735..9e50b5b 100644 (file)
@@ -14,29 +14,29 @@ import akka.japi.Creator;
 import org.opendaylight.controller.cluster.datastore.messages.CloseDataChangeListenerRegistration;
 import org.opendaylight.controller.cluster.datastore.messages.CloseDataChangeListenerRegistrationReply;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
-import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 
 public class DataChangeListenerRegistration extends AbstractUntypedActor {
 
-    private final org.opendaylight.yangtools.concepts.ListenerRegistration<AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>>>
+    private final org.opendaylight.yangtools.concepts.ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>
         registration;
 
     public DataChangeListenerRegistration(
-        org.opendaylight.yangtools.concepts.ListenerRegistration<AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>>> registration) {
+        org.opendaylight.yangtools.concepts.ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>> registration) {
         this.registration = registration;
     }
 
     @Override
     public void handleReceive(Object message) throws Exception {
-        if (message instanceof CloseDataChangeListenerRegistration) {
+        if (message.getClass().equals(CloseDataChangeListenerRegistration.SERIALIZABLE_CLASS)) {
             closeListenerRegistration(
-                (CloseDataChangeListenerRegistration) message);
+                new CloseDataChangeListenerRegistration());
         }
     }
 
     public static Props props(
-        final org.opendaylight.yangtools.concepts.ListenerRegistration<AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>>> registration) {
+        final org.opendaylight.yangtools.concepts.ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>> registration) {
         return Props.create(new Creator<DataChangeListenerRegistration>() {
 
             @Override
@@ -50,7 +50,7 @@ public class DataChangeListenerRegistration extends AbstractUntypedActor {
         CloseDataChangeListenerRegistration message) {
         registration.close();
         getSender()
-            .tell(new CloseDataChangeListenerRegistrationReply(), getSelf());
+            .tell(new CloseDataChangeListenerRegistrationReply().toSerializable(), getSelf());
         getSelf().tell(PoisonPill.getInstance(), getSelf());
     }
 }
index 83737cf..e3cdbb4 100644 (file)
@@ -14,7 +14,7 @@ import akka.actor.PoisonPill;
 import org.opendaylight.controller.cluster.datastore.messages.CloseDataChangeListenerRegistration;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
-import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 
 /**
@@ -29,7 +29,7 @@ public class DataChangeListenerRegistrationProxy implements ListenerRegistration
     private final AsyncDataChangeListener listener;
     private final ActorRef dataChangeListenerActor;
 
-    public <L extends AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>>>
+    public <L extends AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>
     DataChangeListenerRegistrationProxy(
         ActorSelection listenerRegistrationActor,
         L listener, ActorRef dataChangeListenerActor) {
@@ -45,7 +45,7 @@ public class DataChangeListenerRegistrationProxy implements ListenerRegistration
 
     @Override
     public void close() {
-        listenerRegistrationActor.tell(new CloseDataChangeListenerRegistration(), null);
+        listenerRegistrationActor.tell(new CloseDataChangeListenerRegistration().toSerializable(), null);
         dataChangeListenerActor.tell(PoisonPill.getInstance(), null);
     }
 }
index 0c4fae0..2ef8e5f 100644 (file)
@@ -23,7 +23,7 @@ import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransactio
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
-import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import org.opendaylight.yangtools.yang.model.api.SchemaContextListener;
@@ -61,7 +61,9 @@ public class DistributedDataStore implements DOMStore, SchemaContextListener, Au
         Executors.newFixedThreadPool(10);
 
     public DistributedDataStore(ActorSystem actorSystem, String type, ClusterWrapper cluster, Configuration configuration) {
-        this(new ActorContext(actorSystem, actorSystem.actorOf(ShardManager.props(type, cluster, configuration), "shardmanager-" + type), configuration), type);
+        this(new ActorContext(actorSystem, actorSystem
+            .actorOf(ShardManager.props(type, cluster, configuration),
+                "shardmanager-" + type), cluster, configuration), type);
     }
 
     public DistributedDataStore(ActorContext actorContext, String type) {
@@ -71,18 +73,18 @@ public class DistributedDataStore implements DOMStore, SchemaContextListener, Au
 
 
     @Override
-    public <L extends AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>>> ListenerRegistration<L> registerChangeListener(
-        InstanceIdentifier path, L listener,
+    public <L extends AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>> ListenerRegistration<L> registerChangeListener(
+        YangInstanceIdentifier path, L listener,
         AsyncDataBroker.DataChangeScope scope) {
 
         ActorRef dataChangeListenerActor = actorContext.getActorSystem().actorOf(
-            DataChangeListener.props(listener));
+            DataChangeListener.props(schemaContext,listener,path ));
 
         String shardName = ShardStrategyFactory.getStrategy(path).findShard(path);
 
         Object result = actorContext.executeShardOperation(shardName,
             new RegisterChangeListener(path, dataChangeListenerActor.path(),
-                AsyncDataBroker.DataChangeScope.BASE).toSerializable(),
+                scope).toSerializable(),
             ActorContext.ASK_DURATION
         );
 
index 907b856..692d1b4 100644 (file)
@@ -9,13 +9,16 @@
 package org.opendaylight.controller.cluster.datastore;
 
 import akka.actor.ActorSystem;
+import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
 import org.opendaylight.controller.sal.core.api.model.SchemaService;
 
 public class DistributedDataStoreFactory {
     public static DistributedDataStore createInstance(String name, SchemaService schemaService){
         ActorSystem actorSystem = ActorSystemFactory.getInstance();
+        Configuration config = new ConfigurationImpl("module-shards.conf", "modules.conf");
         final DistributedDataStore dataStore =
-            new DistributedDataStore(actorSystem, name, new ClusterWrapperImpl(actorSystem), new ConfigurationImpl("module-shards.conf", "modules.conf"));
+            new DistributedDataStore(actorSystem, name, new ClusterWrapperImpl(actorSystem),config );
+       ShardStrategyFactory.setConfiguration(config);
         schemaService
             .registerSchemaServiceListener(dataStore);
         return dataStore;
index 9e40a69..d6ad553 100644 (file)
@@ -15,10 +15,13 @@ import akka.event.Logging;
 import akka.event.LoggingAdapter;
 import akka.japi.Creator;
 import akka.persistence.Persistent;
+import akka.persistence.RecoveryCompleted;
 import akka.persistence.UntypedProcessor;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
+import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardMBeanFactory;
+import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionChain;
@@ -36,7 +39,7 @@ import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
-import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 
@@ -63,7 +66,7 @@ public class Shard extends UntypedProcessor {
     private final Map<Object, DOMStoreThreePhaseCommitCohort>
         modificationToCohort = new HashMap<>();
 
-    private final LoggingAdapter log =
+    private final LoggingAdapter LOG =
         Logging.getLogger(getContext().system(), this);
 
     // By default persistent will be true and can be turned off using the system
@@ -72,14 +75,20 @@ public class Shard extends UntypedProcessor {
 
     private SchemaContext schemaContext;
 
+    private final ShardStats shardMBean;
+
     private Shard(String name) {
 
         String setting = System.getProperty("shard.persistent");
+
         this.persistent = !"false".equals(setting);
 
-        log.info("Creating shard : {} persistent : {}", name , persistent);
+        LOG.info("Creating shard : {} persistent : {}", name, persistent);
 
         store = new InMemoryDOMDataStore(name, storeExecutor);
+
+        shardMBean = ShardMBeanFactory.getShardStatsMBean(name);
+
     }
 
     public static Props props(final String name) {
@@ -96,14 +105,14 @@ public class Shard extends UntypedProcessor {
 
     @Override
     public void onReceive(Object message) throws Exception {
-        log.debug("Received message {}", message);
+        LOG.debug("Received message " + message.getClass().toString());
 
         if(!recoveryFinished()){
             // FIXME : Properly handle recovery
             return;
         }
 
-        if (message instanceof CreateTransactionChain) {
+        if (message.getClass().equals(CreateTransactionChain.SERIALIZABLE_CLASS)) {
             createTransactionChain();
         } else if (message.getClass().equals(RegisterChangeListener.SERIALIZABLE_CLASS)) {
             registerChangeListener(RegisterChangeListener.fromSerializable(getContext().system(), message));
@@ -113,12 +122,15 @@ public class Shard extends UntypedProcessor {
             handleForwardedCommit((ForwardedCommitTransaction) message);
         } else if (message instanceof Persistent) {
             commit(((Persistent)message).payload());
-        } else if (message instanceof CreateTransaction) {
-            createTransaction((CreateTransaction) message);
+        } else if (message.getClass().equals(CreateTransaction.SERIALIZABLE_CLASS)) {
+            createTransaction(CreateTransaction.fromSerializable(message));
         } else if(message instanceof NonPersistent){
             commit(((NonPersistent)message).payload());
-        } else {
-          throw new Exception("Not recognized message in Shard::OnReceive"+message);
+        }else if (message instanceof RecoveryCompleted) {
+            //FIXME: PROPERLY HANDLE RECOVERY COMPLETED
+
+        }else {
+          throw new Exception("Not recognized message found message=" + message);
         }
     }
 
@@ -137,11 +149,12 @@ public class Shard extends UntypedProcessor {
         DOMStoreThreePhaseCommitCohort cohort =
             modificationToCohort.remove(serialized);
         if (cohort == null) {
-            log.error(
+            LOG.error(
                 "Could not find cohort for modification : " + modification);
             return;
         }
         final ListenableFuture<Void> future = cohort.commit();
+        shardMBean.incrementCommittedTransactionCount();
         final ActorRef sender = getSender();
         final ActorRef self = getSelf();
         future.addListener(new Runnable() {
@@ -149,10 +162,10 @@ public class Shard extends UntypedProcessor {
             public void run() {
                 try {
                     future.get();
-                    sender.tell(new CommitTransactionReply(), self);
+                    sender.tell(new CommitTransactionReply().toSerializable(), self);
                 } catch (InterruptedException | ExecutionException e) {
                     // FIXME : Handle this properly
-                    log.error(e, "An exception happened when committing");
+                    LOG.error(e, "An exception happened when committing");
                 }
             }
         }, getContext().dispatcher());
@@ -180,19 +193,25 @@ public class Shard extends UntypedProcessor {
     private void registerChangeListener(
         RegisterChangeListener registerChangeListener) {
 
+        LOG.debug("registerDataChangeListener for " + registerChangeListener.getPath());
+
+
         ActorSelection dataChangeListenerPath = getContext()
             .system().actorSelection(registerChangeListener.getDataChangeListenerPath());
 
-        AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>>
-            listener = new DataChangeListenerProxy(dataChangeListenerPath);
+        AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>
+            listener = new DataChangeListenerProxy(schemaContext,dataChangeListenerPath);
 
-        org.opendaylight.yangtools.concepts.ListenerRegistration<AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>>>
+        org.opendaylight.yangtools.concepts.ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>
             registration =
             store.registerChangeListener(registerChangeListener.getPath(),
                 listener, registerChangeListener.getScope());
         ActorRef listenerRegistration =
             getContext().actorOf(
                 DataChangeListenerRegistration.props(registration));
+
+        LOG.debug("registerDataChangeListener sending reply, listenerRegistrationPath = " + listenerRegistration.path().toString());
+
         getSender()
             .tell(new RegisterChangeListenerReply(listenerRegistration.path()).toSerializable(),
                 getSelf());
@@ -203,7 +222,7 @@ public class Shard extends UntypedProcessor {
         ActorRef transactionChain =
             getContext().actorOf(ShardTransactionChain.props(chain, schemaContext));
         getSender()
-            .tell(new CreateTransactionChainReply(transactionChain.path()),
+            .tell(new CreateTransactionChainReply(transactionChain.path()).toSerializable(),
                 getSelf());
     }
 }
index 3e0c97f..0363b3c 100644 (file)
@@ -11,14 +11,18 @@ package org.opendaylight.controller.cluster.datastore;
 import akka.actor.ActorPath;
 import akka.actor.ActorRef;
 import akka.actor.Address;
+import akka.actor.OneForOneStrategy;
 import akka.actor.Props;
-import akka.event.Logging;
-import akka.event.LoggingAdapter;
+import akka.actor.SupervisorStrategy;
+import akka.cluster.ClusterEvent;
 import akka.japi.Creator;
+import akka.japi.Function;
+import com.google.common.base.Preconditions;
 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
 import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
 import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound;
 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
+import scala.concurrent.duration.Duration;
 
 import java.util.HashMap;
 import java.util.List;
@@ -49,21 +53,9 @@ import java.util.Map;
  */
 public class ShardManager extends AbstractUntypedActor {
 
-    // Stores a mapping between a shard name and the address of the current primary
-    private final Map<String, Address> shardNameToPrimaryAddress =
-        new HashMap<>();
-
     // Stores a mapping between a member name and the address of the member
     private final Map<String, Address> memberNameToAddress = new HashMap<>();
 
-    // Stores a mapping between the shard name and all the members on which a replica of that shard are available
-    private final Map<String, List<String>> shardNameToMembers =
-        new HashMap<>();
-
-    private final LoggingAdapter log =
-        Logging.getLogger(getContext().system(), this);
-
-
     private final Map<String, ActorPath> localShards = new HashMap<>();
 
 
@@ -78,20 +70,17 @@ public class ShardManager extends AbstractUntypedActor {
      *             configuration or operational
      */
     private ShardManager(String type, ClusterWrapper cluster, Configuration configuration) {
-        this.type = type;
-        this.cluster = cluster;
-        this.configuration = configuration;
-        String memberName = cluster.getCurrentMemberName();
-        List<String> memberShardNames =
-            configuration.getMemberShardNames(memberName);
 
-        for(String shardName : memberShardNames){
-            String shardActorName = getShardActorName(memberName, shardName);
-            ActorRef actor = getContext()
-                .actorOf(Shard.props(shardActorName), shardActorName);
-            ActorPath path = actor.path();
-            localShards.put(shardName, path);
-        }
+        this.type = Preconditions.checkNotNull(type, "type should not be null");
+        this.cluster = Preconditions.checkNotNull(cluster, "cluster should not be null");
+        this.configuration = Preconditions.checkNotNull(configuration, "configuration should not be null");
+
+        // Subscribe this actor to cluster member events
+        cluster.subscribeToMemberEvents(getSelf());
+
+        // Create all the local Shards and make them a child of the ShardManager
+        // TODO: This may need to be initiated when we first get the schema context
+        createLocalShards();
     }
 
     public static Props props(final String type,
@@ -106,55 +95,113 @@ public class ShardManager extends AbstractUntypedActor {
         });
     }
 
+
     @Override
     public void handleReceive(Object message) throws Exception {
-        if (message instanceof FindPrimary) {
-            FindPrimary msg = ((FindPrimary) message);
-            String shardName = msg.getShardName();
-
-            List<String> members =
-                configuration.getMembersFromShardName(shardName);
-
-            for(String memberName : members) {
-                if (memberName.equals(cluster.getCurrentMemberName())) {
-                    // This is a local shard
-                    ActorPath shardPath = localShards.get(shardName);
-                    // FIXME: This check may be redundant
-                    if (shardPath == null) {
-                        getSender()
-                            .tell(new PrimaryNotFound(shardName), getSelf());
-                        return;
-                    }
-                    getSender().tell(new PrimaryFound(shardPath.toString()),
-                        getSelf());
-                    return;
-                } else {
-                    Address address = memberNameToAddress.get(shardName);
-                    if(address != null){
-                        String path =
-                            address.toString() + "/user/" + getShardActorName(
-                                memberName, shardName);
-                        getSender().tell(new PrimaryFound(path), getSelf());
-                    }
+        if (message.getClass().equals(FindPrimary.SERIALIZABLE_CLASS)) {
+            findPrimary(
+                FindPrimary.fromSerializable(message));
+
+        } else if (message instanceof UpdateSchemaContext) {
+            updateSchemaContext(message);
+        } else if (message instanceof ClusterEvent.MemberUp){
+            memberUp((ClusterEvent.MemberUp) message);
+        } else if(message instanceof ClusterEvent.MemberRemoved) {
+            memberRemoved((ClusterEvent.MemberRemoved) message);
+        } else if(message instanceof ClusterEvent.UnreachableMember) {
+            ignoreMessage(message);
+        } else{
+          throw new Exception ("Not recognized message received, message="+message);
+        }
+
+    }
+
+    private void ignoreMessage(Object message){
+        LOG.debug("Unhandled message : " + message);
+    }
+
+    private void memberRemoved(ClusterEvent.MemberRemoved message) {
+        memberNameToAddress.remove(message.member().roles().head());
+    }
+
+    private void memberUp(ClusterEvent.MemberUp message) {
+        memberNameToAddress.put(message.member().roles().head(), message.member().address());
+    }
 
+    private void updateSchemaContext(Object message) {
+        for(ActorPath path : localShards.values()){
+            getContext().system().actorSelection(path)
+                .forward(message,
+                    getContext());
+        }
+    }
+
+    private void findPrimary(FindPrimary message) {
+        String shardName = message.getShardName();
+
+        List<String> members =
+            configuration.getMembersFromShardName(shardName);
 
+        for(String memberName : members) {
+            if (memberName.equals(cluster.getCurrentMemberName())) {
+                // This is a local shard
+                ActorPath shardPath = localShards.get(shardName);
+                if (shardPath == null) {
+                    getSender()
+                        .tell(new PrimaryNotFound(shardName).toSerializable(), getSelf());
+                    return;
+                }
+                getSender().tell(new PrimaryFound(shardPath.toString()).toSerializable(),
+                    getSelf());
+                return;
+            } else {
+                Address address = memberNameToAddress.get(memberName);
+                if(address != null){
+                    String path =
+                        address.toString() + "/user/shardmanager-" + this.type + "/" + getShardActorName(
+                            memberName, shardName);
+                    getSender().tell(new PrimaryFound(path).toSerializable(), getSelf());
+                    return;
                 }
-            }
 
-            getSender().tell(new PrimaryNotFound(shardName), getSelf());
 
-        } else if (message instanceof UpdateSchemaContext) {
-            for(ActorPath path : localShards.values()){
-                getContext().system().actorSelection(path)
-                    .forward(message,
-                        getContext());
             }
         }
+
+        getSender().tell(new PrimaryNotFound(shardName).toSerializable(), getSelf());
     }
 
     private String getShardActorName(String memberName, String shardName){
         return memberName + "-shard-" + shardName + "-" + this.type;
     }
 
+    // Create the shards that are local to this member
+    private void createLocalShards() {
+        String memberName = this.cluster.getCurrentMemberName();
+        List<String> memberShardNames =
+            this.configuration.getMemberShardNames(memberName);
 
+        for(String shardName : memberShardNames){
+            String shardActorName = getShardActorName(memberName, shardName);
+            ActorRef actor = getContext()
+                .actorOf(Shard.props(shardActorName), shardActorName);
+            ActorPath path = actor.path();
+            localShards.put(shardName, path);
+        }
+
+    }
+
+
+    @Override
+    public SupervisorStrategy supervisorStrategy() {
+        return new OneForOneStrategy(10, Duration.create("1 minute"),
+            new Function<Throwable, SupervisorStrategy.Directive>() {
+                @Override
+                public SupervisorStrategy.Directive apply(Throwable t) {
+                    return SupervisorStrategy.resume();
+                }
+            }
+        );
+
+    }
 }
index 7a0b197..737f57b 100644 (file)
@@ -37,7 +37,7 @@ import org.opendaylight.controller.cluster.datastore.modification.WriteModificat
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
-import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 
@@ -131,23 +131,23 @@ public class ShardTransaction extends AbstractUntypedActor {
             mergeData(MergeData.fromSerializable(message, schemaContext));
         } else if (DeleteData.SERIALIZABLE_CLASS.equals(message.getClass())) {
             deleteData(DeleteData.fromSerizalizable(message));
-        } else if (message instanceof ReadyTransaction) {
-            readyTransaction((ReadyTransaction) message);
-        } else if (message instanceof CloseTransaction) {
-            closeTransaction((CloseTransaction) message);
+        } else if (ReadyTransaction.SERIALIZABLE_CLASS.equals(message.getClass())) {
+            readyTransaction(new ReadyTransaction());
+        } else if (message.getClass().equals(CloseTransaction.SERIALIZABLE_CLASS)) {
+            closeTransaction(new CloseTransaction());
         } else if (message instanceof GetCompositedModification) {
             // This is here for testing only
             getSender().tell(new GetCompositeModificationReply(
                 new ImmutableCompositeModification(modification)), getSelf());
         }else{
-          throw new Exception ("handleRecieve received an unknown mesages"+message);
+          throw new Exception ("Shard:handleRecieve received an unknown message"+message);
         }
     }
 
     private void readData(ReadData message) {
         final ActorRef sender = getSender();
         final ActorRef self = getSelf();
-        final InstanceIdentifier path = message.getPath();
+        final YangInstanceIdentifier path = message.getPath();
         final ListenableFuture<Optional<NormalizedNode<?, ?>>> future =
             transaction.read(path);
 
@@ -175,21 +175,23 @@ public class ShardTransaction extends AbstractUntypedActor {
     private void writeData(WriteData message) {
         modification.addModification(
             new WriteModification(message.getPath(), message.getData(),schemaContext));
+        LOG.debug("writeData at path : " + message.getPath().toString());
         transaction.write(message.getPath(), message.getData());
-        getSender().tell(new WriteDataReply(), getSelf());
+        getSender().tell(new WriteDataReply().toSerializable(), getSelf());
     }
 
     private void mergeData(MergeData message) {
         modification.addModification(
             new MergeModification(message.getPath(), message.getData(), schemaContext));
+        LOG.debug("mergeData at path : " + message.getPath().toString());
         transaction.merge(message.getPath(), message.getData());
-        getSender().tell(new MergeDataReply(), getSelf());
+        getSender().tell(new MergeDataReply().toSerializable(), getSelf());
     }
 
     private void deleteData(DeleteData message) {
         modification.addModification(new DeleteModification(message.getPath()));
         transaction.delete(message.getPath());
-        getSender().tell(new DeleteDataReply(), getSelf());
+        getSender().tell(new DeleteDataReply().toSerializable(), getSelf());
     }
 
     private void readyTransaction(ReadyTransaction message) {
@@ -197,13 +199,13 @@ public class ShardTransaction extends AbstractUntypedActor {
         ActorRef cohortActor = getContext().actorOf(
             ThreePhaseCommitCohort.props(cohort, shardActor, modification), "cohort");
         getSender()
-            .tell(new ReadyTransactionReply(cohortActor.path()), getSelf());
+            .tell(new ReadyTransactionReply(cohortActor.path()).toSerializable(), getSelf());
 
     }
 
     private void closeTransaction(CloseTransaction message) {
         transaction.close();
-        getSender().tell(new CloseTransactionReply(), getSelf());
+        getSender().tell(new CloseTransactionReply().toSerializable(), getSelf());
         getSelf().tell(PoisonPill.getInstance(), getSelf());
     }
 
index 2991a77..5004241 100644 (file)
@@ -14,7 +14,7 @@ import akka.japi.Creator;
 import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionChain;
 import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionChainReply;
 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
-import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
+import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
@@ -34,12 +34,14 @@ public class ShardTransactionChain extends AbstractUntypedActor {
 
     @Override
     public void handleReceive(Object message) throws Exception {
-        if (message instanceof CreateTransaction) {
-            CreateTransaction createTransaction = (CreateTransaction) message;
+        if (message.getClass().equals(CreateTransaction.SERIALIZABLE_CLASS)) {
+            CreateTransaction createTransaction = CreateTransaction.fromSerializable( message);
             createTransaction(createTransaction);
-        } else if (message instanceof CloseTransactionChain) {
+        } else if (message.getClass().equals(CloseTransactionChain.SERIALIZABLE_CLASS)) {
             chain.close();
-            getSender().tell(new CloseTransactionChainReply(), getSelf());
+            getSender().tell(new CloseTransactionChainReply().toSerializable(), getSelf());
+        }else{
+          throw new Exception("Not recognized message recieved="+message);
         }
     }
 
@@ -49,10 +51,7 @@ public class ShardTransactionChain extends AbstractUntypedActor {
         ActorRef transactionActor = getContext().actorOf(ShardTransaction
             .props(chain, transaction, getContext().parent(), schemaContext), "shard-" + createTransaction.getTransactionId());
         getSender()
-            .tell(ShardTransactionMessages.CreateTransactionReply.newBuilder()
-                .setTransactionActorPath(transactionActor.path().toString())
-                .setTransactionId(createTransaction.getTransactionId())
-                .build(),
+            .tell(new CreateTransactionReply(transactionActor.path().toString(),createTransaction.getTransactionId()).toSerializable(),
                 getSelf());
     }
 
index 060c9d6..a8deb01 100644 (file)
@@ -58,14 +58,16 @@ public class ThreePhaseCommitCohort extends AbstractUntypedActor {
 
     @Override
     public void handleReceive(Object message) throws Exception {
-        if (message instanceof CanCommitTransaction) {
-            canCommit((CanCommitTransaction) message);
-        } else if (message instanceof PreCommitTransaction) {
-            preCommit((PreCommitTransaction) message);
-        } else if (message instanceof CommitTransaction) {
-            commit((CommitTransaction) message);
-        } else if (message instanceof AbortTransaction) {
-            abort((AbortTransaction) message);
+        if (message.getClass().equals(CanCommitTransaction.SERIALIZABLE_CLASS)) {
+            canCommit(new CanCommitTransaction());
+        } else if (message.getClass().equals(PreCommitTransaction.SERIALIZABLE_CLASS)) {
+            preCommit(new PreCommitTransaction());
+        } else if (message.getClass().equals(CommitTransaction.SERIALIZABLE_CLASS)) {
+            commit(new CommitTransaction());
+        } else if (message.getClass().equals(AbortTransaction.SERIALIZABLE_CLASS)) {
+            abort(new AbortTransaction());
+        } else {
+          throw new Exception ("Not recognized message received,message="+message);
         }
     }
 
@@ -79,7 +81,7 @@ public class ThreePhaseCommitCohort extends AbstractUntypedActor {
             public void run() {
                 try {
                     future.get();
-                    sender.tell(new AbortTransactionReply(), self);
+                    sender.tell(new AbortTransactionReply().toSerializable(), self);
                 } catch (InterruptedException | ExecutionException e) {
                     log.error(e, "An exception happened when aborting");
                 }
@@ -107,7 +109,7 @@ public class ThreePhaseCommitCohort extends AbstractUntypedActor {
             public void run() {
                 try {
                     future.get();
-                    sender.tell(new PreCommitTransactionReply(), self);
+                    sender.tell(new PreCommitTransactionReply().toSerializable(), self);
                 } catch (InterruptedException | ExecutionException e) {
                     log.error(e, "An exception happened when preCommitting");
                 }
@@ -126,7 +128,7 @@ public class ThreePhaseCommitCohort extends AbstractUntypedActor {
             public void run() {
                 try {
                     Boolean canCommit = future.get();
-                    sender.tell(new CanCommitTransactionReply(canCommit), self);
+                    sender.tell(new CanCommitTransactionReply(canCommit).toSerializable(), self);
                 } catch (InterruptedException | ExecutionException e) {
                     log.error(e, "An exception happened when aborting");
                 }
index 279ecba..b56dc94 100644 (file)
@@ -67,12 +67,12 @@ public class ThreePhaseCommitCohortProxy implements
                 try {
                     Object response =
                         actorContext.executeRemoteOperation(cohort,
-                            new CanCommitTransaction(),
+                            new CanCommitTransaction().toSerializable(),
                             ActorContext.ASK_DURATION);
 
-                    if (response instanceof CanCommitTransactionReply) {
+                    if (response.getClass().equals(CanCommitTransactionReply.SERIALIZABLE_CLASS)) {
                         CanCommitTransactionReply reply =
-                            (CanCommitTransactionReply) response;
+                            CanCommitTransactionReply.fromSerializable(response);
                         if (!reply.getCanCommit()) {
                             return false;
                         }
@@ -97,15 +97,15 @@ public class ThreePhaseCommitCohortProxy implements
     }
 
     @Override public ListenableFuture<Void> preCommit() {
-        return voidOperation(new PreCommitTransaction(), PreCommitTransactionReply.class);
+        return voidOperation(new PreCommitTransaction().toSerializable(), PreCommitTransactionReply.SERIALIZABLE_CLASS);
     }
 
     @Override public ListenableFuture<Void> abort() {
-        return voidOperation(new AbortTransaction(), AbortTransactionReply.class);
+        return voidOperation(new AbortTransaction().toSerializable(), AbortTransactionReply.SERIALIZABLE_CLASS);
     }
 
     @Override public ListenableFuture<Void> commit() {
-        return voidOperation(new CommitTransaction(), CommitTransactionReply.class);
+        return voidOperation(new CommitTransaction().toSerializable(), CommitTransactionReply.SERIALIZABLE_CLASS);
     }
 
     private ListenableFuture<Void> voidOperation(final Object message, final Class expectedResponseClass){
index 2f784dc..5a0049a 100644 (file)
@@ -15,6 +15,7 @@ import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ListenableFutureTask;
 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
+import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
 import org.opendaylight.controller.cluster.datastore.messages.MergeData;
 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
@@ -24,10 +25,9 @@ import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionRe
 import org.opendaylight.controller.cluster.datastore.messages.WriteData;
 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
-import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
-import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 
@@ -62,7 +62,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
 
     private final TransactionType transactionType;
     private final ActorContext actorContext;
-    private final Map<String, ActorSelection> remoteTransactionPaths = new HashMap<>();
+    private final Map<String, TransactionContext> remoteTransactionPaths = new HashMap<>();
     private final String identifier;
     private final ExecutorService executor;
     private final SchemaContext schemaContext;
@@ -74,7 +74,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
         SchemaContext schemaContext
         ) {
 
-        this.identifier = "txn-" + counter.getAndIncrement();
+        this.identifier = actorContext.getCurrentMemberName() + "-txn-" + counter.getAndIncrement();
         this.transactionType = transactionType;
         this.actorContext = actorContext;
         this.executor = executor;
@@ -84,7 +84,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
     }
 
     @Override
-    public ListenableFuture<Optional<NormalizedNode<?, ?>>> read(final InstanceIdentifier path) {
+    public ListenableFuture<Optional<NormalizedNode<?, ?>>> read(final YangInstanceIdentifier path) {
 
         createTransactionIfMissing(actorContext, path);
 
@@ -118,7 +118,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
     }
 
     @Override
-    public void write(InstanceIdentifier path, NormalizedNode<?, ?> data) {
+    public void write(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
 
         createTransactionIfMissing(actorContext, path);
 
@@ -127,7 +127,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
     }
 
     @Override
-    public void merge(InstanceIdentifier path, NormalizedNode<?, ?> data) {
+    public void merge(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
 
         createTransactionIfMissing(actorContext, path);
 
@@ -136,7 +136,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
     }
 
     @Override
-    public void delete(InstanceIdentifier path) {
+    public void delete(YangInstanceIdentifier path) {
 
         createTransactionIfMissing(actorContext, path);
 
@@ -148,15 +148,17 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
     public DOMStoreThreePhaseCommitCohort ready() {
         List<ActorPath> cohortPaths = new ArrayList<>();
 
-        for(ActorSelection remoteTransaction : remoteTransactionPaths.values()) {
-            Object result = actorContext.executeRemoteOperation(remoteTransaction,
-                new ReadyTransaction(),
+        for(TransactionContext transactionContext : remoteTransactionPaths.values()) {
+            Object result = actorContext.executeRemoteOperation(transactionContext.getActor(),
+                new ReadyTransaction().toSerializable(),
                 ActorContext.ASK_DURATION
             );
 
-            if(result instanceof ReadyTransactionReply){
-                ReadyTransactionReply reply = (ReadyTransactionReply) result;
-                cohortPaths.add(reply.getCohortPath());
+            if(result.getClass().equals(ReadyTransactionReply.SERIALIZABLE_CLASS)){
+                ReadyTransactionReply reply = ReadyTransactionReply.fromSerializable(actorContext.getActorSystem(),result);
+                String resolvedCohortPath = transactionContext
+                    .getResolvedCohortPath(reply.getCohortPath().toString());
+                cohortPaths.add(actorContext.actorFor(resolvedCohortPath));
             }
         }
 
@@ -170,37 +172,74 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
 
     @Override
     public void close() {
-        for(ActorSelection remoteTransaction : remoteTransactionPaths.values()) {
-            remoteTransaction.tell(new CloseTransaction(), null);
+        for(TransactionContext transactionContext : remoteTransactionPaths.values()) {
+            transactionContext.getActor().tell(
+                new CloseTransaction().toSerializable(), null);
         }
     }
 
-    private ActorSelection remoteTransactionFromIdentifier(InstanceIdentifier path){
+    private ActorSelection remoteTransactionFromIdentifier(YangInstanceIdentifier path){
         String shardName = shardNameFromIdentifier(path);
-        return remoteTransactionPaths.get(shardName);
+        return remoteTransactionPaths.get(shardName).getActor();
     }
 
-    private String shardNameFromIdentifier(InstanceIdentifier path){
+    private String shardNameFromIdentifier(YangInstanceIdentifier path){
         return ShardStrategyFactory.getStrategy(path).findShard(path);
     }
 
-    private void createTransactionIfMissing(ActorContext actorContext, InstanceIdentifier path) {
+    private void createTransactionIfMissing(ActorContext actorContext, YangInstanceIdentifier path) {
         String shardName = ShardStrategyFactory.getStrategy(path).findShard(path);
 
-        ActorSelection actorSelection =
+        TransactionContext transactionContext =
             remoteTransactionPaths.get(shardName);
 
-        if(actorSelection != null){
+        if(transactionContext != null){
             // A transaction already exists with that shard
             return;
         }
 
-        Object response = actorContext.executeShardOperation(shardName, new CreateTransaction(identifier), ActorContext.ASK_DURATION);
-        if(response instanceof CreateTransactionReply){
-            CreateTransactionReply reply = (CreateTransactionReply) response;
-            remoteTransactionPaths.put(shardName, actorContext.actorSelection(reply.getTransactionActorPath()));
+        Object response = actorContext.executeShardOperation(shardName, new CreateTransaction(identifier).toSerializable(), ActorContext.ASK_DURATION);
+        if(response.getClass().equals(CreateTransactionReply.SERIALIZABLE_CLASS)){
+            CreateTransactionReply reply = CreateTransactionReply.fromSerializable(response);
+            String transactionPath = actorContext.getRemoteActorPath(shardName, reply.getTransactionPath());
+
+            ActorSelection transactionActor = actorContext.actorSelection(transactionPath);
+            transactionContext = new TransactionContext(shardName, transactionPath, transactionActor);
+
+            remoteTransactionPaths.put(shardName, transactionContext);
         }
     }
 
 
+    private class TransactionContext {
+        private final String shardName;
+        private final String actorPath;
+        private final ActorSelection  actor;
+
+
+        private TransactionContext(String shardName, String actorPath,
+            ActorSelection actor) {
+            this.shardName = shardName;
+            this.actorPath = actorPath;
+            this.actor = actor;
+        }
+
+
+        public String getShardName() {
+            return shardName;
+        }
+
+        public String getActorPath() {
+            return actorPath;
+        }
+
+        public ActorSelection getActor() {
+            return actor;
+        }
+
+        public String getResolvedCohortPath(String cohortPath){
+            return actorContext.resolvePath(actorPath, cohortPath);
+        }
+    }
+
 }
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/jmx/mbeans/AbstractBaseMBean.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/jmx/mbeans/AbstractBaseMBean.java
new file mode 100644 (file)
index 0000000..de1ac18
--- /dev/null
@@ -0,0 +1,138 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+
+package org.opendaylight.controller.cluster.datastore.jmx.mbeans;
+
+
+import com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.management.InstanceNotFoundException;
+import javax.management.MBeanRegistrationException;
+import javax.management.MBeanServer;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
+import java.lang.management.ManagementFactory;
+
+/**
+ * All MBeans should extend this class that help in registering and
+ * unregistering the MBeans.
+ *
+ */
+
+
+public abstract class AbstractBaseMBean {
+
+
+  public static String BASE_JMX_PREFIX = "org.opendaylight.controller:";
+  public static String JMX_TYPE_DISTRIBUTED_DATASTORE = "DistributedDatastore";
+  public static String JMX_CATEGORY_SHARD = "Shard";
+
+  private static final Logger LOG = LoggerFactory
+      .getLogger(AbstractBaseMBean.class);
+
+  MBeanServer server = ManagementFactory.getPlatformMBeanServer();
+  /**
+   * gets the MBean ObjectName
+   *
+   * @return Object name of the MBean
+   * @throws MalformedObjectNameException - The bean name does not have the right format.
+   * @throws NullPointerException - The bean name is null
+   */
+  protected ObjectName getMBeanObjectName()
+      throws MalformedObjectNameException, NullPointerException {
+    String name = BASE_JMX_PREFIX + "type="+getMBeanType()+",Category="+
+                                   getMBeanCategory() + ",name="+
+                                   getMBeanName();
+
+
+    return new ObjectName(name);
+  }
+
+  public boolean registerMBean() {
+    boolean registered = false;
+    try {
+      // Object to identify MBean
+      final ObjectName mbeanName = this.getMBeanObjectName();
+
+      Preconditions.checkArgument(mbeanName != null,
+          "Object name of the MBean cannot be null");
+
+      LOG.debug("Register MBean {}", mbeanName);
+
+      // unregistered if already registered
+      if (server.isRegistered(mbeanName)) {
+
+        LOG.debug("MBean {} found to be already registered", mbeanName);
+
+        try {
+          unregisterMBean(mbeanName);
+        } catch (Exception e) {
+
+          LOG.warn("unregister mbean {} resulted in exception {} ", mbeanName,
+              e);
+        }
+      }
+      server.registerMBean(this, mbeanName);
+
+      LOG.debug("MBean {} registered successfully",
+          mbeanName.getCanonicalName());
+      registered = true;
+    } catch (Exception e) {
+
+      LOG.error("registration failed {}", e);
+
+    }
+    return registered;
+  }
+
+
+  public boolean unregisterMBean() {
+    boolean unregister = false;
+    try {
+      ObjectName mbeanName = this.getMBeanObjectName();
+      unregister = true;
+      unregisterMBean(mbeanName);
+    } catch (Exception e) {
+
+      LOG.error("Failed when unregistering MBean {}", e);
+    }
+    return unregister;
+  }
+
+  private void unregisterMBean(ObjectName mbeanName)
+      throws MBeanRegistrationException, InstanceNotFoundException {
+
+    server.unregisterMBean(mbeanName);
+
+  }
+
+
+  /**
+   * @return name of bean
+   */
+  protected abstract String getMBeanName();
+
+  /**
+   * @return type of the MBean
+   */
+  protected abstract String getMBeanType();
+
+
+  /**
+   * @return Category name of teh bean
+   */
+  protected abstract String getMBeanCategory();
+
+  //require for test cases
+  public MBeanServer getMBeanServer() {
+    return server;
+  }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/jmx/mbeans/shard/ShardMBeanFactory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/jmx/mbeans/shard/ShardMBeanFactory.java
new file mode 100644 (file)
index 0000000..a335908
--- /dev/null
@@ -0,0 +1,26 @@
+package org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * @author: syedbahm
+ * Date: 7/16/14
+ */
+public class ShardMBeanFactory {
+  private static Map<String,ShardStats> shardMBeans= new HashMap<String,ShardStats>();
+
+  public static ShardStats getShardStatsMBean(String shardName){
+       if(shardMBeans.containsKey(shardName)){
+            return shardMBeans.get(shardName);
+       }else {
+         ShardStats shardStatsMBeanImpl = new ShardStats(shardName);
+
+         if(shardStatsMBeanImpl.registerMBean()) {
+           shardMBeans.put(shardName, shardStatsMBeanImpl);
+         }
+         return shardStatsMBeanImpl;
+       }
+  }
+
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/jmx/mbeans/shard/ShardStats.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/jmx/mbeans/shard/ShardStats.java
new file mode 100644 (file)
index 0000000..2da6aae
--- /dev/null
@@ -0,0 +1,70 @@
+package org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard;
+
+import org.opendaylight.controller.cluster.datastore.jmx.mbeans.AbstractBaseMBean;
+
+/**
+ * @author: syedbahm
+ */
+public class ShardStats extends AbstractBaseMBean implements ShardStatsMBean {
+  private  Long committedTransactionsCount;
+  private Long journalMessagesCount;
+  final private String shardName;
+
+  ShardStats(String shardName){
+    this.shardName = shardName;
+    committedTransactionsCount =0L;
+    journalMessagesCount = 0L;
+  };
+
+
+  @Override
+  public String getShardName() {
+    return shardName;
+  }
+
+  @Override
+  public Long getCommittedTransactionsCount() {
+    return committedTransactionsCount;
+  }
+
+  @Override
+  public Long getJournalMessagesCount() {
+    //FIXME: this will be populated once after integration with Raft stuff
+    return journalMessagesCount;
+  }
+
+
+  public Long incrementCommittedTransactionCount() {
+    return committedTransactionsCount++;
+  }
+
+
+  public void updateCommittedTransactionsCount(long currentCount){
+     committedTransactionsCount = currentCount;
+
+  }
+
+  public void updateJournalMessagesCount(long currentCount){
+    journalMessagesCount  = currentCount;
+
+  }
+
+
+
+  @Override
+  protected String getMBeanName() {
+    return  shardName;
+  }
+
+  @Override
+  protected String getMBeanType() {
+    return JMX_TYPE_DISTRIBUTED_DATASTORE;
+  }
+
+  @Override
+  protected String getMBeanCategory() {
+    return JMX_CATEGORY_SHARD;
+  }
+
+
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/jmx/mbeans/shard/ShardStatsMBean.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/jmx/mbeans/shard/ShardStatsMBean.java
new file mode 100644 (file)
index 0000000..c107e49
--- /dev/null
@@ -0,0 +1,11 @@
+package org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard;
+
+/**
+ * @author: syedbahm
+ */
+public interface ShardStatsMBean {
+   String getShardName();
+   Long getCommittedTransactionsCount();
+   Long getJournalMessagesCount();
+
+}
index 4cf713a..4515bd7 100644 (file)
@@ -8,5 +8,13 @@
 
 package org.opendaylight.controller.cluster.datastore.messages;
 
-public class AbortTransaction {
+import org.opendaylight.controller.protobuff.messages.cohort3pc.ThreePhaseCommitCohortMessages;
+
+public class AbortTransaction implements SerializableMessage {
+  public static Class SERIALIZABLE_CLASS = ThreePhaseCommitCohortMessages.AbortTransaction.class;
+
+  @Override
+  public Object toSerializable() {
+    return ThreePhaseCommitCohortMessages.AbortTransaction.newBuilder().build();
+  }
 }
index 84234e5..31a06fe 100644 (file)
@@ -8,5 +8,14 @@
 
 package org.opendaylight.controller.cluster.datastore.messages;
 
-public class AbortTransactionReply {
+import org.opendaylight.controller.protobuff.messages.cohort3pc.ThreePhaseCommitCohortMessages;
+
+public class AbortTransactionReply implements SerializableMessage {
+  public static Class SERIALIZABLE_CLASS = ThreePhaseCommitCohortMessages.AbortTransactionReply.class;
+
+
+  @Override
+  public Object toSerializable() {
+    return ThreePhaseCommitCohortMessages.AbortTransactionReply.newBuilder().build();
+  }
 }
index 526d60f..2c032af 100644 (file)
@@ -8,5 +8,13 @@
 
 package org.opendaylight.controller.cluster.datastore.messages;
 
-public class CanCommitTransaction {
+import org.opendaylight.controller.protobuff.messages.cohort3pc.ThreePhaseCommitCohortMessages;
+
+public class CanCommitTransaction implements SerializableMessage {
+  public static Class SERIALIZABLE_CLASS = ThreePhaseCommitCohortMessages.CanCommitTransaction.class;
+
+  @Override
+  public Object toSerializable() {
+    return  ThreePhaseCommitCohortMessages.CanCommitTransaction.newBuilder().build();
+  }
 }
index d143c14..bbcd4de 100644 (file)
@@ -8,7 +8,10 @@
 
 package org.opendaylight.controller.cluster.datastore.messages;
 
-public class CanCommitTransactionReply {
+import org.opendaylight.controller.protobuff.messages.cohort3pc.ThreePhaseCommitCohortMessages;
+
+public class CanCommitTransactionReply implements SerializableMessage {
+  public static Class SERIALIZABLE_CLASS = ThreePhaseCommitCohortMessages.CanCommitTransactionReply.class;
   private final Boolean canCommit;
 
   public CanCommitTransactionReply(Boolean canCommit) {
@@ -18,4 +21,14 @@ public class CanCommitTransactionReply {
   public Boolean getCanCommit() {
     return canCommit;
   }
+
+  @Override
+  public Object toSerializable() {
+    return  ThreePhaseCommitCohortMessages.CanCommitTransactionReply.newBuilder().setCanCommit(canCommit).build();
+  }
+
+
+  public static CanCommitTransactionReply fromSerializable(Object message) {
+    return  new CanCommitTransactionReply(((ThreePhaseCommitCohortMessages.CanCommitTransactionReply)message).getCanCommit());
+  }
 }
index c3cb00c..57237bc 100644 (file)
@@ -8,5 +8,12 @@
 
 package org.opendaylight.controller.cluster.datastore.messages;
 
-public class CloseDataChangeListenerRegistration {
+import org.opendaylight.controller.protobuff.messages.registration.ListenerRegistrationMessages;
+
+public class CloseDataChangeListenerRegistration implements SerializableMessage {
+  public static Class SERIALIZABLE_CLASS = ListenerRegistrationMessages.CloseDataChangeListenerRegistration.class;
+  @Override
+  public Object toSerializable() {
+    return ListenerRegistrationMessages.CloseDataChangeListenerRegistration.newBuilder().build();
+  }
 }
index d5c75eb..faf73c8 100644 (file)
@@ -8,5 +8,14 @@
 
 package org.opendaylight.controller.cluster.datastore.messages;
 
-public class CloseDataChangeListenerRegistrationReply {
+import org.opendaylight.controller.protobuff.messages.registration.ListenerRegistrationMessages;
+
+public class CloseDataChangeListenerRegistrationReply implements SerializableMessage{
+  public static Class SERIALIZABLE_CLASS = ListenerRegistrationMessages.CloseDataChangeListenerRegistrationReply.class;
+
+  @Override
+  public Object toSerializable() {
+    return ListenerRegistrationMessages.CloseDataChangeListenerRegistrationReply.newBuilder().build();
+  }
+
 }
index 6809f4b..451e39c 100644 (file)
@@ -8,5 +8,12 @@
 
 package org.opendaylight.controller.cluster.datastore.messages;
 
-public class CloseTransaction {
+import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
+
+public class CloseTransaction implements SerializableMessage{
+  public static final Class SERIALIZABLE_CLASS = ShardTransactionMessages.CloseTransaction.class;
+  @Override
+  public Object toSerializable() {
+    return ShardTransactionMessages.CloseTransaction.newBuilder().build();
+  }
 }
index 04c422b..efa51fd 100644 (file)
@@ -8,5 +8,12 @@
 
 package org.opendaylight.controller.cluster.datastore.messages;
 
-public class CloseTransactionChain {
+import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionChainMessages;
+
+public class CloseTransactionChain implements SerializableMessage{
+  public static final Class SERIALIZABLE_CLASS = ShardTransactionChainMessages.CloseTransactionChain.class;
+  @Override
+  public Object toSerializable() {
+    return ShardTransactionChainMessages.CloseTransactionChain.newBuilder().build();
+  }
 }
index 89fa93b..23699b7 100644 (file)
@@ -8,5 +8,13 @@
 
 package org.opendaylight.controller.cluster.datastore.messages;
 
-public class CloseTransactionChainReply {
+import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionChainMessages;
+
+public class CloseTransactionChainReply implements SerializableMessage {
+  public static final Class SERIALIZABLE_CLASS = ShardTransactionChainMessages.CloseTransactionChainReply.class;
+  @Override
+  public Object toSerializable() {
+    return ShardTransactionChainMessages.CloseTransactionChainReply.newBuilder().build();
+  }
+
 }
index 4910a3e..666d182 100644 (file)
@@ -8,5 +8,12 @@
 
 package org.opendaylight.controller.cluster.datastore.messages;
 
-public class CloseTransactionReply {
+import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
+
+public class CloseTransactionReply implements SerializableMessage {
+  public static final Class SERIALIZABLE_CLASS = ShardTransactionMessages.CloseTransactionReply.class;
+  @Override
+  public Object toSerializable() {
+    return ShardTransactionMessages.CloseTransactionReply.newBuilder().build();
+  }
 }
index d7b210f..1418713 100644 (file)
@@ -8,5 +8,13 @@
 
 package org.opendaylight.controller.cluster.datastore.messages;
 
-public class CommitTransaction {
+import org.opendaylight.controller.protobuff.messages.cohort3pc.ThreePhaseCommitCohortMessages;
+
+public class CommitTransaction implements SerializableMessage {
+  public static Class SERIALIZABLE_CLASS = ThreePhaseCommitCohortMessages.CommitTransaction.class;
+
+  @Override
+  public Object toSerializable() {
+    return  ThreePhaseCommitCohortMessages.CommitTransaction.newBuilder().build();
+  }
 }
index a0e5e89..afeba29 100644 (file)
@@ -8,5 +8,14 @@
 
 package org.opendaylight.controller.cluster.datastore.messages;
 
-public class CommitTransactionReply {
+import org.opendaylight.controller.protobuff.messages.cohort3pc.ThreePhaseCommitCohortMessages;
+
+public class CommitTransactionReply implements SerializableMessage {
+
+  public static Class SERIALIZABLE_CLASS = ThreePhaseCommitCohortMessages.CommitTransactionReply.class;
+
+  @Override
+  public Object toSerializable() {
+    return  ThreePhaseCommitCohortMessages.CommitTransactionReply.newBuilder().build();
+  }
 }
index 6110641..795131f 100644 (file)
@@ -8,15 +8,30 @@
 
 package org.opendaylight.controller.cluster.datastore.messages;
 
-public class CreateTransaction {
-    private final String transactionId;
 
-    public CreateTransaction(String transactionId){
+import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
 
-        this.transactionId = transactionId;
-    }
 
-    public String getTransactionId() {
-        return transactionId;
-    }
+public class CreateTransaction implements SerializableMessage {
+  public static Class SERIALIZABLE_CLASS = ShardTransactionMessages.CreateTransaction.class;
+  private final String transactionId;
+
+  public CreateTransaction(String transactionId){
+
+    this.transactionId = transactionId;
+  }
+
+  public String getTransactionId() {
+    return transactionId;
+  }
+
+  @Override
+  public Object toSerializable() {
+    return  ShardTransactionMessages.CreateTransaction.newBuilder().setTransactionId(transactionId).build();
+  }
+
+  public static CreateTransaction fromSerializable(Object message){
+    return new CreateTransaction(((ShardTransactionMessages.CreateTransaction)message).getTransactionId());
+  }
+
 }
index 9473885..6339749 100644 (file)
@@ -8,7 +8,13 @@
 
 package org.opendaylight.controller.cluster.datastore.messages;
 
-public class CreateTransactionChain {
+import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionChainMessages;
 
+public class CreateTransactionChain implements SerializableMessage{
+  public static Class SERIALIZABLE_CLASS = ShardTransactionChainMessages.CreateTransactionChain.class;
 
+  @Override
+  public Object toSerializable() {
+    return  ShardTransactionChainMessages.CreateTransactionChain.newBuilder().build();
+  }
 }
index 49dd9b6..4a49762 100644 (file)
@@ -9,8 +9,11 @@
 package org.opendaylight.controller.cluster.datastore.messages;
 
 import akka.actor.ActorPath;
+import akka.actor.ActorSystem;
+import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionChainMessages;
 
-public class CreateTransactionChainReply {
+public class CreateTransactionChainReply implements SerializableMessage {
+  public static final Class SERIALIZABLE_CLASS = ShardTransactionChainMessages.CreateTransactionChainReply.class;
   private final ActorPath transactionChainPath;
 
   public CreateTransactionChainReply(ActorPath transactionChainPath) {
@@ -20,4 +23,17 @@ public class CreateTransactionChainReply {
   public ActorPath getTransactionChainPath() {
     return transactionChainPath;
   }
+
+  @Override
+  public ShardTransactionChainMessages.CreateTransactionChainReply toSerializable() {
+    return ShardTransactionChainMessages.CreateTransactionChainReply.newBuilder()
+        .setTransactionChainPath(transactionChainPath.toString()).build();
+  }
+
+  public static CreateTransactionChainReply fromSerializable(ActorSystem actorSystem,Object serializable){
+    ShardTransactionChainMessages.CreateTransactionChainReply o = (ShardTransactionChainMessages.CreateTransactionChainReply) serializable;
+    return new CreateTransactionChainReply(
+        actorSystem.actorFor(o.getTransactionChainPath()).path());
+  }
+
 }
index 18b29c6..096d131 100644 (file)
@@ -32,7 +32,7 @@ public class CreateTransactionReply implements SerializableMessage {
 
     public Object toSerializable(){
         return ShardTransactionMessages.CreateTransactionReply.newBuilder()
-            .setTransactionActorPath(transactionPath.toString())
+            .setTransactionActorPath(transactionPath)
             .setTransactionId(transactionId)
             .build();
     }
index c55dae5..a8827be 100644 (file)
 
 package org.opendaylight.controller.cluster.datastore.messages;
 
+import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec;
+import org.opendaylight.controller.cluster.datastore.utils.InstanceIdentifierUtils;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
-import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
+import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages;
+import org.opendaylight.controller.protobuff.messages.datachange.notification.DataChangeListenerMessages;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 
-public class DataChanged {
-    private final AsyncDataChangeEvent<InstanceIdentifier, NormalizedNode<?, ?>>
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class DataChanged implements SerializableMessage {
+    public static final Class SERIALIZABLE_CLASS =
+        DataChangeListenerMessages.DataChanged.class;
+    final private SchemaContext schemaContext;
+    private final AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>>
         change;
 
-    public DataChanged(
-        AsyncDataChangeEvent<InstanceIdentifier, NormalizedNode<?, ?>> change) {
+
+
+    public DataChanged(SchemaContext schemaContext,
+        AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change) {
         this.change = change;
+        this.schemaContext = schemaContext;
     }
 
-    public AsyncDataChangeEvent<InstanceIdentifier, NormalizedNode<?, ?>> getChange() {
+
+    public AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> getChange() {
         return change;
     }
+
+
+    private NormalizedNodeMessages.Node convertToNodeTree(
+        NormalizedNode<?, ?> normalizedNode) {
+
+        return new NormalizedNodeToNodeCodec(schemaContext)
+            .encode(YangInstanceIdentifier.builder().build(), normalizedNode)
+            .getNormalizedNode();
+
+    }
+
+    private Iterable<NormalizedNodeMessages.InstanceIdentifier> convertToRemovePaths(
+        Set<YangInstanceIdentifier> removedPaths) {
+        final Set<NormalizedNodeMessages.InstanceIdentifier> removedPathInstanceIds = new HashSet<>();
+        for (YangInstanceIdentifier id : removedPaths) {
+            removedPathInstanceIds.add(InstanceIdentifierUtils.toSerializable(id));
+        }
+        return new Iterable<NormalizedNodeMessages.InstanceIdentifier>() {
+            public Iterator<NormalizedNodeMessages.InstanceIdentifier> iterator() {
+                return removedPathInstanceIds.iterator();
+            }
+        };
+
+    }
+
+    private NormalizedNodeMessages.NodeMap convertToNodeMap(
+        Map<YangInstanceIdentifier, NormalizedNode<?, ?>> data) {
+        NormalizedNodeToNodeCodec normalizedNodeToNodeCodec =
+            new NormalizedNodeToNodeCodec(schemaContext);
+        NormalizedNodeMessages.NodeMap.Builder nodeMapBuilder =
+            NormalizedNodeMessages.NodeMap.newBuilder();
+        NormalizedNodeMessages.NodeMapEntry.Builder builder =
+            NormalizedNodeMessages.NodeMapEntry.newBuilder();
+        for (Map.Entry<YangInstanceIdentifier, NormalizedNode<?, ?>> entry : data
+            .entrySet()) {
+
+
+            NormalizedNodeMessages.InstanceIdentifier instanceIdentifier =
+                InstanceIdentifierUtils.toSerializable(entry.getKey());
+
+            builder.setInstanceIdentifierPath(instanceIdentifier)
+                .setNormalizedNode(normalizedNodeToNodeCodec
+                    .encode(entry.getKey(), entry.getValue())
+                    .getNormalizedNode());
+            nodeMapBuilder.addMapEntries(builder.build());
+        }
+        return nodeMapBuilder.build();
+    }
+
+
+    @Override
+    public Object toSerializable() {
+        return DataChangeListenerMessages.DataChanged.newBuilder()
+            .addAllRemovedPaths(convertToRemovePaths(change.getRemovedPaths()))
+            .setCreatedData(convertToNodeMap(change.getCreatedData()))
+            .setOriginalData(convertToNodeMap(change.getOriginalData()))
+            .setUpdatedData(convertToNodeMap(change.getUpdatedData()))
+            .setOriginalSubTree(convertToNodeTree(change.getOriginalSubtree()))
+            .setUpdatedSubTree(convertToNodeTree(change.getUpdatedSubtree()))
+            .build();
+    }
+
+    public static DataChanged fromSerialize(SchemaContext sc, Object message,
+        YangInstanceIdentifier pathId) {
+        DataChangeListenerMessages.DataChanged dataChanged =
+            (DataChangeListenerMessages.DataChanged) message;
+        DataChangedEvent event = new DataChangedEvent(sc);
+        if (dataChanged.getCreatedData() != null && dataChanged.getCreatedData()
+            .isInitialized()) {
+            event.setCreatedData(dataChanged.getCreatedData());
+        }
+        if (dataChanged.getOriginalData() != null && dataChanged
+            .getOriginalData().isInitialized()) {
+            event.setOriginalData(dataChanged.getOriginalData());
+        }
+
+        if (dataChanged.getUpdatedData() != null && dataChanged.getUpdatedData()
+            .isInitialized()) {
+            event.setUpdateData(dataChanged.getUpdatedData());
+        }
+
+        if (dataChanged.getOriginalSubTree() != null && dataChanged
+            .getOriginalSubTree().isInitialized()) {
+            event.setOriginalSubtree(dataChanged.getOriginalSubTree(), pathId);
+        }
+
+        if (dataChanged.getUpdatedSubTree() != null && dataChanged
+            .getUpdatedSubTree().isInitialized()) {
+            event.setUpdatedSubtree(dataChanged.getOriginalSubTree(), pathId);
+        }
+
+        if (dataChanged.getRemovedPathsList() != null && !dataChanged
+            .getRemovedPathsList().isEmpty()) {
+            event.setRemovedPaths(dataChanged.getRemovedPathsList());
+        }
+
+        return new DataChanged(sc, event);
+
+    }
+
+    static class DataChangedEvent implements
+        AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> {
+        private final SchemaContext schemaContext;
+        private Map<YangInstanceIdentifier, NormalizedNode<?, ?>> createdData;
+        private final NormalizedNodeToNodeCodec nodeCodec;
+        private Map<YangInstanceIdentifier, NormalizedNode<?, ?>> updatedData;
+        private Map<YangInstanceIdentifier, NormalizedNode<?, ?>> originalData;
+        private NormalizedNode<?, ?> originalSubTree;
+        private NormalizedNode<?, ?> updatedSubTree;
+        private Set<YangInstanceIdentifier> removedPathIds;
+
+        DataChangedEvent(SchemaContext schemaContext) {
+            this.schemaContext = schemaContext;
+            nodeCodec = new NormalizedNodeToNodeCodec(schemaContext);
+        }
+
+        @Override
+        public Map<YangInstanceIdentifier, NormalizedNode<?, ?>> getCreatedData() {
+            if(createdData == null){
+                return Collections.emptyMap();
+            }
+            return createdData;
+        }
+
+        DataChangedEvent setCreatedData(
+            NormalizedNodeMessages.NodeMap nodeMap) {
+            this.createdData = convertNodeMapToMap(nodeMap);
+            return this;
+        }
+
+        private Map<YangInstanceIdentifier, NormalizedNode<?, ?>> convertNodeMapToMap(
+            NormalizedNodeMessages.NodeMap nodeMap) {
+            Map<YangInstanceIdentifier, NormalizedNode<?, ?>> mapEntries =
+                new HashMap<YangInstanceIdentifier, NormalizedNode<?, ?>>();
+            for (NormalizedNodeMessages.NodeMapEntry nodeMapEntry : nodeMap
+                .getMapEntriesList()) {
+                YangInstanceIdentifier id = InstanceIdentifierUtils
+                    .fromSerializable(nodeMapEntry.getInstanceIdentifierPath());
+                mapEntries.put(id,
+                    nodeCodec.decode(id, nodeMapEntry.getNormalizedNode()));
+            }
+            return mapEntries;
+        }
+
+
+        @Override
+        public Map<YangInstanceIdentifier, NormalizedNode<?, ?>> getUpdatedData() {
+            if(updatedData == null){
+                return Collections.emptyMap();
+            }
+            return updatedData;
+        }
+
+        DataChangedEvent setUpdateData(NormalizedNodeMessages.NodeMap nodeMap) {
+            this.updatedData = convertNodeMapToMap(nodeMap);
+            return this;
+        }
+
+        @Override
+        public Set<YangInstanceIdentifier> getRemovedPaths() {
+            if (removedPathIds == null) {
+                return Collections.emptySet();
+            }
+            return removedPathIds;
+        }
+
+        public DataChangedEvent setRemovedPaths(List<NormalizedNodeMessages.InstanceIdentifier> removedPaths) {
+            Set<YangInstanceIdentifier> removedIds = new HashSet<>();
+            for (NormalizedNodeMessages.InstanceIdentifier path : removedPaths) {
+                removedIds.add(InstanceIdentifierUtils.fromSerializable(path));
+            }
+            this.removedPathIds = removedIds;
+            return this;
+        }
+
+        @Override
+        public Map<YangInstanceIdentifier, NormalizedNode<?, ?>> getOriginalData() {
+            if (originalData == null) {
+                Collections.emptyMap();
+            }
+            return originalData;
+        }
+
+        DataChangedEvent setOriginalData(
+            NormalizedNodeMessages.NodeMap nodeMap) {
+            this.originalData = convertNodeMapToMap(nodeMap);
+            return this;
+        }
+
+        @Override
+        public NormalizedNode<?, ?> getOriginalSubtree() {
+            return originalSubTree;
+        }
+
+        DataChangedEvent setOriginalSubtree(NormalizedNodeMessages.Node node,
+            YangInstanceIdentifier instanceIdentifierPath) {
+            originalSubTree = nodeCodec.decode(instanceIdentifierPath, node);
+            return this;
+        }
+
+        @Override
+        public NormalizedNode<?, ?> getUpdatedSubtree() {
+            return updatedSubTree;
+        }
+
+        DataChangedEvent setUpdatedSubtree(NormalizedNodeMessages.Node node,
+            YangInstanceIdentifier instanceIdentifierPath) {
+            updatedSubTree = nodeCodec.decode(instanceIdentifierPath, node);
+            return this;
+        }
+
+
+    }
+
+
+
 }
index 3531021..cffe985 100644 (file)
@@ -8,5 +8,12 @@
 
 package org.opendaylight.controller.cluster.datastore.messages;
 
-public class DataChangedReply {
+import org.opendaylight.controller.protobuff.messages.datachange.notification.DataChangeListenerMessages;
+
+public class DataChangedReply implements SerializableMessage {
+  public static final Class SERIALIZABLE_CLASS = DataChangeListenerMessages.DataChangedReply.class;
+  @Override
+  public Object toSerializable() {
+    return DataChangeListenerMessages.DataChangedReply.newBuilder().build();
+  }
 }
index babe1c6..17861a5 100644 (file)
@@ -10,29 +10,29 @@ package org.opendaylight.controller.cluster.datastore.messages;
 
 import org.opendaylight.controller.cluster.datastore.utils.InstanceIdentifierUtils;
 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
-import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 
 public class DeleteData implements SerializableMessage {
 
     public static final Class SERIALIZABLE_CLASS = ShardTransactionMessages.DeleteData.class;
 
-    private final InstanceIdentifier path;
+    private final YangInstanceIdentifier path;
 
-    public DeleteData(InstanceIdentifier path) {
+    public DeleteData(YangInstanceIdentifier path) {
         this.path = path;
     }
 
-    public InstanceIdentifier getPath() {
+    public YangInstanceIdentifier getPath() {
         return path;
     }
 
     @Override public Object toSerializable() {
         return ShardTransactionMessages.DeleteData.newBuilder()
-            .setInstanceIdentifierPathArguments(path.toString()).build();
+            .setInstanceIdentifierPathArguments(InstanceIdentifierUtils.toSerializable(path)).build();
     }
 
     public static DeleteData fromSerizalizable(Object serializable){
         ShardTransactionMessages.DeleteData o = (ShardTransactionMessages.DeleteData) serializable;
-        return new DeleteData(InstanceIdentifierUtils.from(o.getInstanceIdentifierPathArguments()));
+        return new DeleteData(InstanceIdentifierUtils.fromSerializable(o.getInstanceIdentifierPathArguments()));
     }
 }
index a3c7305..8e2a7b7 100644 (file)
@@ -8,5 +8,12 @@
 
 package org.opendaylight.controller.cluster.datastore.messages;
 
-public class DeleteDataReply {
+import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
+
+public class DeleteDataReply implements SerializableMessage{
+  public static final Class SERIALIZABLE_CLASS = ShardTransactionMessages.DeleteDataReply.class;
+  @Override
+  public Object toSerializable() {
+    return ShardTransactionMessages.DeleteDataReply.newBuilder().build();
+  }
 }
index f2497e6..f584467 100644 (file)
@@ -9,13 +9,14 @@
 package org.opendaylight.controller.cluster.datastore.messages;
 
 import com.google.common.base.Preconditions;
+import org.opendaylight.controller.protobuff.messages.shard.ShardManagerMessages;
 
 /**
  * The FindPrimary message is used to locate the primary of any given shard
  *
- * TODO : Make this serializable
  */
-public class FindPrimary{
+public class FindPrimary implements SerializableMessage{
+  public static final Class SERIALIZABLE_CLASS = ShardManagerMessages.FindPrimary.class;
     private final String shardName;
 
     public FindPrimary(String shardName){
@@ -28,4 +29,13 @@ public class FindPrimary{
     public String getShardName() {
         return shardName;
     }
+
+  @Override
+  public Object toSerializable() {
+    return ShardManagerMessages.FindPrimary.newBuilder().setShardName(shardName).build();
+  }
+
+  public static FindPrimary fromSerializable(Object message){
+    return new FindPrimary(((ShardManagerMessages.FindPrimary)message).getShardName());
+  }
 }
index 59f1387..ba79081 100644 (file)
@@ -12,7 +12,7 @@ import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCo
 import org.opendaylight.controller.cluster.datastore.utils.InstanceIdentifierUtils;
 import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages;
 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
-import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 
@@ -20,7 +20,7 @@ public class MergeData extends ModifyData{
 
     public static final Class SERIALIZABLE_CLASS = ShardTransactionMessages.MergeData.class;
 
-    public MergeData(InstanceIdentifier path, NormalizedNode<?, ?> data,
+    public MergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data,
         SchemaContext context) {
         super(path, data, context);
     }
@@ -28,16 +28,16 @@ public class MergeData extends ModifyData{
     @Override public Object toSerializable() {
 
         NormalizedNodeMessages.Node normalizedNode =
-            new NormalizedNodeToNodeCodec(schemaContext).encode(InstanceIdentifierUtils.from(path.toString()), data)
+            new NormalizedNodeToNodeCodec(schemaContext).encode(path, data)
                 .getNormalizedNode();
         return ShardTransactionMessages.MergeData.newBuilder()
-            .setInstanceIdentifierPathArguments(path.toString())
+            .setInstanceIdentifierPathArguments(InstanceIdentifierUtils.toSerializable(path))
             .setNormalizedNode(normalizedNode).build();
     }
 
     public static MergeData fromSerializable(Object serializable, SchemaContext schemaContext){
         ShardTransactionMessages.MergeData o = (ShardTransactionMessages.MergeData) serializable;
-        InstanceIdentifier identifier = InstanceIdentifierUtils.from(o.getInstanceIdentifierPathArguments());
+        YangInstanceIdentifier identifier = InstanceIdentifierUtils.fromSerializable(o.getInstanceIdentifierPathArguments());
 
         NormalizedNode<?, ?> normalizedNode =
             new NormalizedNodeToNodeCodec(schemaContext)
index 8e90972..81b1c3b 100644 (file)
@@ -8,5 +8,13 @@
 
 package org.opendaylight.controller.cluster.datastore.messages;
 
-public class MergeDataReply {
+import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
+
+public class MergeDataReply implements SerializableMessage{
+  public static final Class SERIALIZABLE_CLASS = ShardTransactionMessages.MergeDataReply.class;
+
+  @Override
+  public Object toSerializable() {
+    return ShardTransactionMessages.MergeDataReply.newBuilder().build();
+  }
 }
index cdab30c..b5c39d1 100644 (file)
@@ -9,16 +9,16 @@
 package org.opendaylight.controller.cluster.datastore.messages;
 
 import com.google.common.base.Preconditions;
-import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 
 public abstract class ModifyData implements SerializableMessage {
-    protected final InstanceIdentifier path;
+    protected final YangInstanceIdentifier path;
     protected final NormalizedNode<?, ?> data;
     protected final SchemaContext schemaContext;
 
-    public ModifyData(InstanceIdentifier path, NormalizedNode<?, ?> data,
+    public ModifyData(YangInstanceIdentifier path, NormalizedNode<?, ?> data,
         SchemaContext context) {
         Preconditions.checkNotNull(context,
             "Cannot serialize an object which does not have a schema schemaContext");
@@ -29,7 +29,7 @@ public abstract class ModifyData implements SerializableMessage {
         this.schemaContext = context;
     }
 
-    public InstanceIdentifier getPath() {
+    public YangInstanceIdentifier getPath() {
         return path;
     }
 
index 87a9c77..1e5a053 100644 (file)
@@ -8,5 +8,14 @@
 
 package org.opendaylight.controller.cluster.datastore.messages;
 
-public class PreCommitTransaction {
+import org.opendaylight.controller.protobuff.messages.cohort3pc.ThreePhaseCommitCohortMessages;
+
+public class PreCommitTransaction implements SerializableMessage{
+
+  public static Class SERIALIZABLE_CLASS = ThreePhaseCommitCohortMessages.PreCommitTransaction.class;
+
+  @Override
+  public Object toSerializable() {
+    return  ThreePhaseCommitCohortMessages.PreCommitTransaction.newBuilder().build();
+  }
 }
index f499c72..1aedae3 100644 (file)
@@ -8,5 +8,14 @@
 
 package org.opendaylight.controller.cluster.datastore.messages;
 
-public class PreCommitTransactionReply {
+import org.opendaylight.controller.protobuff.messages.cohort3pc.ThreePhaseCommitCohortMessages;
+
+public class PreCommitTransactionReply implements SerializableMessage{
+
+  public static Class SERIALIZABLE_CLASS = ThreePhaseCommitCohortMessages.PreCommitTransactionReply.class;
+
+  @Override
+  public Object toSerializable() {
+    return  ThreePhaseCommitCohortMessages.PreCommitTransactionReply.newBuilder().build();
+  }
 }
index d6aae37..6950283 100644 (file)
@@ -8,7 +8,10 @@
 
 package org.opendaylight.controller.cluster.datastore.messages;
 
-public class PrimaryFound {
+import org.opendaylight.controller.protobuff.messages.shard.ShardManagerMessages;
+
+public class PrimaryFound implements SerializableMessage {
+  public static final Class SERIALIZABLE_CLASS = ShardManagerMessages.PrimaryFound.class;
   private final String primaryPath;
 
   public PrimaryFound(String primaryPath) {
@@ -44,4 +47,12 @@ public class PrimaryFound {
   }
 
 
+  @Override
+  public Object toSerializable() {
+    return  ShardManagerMessages.PrimaryFound.newBuilder().setPrimaryPath(primaryPath).build();
+  }
+
+  public static PrimaryFound fromSerializable(Object message){
+    return new PrimaryFound(((ShardManagerMessages.PrimaryFound)message).getPrimaryPath());
+  }
 }
index c66e12c..057028c 100644 (file)
@@ -9,8 +9,10 @@
 package org.opendaylight.controller.cluster.datastore.messages;
 
 import com.google.common.base.Preconditions;
+import org.opendaylight.controller.protobuff.messages.shard.ShardManagerMessages;
 
-public class PrimaryNotFound {
+public class PrimaryNotFound implements SerializableMessage {
+  public static final Class SERIALIZABLE_CLASS = ShardManagerMessages.PrimaryNotFound.class;
 
     private final String shardName;
 
@@ -37,4 +39,13 @@ public class PrimaryNotFound {
     public int hashCode() {
         return shardName != null ? shardName.hashCode() : 0;
     }
+
+  @Override
+  public Object toSerializable() {
+    return ShardManagerMessages.PrimaryNotFound.newBuilder().setShardName(shardName).build();
+  }
+
+  public static PrimaryNotFound fromSerializable(Object message){
+    return new PrimaryNotFound(((ShardManagerMessages.PrimaryNotFound)message).getShardName());
+  }
 }
index cb6347f..a698f46 100644 (file)
@@ -10,28 +10,28 @@ package org.opendaylight.controller.cluster.datastore.messages;
 
 import org.opendaylight.controller.cluster.datastore.utils.InstanceIdentifierUtils;
 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
-import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 
 public class ReadData {
   public static final Class SERIALIZABLE_CLASS = ShardTransactionMessages.ReadData.class;
-  private final InstanceIdentifier path;
+  private final YangInstanceIdentifier path;
 
-  public ReadData(InstanceIdentifier path) {
+  public ReadData(YangInstanceIdentifier path) {
     this.path = path;
   }
 
-  public InstanceIdentifier getPath() {
+  public YangInstanceIdentifier getPath() {
     return path;
   }
 
   public Object toSerializable(){
     return ShardTransactionMessages.ReadData.newBuilder()
-        .setInstanceIdentifierPathArguments(path.toString())
+        .setInstanceIdentifierPathArguments(InstanceIdentifierUtils.toSerializable(path))
         .build();
   }
 
   public static ReadData fromSerializable(Object serializable){
     ShardTransactionMessages.ReadData o = (ShardTransactionMessages.ReadData) serializable;
-    return new ReadData(InstanceIdentifierUtils.from(o.getInstanceIdentifierPathArguments()));
+    return new ReadData(InstanceIdentifierUtils.fromSerializable(o.getInstanceIdentifierPathArguments()));
   }
 }
index a8926be..c5498ca 100644 (file)
@@ -10,11 +10,12 @@ package org.opendaylight.controller.cluster.datastore.messages;
 
 import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec;
 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
-import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 
 public class ReadDataReply implements SerializableMessage{
+
   private final NormalizedNode<?, ?> normalizedNode;
   private final SchemaContext schemaContext;
   public static final Class SERIALIZABLE_CLASS = ShardTransactionMessages.ReadDataReply.class;
@@ -32,7 +33,7 @@ public class ReadDataReply implements SerializableMessage{
     if(normalizedNode != null) {
       return ShardTransactionMessages.ReadDataReply.newBuilder()
           .setNormalizedNode(new NormalizedNodeToNodeCodec(schemaContext)
-                  .encode(InstanceIdentifier.builder().build(), normalizedNode).getNormalizedNode()
+                  .encode(YangInstanceIdentifier.builder().build(), normalizedNode).getNormalizedNode()
           ).build();
     }else{
       return ShardTransactionMessages.ReadDataReply.newBuilder().build();
@@ -41,7 +42,7 @@ public class ReadDataReply implements SerializableMessage{
 
   }
 
-  public static ReadDataReply fromSerializable(SchemaContext schemaContext,InstanceIdentifier id,Object serializable){
+  public static ReadDataReply fromSerializable(SchemaContext schemaContext,YangInstanceIdentifier id,Object serializable){
     ShardTransactionMessages.ReadDataReply o = (ShardTransactionMessages.ReadDataReply) serializable;
     return new ReadDataReply(schemaContext,new NormalizedNodeToNodeCodec(schemaContext).decode(id, o.getNormalizedNode()));
   }
index 58eef66..3a51d98 100644 (file)
@@ -8,5 +8,14 @@
 
 package org.opendaylight.controller.cluster.datastore.messages;
 
-public class ReadyTransaction {
+import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
+
+public class ReadyTransaction implements SerializableMessage{
+  public static final Class SERIALIZABLE_CLASS = ShardTransactionMessages.ReadyTransaction.class;
+
+  @Override
+  public Object toSerializable() {
+    return ShardTransactionMessages.ReadyTransaction.newBuilder().build();
+  }
+
 }
index 32d31bf..5273dc2 100644 (file)
@@ -9,8 +9,11 @@
 package org.opendaylight.controller.cluster.datastore.messages;
 
 import akka.actor.ActorPath;
+import akka.actor.ActorSystem;
+import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
 
-public class ReadyTransactionReply {
+public class ReadyTransactionReply implements SerializableMessage {
+  public static final Class SERIALIZABLE_CLASS = ShardTransactionMessages.ReadyTransactionReply.class;
   private final ActorPath cohortPath;
 
   public ReadyTransactionReply(ActorPath cohortPath) {
@@ -21,4 +24,16 @@ public class ReadyTransactionReply {
   public ActorPath getCohortPath() {
     return cohortPath;
   }
+
+  @Override
+  public ShardTransactionMessages.ReadyTransactionReply toSerializable() {
+    return ShardTransactionMessages.ReadyTransactionReply.newBuilder()
+        .setActorPath(cohortPath.toString()).build();
+  }
+
+  public static ReadyTransactionReply fromSerializable(ActorSystem actorSystem,Object serializable){
+    ShardTransactionMessages.ReadyTransactionReply o = (ShardTransactionMessages.ReadyTransactionReply) serializable;
+    return new ReadyTransactionReply(
+        actorSystem.actorFor(o.getActorPath()).path());
+  }
 }
index db8a08f..c1ec0a8 100644 (file)
@@ -13,16 +13,16 @@ import akka.actor.ActorSystem;
 import org.opendaylight.controller.cluster.datastore.utils.InstanceIdentifierUtils;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
 import org.opendaylight.controller.protobuff.messages.registration.ListenerRegistrationMessages;
-import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 
 public class RegisterChangeListener implements SerializableMessage {
   public static final Class SERIALIZABLE_CLASS = ListenerRegistrationMessages.RegisterChangeListener.class;
-    private final InstanceIdentifier path;
+    private final YangInstanceIdentifier path;
     private final ActorPath dataChangeListenerPath;
     private final AsyncDataBroker.DataChangeScope scope;
 
 
-    public RegisterChangeListener(InstanceIdentifier path,
+    public RegisterChangeListener(YangInstanceIdentifier path,
         ActorPath dataChangeListenerPath,
         AsyncDataBroker.DataChangeScope scope) {
         this.path = path;
@@ -30,7 +30,7 @@ public class RegisterChangeListener implements SerializableMessage {
         this.scope = scope;
     }
 
-    public InstanceIdentifier getPath() {
+    public YangInstanceIdentifier getPath() {
         return path;
     }
 
@@ -47,14 +47,14 @@ public class RegisterChangeListener implements SerializableMessage {
     @Override
     public ListenerRegistrationMessages.RegisterChangeListener toSerializable() {
       return ListenerRegistrationMessages.RegisterChangeListener.newBuilder()
-          .setInstanceIdentifierPath(path.toString())
+          .setInstanceIdentifierPath(InstanceIdentifierUtils.toSerializable(path))
           .setDataChangeListenerActorPath(dataChangeListenerPath.toString())
           .setDataChangeScope(scope.ordinal()).build();
     }
 
   public static RegisterChangeListener fromSerializable(ActorSystem actorSystem,Object serializable){
     ListenerRegistrationMessages.RegisterChangeListener o = (ListenerRegistrationMessages.RegisterChangeListener) serializable;
-    return new RegisterChangeListener(InstanceIdentifierUtils.from(o.getInstanceIdentifierPath()),
+    return new RegisterChangeListener(InstanceIdentifierUtils.fromSerializable(o.getInstanceIdentifierPath()),
                                                 actorSystem.actorFor(o.getDataChangeListenerActorPath()).path(),
                                               AsyncDataBroker.DataChangeScope.values()[o.getDataChangeScope()]);
   }
index 3cde958..87fa010 100644 (file)
@@ -12,7 +12,7 @@ import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCo
 import org.opendaylight.controller.cluster.datastore.utils.InstanceIdentifierUtils;
 import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages;
 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
-import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 
@@ -20,25 +20,24 @@ public class WriteData extends ModifyData{
 
   public static final Class SERIALIZABLE_CLASS = ShardTransactionMessages.WriteData.class;
 
-  public WriteData(InstanceIdentifier path, NormalizedNode<?, ?> data, SchemaContext schemaContext) {
+  public WriteData(YangInstanceIdentifier path, NormalizedNode<?, ?> data, SchemaContext schemaContext) {
     super(path, data, schemaContext);
   }
 
     @Override public Object toSerializable() {
 
         NormalizedNodeMessages.Node normalizedNode =
-            new NormalizedNodeToNodeCodec(schemaContext).encode(
-                InstanceIdentifierUtils.from(path.toString()), data)
+            new NormalizedNodeToNodeCodec(schemaContext).encode(path, data)
                 .getNormalizedNode();
         return ShardTransactionMessages.WriteData.newBuilder()
-            .setInstanceIdentifierPathArguments(path.toString())
+            .setInstanceIdentifierPathArguments(InstanceIdentifierUtils.toSerializable(path))
             .setNormalizedNode(normalizedNode).build();
 
     }
 
     public static WriteData fromSerializable(Object serializable, SchemaContext schemaContext){
         ShardTransactionMessages.WriteData o = (ShardTransactionMessages.WriteData) serializable;
-        InstanceIdentifier identifier = InstanceIdentifierUtils.from(o.getInstanceIdentifierPathArguments());
+        YangInstanceIdentifier identifier = InstanceIdentifierUtils.fromSerializable(o.getInstanceIdentifierPathArguments());
 
         NormalizedNode<?, ?> normalizedNode =
             new NormalizedNodeToNodeCodec(schemaContext)
index 2a2b4ed..5404fb6 100644 (file)
@@ -8,5 +8,12 @@
 
 package org.opendaylight.controller.cluster.datastore.messages;
 
-public class WriteDataReply {
+import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
+
+public class WriteDataReply implements SerializableMessage{
+  public static final Class SERIALIZABLE_CLASS = ShardTransactionMessages.WriteDataReply.class;
+  @Override
+  public Object toSerializable() {
+    return ShardTransactionMessages.WriteDataReply.newBuilder().build();
+  }
 }
index 5d9f962..169397b 100644 (file)
@@ -9,7 +9,7 @@
 package org.opendaylight.controller.cluster.datastore.modification;
 
 
-import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 
 import java.io.Serializable;
 
@@ -21,9 +21,9 @@ public abstract class AbstractModification implements Modification,
 
     private static final long serialVersionUID = 1638042650152084457L;
 
-    protected final InstanceIdentifier path;
+    protected final YangInstanceIdentifier path;
 
-    protected AbstractModification(InstanceIdentifier path) {
+    protected AbstractModification(YangInstanceIdentifier path) {
         this.path = path;
     }
 }
index f7d8b87..593f458 100644 (file)
@@ -11,13 +11,13 @@ package org.opendaylight.controller.cluster.datastore.modification;
 import org.opendaylight.controller.cluster.datastore.utils.InstanceIdentifierUtils;
 import org.opendaylight.controller.protobuff.messages.persistent.PersistentMessages;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
-import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 
 /**
  * DeleteModification store all the parameters required to delete a path from the data tree
  */
 public class DeleteModification extends AbstractModification {
-  public DeleteModification(InstanceIdentifier path) {
+  public DeleteModification(YangInstanceIdentifier path) {
     super(path);
   }
 
@@ -29,12 +29,12 @@ public class DeleteModification extends AbstractModification {
     @Override public Object toSerializable() {
         return PersistentMessages.Modification.newBuilder()
             .setType(this.getClass().toString())
-            .setPath(this.path.toString())
+            .setPath(InstanceIdentifierUtils.toSerializable(this.path))
             .build();
     }
 
     public static DeleteModification fromSerializable(Object serializable){
         PersistentMessages.Modification o = (PersistentMessages.Modification) serializable;
-        return new DeleteModification(InstanceIdentifierUtils.from(o.getPath()));
+        return new DeleteModification(InstanceIdentifierUtils.fromSerializable(o.getPath()));
     }
 }
index b484f85..f06adcf 100644 (file)
@@ -13,7 +13,7 @@ import org.opendaylight.controller.cluster.datastore.utils.InstanceIdentifierUti
 import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages;
 import org.opendaylight.controller.protobuff.messages.persistent.PersistentMessages;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
-import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 
@@ -25,7 +25,7 @@ public class MergeModification extends AbstractModification {
     private final SchemaContext schemaContext;
 
 
-    public MergeModification(InstanceIdentifier path, NormalizedNode data,
+    public MergeModification(YangInstanceIdentifier path, NormalizedNode data,
         SchemaContext schemaContext) {
         super(path);
         this.data = data;
@@ -40,11 +40,11 @@ public class MergeModification extends AbstractModification {
     @Override public Object toSerializable() {
         NormalizedNodeMessages.Container encode =
             new NormalizedNodeToNodeCodec(schemaContext).encode(
-                InstanceIdentifierUtils.from(path.toString()), data);
+                path, data);
 
         return PersistentMessages.Modification.newBuilder()
             .setType(this.getClass().toString())
-            .setPath(this.path.toString())
+            .setPath(InstanceIdentifierUtils.toSerializable(this.path))
             .setData(encode.getNormalizedNode())
             .build();
 
@@ -55,7 +55,7 @@ public class MergeModification extends AbstractModification {
         SchemaContext schemaContext) {
         PersistentMessages.Modification o = (PersistentMessages.Modification) serializable;
 
-        InstanceIdentifier path = InstanceIdentifierUtils.from(o.getPath());
+        YangInstanceIdentifier path = InstanceIdentifierUtils.fromSerializable(o.getPath());
         NormalizedNode data = new NormalizedNodeToNodeCodec(schemaContext).decode(
             path, o.getData());
 
index 0f81c7c..b4a7dd6 100644 (file)
@@ -13,7 +13,7 @@ import org.opendaylight.controller.cluster.datastore.utils.InstanceIdentifierUti
 import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages;
 import org.opendaylight.controller.protobuff.messages.persistent.PersistentMessages;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
-import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 
@@ -25,7 +25,7 @@ public class WriteModification extends AbstractModification {
   private final NormalizedNode data;
     private final SchemaContext schemaContext;
 
-    public WriteModification(InstanceIdentifier path, NormalizedNode data, SchemaContext schemaContext) {
+    public WriteModification(YangInstanceIdentifier path, NormalizedNode data, SchemaContext schemaContext) {
     super(path);
     this.data = data;
         this.schemaContext = schemaContext;
@@ -39,12 +39,12 @@ public class WriteModification extends AbstractModification {
     @Override public Object toSerializable() {
         NormalizedNodeMessages.Container encode =
             new NormalizedNodeToNodeCodec(schemaContext).encode(
-                InstanceIdentifierUtils.from(path.toString()), data);
+                path, data);
 
 
         return PersistentMessages.Modification.newBuilder()
             .setType(this.getClass().toString())
-            .setPath(this.path.toString())
+            .setPath(InstanceIdentifierUtils.toSerializable(this.path))
             .setData(encode.getNormalizedNode())
             .build();
 
@@ -55,7 +55,7 @@ public class WriteModification extends AbstractModification {
         SchemaContext schemaContext) {
         PersistentMessages.Modification o = (PersistentMessages.Modification) serializable;
 
-        InstanceIdentifier path = InstanceIdentifierUtils.from(o.getPath());
+        YangInstanceIdentifier path = InstanceIdentifierUtils.fromSerializable(o.getPath());
         NormalizedNode data = new NormalizedNodeToNodeCodec(schemaContext).decode(
             path, o.getData());
 
index a8ab5c4..55c682b 100644 (file)
@@ -8,7 +8,7 @@
 
 package org.opendaylight.controller.cluster.datastore.shardstrategy;
 
-import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 
 /**
  * The DefaultShardStrategy basically puts all data into the default Shard
@@ -22,7 +22,7 @@ public class DefaultShardStrategy implements ShardStrategy{
   public static final String DEFAULT_SHARD = "default";
 
   @Override
-  public String findShard(InstanceIdentifier path) {
+  public String findShard(YangInstanceIdentifier path) {
     return DEFAULT_SHARD;
   }
 }
index fe6f740..6f4b65a 100644 (file)
@@ -9,7 +9,7 @@
 package org.opendaylight.controller.cluster.datastore.shardstrategy;
 
 import org.opendaylight.controller.cluster.datastore.Configuration;
-import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 
 public class ModuleShardStrategy implements ShardStrategy {
 
@@ -24,7 +24,7 @@ public class ModuleShardStrategy implements ShardStrategy {
         this.configuration = configuration;
     }
 
-    @Override public String findShard(InstanceIdentifier path) {
+    @Override public String findShard(YangInstanceIdentifier path) {
         return configuration.getShardNamesFromModuleName(moduleName).get(0);
     }
 }
index f75eb2d..2df945e 100644 (file)
@@ -8,7 +8,7 @@
 
 package org.opendaylight.controller.cluster.datastore.shardstrategy;
 
-import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 
 /**
  * The role of ShardStrategy is to figure out which Shards a given piece of data belongs to
@@ -20,5 +20,5 @@ public interface ShardStrategy {
    * @param path The location of the data in the logical tree
    * @return
    */
-  String findShard(InstanceIdentifier path);
+  String findShard(YangInstanceIdentifier path);
 }
index 8b077d6..f4ab8fa 100644 (file)
@@ -11,7 +11,7 @@ package org.opendaylight.controller.cluster.datastore.shardstrategy;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import org.opendaylight.controller.cluster.datastore.Configuration;
-import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
@@ -29,7 +29,7 @@ public class ShardStrategyFactory {
         moduleNameToStrategyMap = configuration.getModuleNameToShardStrategyMap();
     }
 
-    public static ShardStrategy getStrategy(InstanceIdentifier path) {
+    public static ShardStrategy getStrategy(YangInstanceIdentifier path) {
         Preconditions.checkState(configuration != null, "configuration should not be missing");
         Preconditions.checkNotNull(path, "path should not be null");
 
@@ -44,9 +44,8 @@ public class ShardStrategyFactory {
     }
 
 
-    private static String getModuleName(InstanceIdentifier path) {
-        String namespace = path.getLastPathArgument().getNodeType().getNamespace()
-            .toASCIIString();
+    private static String getModuleName(YangInstanceIdentifier path) {
+        String namespace = path.getPathArguments().iterator().next().getNodeType().getNamespace().toASCIIString();
 
         Optional<String> optional =
             configuration.getModuleNameFromNameSpace(namespace);
index 2f1949e..c7ee7d8 100644 (file)
@@ -14,6 +14,10 @@ import akka.actor.ActorSelection;
 import akka.actor.ActorSystem;
 import akka.actor.PoisonPill;
 import akka.util.Timeout;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
 import org.opendaylight.controller.cluster.datastore.Configuration;
 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
 import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
@@ -41,18 +45,24 @@ public class ActorContext {
     private static final Logger
         LOG = LoggerFactory.getLogger(ActorContext.class);
 
-    public static final FiniteDuration ASK_DURATION = Duration.create(5, TimeUnit.SECONDS);
-    public static final Duration AWAIT_DURATION = Duration.create(5, TimeUnit.SECONDS);
+    public static final FiniteDuration ASK_DURATION =
+        Duration.create(5, TimeUnit.SECONDS);
+    public static final Duration AWAIT_DURATION =
+        Duration.create(5, TimeUnit.SECONDS);
 
     private final ActorSystem actorSystem;
     private final ActorRef shardManager;
+    private final ClusterWrapper clusterWrapper;
     private final Configuration configuration;
 
     private SchemaContext schemaContext = null;
 
-    public ActorContext(ActorSystem actorSystem, ActorRef shardManager, Configuration configuration){
+    public ActorContext(ActorSystem actorSystem, ActorRef shardManager,
+        ClusterWrapper clusterWrapper,
+        Configuration configuration) {
         this.actorSystem = actorSystem;
         this.shardManager = shardManager;
+        this.clusterWrapper = clusterWrapper;
         this.configuration = configuration;
     }
 
@@ -64,11 +74,11 @@ public class ActorContext {
         return shardManager;
     }
 
-    public ActorSelection actorSelection(String actorPath){
+    public ActorSelection actorSelection(String actorPath) {
         return actorSystem.actorSelection(actorPath);
     }
 
-    public ActorSelection actorSelection(ActorPath actorPath){
+    public ActorSelection actorSelection(ActorPath actorPath) {
         return actorSystem.actorSelection(actorPath);
     }
 
@@ -80,28 +90,35 @@ public class ActorContext {
      * @return
      */
     public ActorSelection findPrimary(String shardName) {
+        String path = findPrimaryPath(shardName);
+        return actorSystem.actorSelection(path);
+    }
+
+    public String findPrimaryPath(String shardName) {
         Object result = executeLocalOperation(shardManager,
-            new FindPrimary(shardName), ASK_DURATION);
+            new FindPrimary(shardName).toSerializable(), ASK_DURATION);
 
-        if(result instanceof PrimaryFound){
-            PrimaryFound found = (PrimaryFound) result;
+        if (result.getClass().equals(PrimaryFound.SERIALIZABLE_CLASS)) {
+            PrimaryFound found = PrimaryFound.fromSerializable(result);
 
-            LOG.error("Primary found {}", found.getPrimaryPath());
+            LOG.debug("Primary found {}", found.getPrimaryPath());
 
-            return actorSystem.actorSelection(found.getPrimaryPath());
+            return found.getPrimaryPath();
         }
         throw new PrimaryNotFoundException();
     }
 
+
     /**
      * Executes an operation on a local actor and wait for it's response
+     *
      * @param actor
      * @param message
      * @param duration
      * @return The response of the operation
      */
     public Object executeLocalOperation(ActorRef actor, Object message,
-        FiniteDuration duration){
+        FiniteDuration duration) {
         Future<Object> future =
             ask(actor, message, new Timeout(duration));
 
@@ -114,13 +131,17 @@ public class ActorContext {
 
     /**
      * Execute an operation on a remote actor and wait for it's response
+     *
      * @param actor
      * @param message
      * @param duration
      * @return
      */
     public Object executeRemoteOperation(ActorSelection actor, Object message,
-        FiniteDuration duration){
+        FiniteDuration duration) {
+
+        LOG.debug("Sending remote message {} to {}", message.getClass().toString(), actor.toString());
+
         Future<Object> future =
             ask(actor, message, new Timeout(duration));
 
@@ -134,18 +155,19 @@ public class ActorContext {
     /**
      * Execute an operation on the primary for a given shard
      * <p>
-     *     This method first finds the primary for a given shard ,then sends
-     *     the message to the remote shard and waits for a response
+     * This method first finds the primary for a given shard ,then sends
+     * the message to the remote shard and waits for a response
      * </p>
+     *
      * @param shardName
      * @param message
      * @param duration
-     * @throws org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException if the message to the remote shard times out
-     * @throws org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException if the primary shard is not found
-     *
      * @return
+     * @throws org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException         if the message to the remote shard times out
+     * @throws org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException if the primary shard is not found
      */
-    public Object executeShardOperation(String shardName, Object message, FiniteDuration duration){
+    public Object executeShardOperation(String shardName, Object message,
+        FiniteDuration duration) {
         ActorSelection primary = findPrimary(shardName);
 
         return executeRemoteOperation(primary, message, duration);
@@ -155,4 +177,44 @@ public class ActorContext {
         shardManager.tell(PoisonPill.getInstance(), null);
         actorSystem.shutdown();
     }
+
+    public String getRemoteActorPath(final String shardName,
+        final String localPathOfRemoteActor) {
+        final String path = findPrimaryPath(shardName);
+
+        LoadingCache<String, String> graphs = CacheBuilder.newBuilder()
+            .expireAfterAccess(2, TimeUnit.SECONDS)
+            .build(
+                new CacheLoader<String, String>() {
+                    public String load(String key) {
+                        return resolvePath(path, localPathOfRemoteActor);
+                    }
+                }
+            );
+        return graphs.getUnchecked(localPathOfRemoteActor);
+    }
+
+    public String resolvePath(final String primaryPath,
+        final String localPathOfRemoteActor) {
+        StringBuilder builder = new StringBuilder();
+        String[] primaryPathElements = primaryPath.split("/");
+        builder.append(primaryPathElements[0]).append("//")
+            .append(primaryPathElements[1]).append(primaryPathElements[2]);
+        String[] remotePathElements = localPathOfRemoteActor.split("/");
+        for (int i = 3; i < remotePathElements.length; i++) {
+            builder.append("/").append(remotePathElements[i]);
+        }
+
+        return builder.toString();
+
+    }
+
+    public ActorPath actorFor(String path){
+        return actorSystem.actorFor(path).path();
+    }
+
+    public String getCurrentMemberName(){
+        return clusterWrapper.getCurrentMemberName();
+    }
+
 }
index fafeba5..20268a6 100644 (file)
@@ -1,7 +1,10 @@
 package org.opendaylight.controller.cluster.datastore.utils;
 
 import org.opendaylight.controller.cluster.datastore.node.utils.NodeIdentifierFactory;
-import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
+import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -10,33 +13,60 @@ import java.util.List;
  * @author: syedbahm
  */
 public class InstanceIdentifierUtils {
-  public static String getParentPath(String currentElementPath) {
-    String parentPath = "";
-
-    if (currentElementPath != null) {
-      String[] parentPaths = currentElementPath.split("/");
-      if (parentPaths.length > 2) {
-        for (int i = 0; i < parentPaths.length - 1; i++) {
-          if (parentPaths[i].length() > 0) {
-            parentPath += "/" + parentPaths[i];
-          }
+
+    protected static final Logger logger = LoggerFactory
+        .getLogger(InstanceIdentifierUtils.class);
+
+    public static String getParentPath(String currentElementPath) {
+        String parentPath = "";
+
+        if (currentElementPath != null) {
+            String[] parentPaths = currentElementPath.split("/");
+            if (parentPaths.length > 2) {
+                for (int i = 0; i < parentPaths.length - 1; i++) {
+                    if (parentPaths[i].length() > 0) {
+                        parentPath += "/" + parentPaths[i];
+                    }
+                }
+            }
+        }
+        return parentPath;
+    }
+
+    @Deprecated
+    public static YangInstanceIdentifier from(String path) {
+        String[] ids = path.split("/");
+
+        List<YangInstanceIdentifier.PathArgument> pathArguments =
+            new ArrayList<>();
+        for (String nodeId : ids) {
+            if (!"".equals(nodeId)) {
+                pathArguments
+                    .add(NodeIdentifierFactory.getArgument(nodeId));
+            }
         }
-      }
+        final YangInstanceIdentifier instanceIdentifier =
+            YangInstanceIdentifier.create(pathArguments);
+        return instanceIdentifier;
     }
-    return parentPath;
-  }
 
-  public static InstanceIdentifier from(String path) {
-    String[] ids = path.split("/");
+    /**
+     * @deprecated Use {@link org.opendaylight.controller.cluster.datastore.util.InstanceIdentifierUtils} instead
+     * @param path
+     * @return
+     */
+    @Deprecated
+    public static NormalizedNodeMessages.InstanceIdentifier toSerializable(YangInstanceIdentifier path){
+        return org.opendaylight.controller.cluster.datastore.util.InstanceIdentifierUtils.toSerializable(path);
+    }
 
-    List<InstanceIdentifier.PathArgument> pathArguments = new ArrayList<>();
-    for (String nodeId : ids) {
-      if (!"".equals(nodeId)) {
-        pathArguments.add(NodeIdentifierFactory.getArgument(nodeId));
-      }
+    /**
+     * @deprecated Use {@link org.opendaylight.controller.cluster.datastore.util.InstanceIdentifierUtils} instead
+     * @param path
+     * @return
+     */
+    @Deprecated
+    public static YangInstanceIdentifier fromSerializable(NormalizedNodeMessages.InstanceIdentifier path){
+        return org.opendaylight.controller.cluster.datastore.util.InstanceIdentifierUtils.fromSerializable(path);
     }
-    final InstanceIdentifier instanceIdentifier =
-        new InstanceIdentifier(pathArguments);
-    return instanceIdentifier;
-  }
 }
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/resources/module-shards.conf b/opendaylight/md-sal/sal-distributed-datastore/src/main/resources/module-shards.conf
new file mode 100644 (file)
index 0000000..60dd775
--- /dev/null
@@ -0,0 +1,25 @@
+module-shards = [
+    {
+        name = "default"
+        shards = [
+            {
+                name="default",
+                replicas = [
+                    "member-1",
+                ]
+            }
+        ]
+    },
+    {
+        name = "inventory"
+        shards = [
+            {
+                name="inventory"
+                replicas = [
+                    "member-1",
+                ]
+            }
+        ]
+    }
+
+]
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/resources/modules.conf b/opendaylight/md-sal/sal-distributed-datastore/src/main/resources/modules.conf
new file mode 100644 (file)
index 0000000..05ef33f
--- /dev/null
@@ -0,0 +1,7 @@
+modules = [
+    {
+        name = "inventory"
+        namespace = "urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:test:people"
+        shard-strategy = "module"
+    }
+]
index 6b9f00e..b62a4b3 100644 (file)
@@ -12,12 +12,10 @@ import akka.actor.ActorPath;
 import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
 import akka.actor.Props;
-import akka.actor.Terminated;
 import akka.testkit.JavaTestKit;
 import junit.framework.Assert;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
-import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionChain;
 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionChainReply;
@@ -59,14 +57,14 @@ public class BasicIntegrationTest extends AbstractActorTest {
                         new UpdateSchemaContext(TestModel.createTestContext()),
                         getRef());
 
-                    shard.tell(new CreateTransactionChain(), getRef());
+                    shard.tell(new CreateTransactionChain().toSerializable(), getRef());
 
                     final ActorSelection transactionChain =
                         new ExpectMsg<ActorSelection>("CreateTransactionChainReply") {
                             protected ActorSelection match(Object in) {
-                                if (in instanceof CreateTransactionChainReply) {
+                                if (in.getClass().equals(CreateTransactionChainReply.SERIALIZABLE_CLASS)) {
                                     ActorPath transactionChainPath =
-                                        ((CreateTransactionChainReply) in)
+                                        CreateTransactionChainReply.fromSerializable(getSystem(),in)
                                             .getTransactionChainPath();
                                     return getSystem()
                                         .actorSelection(transactionChainPath);
@@ -78,7 +76,7 @@ public class BasicIntegrationTest extends AbstractActorTest {
 
                     Assert.assertNotNull(transactionChain);
 
-                    transactionChain.tell(new CreateTransaction("txn-1"), getRef());
+                    transactionChain.tell(new CreateTransaction("txn-1").toSerializable(), getRef());
 
                     final ActorSelection transaction =
                         new ExpectMsg<ActorSelection>("CreateTransactionReply") {
@@ -105,7 +103,7 @@ public class BasicIntegrationTest extends AbstractActorTest {
 
                     Boolean writeDone = new ExpectMsg<Boolean>("WriteDataReply") {
                         protected Boolean match(Object in) {
-                            if (in instanceof WriteDataReply) {
+                            if (in.getClass().equals(WriteDataReply.SERIALIZABLE_CLASS)) {
                                 return true;
                             } else {
                                 throw noMatch();
@@ -115,14 +113,14 @@ public class BasicIntegrationTest extends AbstractActorTest {
 
                     Assert.assertTrue(writeDone);
 
-                    transaction.tell(new ReadyTransaction(), getRef());
+                    transaction.tell(new ReadyTransaction().toSerializable(), getRef());
 
                     final ActorSelection cohort =
                         new ExpectMsg<ActorSelection>("ReadyTransactionReply") {
                             protected ActorSelection match(Object in) {
-                                if (in instanceof ReadyTransactionReply) {
+                                if (in.getClass().equals(ReadyTransactionReply.SERIALIZABLE_CLASS)) {
                                     ActorPath cohortPath =
-                                        ((ReadyTransactionReply) in)
+                                        ReadyTransactionReply.fromSerializable(getSystem(),in)
                                             .getCohortPath();
                                     return getSystem()
                                         .actorSelection(cohortPath);
@@ -137,12 +135,12 @@ public class BasicIntegrationTest extends AbstractActorTest {
                     // Add a watch on the transaction actor so that we are notified when it dies
                     final ActorRef cohorActorRef = watchActor(cohort);
 
-                    cohort.tell(new PreCommitTransaction(), getRef());
+                    cohort.tell(new PreCommitTransaction().toSerializable(), getRef());
 
                     Boolean preCommitDone =
                         new ExpectMsg<Boolean>("PreCommitTransactionReply") {
                             protected Boolean match(Object in) {
-                                if (in instanceof PreCommitTransactionReply) {
+                                if (in.getClass().equals(PreCommitTransactionReply.SERIALIZABLE_CLASS)) {
                                     return true;
                                 } else {
                                     throw noMatch();
@@ -152,51 +150,9 @@ public class BasicIntegrationTest extends AbstractActorTest {
 
                     Assert.assertTrue(preCommitDone);
 
-                    // FIXME : When we commit on the cohort it "kills" the Transaction.
-                    // This in turn kills the child of Transaction as well.
-                    // The order in which we receive the terminated event for both
-                    // these actors is not fixed which may cause this test to fail
-                    cohort.tell(new CommitTransaction(), getRef());
+                    cohort.tell(new CommitTransaction().toSerializable(), getRef());
 
-                    final Boolean terminatedCohort =
-                        new ExpectMsg<Boolean>("Terminated Cohort") {
-                            protected Boolean match(Object in) {
-                                if (in instanceof Terminated) {
-                                    return cohorActorRef.equals(((Terminated) in).actor());
-                                } else {
-                                    throw noMatch();
-                                }
-                            }
-                        }.get(); // this extracts the received message
-
-                    Assert.assertTrue(terminatedCohort);
-
-
-                    final Boolean terminatedTransaction =
-                        new ExpectMsg<Boolean>("Terminated Transaction") {
-                            protected Boolean match(Object in) {
-                                if (in instanceof Terminated) {
-                                    return transactionActorRef.equals(((Terminated) in).actor());
-                                } else {
-                                    throw noMatch();
-                                }
-                            }
-                        }.get(); // this extracts the received message
-
-                    Assert.assertTrue(terminatedTransaction);
-
-                    final Boolean commitDone =
-                        new ExpectMsg<Boolean>("CommitTransactionReply") {
-                            protected Boolean match(Object in) {
-                                if (in instanceof CommitTransactionReply) {
-                                    return true;
-                                } else {
-                                    throw noMatch();
-                                }
-                            }
-                        }.get(); // this extracts the received message
-
-                    Assert.assertTrue(commitDone);
+                    // FIXME : Add assertions that the commit worked and that the cohort and transaction actors were terminated
 
                 }
 
index 85877ce..56fd3c5 100644 (file)
@@ -1,9 +1,11 @@
 package org.opendaylight.controller.cluster.datastore;
 
+import com.typesafe.config.ConfigFactory;
 import junit.framework.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
+import java.io.File;
 import java.util.List;
 
 import static org.junit.Assert.assertTrue;
@@ -30,4 +32,10 @@ public class ConfigurationImplTest {
         assertTrue(memberShardNames.contains("people-1"));
         assertTrue(memberShardNames.contains("cars-1"));
     }
+
+    @Test
+    public void testReadConfigurationFromFile(){
+        File f = new File("./module-shards.conf");
+        ConfigFactory.parseFile(f);
+    }
 }
index d5625d2..8c1cbbb 100644 (file)
@@ -9,62 +9,79 @@ import org.opendaylight.controller.cluster.datastore.messages.DataChanged;
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
 import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor;
 import org.opendaylight.controller.cluster.datastore.utils.MessageCollectorActor;
+import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper;
 import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
+import org.opendaylight.controller.md.cluster.datastore.model.CompositeModel;
+import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
-import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
 public class DataChangeListenerProxyTest extends AbstractActorTest {
 
-    private static class MockDataChangeEvent implements
-        AsyncDataChangeEvent<InstanceIdentifier,NormalizedNode<?,?>> {
+  private static class MockDataChangedEvent implements AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> {
+    Map<YangInstanceIdentifier,NormalizedNode<?,?>> createdData = new HashMap();
+    Map<YangInstanceIdentifier,NormalizedNode<?,?>> updatedData = new HashMap();
+    Map<YangInstanceIdentifier,NormalizedNode<?,?>> originalData = new HashMap();
 
-        @Override
-        public Map<InstanceIdentifier, NormalizedNode<?, ?>> getCreatedData() {
-            throw new UnsupportedOperationException("getCreatedData");
-        }
 
-        @Override
-        public Map<InstanceIdentifier, NormalizedNode<?, ?>> getUpdatedData() {
-            throw new UnsupportedOperationException("getUpdatedData");
-        }
 
-        @Override public Set<InstanceIdentifier> getRemovedPaths() {
-            throw new UnsupportedOperationException("getRemovedPaths");
-        }
+    @Override
+    public Map<YangInstanceIdentifier, NormalizedNode<?, ?>> getCreatedData() {
+      createdData.put(YangInstanceIdentifier.builder().build(), CompositeModel.createDocumentOne(CompositeModel.createTestContext()));
+      return createdData;
+    }
 
-        @Override
-        public Map<InstanceIdentifier, NormalizedNode<?, ?>> getOriginalData() {
-            throw new UnsupportedOperationException("getOriginalData");
-        }
+    @Override
+    public Map<YangInstanceIdentifier, NormalizedNode<?, ?>> getUpdatedData() {
+      updatedData.put(YangInstanceIdentifier.builder().build(), CompositeModel.createTestContainer());
+      return updatedData;
 
-        @Override public NormalizedNode<?, ?> getOriginalSubtree() {
-            throw new UnsupportedOperationException("getOriginalSubtree");
-        }
+    }
 
-        @Override public NormalizedNode<?, ?> getUpdatedSubtree() {
-            throw new UnsupportedOperationException("getUpdatedSubtree");
-        }
+    @Override
+    public Set<YangInstanceIdentifier> getRemovedPaths() {
+      Set<YangInstanceIdentifier>ids = new HashSet();
+      ids.add( CompositeModel.TEST_PATH);
+      return ids;
     }
 
-    @Test
+    @Override
+    public Map<YangInstanceIdentifier, NormalizedNode<?, ?>> getOriginalData() {
+      originalData.put(YangInstanceIdentifier.builder().build(), CompositeModel.createFamily());
+      return originalData;
+    }
+
+    @Override public NormalizedNode<?, ?> getOriginalSubtree() {
+      return CompositeModel.createFamily() ;
+    }
+
+    @Override public NormalizedNode<?, ?> getUpdatedSubtree() {
+      return CompositeModel.createTestContainer();
+    }
+  }
+
+
+  @Test
     public void testOnDataChanged() throws Exception {
         final Props props = Props.create(MessageCollectorActor.class);
         final ActorRef actorRef = getSystem().actorOf(props);
 
         DataChangeListenerProxy dataChangeListenerProxy =
-            new DataChangeListenerProxy(
+            new DataChangeListenerProxy(TestModel.createTestContext(),
                 getSystem().actorSelection(actorRef.path()));
 
-        dataChangeListenerProxy.onDataChanged(new MockDataChangeEvent());
+        dataChangeListenerProxy.onDataChanged(new MockDataChangedEvent());
 
         //Check if it was received by the remote actor
         ActorContext
-            testContext = new ActorContext(getSystem(), getSystem().actorOf(Props.create(DoNothingActor.class)), new MockConfiguration());
+            testContext = new ActorContext(getSystem(), getSystem().actorOf(Props.create(DoNothingActor.class)), new MockClusterWrapper(), new MockConfiguration());
         Object messages = testContext
             .executeLocalOperation(actorRef, "messages",
                 ActorContext.ASK_DURATION);
@@ -77,7 +94,7 @@ public class DataChangeListenerProxyTest extends AbstractActorTest {
 
         Assert.assertEquals(1, listMessages.size());
 
-        Assert.assertTrue(listMessages.get(0) instanceof DataChanged);
+        Assert.assertTrue(listMessages.get(0).getClass().equals(DataChanged.SERIALIZABLE_CLASS));
 
     }
 }
index 57609a9..c99a7e8 100644 (file)
@@ -8,10 +8,11 @@ import org.opendaylight.controller.cluster.datastore.messages.CloseDataChangeLis
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
 import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor;
 import org.opendaylight.controller.cluster.datastore.utils.MessageCollectorActor;
+import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper;
 import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
-import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 
 import java.util.List;
@@ -21,10 +22,10 @@ public class DataChangeListenerRegistrationProxyTest extends AbstractActorTest{
     private ActorRef dataChangeListenerActor = getSystem().actorOf(Props.create(DoNothingActor.class));
 
     private static class MockDataChangeListener implements
-        AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>> {
+        AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> {
 
         @Override public void onDataChanged(
-            AsyncDataChangeEvent<InstanceIdentifier, NormalizedNode<?, ?>> change) {
+            AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change) {
             throw new UnsupportedOperationException("onDataChanged");
         }
     }
@@ -59,7 +60,7 @@ public class DataChangeListenerRegistrationProxyTest extends AbstractActorTest{
 
         //Check if it was received by the remote actor
         ActorContext
-            testContext = new ActorContext(getSystem(), getSystem().actorOf(Props.create(DoNothingActor.class)), new MockConfiguration());
+            testContext = new ActorContext(getSystem(), getSystem().actorOf(Props.create(DoNothingActor.class)),new MockClusterWrapper(), new MockConfiguration());
         Object messages = testContext
             .executeLocalOperation(actorRef, "messages",
                 ActorContext.ASK_DURATION);
@@ -72,6 +73,6 @@ public class DataChangeListenerRegistrationProxyTest extends AbstractActorTest{
 
         Assert.assertEquals(1, listMessages.size());
 
-        Assert.assertTrue(listMessages.get(0) instanceof CloseDataChangeListenerRegistration);
+        Assert.assertTrue(listMessages.get(0).getClass().equals(CloseDataChangeListenerRegistration.SERIALIZABLE_CLASS));
     }
 }
index 23302b5..8413bac 100644 (file)
@@ -13,7 +13,7 @@ import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
-import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 
 import static org.junit.Assert.assertEquals;
@@ -39,12 +39,12 @@ public class DataChangeListenerRegistrationTest extends AbstractActorTest {
       new Within(duration("1 seconds")) {
         protected void run() {
 
-          subject.tell(new CloseDataChangeListenerRegistration(), getRef());
+          subject.tell(new CloseDataChangeListenerRegistration().toSerializable(), getRef());
 
           final String out = new ExpectMsg<String>("match hint") {
             // do not put code outside this method, will run afterwards
             protected String match(Object in) {
-              if (in instanceof CloseDataChangeListenerRegistrationReply) {
+              if (in.getClass().equals(CloseDataChangeListenerRegistrationReply.SERIALIZABLE_CLASS)) {
                 return "match";
               } else {
                 throw noMatch();
@@ -62,10 +62,10 @@ public class DataChangeListenerRegistrationTest extends AbstractActorTest {
     }};
   }
 
-  private  AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>> noOpDataChangeListener(){
-    return new AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>>() {
+  private  AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> noOpDataChangeListener(){
+    return new AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>() {
       @Override
-      public void onDataChanged(AsyncDataChangeEvent<InstanceIdentifier, NormalizedNode<?, ?>> change) {
+      public void onDataChanged(AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change) {
 
       }
     };
index d64859a..fd61032 100644 (file)
@@ -6,66 +6,90 @@ import akka.testkit.JavaTestKit;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.datastore.messages.DataChanged;
 import org.opendaylight.controller.cluster.datastore.messages.DataChangedReply;
+import org.opendaylight.controller.md.cluster.datastore.model.CompositeModel;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
-import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 
 public class DataChangeListenerTest extends AbstractActorTest {
 
-    private static class MockDataChangedEvent implements AsyncDataChangeEvent<InstanceIdentifier, NormalizedNode<?, ?>> {
+    private static class MockDataChangedEvent implements AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> {
+       Map<YangInstanceIdentifier,NormalizedNode<?,?>> createdData = new HashMap();
+       Map<YangInstanceIdentifier,NormalizedNode<?,?>> updatedData = new HashMap();
+       Map<YangInstanceIdentifier,NormalizedNode<?,?>> originalData = new HashMap();
+
+
 
         @Override
-        public Map<InstanceIdentifier, NormalizedNode<?, ?>> getCreatedData() {
-            throw new UnsupportedOperationException("getCreatedData");
+        public Map<YangInstanceIdentifier, NormalizedNode<?, ?>> getCreatedData() {
+            createdData.put(CompositeModel.FAMILY_PATH, CompositeModel.createFamily());
+            return createdData;
         }
 
         @Override
-        public Map<InstanceIdentifier, NormalizedNode<?, ?>> getUpdatedData() {
-            throw new UnsupportedOperationException("getUpdatedData");
+        public Map<YangInstanceIdentifier, NormalizedNode<?, ?>> getUpdatedData() {
+            updatedData.put(CompositeModel.FAMILY_PATH, CompositeModel.createFamily());
+            return updatedData;
+
         }
 
-        @Override public Set<InstanceIdentifier> getRemovedPaths() {
-            throw new UnsupportedOperationException("getRemovedPaths");
+        @Override
+        public Set<YangInstanceIdentifier> getRemovedPaths() {
+               Set<YangInstanceIdentifier>ids = new HashSet();
+               ids.add( CompositeModel.TEST_PATH);
+              return ids;
         }
 
         @Override
-        public Map<InstanceIdentifier, NormalizedNode<?, ?>> getOriginalData() {
-            throw new UnsupportedOperationException("getOriginalData");
+        public Map<YangInstanceIdentifier, NormalizedNode<?, ?>> getOriginalData() {
+          originalData.put(CompositeModel.FAMILY_PATH, CompositeModel.createFamily());
+          return originalData;
         }
 
         @Override public NormalizedNode<?, ?> getOriginalSubtree() {
-            throw new UnsupportedOperationException("getOriginalSubtree");
+
+
+          return originalData.put(CompositeModel.FAMILY_PATH, CompositeModel.createFamily());
         }
 
         @Override public NormalizedNode<?, ?> getUpdatedSubtree() {
-            throw new UnsupportedOperationException("getUpdatedSubtree");
+
+          //fixme: need to have some valid data here
+          return originalData.put(CompositeModel.FAMILY_PATH, CompositeModel.createFamily());
         }
     }
 
-    private class MockDataChangeListener implements AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>> {
+    private class MockDataChangeListener implements AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> {
         private boolean gotIt = false;
+        private   AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change;
 
         @Override public void onDataChanged(
-            AsyncDataChangeEvent<InstanceIdentifier, NormalizedNode<?, ?>> change) {
-            gotIt = true;
+            AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change) {
+            gotIt = true;this.change=change;
         }
 
         public boolean gotIt() {
             return gotIt;
         }
+        public  AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> getChange(){
+          return change;
+        }
     }
 
     @Test
     public void testDataChanged(){
         new JavaTestKit(getSystem()) {{
             final MockDataChangeListener listener = new MockDataChangeListener();
-            final Props props = DataChangeListener.props(listener);
+            final Props props = DataChangeListener.props(CompositeModel.createTestContext(),listener,CompositeModel.FAMILY_PATH );
             final ActorRef subject =
                 getSystem().actorOf(props, "testDataChanged");
 
@@ -73,15 +97,14 @@ public class DataChangeListenerTest extends AbstractActorTest {
                 protected void run() {
 
                     subject.tell(
-                        new DataChanged(new MockDataChangedEvent()),
+                        new DataChanged(CompositeModel.createTestContext(),new MockDataChangedEvent()).toSerializable(),
                         getRef());
 
                     final Boolean out = new ExpectMsg<Boolean>("dataChanged") {
                         // do not put code outside this method, will run afterwards
                         protected Boolean match(Object in) {
-                            if (in instanceof DataChangedReply) {
-                                DataChangedReply reply =
-                                    (DataChangedReply) in;
+                            if (in.getClass().equals(DataChangedReply.SERIALIZABLE_CLASS)) {
+
                                 return true;
                             } else {
                                 throw noMatch();
@@ -91,6 +114,7 @@ public class DataChangeListenerTest extends AbstractActorTest {
 
                     assertTrue(out);
                     assertTrue(listener.gotIt());
+                    assertNotNull(listener.getChange().getCreatedData());
                     // Will wait for the rest of the 3 seconds
                     expectNoMsg();
                 }
index 914f0db..23a1ed4 100644 (file)
@@ -3,7 +3,6 @@ package org.opendaylight.controller.cluster.datastore;
 import akka.actor.ActorRef;
 import akka.actor.Props;
 import junit.framework.Assert;
-
 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
 import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor;
@@ -19,7 +18,7 @@ import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransactio
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
-import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 
 public class DistributedDataStoreTest extends AbstractActorTest{
@@ -58,9 +57,9 @@ public class DistributedDataStoreTest extends AbstractActorTest{
     public void testRegisterChangeListener() throws Exception {
         mockActorContext.setExecuteShardOperationResponse(new RegisterChangeListenerReply(doNothingActorRef.path()).toSerializable());
         ListenerRegistration registration =
-                distributedDataStore.registerChangeListener(TestModel.TEST_PATH, new AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>>() {
+                distributedDataStore.registerChangeListener(TestModel.TEST_PATH, new AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>() {
             @Override
-            public void onDataChanged(AsyncDataChangeEvent<InstanceIdentifier, NormalizedNode<?, ?>> change) {
+            public void onDataChanged(AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change) {
                 throw new UnsupportedOperationException("onDataChanged");
             }
         }, AsyncDataBroker.DataChangeScope.BASE);
index a2f19d8..87d257a 100644 (file)
@@ -4,6 +4,7 @@ import akka.actor.ActorSystem;
 import akka.actor.Props;
 import akka.testkit.JavaTestKit;
 import akka.testkit.TestActorRef;
+import junit.framework.Assert;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -18,12 +19,12 @@ public class ShardManagerTest {
     private static ActorSystem system;
 
     @BeforeClass
-    public static void setUp(){
+    public static void setUp() {
         system = ActorSystem.create("test");
     }
 
     @AfterClass
-    public static void tearDown(){
+    public static void tearDown() {
         JavaTestKit.shutdownActorSystem(system);
         system = null;
     }
@@ -32,15 +33,19 @@ public class ShardManagerTest {
     public void testOnReceiveFindPrimaryForNonExistentShard() throws Exception {
 
         new JavaTestKit(system) {{
-            final Props props = ShardManager.props("config", new MockClusterWrapper(), new MockConfiguration());
-            final TestActorRef<ShardManager> subject = TestActorRef.create(system, props);
+            final Props props = ShardManager
+                .props("config", new MockClusterWrapper(),
+                    new MockConfiguration());
+            final TestActorRef<ShardManager> subject =
+                TestActorRef.create(system, props);
 
             new Within(duration("1 seconds")) {
                 protected void run() {
 
-                    subject.tell(new FindPrimary("inventory"), getRef());
+                    subject.tell(new FindPrimary("inventory").toSerializable(), getRef());
 
-                    expectMsgEquals(Duration.Zero(), new PrimaryNotFound("inventory"));
+                    expectMsgEquals(Duration.Zero(),
+                        new PrimaryNotFound("inventory").toSerializable());
 
                     // Will wait for the rest of the 3 seconds
                     expectNoMsg();
@@ -49,24 +54,99 @@ public class ShardManagerTest {
         }};
     }
 
-  @Test
-  public void testOnReceiveFindPrimaryForExistentShard() throws Exception {
+    @Test
+    public void testOnReceiveFindPrimaryForExistentShard() throws Exception {
+
+        new JavaTestKit(system) {{
+            final Props props = ShardManager
+                .props("config", new MockClusterWrapper(),
+                    new MockConfiguration());
+            final TestActorRef<ShardManager> subject =
+                TestActorRef.create(system, props);
+
+            // the run() method needs to finish within 3 seconds
+            new Within(duration("1 seconds")) {
+                protected void run() {
+
+                    subject.tell(new FindPrimary(Shard.DEFAULT_NAME).toSerializable(), getRef());
+
+                    expectMsgClass(PrimaryFound.SERIALIZABLE_CLASS);
+
+                    expectNoMsg();
+                }
+            };
+        }};
+    }
+
+    @Test
+    public void testOnReceiveMemberUp() throws Exception {
+
+        new JavaTestKit(system) {{
+            final Props props = ShardManager
+                .props("config", new MockClusterWrapper(),
+                    new MockConfiguration());
+            final TestActorRef<ShardManager> subject =
+                TestActorRef.create(system, props);
 
-    new JavaTestKit(system) {{
-      final Props props = ShardManager.props("config", new MockClusterWrapper(), new MockConfiguration());
-      final TestActorRef<ShardManager> subject = TestActorRef.create(system, props);
+            // the run() method needs to finish within 3 seconds
+            new Within(duration("1 seconds")) {
+                protected void run() {
+
+                    MockClusterWrapper.sendMemberUp(subject, "member-2", getRef().path().toString());
 
-      // the run() method needs to finish within 3 seconds
-      new Within(duration("1 seconds")) {
-        protected void run() {
+                    subject.tell(new FindPrimary("astronauts").toSerializable(), getRef());
 
-          subject.tell(new FindPrimary(Shard.DEFAULT_NAME), getRef());
+                    final String out = new ExpectMsg<String>("primary found") {
+                        // do not put code outside this method, will run afterwards
+                        protected String match(Object in) {
+                            if (in.getClass().equals(PrimaryFound.SERIALIZABLE_CLASS)) {
+                                PrimaryFound f = PrimaryFound.fromSerializable(in);
+                                return f.getPrimaryPath();
+                            } else {
+                                throw noMatch();
+                            }
+                        }
+                    }.get(); // this extracts the received message
+
+                    Assert.assertTrue(out, out.contains("member-2-shard-astronauts-config"));
+
+                    expectNoMsg();
+                }
+            };
+        }};
+    }
+
+    @Test
+    public void testOnReceiveMemberDown() throws Exception {
+
+        new JavaTestKit(system) {{
+            final Props props = ShardManager
+                .props("config", new MockClusterWrapper(),
+                    new MockConfiguration());
+            final TestActorRef<ShardManager> subject =
+                TestActorRef.create(system, props);
+
+            // the run() method needs to finish within 3 seconds
+            new Within(duration("1 seconds")) {
+                protected void run() {
+
+                    MockClusterWrapper.sendMemberUp(subject, "member-2", getRef().path().toString());
+
+                    subject.tell(new FindPrimary("astronauts").toSerializable(), getRef());
+
+                    expectMsgClass(PrimaryFound.SERIALIZABLE_CLASS);
+
+                    MockClusterWrapper.sendMemberRemoved(subject, "member-2", getRef().path().toString());
+
+                    subject.tell(new FindPrimary("astronauts").toSerializable(), getRef());
+
+                    expectMsgClass(PrimaryNotFound.SERIALIZABLE_CLASS);
+
+                    expectNoMsg();
+                }
+            };
+        }};
+    }
 
-          expectMsgClass(PrimaryFound.class);
 
-          expectNoMsg();
-        }
-      };
-    }};
-  }
 }
index 2568b0f..f20cd8c 100644 (file)
@@ -7,16 +7,16 @@ import org.junit.Test;
 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionChain;
 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionChainReply;
-import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
-import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
+import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
-import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
+import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 
 import static org.junit.Assert.assertEquals;
@@ -33,14 +33,14 @@ public class ShardTest extends AbstractActorTest {
             new Within(duration("1 seconds")) {
                 protected void run() {
 
-                    subject.tell(new CreateTransactionChain(), getRef());
+                    subject.tell(new CreateTransactionChain().toSerializable(), getRef());
 
                     final String out = new ExpectMsg<String>("match hint") {
                         // do not put code outside this method, will run afterwards
                         protected String match(Object in) {
-                            if (in instanceof CreateTransactionChainReply) {
+                            if (in.getClass().equals(CreateTransactionChainReply.SERIALIZABLE_CLASS)){
                                 CreateTransactionChainReply reply =
-                                    (CreateTransactionChainReply) in;
+                                    CreateTransactionChainReply.fromSerializable(getSystem(),in);
                                 return reply.getTransactionChainPath()
                                     .toString();
                             } else {
@@ -118,7 +118,7 @@ public class ShardTest extends AbstractActorTest {
                         new UpdateSchemaContext(TestModel.createTestContext()),
                         getRef());
 
-                    subject.tell(new CreateTransaction("txn-1"),
+                    subject.tell(new CreateTransaction("txn-1").toSerializable(),
                         getRef());
 
                     final String out = new ExpectMsg<String>("match hint") {
@@ -148,11 +148,11 @@ public class ShardTest extends AbstractActorTest {
 
 
 
-    private AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>> noOpDataChangeListener() {
-        return new AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>>() {
+    private AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> noOpDataChangeListener() {
+        return new AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>() {
             @Override
             public void onDataChanged(
-                AsyncDataChangeEvent<InstanceIdentifier, NormalizedNode<?, ?>> change) {
+                AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change) {
 
             }
         };
index 1e68179..6330ad8 100644 (file)
@@ -9,7 +9,7 @@ import org.junit.Test;
 import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionChain;
 import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionChainReply;
 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
-import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
+import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
 
@@ -30,27 +30,27 @@ public class ShardTransactionChainTest extends AbstractActorTest {
       final Props props = ShardTransactionChain.props(store.createTransactionChain(), TestModel.createTestContext());
       final ActorRef subject = getSystem().actorOf(props, "testCreateTransaction");
 
-      new Within(duration("1 seconds")) {
+     new Within(duration("1 seconds")) {
         protected void run() {
 
-          subject.tell(new CreateTransaction("txn-1"), getRef());
+          subject.tell(new CreateTransaction("txn-1").toSerializable(), getRef());
 
           final String out = new ExpectMsg<String>("match hint") {
             // do not put code outside this method, will run afterwards
             protected String match(Object in) {
-              if (in instanceof CreateTransactionReply) {
-                return ((CreateTransactionReply) in).getTransactionActorPath().toString();
-              } else {
+              if (in.getClass().equals(CreateTransactionReply.SERIALIZABLE_CLASS)) {
+                return CreateTransactionReply.fromSerializable(in).getTransactionPath();
+              }else{
                 throw noMatch();
               }
             }
           }.get(); // this extracts the received message
 
-            assertEquals("Unexpected transaction path " + out,
-                "akka://test/user/testCreateTransaction/shard-txn-1",
-                out);
+          assertEquals("Unexpected transaction path " + out,
+              "akka://test/user/testCreateTransaction/shard-txn-1",
+              out);
 
-            // Will wait for the rest of the 3 seconds
+          // Will wait for the rest of the 3 seconds
           expectNoMsg();
         }
 
@@ -68,12 +68,12 @@ public class ShardTransactionChainTest extends AbstractActorTest {
       new Within(duration("1 seconds")) {
         protected void run() {
 
-          subject.tell(new CloseTransactionChain(), getRef());
+          subject.tell(new CloseTransactionChain().toSerializable(), getRef());
 
           final String out = new ExpectMsg<String>("match hint") {
             // do not put code outside this method, will run afterwards
             protected String match(Object in) {
-              if (in instanceof CloseTransactionChainReply) {
+              if (in.getClass().equals(CloseTransactionChainReply.SERIALIZABLE_CLASS)) {
                 return "match";
               } else {
                 throw noMatch();
index 68cee1f..4d7c61a 100644 (file)
@@ -26,7 +26,7 @@ import org.opendaylight.controller.cluster.datastore.modification.Modification;
 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
-import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 
@@ -58,14 +58,14 @@ public class ShardTransactionTest extends AbstractActorTest {
                 protected void run() {
 
                     subject.tell(
-                        new ReadData(InstanceIdentifier.builder().build()).toSerializable(),
+                        new ReadData(YangInstanceIdentifier.builder().build()).toSerializable(),
                         getRef());
 
                     final String out = new ExpectMsg<String>("match hint") {
                         // do not put code outside this method, will run afterwards
                         protected String match(Object in) {
                             if (in.getClass().equals(ReadDataReply.SERIALIZABLE_CLASS)) {
-                              if (ReadDataReply.fromSerializable(testSchemaContext,InstanceIdentifier.builder().build(), in)
+                              if (ReadDataReply.fromSerializable(testSchemaContext,YangInstanceIdentifier.builder().build(), in)
                                   .getNormalizedNode()!= null) {
                                     return "match";
                                 }
@@ -179,7 +179,7 @@ public class ShardTransactionTest extends AbstractActorTest {
                     final String out = new ExpectMsg<String>("match hint") {
                         // do not put code outside this method, will run afterwards
                         protected String match(Object in) {
-                            if (in instanceof WriteDataReply) {
+                            if (in.getClass().equals(WriteDataReply.SERIALIZABLE_CLASS)) {
                                 return "match";
                             } else {
                                 throw noMatch();
@@ -217,7 +217,7 @@ public class ShardTransactionTest extends AbstractActorTest {
                     final String out = new ExpectMsg<String>(duration("500 milliseconds"), "match hint") {
                         // do not put code outside this method, will run afterwards
                         protected String match(Object in) {
-                            if (in instanceof MergeDataReply) {
+                            if (in.getClass().equals(MergeDataReply.SERIALIZABLE_CLASS)) {
                                 return "match";
                             } else {
                                 throw noMatch();
@@ -254,7 +254,7 @@ public class ShardTransactionTest extends AbstractActorTest {
                     final String out = new ExpectMsg<String>("match hint") {
                         // do not put code outside this method, will run afterwards
                         protected String match(Object in) {
-                            if (in instanceof DeleteDataReply) {
+                            if (in.getClass().equals(DeleteDataReply.SERIALIZABLE_CLASS)) {
                                 return "match";
                             } else {
                                 throw noMatch();
@@ -286,12 +286,12 @@ public class ShardTransactionTest extends AbstractActorTest {
             new Within(duration("1 seconds")) {
                 protected void run() {
 
-                    subject.tell(new ReadyTransaction(), getRef());
+                    subject.tell(new ReadyTransaction().toSerializable(), getRef());
 
                     final String out = new ExpectMsg<String>("match hint") {
                         // do not put code outside this method, will run afterwards
                         protected String match(Object in) {
-                            if (in instanceof ReadyTransactionReply) {
+                            if (in.getClass().equals(ReadyTransactionReply.SERIALIZABLE_CLASS)) {
                                 return "match";
                             } else {
                                 throw noMatch();
@@ -324,12 +324,12 @@ public class ShardTransactionTest extends AbstractActorTest {
             new Within(duration("2 seconds")) {
                 protected void run() {
 
-                    subject.tell(new CloseTransaction(), getRef());
+                    subject.tell(new CloseTransaction().toSerializable(), getRef());
 
                     final String out = new ExpectMsg<String>("match hint") {
                         // do not put code outside this method, will run afterwards
                         protected String match(Object in) {
-                            if (in instanceof CloseTransactionReply) {
+                            if (in.getClass().equals(CloseTransactionReply.SERIALIZABLE_CLASS)) {
                                 return "match";
                             } else {
                                 throw noMatch();
index 8ff785c..992518e 100644 (file)
@@ -41,7 +41,7 @@ public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
 
     @Test
     public void testCanCommit() throws Exception {
-        actorContext.setExecuteRemoteOperationResponse(new CanCommitTransactionReply(true));
+        actorContext.setExecuteRemoteOperationResponse(new CanCommitTransactionReply(true).toSerializable());
 
         ListenableFuture<Boolean> future = proxy.canCommit();
 
@@ -51,7 +51,7 @@ public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
 
     @Test
     public void testPreCommit() throws Exception {
-        actorContext.setExecuteRemoteOperationResponse(new PreCommitTransactionReply());
+        actorContext.setExecuteRemoteOperationResponse(new PreCommitTransactionReply().toSerializable());
 
         ListenableFuture<Void> future = proxy.preCommit();
 
@@ -61,7 +61,7 @@ public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
 
     @Test
     public void testAbort() throws Exception {
-        actorContext.setExecuteRemoteOperationResponse(new AbortTransactionReply());
+        actorContext.setExecuteRemoteOperationResponse(new AbortTransactionReply().toSerializable());
 
         ListenableFuture<Void> future = proxy.abort();
 
@@ -71,7 +71,7 @@ public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
 
     @Test
     public void testCommit() throws Exception {
-        actorContext.setExecuteRemoteOperationResponse(new CommitTransactionReply());
+        actorContext.setExecuteRemoteOperationResponse(new CommitTransactionReply().toSerializable());
 
         ListenableFuture<Void> future = proxy.commit();
 
index 62398ad..f654e3a 100644 (file)
@@ -5,17 +5,21 @@ import akka.actor.Props;
 import com.google.common.base.Optional;
 import com.google.common.util.concurrent.ListenableFuture;
 import junit.framework.Assert;
+import org.junit.Before;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
 import org.opendaylight.controller.cluster.datastore.messages.MergeData;
+import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.WriteData;
+import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
 import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor;
 import org.opendaylight.controller.cluster.datastore.utils.MessageCollectorActor;
 import org.opendaylight.controller.cluster.datastore.utils.MockActorContext;
+import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper;
 import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
@@ -29,18 +33,26 @@ import java.util.concurrent.Executors;
 
 public class TransactionProxyTest extends AbstractActorTest {
 
+    private final Configuration configuration = new MockConfiguration();
+
     private final ActorContext testContext =
-        new ActorContext(getSystem(), getSystem().actorOf(Props.create(DoNothingActor.class)), new MockConfiguration());
+        new ActorContext(getSystem(), getSystem().actorOf(Props.create(DoNothingActor.class)), new MockClusterWrapper(), configuration );
 
     private ExecutorService transactionExecutor =
         Executors.newSingleThreadExecutor();
 
+    @Before
+    public void setUp(){
+        ShardStrategyFactory.setConfiguration(configuration);
+    }
+
     @Test
     public void testRead() throws Exception {
         final Props props = Props.create(DoNothingActor.class);
         final ActorRef actorRef = getSystem().actorOf(props);
 
         final MockActorContext actorContext = new MockActorContext(this.getSystem());
+        actorContext.setExecuteLocalOperationResponse(createPrimaryFound(actorRef));
         actorContext.setExecuteShardOperationResponse(createTransactionReply(actorRef));
         actorContext.setExecuteRemoteOperationResponse("message");
 
@@ -73,6 +85,7 @@ public class TransactionProxyTest extends AbstractActorTest {
         final ActorRef actorRef = getSystem().actorOf(props);
 
         final MockActorContext actorContext = new MockActorContext(this.getSystem());
+        actorContext.setExecuteLocalOperationResponse(createPrimaryFound(actorRef));
         actorContext.setExecuteShardOperationResponse(createTransactionReply(actorRef));
         actorContext.setExecuteRemoteOperationResponse("message");
 
@@ -104,6 +117,7 @@ public class TransactionProxyTest extends AbstractActorTest {
         final ActorRef actorRef = getSystem().actorOf(props);
 
         final MockActorContext actorContext = new MockActorContext(this.getSystem());
+        actorContext.setExecuteLocalOperationResponse(createPrimaryFound(actorRef));
         actorContext.setExecuteShardOperationResponse(createTransactionReply(actorRef));
         actorContext.setExecuteRemoteOperationResponse("message");
 
@@ -129,12 +143,17 @@ public class TransactionProxyTest extends AbstractActorTest {
         Assert.assertEquals(WriteData.SERIALIZABLE_CLASS, listMessages.get(0).getClass());
     }
 
+    private Object createPrimaryFound(ActorRef actorRef) {
+        return new PrimaryFound(actorRef.path().toString()).toSerializable();
+    }
+
     @Test
     public void testMerge() throws Exception {
         final Props props = Props.create(MessageCollectorActor.class);
         final ActorRef actorRef = getSystem().actorOf(props);
 
         final MockActorContext actorContext = new MockActorContext(this.getSystem());
+        actorContext.setExecuteLocalOperationResponse(createPrimaryFound(actorRef));
         actorContext.setExecuteShardOperationResponse(createTransactionReply(actorRef));
         actorContext.setExecuteRemoteOperationResponse("message");
 
@@ -166,6 +185,7 @@ public class TransactionProxyTest extends AbstractActorTest {
         final ActorRef actorRef = getSystem().actorOf(props);
 
         final MockActorContext actorContext = new MockActorContext(this.getSystem());
+        actorContext.setExecuteLocalOperationResponse(createPrimaryFound(actorRef));
         actorContext.setExecuteShardOperationResponse(createTransactionReply(actorRef));
         actorContext.setExecuteRemoteOperationResponse("message");
 
@@ -196,8 +216,9 @@ public class TransactionProxyTest extends AbstractActorTest {
         final ActorRef doNothingActorRef = getSystem().actorOf(props);
 
         final MockActorContext actorContext = new MockActorContext(this.getSystem());
+        actorContext.setExecuteLocalOperationResponse(createPrimaryFound(doNothingActorRef));
         actorContext.setExecuteShardOperationResponse(createTransactionReply(doNothingActorRef));
-        actorContext.setExecuteRemoteOperationResponse(new ReadyTransactionReply(doNothingActorRef.path()));
+        actorContext.setExecuteRemoteOperationResponse(new ReadyTransactionReply(doNothingActorRef.path()).toSerializable());
 
         TransactionProxy transactionProxy =
             new TransactionProxy(actorContext,
@@ -237,6 +258,7 @@ public class TransactionProxyTest extends AbstractActorTest {
         final ActorRef actorRef = getSystem().actorOf(props);
 
         final MockActorContext actorContext = new MockActorContext(this.getSystem());
+        actorContext.setExecuteLocalOperationResponse(createPrimaryFound(actorRef));
         actorContext.setExecuteShardOperationResponse(createTransactionReply(actorRef));
         actorContext.setExecuteRemoteOperationResponse("message");
 
@@ -260,7 +282,7 @@ public class TransactionProxyTest extends AbstractActorTest {
 
         Assert.assertEquals(1, listMessages.size());
 
-        Assert.assertTrue(listMessages.get(0) instanceof CloseTransaction);
+        Assert.assertTrue(listMessages.get(0).getClass().equals(CloseTransaction.SERIALIZABLE_CLASS));
     }
 
     private CreateTransactionReply createTransactionReply(ActorRef actorRef){
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/jmx/mbeans/shard/ShardStatsTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/jmx/mbeans/shard/ShardStatsTest.java
new file mode 100644 (file)
index 0000000..f7c4676
--- /dev/null
@@ -0,0 +1,55 @@
+package org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.datastore.jmx.mbeans.AbstractBaseMBean;
+
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+
+public class ShardStatsTest {
+  private MBeanServer mbeanServer;
+ private  ShardStats  shardStats;
+  private ObjectName testMBeanName;
+
+  @Before
+  public void setUp() throws Exception {
+
+    shardStats = new ShardStats("shard-1");
+    shardStats.registerMBean();
+    mbeanServer= shardStats.getMBeanServer();
+    String objectName = AbstractBaseMBean.BASE_JMX_PREFIX + "type="+shardStats.getMBeanType()+",Category="+
+        shardStats.getMBeanCategory() + ",name="+
+        shardStats.getMBeanName();
+    testMBeanName = new ObjectName(objectName);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    shardStats.unregisterMBean();
+  }
+
+  @Test
+  public void testGetShardName() throws Exception {
+
+    Object attribute = mbeanServer.getAttribute(testMBeanName,"ShardName");
+    Assert.assertEquals((String) attribute, "shard-1");
+
+  }
+
+  @Test
+  public void testGetCommittedTransactionsCount() throws Exception {
+    //let us increment some transactions count and then check
+    shardStats.incrementCommittedTransactionCount();
+    shardStats.incrementCommittedTransactionCount();
+    shardStats.incrementCommittedTransactionCount();
+
+    //now let us get from MBeanServer what is the transaction count.
+    Object attribute = mbeanServer.getAttribute(testMBeanName,"CommittedTransactionsCount");
+    Assert.assertEquals((Long) attribute, (Long)3L);
+
+
+  }
+}
\ No newline at end of file
index efaca5d..d9c550a 100644 (file)
@@ -17,7 +17,7 @@ import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
-import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 
 public abstract class AbstractModificationTest {
@@ -36,7 +36,7 @@ public abstract class AbstractModificationTest {
     cohort.commit();
   }
 
-  protected Optional<NormalizedNode<?,?>> readData(InstanceIdentifier path) throws Exception{
+  protected Optional<NormalizedNode<?,?>> readData(YangInstanceIdentifier path) throws Exception{
     DOMStoreReadTransaction transaction = store.newReadOnlyTransaction();
     ListenableFuture<Optional<NormalizedNode<?, ?>>> future = transaction.read(path);
     return future.get();
index 0b34840..ab74ba8 100644 (file)
@@ -7,7 +7,7 @@ import org.junit.rules.ExpectedException;
 import org.opendaylight.controller.cluster.datastore.ConfigurationImpl;
 import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
-import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 
 import static junit.framework.Assert.assertNotNull;
 import static junit.framework.Assert.assertTrue;
@@ -32,7 +32,8 @@ public class ShardStrategyFactoryTest {
     @Test
     public void testGetStrategyForKnownModuleName() {
         ShardStrategy strategy =
-            ShardStrategyFactory.getStrategy(InstanceIdentifier.of(CarsModel.BASE_QNAME));
+            ShardStrategyFactory.getStrategy(
+                YangInstanceIdentifier.of(CarsModel.BASE_QNAME));
         assertTrue(strategy instanceof ModuleShardStrategy);
     }
 
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java
new file mode 100644 (file)
index 0000000..3dd0214
--- /dev/null
@@ -0,0 +1,47 @@
+package org.opendaylight.controller.cluster.datastore.utils;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.datastore.AbstractActorTest;
+import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
+import org.opendaylight.controller.cluster.datastore.Configuration;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+
+public class ActorContextTest extends AbstractActorTest{
+    @Test
+    public void testResolvePathForRemoteActor(){
+        ActorContext actorContext =
+            new ActorContext(mock(ActorSystem.class), mock(ActorRef.class),mock(
+                ClusterWrapper.class),
+                mock(Configuration.class));
+
+        String actual = actorContext.resolvePath(
+            "akka.tcp://system@127.0.0.1:2550/user/shardmanager/shard",
+            "akka://system/user/shardmanager/shard/transaction");
+
+        String expected = "akka.tcp://system@127.0.0.1:2550/user/shardmanager/shard/transaction";
+
+        assertEquals(expected, actual);
+    }
+
+    @Test
+    public void testResolvePathForLocalActor(){
+        ActorContext actorContext =
+            new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class),
+                mock(Configuration.class));
+
+        String actual = actorContext.resolvePath(
+            "akka://system/user/shardmanager/shard",
+            "akka://system/user/shardmanager/shard/transaction");
+
+        String expected = "akka://system/user/shardmanager/shard/transaction";
+
+        assertEquals(expected, actual);
+
+        System.out.println(actorContext
+            .actorFor("akka://system/user/shardmanager/shard/transaction"));
+    }
+}
index 12f80fb..1d1e661 100644 (file)
@@ -21,11 +21,11 @@ public class MockActorContext extends ActorContext {
     private Object executeLocalOperationResponse;
 
     public MockActorContext(ActorSystem actorSystem) {
-        super(actorSystem, null, new MockConfiguration());
+        super(actorSystem, null, new MockClusterWrapper(), new MockConfiguration());
     }
 
     public MockActorContext(ActorSystem actorSystem, ActorRef shardManager) {
-        super(actorSystem, shardManager, new MockConfiguration());
+        super(actorSystem, shardManager, new MockClusterWrapper(), new MockConfiguration());
     }
 
 
@@ -55,4 +55,9 @@ public class MockActorContext extends ActorContext {
         Object executeLocalOperationResponse) {
         this.executeLocalOperationResponse = executeLocalOperationResponse;
     }
+
+    @Override public Object executeLocalOperation(ActorRef actor,
+        Object message, FiniteDuration duration) {
+        return this.executeLocalOperationResponse;
+    }
 }
index 7749eaa..803aa03 100644 (file)
@@ -9,15 +9,61 @@
 package org.opendaylight.controller.cluster.datastore.utils;
 
 import akka.actor.ActorRef;
+import akka.actor.AddressFromURIString;
+import akka.cluster.ClusterEvent;
+import akka.cluster.MemberStatus;
+import akka.cluster.UniqueAddress;
 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
+import scala.collection.JavaConversions;
+
+import java.util.HashSet;
+import java.util.Set;
 
 public class MockClusterWrapper implements ClusterWrapper{
 
     @Override public void subscribeToMemberEvents(ActorRef actorRef) {
-        throw new UnsupportedOperationException("subscribeToMemberEvents");
     }
 
     @Override public String getCurrentMemberName() {
         return "member-1";
     }
+
+    public static void sendMemberUp(ActorRef to, String memberName, String address){
+        to.tell(createMemberUp(memberName, address), null);
+    }
+
+    public static void sendMemberRemoved(ActorRef to, String memberName, String address){
+        to.tell(createMemberRemoved(memberName, address), null);
+    }
+
+    private static ClusterEvent.MemberRemoved createMemberRemoved(String memberName, String address) {
+        akka.cluster.UniqueAddress uniqueAddress = new UniqueAddress(
+            AddressFromURIString.parse(address), 55);
+
+        Set<String> roles = new HashSet<>();
+
+        roles.add(memberName);
+
+        akka.cluster.Member member = new akka.cluster.Member(uniqueAddress, 1, MemberStatus
+            .removed(),
+            JavaConversions.asScalaSet(roles).<String>toSet());
+
+        return new ClusterEvent.MemberRemoved(member, MemberStatus.up());
+
+    }
+
+
+    private static ClusterEvent.MemberUp createMemberUp(String memberName, String address) {
+        akka.cluster.UniqueAddress uniqueAddress = new UniqueAddress(
+            AddressFromURIString.parse(address), 55);
+
+        Set<String> roles = new HashSet<>();
+
+        roles.add(memberName);
+
+        akka.cluster.Member member = new akka.cluster.Member(uniqueAddress, 1, MemberStatus.up(),
+            JavaConversions.asScalaSet(roles).<String>toSet());
+
+        return new ClusterEvent.MemberUp(member);
+    }
 }
index 2597dda..8d49c6f 100644 (file)
@@ -12,16 +12,14 @@ import com.google.common.base.Optional;
 import org.opendaylight.controller.cluster.datastore.Configuration;
 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategy;
 
-import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
 public class MockConfiguration implements Configuration{
     @Override public List<String> getMemberShardNames(String memberName) {
-        List<String> shardNames = new ArrayList<>();
-        shardNames.add("default");
-        return shardNames;
+        return Arrays.asList("default");
     }
 
     @Override public Optional<String> getModuleNameFromNameSpace(
@@ -40,8 +38,12 @@ public class MockConfiguration implements Configuration{
     }
 
     @Override public List<String> getMembersFromShardName(String shardName) {
-        List<String> shardNames = new ArrayList<>();
-        shardNames.add("member-1");
-        return shardNames;
+        if("default".equals(shardName)) {
+            return Arrays.asList("member-1", "member-2");
+        } else if("astronauts".equals(shardName)){
+            return Arrays.asList("member-2", "member-3");
+        }
+
+        return Collections.EMPTY_LIST;
     }
 }
index 5f361b2..939096e 100644 (file)
@@ -19,7 +19,7 @@ public class TestUtils {
 
     public static void assertFirstSentMessage(ActorSystem actorSystem, ActorRef actorRef, Class clazz){
         ActorContext testContext = new ActorContext(actorSystem, actorSystem.actorOf(
-            Props.create(DoNothingActor.class)), new MockConfiguration());
+            Props.create(DoNothingActor.class)), new MockClusterWrapper(), new MockConfiguration());
         Object messages = testContext
             .executeLocalOperation(actorRef, "messages",
                 ActorContext.ASK_DURATION);
index 675be8e..57df201 100644 (file)
@@ -9,7 +9,7 @@
 package org.opendaylight.controller.md.cluster.datastore.model;
 
 import org.opendaylight.yangtools.yang.common.QName;
-import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
 import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
@@ -22,7 +22,7 @@ public class CarsModel {
     public static final QName BASE_QNAME = QName.create("urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:test:cars", "2014-03-13",
         "cars");
 
-    public static final InstanceIdentifier BASE_PATH = InstanceIdentifier.of(BASE_QNAME);
+    public static final YangInstanceIdentifier BASE_PATH = YangInstanceIdentifier.of(BASE_QNAME);
 
     public static final QName CARS_QNAME = QName.create(BASE_QNAME, "cars");
     public static final QName CAR_QNAME = QName.create(CARS_QNAME, "car");
@@ -35,19 +35,19 @@ public class CarsModel {
         // Create a list builder
         CollectionNodeBuilder<MapEntryNode, MapNode> cars =
             ImmutableMapNodeBuilder.create().withNodeIdentifier(
-                new InstanceIdentifier.NodeIdentifier(
-                    QName.create(CARS_QNAME, "car")));
+                new YangInstanceIdentifier.NodeIdentifier(
+                    CAR_QNAME));
 
         // Create an entry for the car altima
         MapEntryNode altima =
-            ImmutableNodes.mapEntryBuilder(CARS_QNAME, CAR_NAME_QNAME, "altima")
+            ImmutableNodes.mapEntryBuilder(CAR_QNAME, CAR_NAME_QNAME, "altima")
                 .withChild(ImmutableNodes.leafNode(CAR_NAME_QNAME, "altima"))
                 .withChild(ImmutableNodes.leafNode(CAR_PRICE_QNAME, 1000))
                 .build();
 
         // Create an entry for the car accord
         MapEntryNode honda =
-            ImmutableNodes.mapEntryBuilder(CARS_QNAME, CAR_NAME_QNAME, "accord")
+            ImmutableNodes.mapEntryBuilder(CAR_QNAME, CAR_NAME_QNAME, "accord")
                 .withChild(ImmutableNodes.leafNode(CAR_NAME_QNAME, "accord"))
                 .withChild(ImmutableNodes.leafNode(CAR_PRICE_QNAME, 2000))
                 .build();
@@ -56,7 +56,7 @@ public class CarsModel {
         cars.withChild(honda);
 
         return ImmutableContainerNodeBuilder.create()
-            .withNodeIdentifier(new InstanceIdentifier.NodeIdentifier(BASE_QNAME))
+            .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(BASE_QNAME))
             .withChild(cars.build())
             .build();
 
@@ -64,7 +64,7 @@ public class CarsModel {
 
     public static NormalizedNode emptyContainer(){
         return ImmutableContainerNodeBuilder.create()
-            .withNodeIdentifier(new InstanceIdentifier.NodeIdentifier(BASE_QNAME))
+            .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(BASE_QNAME))
             .build();
     }
 
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/md/cluster/datastore/model/CompositeModel.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/md/cluster/datastore/model/CompositeModel.java
new file mode 100644 (file)
index 0000000..ece3127
--- /dev/null
@@ -0,0 +1,372 @@
+package org.opendaylight.controller.md.cluster.datastore.model;
+
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.AugmentationNode;
+import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
+import org.opendaylight.yangtools.yang.data.api.schema.LeafSetEntryNode;
+import org.opendaylight.yangtools.yang.data.api.schema.LeafSetNode;
+import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
+import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
+import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
+import org.opendaylight.yangtools.yang.data.impl.schema.builder.api.CollectionNodeBuilder;
+import org.opendaylight.yangtools.yang.data.impl.schema.builder.api.DataContainerNodeAttrBuilder;
+import org.opendaylight.yangtools.yang.data.impl.schema.builder.api.DataContainerNodeBuilder;
+import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
+import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableLeafSetEntryNodeBuilder;
+import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableLeafSetNodeBuilder;
+import org.opendaylight.yangtools.yang.model.api.Module;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import org.opendaylight.yangtools.yang.parser.impl.YangParserImpl;
+
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import static org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes.mapEntry;
+import static org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes.mapEntryBuilder;
+import static org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes.mapNodeBuilder;
+
+public class CompositeModel {
+
+  public static final QName TEST_QNAME = QName.create(
+      "urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:test",
+      "2014-03-13", "test");
+
+  public static final QName AUG_QNAME = QName.create(
+      "urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:aug",
+      "2014-03-13", "name");
+
+  public static final QName DESC_QNAME = QName.create(TEST_QNAME, "desc");
+  public static final QName OUTER_LIST_QNAME = QName.create(TEST_QNAME,
+      "outer-list");
+  public static final QName INNER_LIST_QNAME = QName.create(TEST_QNAME,
+      "inner-list");
+  public static final QName OUTER_CHOICE_QNAME = QName.create(TEST_QNAME,
+      "outer-choice");
+  public static final QName ID_QNAME = QName.create(TEST_QNAME, "id");
+  public static final QName NAME_QNAME = QName.create(TEST_QNAME, "name");
+  public static final QName VALUE_QNAME = QName.create(TEST_QNAME, "value");
+  private static final String DATASTORE_TEST_YANG = "/odl-datastore-test.yang";
+  private static final String DATASTORE_AUG_YANG =
+      "/odl-datastore-augmentation.yang";
+  private static final String DATASTORE_TEST_NOTIFICATION_YANG =
+      "/odl-datastore-test-notification.yang";
+
+
+  public static final YangInstanceIdentifier TEST_PATH = YangInstanceIdentifier
+      .of(TEST_QNAME);
+  public static final YangInstanceIdentifier DESC_PATH = YangInstanceIdentifier
+      .builder(TEST_PATH).node(DESC_QNAME).build();
+  public static final YangInstanceIdentifier OUTER_LIST_PATH = YangInstanceIdentifier
+      .builder(TEST_PATH).node(OUTER_LIST_QNAME).build();
+  public static final QName TWO_QNAME = QName.create(TEST_QNAME, "two");
+  public static final QName THREE_QNAME = QName.create(TEST_QNAME, "three");
+
+  private static final Integer ONE_ID = 1;
+  private static final Integer TWO_ID = 2;
+  private static final String TWO_ONE_NAME = "one";
+  private static final String TWO_TWO_NAME = "two";
+  private static final String DESC = "Hello there";
+
+  // Family specific constants
+  public static final QName FAMILY_QNAME =
+      QName
+          .create(
+              "urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:notification-test",
+              "2014-04-17", "family");
+  public static final QName CHILDREN_QNAME = QName.create(FAMILY_QNAME,
+      "children");
+  public static final QName GRAND_CHILDREN_QNAME = QName.create(FAMILY_QNAME,
+      "grand-children");
+  public static final QName CHILD_NUMBER_QNAME = QName.create(FAMILY_QNAME,
+      "child-number");
+  public static final QName CHILD_NAME_QNAME = QName.create(FAMILY_QNAME,
+      "child-name");
+  public static final QName GRAND_CHILD_NUMBER_QNAME = QName.create(
+      FAMILY_QNAME, "grand-child-number");
+  public static final QName GRAND_CHILD_NAME_QNAME = QName.create(FAMILY_QNAME,
+      "grand-child-name");
+
+  public static final YangInstanceIdentifier FAMILY_PATH = YangInstanceIdentifier
+      .of(FAMILY_QNAME);
+  public static final YangInstanceIdentifier FAMILY_DESC_PATH = YangInstanceIdentifier
+      .builder(FAMILY_PATH).node(DESC_QNAME).build();
+  public static final YangInstanceIdentifier CHILDREN_PATH = YangInstanceIdentifier
+      .builder(FAMILY_PATH).node(CHILDREN_QNAME).build();
+
+  private static final Integer FIRST_CHILD_ID = 1;
+  private static final Integer SECOND_CHILD_ID = 2;
+
+  private static final String FIRST_CHILD_NAME = "first child";
+  private static final String SECOND_CHILD_NAME = "second child";
+
+  private static final Integer FIRST_GRAND_CHILD_ID = 1;
+  private static final Integer SECOND_GRAND_CHILD_ID = 2;
+
+  private static final String FIRST_GRAND_CHILD_NAME = "first grand child";
+  private static final String SECOND_GRAND_CHILD_NAME = "second grand child";
+
+  // first child
+  private static final YangInstanceIdentifier CHILDREN_1_PATH = YangInstanceIdentifier
+      .builder(CHILDREN_PATH)
+      .nodeWithKey(CHILDREN_QNAME, CHILD_NUMBER_QNAME, FIRST_CHILD_ID) //
+      .build();
+  private static final YangInstanceIdentifier CHILDREN_1_NAME_PATH =
+      YangInstanceIdentifier.builder(CHILDREN_PATH)
+          .nodeWithKey(CHILDREN_QNAME, CHILD_NAME_QNAME, FIRST_CHILD_NAME) //
+          .build();
+
+  private static final YangInstanceIdentifier CHILDREN_2_PATH = YangInstanceIdentifier
+      .builder(CHILDREN_PATH)
+      .nodeWithKey(CHILDREN_QNAME, CHILD_NUMBER_QNAME, SECOND_CHILD_ID) //
+      .build();
+  private static final YangInstanceIdentifier CHILDREN_2_NAME_PATH =
+      YangInstanceIdentifier.builder(CHILDREN_PATH)
+          .nodeWithKey(CHILDREN_QNAME, CHILD_NAME_QNAME, SECOND_CHILD_NAME) //
+          .build();
+
+
+  private static final YangInstanceIdentifier GRAND_CHILD_1_PATH =
+      YangInstanceIdentifier.builder(CHILDREN_1_PATH)
+          .node(GRAND_CHILDREN_QNAME)
+          //
+          .nodeWithKey(GRAND_CHILDREN_QNAME, GRAND_CHILD_NUMBER_QNAME,
+              FIRST_GRAND_CHILD_ID) //
+          .build();
+
+  private static final YangInstanceIdentifier GRAND_CHILD_1_NAME_PATH =
+      YangInstanceIdentifier.builder(CHILDREN_1_PATH)
+          .node(GRAND_CHILDREN_QNAME)
+          //
+          .nodeWithKey(GRAND_CHILDREN_QNAME, GRAND_CHILD_NAME_QNAME,
+              FIRST_GRAND_CHILD_NAME) //
+          .build();
+
+  private static final YangInstanceIdentifier GRAND_CHILD_2_PATH =
+      YangInstanceIdentifier.builder(CHILDREN_2_PATH)
+          .node(GRAND_CHILDREN_QNAME)
+          //
+          .nodeWithKey(GRAND_CHILDREN_QNAME, GRAND_CHILD_NUMBER_QNAME,
+              SECOND_GRAND_CHILD_ID) //
+          .build();
+
+  private static final YangInstanceIdentifier GRAND_CHILD_2_NAME_PATH =
+      YangInstanceIdentifier.builder(CHILDREN_2_PATH)
+          .node(GRAND_CHILDREN_QNAME)
+          //
+          .nodeWithKey(GRAND_CHILDREN_QNAME, GRAND_CHILD_NAME_QNAME,
+              SECOND_GRAND_CHILD_NAME) //
+          .build();
+
+  private static final YangInstanceIdentifier DESC_PATH_ID = YangInstanceIdentifier
+      .builder(DESC_PATH).build();
+  private static final YangInstanceIdentifier OUTER_LIST_1_PATH =
+      YangInstanceIdentifier.builder(OUTER_LIST_PATH)
+          .nodeWithKey(OUTER_LIST_QNAME, ID_QNAME, ONE_ID) //
+          .build();
+
+  private static final YangInstanceIdentifier OUTER_LIST_2_PATH =
+      YangInstanceIdentifier.builder(OUTER_LIST_PATH)
+          .nodeWithKey(OUTER_LIST_QNAME, ID_QNAME, TWO_ID) //
+          .build();
+
+  private static final YangInstanceIdentifier TWO_TWO_PATH = YangInstanceIdentifier
+      .builder(OUTER_LIST_2_PATH).node(INNER_LIST_QNAME) //
+      .nodeWithKey(INNER_LIST_QNAME, NAME_QNAME, TWO_TWO_NAME) //
+      .build();
+
+  private static final YangInstanceIdentifier TWO_TWO_VALUE_PATH =
+      YangInstanceIdentifier.builder(TWO_TWO_PATH).node(VALUE_QNAME) //
+          .build();
+
+  private static final MapEntryNode BAR_NODE = mapEntryBuilder(
+      OUTER_LIST_QNAME, ID_QNAME, TWO_ID) //
+      .withChild(mapNodeBuilder(INNER_LIST_QNAME) //
+          .withChild(mapEntry(INNER_LIST_QNAME, NAME_QNAME, TWO_ONE_NAME)) //
+          .withChild(mapEntry(INNER_LIST_QNAME, NAME_QNAME, TWO_TWO_NAME)) //
+          .build()) //
+      .build();
+
+  public static final InputStream getDatastoreTestInputStream() {
+    return getInputStream(DATASTORE_TEST_YANG);
+  }
+
+  public static final InputStream getDatastoreAugInputStream() {
+    return getInputStream(DATASTORE_AUG_YANG);
+  }
+
+  public static final InputStream getDatastoreTestNotificationInputStream() {
+    return getInputStream(DATASTORE_TEST_NOTIFICATION_YANG);
+  }
+
+  private static InputStream getInputStream(final String resourceName) {
+    return TestModel.class.getResourceAsStream(resourceName);
+  }
+
+  public static SchemaContext createTestContext() {
+    List<InputStream> inputStreams = new ArrayList<>();
+    inputStreams.add(getDatastoreTestInputStream());
+    inputStreams.add(getDatastoreAugInputStream());
+    inputStreams.add(getDatastoreTestNotificationInputStream());
+
+    YangParserImpl parser = new YangParserImpl();
+    Set<Module> modules = parser.parseYangModelsFromStreams(inputStreams);
+    return parser.resolveSchemaContext(modules);
+  }
+
+  /**
+   * Returns a test document
+   *
+   * <pre>
+   * test
+   *     outer-list
+   *          id 1
+   *     outer-list
+   *          id 2
+   *          inner-list
+   *                  name "one"
+   *          inner-list
+   *                  name "two"
+   *
+   * </pre>
+   *
+   * @return
+   */
+  public static NormalizedNode<?, ?> createDocumentOne(
+      SchemaContext schemaContext) {
+    return ImmutableContainerNodeBuilder
+        .create()
+        .withNodeIdentifier(
+            new YangInstanceIdentifier.NodeIdentifier(schemaContext.getQName()))
+        .withChild(createTestContainer()).build();
+
+  }
+
+  public static ContainerNode createTestContainer() {
+
+
+    final LeafSetEntryNode<Object> nike =
+        ImmutableLeafSetEntryNodeBuilder
+            .create()
+            .withNodeIdentifier(
+                new YangInstanceIdentifier.NodeWithValue(QName.create(TEST_QNAME,
+                    "shoe"), "nike")).withValue("nike").build();
+    final LeafSetEntryNode<Object> puma =
+        ImmutableLeafSetEntryNodeBuilder
+            .create()
+            .withNodeIdentifier(
+                new YangInstanceIdentifier.NodeWithValue(QName.create(TEST_QNAME,
+                    "shoe"), "puma")).withValue("puma").build();
+    final LeafSetNode<Object> shoes =
+        ImmutableLeafSetNodeBuilder
+            .create()
+            .withNodeIdentifier(
+                new YangInstanceIdentifier.NodeIdentifier(QName.create(TEST_QNAME,
+                    "shoe"))).withChild(nike).withChild(puma).build();
+
+
+    final LeafSetEntryNode<Object> five =
+        ImmutableLeafSetEntryNodeBuilder
+            .create()
+            .withNodeIdentifier(
+                (new YangInstanceIdentifier.NodeWithValue(QName.create(TEST_QNAME,
+                    "number"), 5))).withValue(5).build();
+    final LeafSetEntryNode<Object> fifteen =
+        ImmutableLeafSetEntryNodeBuilder
+            .create()
+            .withNodeIdentifier(
+                (new YangInstanceIdentifier.NodeWithValue(QName.create(TEST_QNAME,
+                    "number"), 15))).withValue(15).build();
+    final LeafSetNode<Object> numbers =
+        ImmutableLeafSetNodeBuilder
+            .create()
+            .withNodeIdentifier(
+                new YangInstanceIdentifier.NodeIdentifier(QName.create(TEST_QNAME,
+                    "number"))).withChild(five).withChild(fifteen).build();
+
+
+    Set<QName> childAugmentations = new HashSet<>();
+    childAugmentations.add(AUG_QNAME);
+    final YangInstanceIdentifier.AugmentationIdentifier augmentationIdentifier =
+        new YangInstanceIdentifier.AugmentationIdentifier(null, childAugmentations);
+    final AugmentationNode augmentationNode =
+        Builders.augmentationBuilder()
+            .withNodeIdentifier(augmentationIdentifier)
+            .withChild(ImmutableNodes.leafNode(AUG_QNAME, "First Test"))
+            .build();
+    return ImmutableContainerNodeBuilder
+        .create()
+        .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(TEST_QNAME))
+        .withChild(ImmutableNodes.leafNode(DESC_QNAME, DESC))
+        .withChild(augmentationNode)
+        .withChild(shoes)
+        .withChild(numbers)
+        .withChild(
+            mapNodeBuilder(OUTER_LIST_QNAME)
+                .withChild(mapEntry(OUTER_LIST_QNAME, ID_QNAME, ONE_ID))
+                .withChild(BAR_NODE).build()).build();
+
+  }
+
+
+  public static ContainerNode createFamily() {
+    final DataContainerNodeAttrBuilder<YangInstanceIdentifier.NodeIdentifier, ContainerNode> familyContainerBuilder =
+        ImmutableContainerNodeBuilder.create().withNodeIdentifier(
+            new YangInstanceIdentifier.NodeIdentifier(FAMILY_QNAME));
+
+    final CollectionNodeBuilder<MapEntryNode, MapNode> childrenBuilder =
+        mapNodeBuilder(CHILDREN_QNAME);
+
+    final DataContainerNodeBuilder<YangInstanceIdentifier.NodeIdentifierWithPredicates, MapEntryNode> firstChildBuilder =
+        mapEntryBuilder(CHILDREN_QNAME, CHILD_NUMBER_QNAME, FIRST_CHILD_ID);
+    final DataContainerNodeBuilder<YangInstanceIdentifier.NodeIdentifierWithPredicates, MapEntryNode> secondChildBuilder =
+        mapEntryBuilder(CHILDREN_QNAME, CHILD_NUMBER_QNAME, SECOND_CHILD_ID);
+
+    final DataContainerNodeBuilder<YangInstanceIdentifier.NodeIdentifierWithPredicates, MapEntryNode> firstGrandChildBuilder =
+        mapEntryBuilder(GRAND_CHILDREN_QNAME, GRAND_CHILD_NUMBER_QNAME,
+            FIRST_GRAND_CHILD_ID);
+    final DataContainerNodeBuilder<YangInstanceIdentifier.NodeIdentifierWithPredicates, MapEntryNode> secondGrandChildBuilder =
+        mapEntryBuilder(GRAND_CHILDREN_QNAME, GRAND_CHILD_NUMBER_QNAME,
+            SECOND_GRAND_CHILD_ID);
+
+    firstGrandChildBuilder
+        .withChild(
+            ImmutableNodes.leafNode(GRAND_CHILD_NUMBER_QNAME,
+                FIRST_GRAND_CHILD_ID)).withChild(
+        ImmutableNodes.leafNode(GRAND_CHILD_NAME_QNAME,
+            FIRST_GRAND_CHILD_NAME));
+
+    secondGrandChildBuilder.withChild(
+        ImmutableNodes
+            .leafNode(GRAND_CHILD_NUMBER_QNAME, SECOND_GRAND_CHILD_ID))
+        .withChild(
+            ImmutableNodes.leafNode(GRAND_CHILD_NAME_QNAME,
+                SECOND_GRAND_CHILD_NAME));
+
+    firstChildBuilder
+        .withChild(ImmutableNodes.leafNode(CHILD_NUMBER_QNAME, FIRST_CHILD_ID))
+        .withChild(ImmutableNodes.leafNode(CHILD_NAME_QNAME, FIRST_CHILD_NAME))
+        .withChild(
+            mapNodeBuilder(GRAND_CHILDREN_QNAME).withChild(
+                firstGrandChildBuilder.build()).build());
+
+
+    secondChildBuilder
+        .withChild(ImmutableNodes.leafNode(CHILD_NUMBER_QNAME, SECOND_CHILD_ID))
+        .withChild(ImmutableNodes.leafNode(CHILD_NAME_QNAME, SECOND_CHILD_NAME))
+        .withChild(
+            mapNodeBuilder(GRAND_CHILDREN_QNAME).withChild(
+                firstGrandChildBuilder.build()).build());
+
+    childrenBuilder.withChild(firstChildBuilder.build());
+    childrenBuilder.withChild(secondChildBuilder.build());
+
+    return familyContainerBuilder.withChild(childrenBuilder.build()).build();
+  }
+
+}
index 14b02a2..1b4020a 100644 (file)
@@ -9,7 +9,7 @@
 package org.opendaylight.controller.md.cluster.datastore.model;
 
 import org.opendaylight.yangtools.yang.common.QName;
-import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
 import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
@@ -22,7 +22,7 @@ public class PeopleModel {
     public static final QName BASE_QNAME = QName.create("urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:test:people", "2014-03-13",
         "people");
 
-    public static final InstanceIdentifier BASE_PATH = InstanceIdentifier.of(BASE_QNAME);
+    public static final YangInstanceIdentifier BASE_PATH = YangInstanceIdentifier.of(BASE_QNAME);
     public static final QName PEOPLE_QNAME = QName.create(BASE_QNAME, "people");
     public static final QName PERSON_QNAME = QName.create(PEOPLE_QNAME, "person");
     public static final QName PERSON_NAME_QNAME = QName.create(PERSON_QNAME, "name");
@@ -35,19 +35,19 @@ public class PeopleModel {
         // Create a list builder
         CollectionNodeBuilder<MapEntryNode, MapNode> cars =
             ImmutableMapNodeBuilder.create().withNodeIdentifier(
-                new InstanceIdentifier.NodeIdentifier(
-                    QName.create(PEOPLE_QNAME, "person")));
+                new YangInstanceIdentifier.NodeIdentifier(
+                    PERSON_QNAME));
 
         // Create an entry for the person jack
         MapEntryNode jack =
-            ImmutableNodes.mapEntryBuilder(PEOPLE_QNAME, PERSON_NAME_QNAME, "jack")
+            ImmutableNodes.mapEntryBuilder(PERSON_QNAME, PERSON_NAME_QNAME, "jack")
                 .withChild(ImmutableNodes.leafNode(PERSON_NAME_QNAME, "jack"))
                 .withChild(ImmutableNodes.leafNode(PERSON_AGE_QNAME, 100))
                 .build();
 
         // Create an entry for the person jill
         MapEntryNode jill =
-            ImmutableNodes.mapEntryBuilder(PEOPLE_QNAME, PERSON_NAME_QNAME, "jill")
+            ImmutableNodes.mapEntryBuilder(PERSON_QNAME, PERSON_NAME_QNAME, "jill")
                 .withChild(ImmutableNodes.leafNode(PERSON_NAME_QNAME, "jill"))
                 .withChild(ImmutableNodes.leafNode(PERSON_AGE_QNAME, 200))
                 .build();
@@ -56,7 +56,7 @@ public class PeopleModel {
         cars.withChild(jill);
 
         return ImmutableContainerNodeBuilder.create()
-            .withNodeIdentifier(new InstanceIdentifier.NodeIdentifier(BASE_QNAME))
+            .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(BASE_QNAME))
             .withChild(cars.build())
             .build();
 
@@ -65,7 +65,7 @@ public class PeopleModel {
     public static NormalizedNode emptyContainer(){
         return ImmutableContainerNodeBuilder.create()
             .withNodeIdentifier(
-                new InstanceIdentifier.NodeIdentifier(BASE_QNAME))
+                new YangInstanceIdentifier.NodeIdentifier(BASE_QNAME))
             .build();
     }
 
index 6824591..d8fefcd 100644 (file)
@@ -12,50 +12,51 @@ import junit.framework.Assert;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec;
 import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages;
-import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 
 public class SampleModelsTest {
     @Test
     public void testPeopleModel(){
-        NormalizedNode<?, ?> expected = PeopleModel.emptyContainer();
+        NormalizedNode<?, ?> expected = PeopleModel.create();
 
 
         NormalizedNodeMessages.Container node =
             new NormalizedNodeToNodeCodec(SchemaContextHelper.full())
-                .encode(InstanceIdentifier.of(PeopleModel.BASE_QNAME),
+                .encode(YangInstanceIdentifier.of(PeopleModel.BASE_QNAME),
                     expected);
 
         NormalizedNodeMessages.Node normalizedNode =
             node.getNormalizedNode();
 
-        NormalizedNode<?,?> actual = new NormalizedNodeToNodeCodec(SchemaContextHelper.full()).decode(InstanceIdentifier.of(PeopleModel.BASE_QNAME),
+        NormalizedNode<?,?> actual = new NormalizedNodeToNodeCodec(SchemaContextHelper.full()).decode(YangInstanceIdentifier.of(PeopleModel.BASE_QNAME),
             normalizedNode);
 
 
-        Assert.assertEquals(expected, actual);
+        Assert.assertEquals(expected.toString(), actual.toString());
 
     }
 
 
     @Test
     public void testCarsModel(){
-        NormalizedNode<?, ?> expected = CarsModel.emptyContainer();
+        NormalizedNode<?, ?> expected = CarsModel.create();
 
 
         NormalizedNodeMessages.Container node =
             new NormalizedNodeToNodeCodec(SchemaContextHelper.full())
-                .encode(InstanceIdentifier.of(CarsModel.BASE_QNAME),
+                .encode(YangInstanceIdentifier.of(CarsModel.BASE_QNAME),
                     expected);
 
         NormalizedNodeMessages.Node normalizedNode =
             node.getNormalizedNode();
 
-        NormalizedNode<?,?> actual = new NormalizedNodeToNodeCodec(SchemaContextHelper.full()).decode(InstanceIdentifier.of(CarsModel.BASE_QNAME),
+        NormalizedNode<?,?> actual = new NormalizedNodeToNodeCodec(SchemaContextHelper.full()).decode(
+            YangInstanceIdentifier.of(CarsModel.BASE_QNAME),
             normalizedNode);
 
 
-        Assert.assertEquals(expected, actual);
+        Assert.assertEquals(expected.toString(), actual.toString());
 
     }
 }
index 7a1def9..85441ec 100644 (file)
@@ -8,7 +8,7 @@
 package org.opendaylight.controller.md.cluster.datastore.model;
 
 import org.opendaylight.yangtools.yang.common.QName;
-import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.model.api.Module;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import org.opendaylight.yangtools.yang.parser.impl.YangParserImpl;
@@ -29,8 +29,8 @@ public class TestModel {
   public static final QName VALUE_QNAME = QName.create(TEST_QNAME, "value");
   private static final String DATASTORE_TEST_YANG = "/odl-datastore-test.yang";
 
-  public static final InstanceIdentifier TEST_PATH = InstanceIdentifier.of(TEST_QNAME);
-  public static final