Merge "Optimizations, Monitoring and Logging"
authorEd Warnicke <eaw@cisco.com>
Tue, 12 Aug 2014 23:09:32 +0000 (23:09 +0000)
committerGerrit Code Review <gerrit@opendaylight.org>
Tue, 12 Aug 2014 23:09:32 +0000 (23:09 +0000)
42 files changed:
opendaylight/distribution/opendaylight-karaf/src/main/resources/configuration/logback.xml
opendaylight/distribution/opendaylight/src/main/resources/configuration/logback.xml
opendaylight/md-sal/md-sal-config/src/main/resources/initial/01-md-sal.xml
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Candidate.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java
opendaylight/md-sal/sal-distributed-datastore/pom.xml
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractUntypedActor.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ClusterWrapperImpl.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/CompositeModificationPayload.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ConfigurationImpl.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListener.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerProxy.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransactionChain.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohort.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxy.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/exceptions/UnknownMessageException.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/identifiers/ShardIdentifier.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/identifiers/ShardManagerIdentifier.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/identifiers/ShardTransactionIdentifier.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/identifiers/TransactionIdentifier.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/jmx/mbeans/AbstractBaseMBean.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/jmx/mbeans/shardmanager/ShardManagerInfo.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/jmx/mbeans/shardmanager/ShardManagerInfoMBean.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/PeerAddressResolved.java
opendaylight/md-sal/sal-distributed-datastore/src/main/resources/application.conf
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/BasicIntegrationTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ConfigurationImplTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionFailureTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/identifiers/ShardIdentifierTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/identifiers/ShardManagerIdentifierTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/test/resources/application.conf

index d1a5dcc..ed659bf 100644 (file)
@@ -2,7 +2,7 @@
 
   <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
     <encoder>
-      <pattern>%date{"yyyy-MM-dd HH:mm:ss.SSS z"} [%thread] %-5level %logger{36} - %msg%n</pattern>
+      <pattern>%date{"yyyy-MM-dd HH:mm:ss.SSS z"} [%thread] %-5level %logger{36} %X{akkaSource} - %msg%n</pattern>
     </encoder>
   </appender>
   <appender name="opendaylight.log" class="ch.qos.logback.core.rolling.RollingFileAppender">
   <!-- Web modules -->
   <logger name="org.opendaylight.controller.web" level="INFO"/>
 
+  <!-- Clustering -->
+  <logger name="org.opendaylight.controller.cluster" level="INFO"/>
+  <logger name="org.opendaylight.controller.cluster.datastore.node" level="INFO"/>
+
   <!--
        Unsynchronized controller startup causes models to crop up in random
        order, which results in temporary inability to fully resolve a model,
index 94a3702..b73244b 100644 (file)
@@ -2,7 +2,7 @@
 
   <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
     <encoder>
-      <pattern>%date{"yyyy-MM-dd HH:mm:ss.SSS z"} [%thread] %-5level %logger{36} - %msg%n</pattern>
+      <pattern>%date{"yyyy-MM-dd HH:mm:ss.SSS z"} [%thread] %-5level %logger{36} %X{akkaSource} - %msg%n</pattern>
     </encoder>
   </appender>
   <appender name="opendaylight.log" class="ch.qos.logback.core.rolling.RollingFileAppender">
   <!-- Web modules -->
   <logger name="org.opendaylight.controller.web" level="INFO"/>
 
+  <!-- Clustering -->
+  <logger name="org.opendaylight.controller.cluster" level="INFO"/>
+  <logger name="org.opendaylight.controller.cluster.datastore.node" level="INFO"/>
+
   <!--
        Unsynchronized controller startup causes models to crop up in random
        order, which results in temporary inability to fully resolve a model,
index 35a7766..c58f6cb 100644 (file)
                     </schema-service>
                 </module>
 
-                <!-- DISTRIBUTED_DATA_STORE -->
-                <!-- Enable the following modules if you want to use the Distributed Data Store instead of the InMemoryDataStore -->
-                <!--
-                <module>
-                    <type xmlns:prefix="urn:opendaylight:params:xml:ns:yang:controller:config:distributed-datastore-provider">prefix:distributed-operational-datastore-provider</type>
-                    <name>distributed-operational-store-module</name>
-                    <operational-schema-service>
-                        <type xmlns:dom="urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom">dom:schema-service</type>
-                        <name>yang-schema-service</name>
-                    </operational-schema-service>
-                </module>
-
-                <module>
-                    <type xmlns:prefix="urn:opendaylight:params:xml:ns:yang:controller:config:distributed-datastore-provider">prefix:distributed-config-datastore-provider</type>
-                    <name>distributed-config-store-module</name>
-                    <configschema-service>
-                        <type xmlns:dom="urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom">dom:schema-service</type>
-                        <name>yang-schema-service</name>
-                    </config-schema-service>
-                </module>
-                -->
-
                 <module>
                     <type xmlns:prefix="urn:opendaylight:params:xml:ns:yang:controller:inmemory-datastore-provider">prefix:inmemory-operational-datastore-provider</type>
                     <name>operational-store-service</name>
                    <config-data-store>
                         <type xmlns:config-dom-store-spi="urn:opendaylight:params:xml:ns:yang:controller:md:sal:core:spi:config-dom-store">config-dom-store-spi:config-dom-datastore</type>
                         <name>config-store-service</name>
-                        <!-- DISTRIBUTED_DATA_STORE -->
-                        <!--
-                        <name>distributed-config-store-service</name>
-                        -->
                     </config-data-store>
 
                     <operational-data-store>
                         <type xmlns:operational-dom-store-spi="urn:opendaylight:params:xml:ns:yang:controller:md:sal:core:spi:operational-dom-store">operational-dom-store-spi:operational-dom-datastore</type>
                         <name>operational-store-service</name>
-                        <!-- DISTRIBUTED_DATA_STORE -->
-                        <!--
-                        <name>distributed-operational-store-service</name>
-                        -->
-
                     </operational-data-store>
                 </module>
                 <module>
                         </binding-mapping-service>
                     </binding-forwarded-data-broker>
                 </module>
-                <!-- Cluster RPC -->
-                <!-- Enable the following module if you want to use remote rpc connector
-                <module>
-                    <type xmlns:prefix="urn:opendaylight:params:xml:ns:yang:controller:config:remote-rpc-connector">prefix:remote-rpc-connector</type>
-                    <name>remote-rpc-connector</name>
-                    <dom-broker xmlns="urn:opendaylight:params:xml:ns:yang:controller:config:remote-rpc-connector">
-                        <type xmlns:dom="urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom">dom:dom-broker-osgi-registry</type>
-                        <name>dom-broker</name>
-                    </dom-broker>
-                </module>
-                -->
             </modules>
             <services xmlns="urn:opendaylight:params:xml:ns:yang:controller:config">
                     <service>
                         </instance>
                     </service>
 
-                <!-- DISTRIBUTED_DATA_STORE -->
-                <!-- Enable the following if you want to use the Distributed Data Store instead of the InMemory Data Store -->
-                <!-- Note that you MUST delete the InMemoryDataStore related services which provide config-dom-datastore and operational-dom-datastore -->
-                <!--
-                <service>
-                    <type xmlns:config-dom-store-spi="urn:opendaylight:params:xml:ns:yang:controller:md:sal:core:spi:config-dom-store">config-dom-store-spi:config-dom-datastore</type>
-                    <instance>
-                        <name>distributed-config-store-service</name>
-                        <provider>/modules/module[type='distributed-config-datastore-provider'][name='distributed-config-store-module']</provider>
-                    </instance>
-                </service>
-                <service>
-                    <type xmlns:operational-dom-store-spi="urn:opendaylight:params:xml:ns:yang:controller:md:sal:core:spi:operational-dom-store">operational-dom-store-spi:operational-dom-datastore</type>
-                    <instance>
-                        <name>distributed-operational-store-service</name>
-                        <provider>/modules/module[type='distributed-operational-datastore-provider'][name='distributed-operational-store-module']</provider>
-                    </instance>
-                </service>
-                -->
-
-                <!-- DISTRIBUTED_DATA_STORE -->
-                <!-- Delete the following two services (config-store-service and operational-store-service) if you want to use the distributed data store instead -->
                 <service>
                     <type xmlns:config-dom-store-spi="urn:opendaylight:params:xml:ns:yang:controller:md:sal:core:spi:config-dom-store">config-dom-store-spi:config-dom-datastore</type>
                     <instance>
index a0bb004..ae8b6fe 100644 (file)
@@ -618,7 +618,7 @@ public abstract class RaftActor extends UntypedPersistentActor {
         }
 
         @Override public void update(long currentTerm, String votedFor) {
-            LOG.info("Set currentTerm={}, votedFor={}", currentTerm, votedFor);
+            LOG.debug("Set currentTerm={}, votedFor={}", currentTerm, votedFor);
 
             this.currentTerm = currentTerm;
             this.votedFor = votedFor;
index 707c532..251a13d 100644 (file)
@@ -127,6 +127,9 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
     protected RaftState requestVote(ActorRef sender,
         RequestVote requestVote) {
 
+
+        context.getLogger().debug(requestVote.toString());
+
         boolean grantVote = false;
 
         //  Reply false if term < currentTerm (ยง5.1)
@@ -326,7 +329,7 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
         }
         // Send a local message to the local RaftActor (it's derived class to be
         // specific to apply the log to it's index)
-        context.getLogger().info("Setting last applied to {}", index);
+        context.getLogger().debug("Setting last applied to {}", index);
         context.setLastApplied(index);
     }
 
index c125bd3..bb1927e 100644 (file)
@@ -81,7 +81,7 @@ public class Candidate extends AbstractRaftActorBehavior {
     @Override protected RaftState handleAppendEntries(ActorRef sender,
         AppendEntries appendEntries) {
 
-        context.getLogger().info("Candidate: Received {}", appendEntries.toString());
+        context.getLogger().debug(appendEntries.toString());
 
         return state();
     }
index c8cd41d..54e0494 100644 (file)
@@ -42,7 +42,7 @@ public class Follower extends AbstractRaftActorBehavior {
 
         if(appendEntries.getEntries() != null && appendEntries.getEntries().size() > 0) {
             context.getLogger()
-                .info("Follower: Received {}", appendEntries.toString());
+                .debug(appendEntries.toString());
         }
 
         // TODO : Refactor this method into a bunch of smaller methods
index 8b95e8b..234f9db 100644 (file)
@@ -120,7 +120,7 @@ public class Leader extends AbstractRaftActorBehavior {
     @Override protected RaftState handleAppendEntries(ActorRef sender,
         AppendEntries appendEntries) {
 
-        context.getLogger().info("Leader: Received {}", appendEntries.toString());
+        context.getLogger().debug(appendEntries.toString());
 
         return state();
     }
@@ -130,7 +130,7 @@ public class Leader extends AbstractRaftActorBehavior {
 
         if(! appendEntriesReply.isSuccess()) {
             context.getLogger()
-                .info("Leader: Received {}", appendEntriesReply.toString());
+                .debug(appendEntriesReply.toString());
         }
 
         // Update the FollowerLogInformation
@@ -294,12 +294,7 @@ public class Leader extends AbstractRaftActorBehavior {
                 List<ReplicatedLogEntry> entries = Collections.emptyList();
 
                 if (context.getReplicatedLog().isPresent(nextIndex)) {
-                    // TODO: Instead of sending all entries from nextIndex
-                    // only send a fixed number of entries to each follower
-                    // This is to avoid the situation where there are a lot of
-                    // entries to install for a fresh follower or to a follower
-                    // that has fallen too far behind with the log but yet is not
-                    // eligible to receive a snapshot
+                    // FIXME : Sending one entry at a time
                     entries =
                         context.getReplicatedLog().getFrom(nextIndex, 1);
                 }
index 848d425..648e8d2 100644 (file)
       <artifactId>akka-testkit_${scala.version}</artifactId>
     </dependency>
 
+    <dependency>
+      <groupId>com.typesafe.akka</groupId>
+      <artifactId>akka-slf4j_${scala.version}</artifactId>
+    </dependency>
+
     <!-- SAL Dependencies -->
 
     <dependency>
index ce05160..ac01f42 100644 (file)
@@ -33,4 +33,12 @@ public abstract class AbstractUntypedActor extends UntypedActor {
     }
 
     protected abstract void handleReceive(Object message) throws Exception;
+
+    protected void ignoreMessage(Object message){
+        LOG.debug("Unhandled message {} ", message);
+    }
+
+    protected void unknownMessage(Object message) throws Exception{
+        unhandled(message);
+    }
 }
index 142aacd..8910137 100644 (file)
@@ -12,18 +12,31 @@ import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
 import akka.cluster.Cluster;
 import akka.cluster.ClusterEvent;
+import com.google.common.base.Preconditions;
 
 public class ClusterWrapperImpl implements ClusterWrapper {
     private final Cluster cluster;
     private final String currentMemberName;
 
     public ClusterWrapperImpl(ActorSystem actorSystem){
+        Preconditions.checkNotNull(actorSystem, "actorSystem should not be null");
+
         cluster = Cluster.get(actorSystem);
+
+        Preconditions.checkState(cluster.getSelfRoles().size() > 0,
+            "No akka roles were specified\n" +
+                "One way to specify the member name is to pass a property on the command line like so\n" +
+                "   -Dakka.cluster.roles.0=member-3\n" +
+                "member-3 here would be the name of the member"
+        );
+
         currentMemberName = (String) cluster.getSelfRoles().toArray()[0];
 
     }
 
     public void subscribeToMemberEvents(ActorRef actorRef){
+        Preconditions.checkNotNull(actorRef, "actorRef should not be null");
+
         cluster.subscribe(actorRef, ClusterEvent.initialStateAsEvents(),
             ClusterEvent.MemberEvent.class,
             ClusterEvent.UnreachableMember.class);
index abc69f1..d0abb20 100644 (file)
@@ -29,7 +29,7 @@ public class CompositeModificationPayload extends Payload implements
         modification = null;
     }
     public CompositeModificationPayload(Object modification){
-        this.modification = (PersistentMessages.CompositeModification) modification;
+        this.modification = (PersistentMessages.CompositeModification) Preconditions.checkNotNull(modification, "modification should not be null");
     }
 
     @Override public Map<GeneratedMessage.GeneratedExtension, PersistentMessages.CompositeModification> encode() {
index 3459002..37b565d 100644 (file)
@@ -9,6 +9,7 @@
 package org.opendaylight.controller.cluster.datastore;
 
 import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 import com.typesafe.config.ConfigObject;
@@ -34,11 +35,23 @@ public class ConfigurationImpl implements Configuration {
     private static final Logger
         LOG = LoggerFactory.getLogger(DistributedDataStore.class);
 
+    // Look up maps to speed things up
+
+    // key = memberName, value = list of shardNames
+    private Map<String, List<String>> memberShardNames = new HashMap<>();
+
+    // key = shardName, value = list of replicaNames (replicaNames are the same as memberNames)
+    private Map<String, List<String>> shardReplicaNames = new HashMap<>();
+
 
     public ConfigurationImpl(String moduleShardsConfigPath,
 
         String modulesConfigPath){
 
+        Preconditions.checkNotNull(moduleShardsConfigPath, "moduleShardsConfigPath should not be null");
+        Preconditions.checkNotNull(modulesConfigPath, "modulesConfigPath should not be null");
+
+
         File moduleShardsFile = new File("./configuration/initial/" + moduleShardsConfigPath);
         File modulesFile = new File("./configuration/initial/" + modulesConfigPath);
 
@@ -66,6 +79,13 @@ public class ConfigurationImpl implements Configuration {
     }
 
     @Override public List<String> getMemberShardNames(String memberName){
+
+        Preconditions.checkNotNull(memberName, "memberName should not be null");
+
+        if(memberShardNames.containsKey(memberName)){
+            return memberShardNames.get(memberName);
+        }
+
         List<String> shards = new ArrayList();
         for(ModuleShard ms : moduleShards){
             for(Shard s : ms.getShards()){
@@ -76,11 +96,17 @@ public class ConfigurationImpl implements Configuration {
                 }
             }
         }
+
+        memberShardNames.put(memberName, shards);
+
         return shards;
 
     }
 
     @Override public Optional<String> getModuleNameFromNameSpace(String nameSpace) {
+
+        Preconditions.checkNotNull(nameSpace, "nameSpace should not be null");
+
         for(Module m : modules){
             if(m.getNameSpace().equals(nameSpace)){
                 return Optional.of(m.getName());
@@ -98,6 +124,9 @@ public class ConfigurationImpl implements Configuration {
     }
 
     @Override public List<String> getShardNamesFromModuleName(String moduleName) {
+
+        Preconditions.checkNotNull(moduleName, "moduleName should not be null");
+
         for(ModuleShard m : moduleShards){
             if(m.getModuleName().equals(moduleName)){
                 List<String> l = new ArrayList<>();
@@ -112,14 +141,23 @@ public class ConfigurationImpl implements Configuration {
     }
 
     @Override public List<String> getMembersFromShardName(String shardName) {
-        List<String> shards = new ArrayList();
+
+        Preconditions.checkNotNull(shardName, "shardName should not be null");
+
+        if(shardReplicaNames.containsKey(shardName)){
+            return shardReplicaNames.get(shardName);
+        }
+
         for(ModuleShard ms : moduleShards){
             for(Shard s : ms.getShards()) {
                 if(s.getName().equals(shardName)){
-                    return s.getReplicas();
+                    List<String> replicas = s.getReplicas();
+                    shardReplicaNames.put(shardName, replicas);
+                    return replicas;
                 }
             }
         }
+        shardReplicaNames.put(shardName, Collections.EMPTY_LIST);
         return Collections.EMPTY_LIST;
     }
 
index cdf04dd..1dab285 100644 (file)
@@ -10,6 +10,7 @@ package org.opendaylight.controller.cluster.datastore;
 
 import akka.actor.Props;
 import akka.japi.Creator;
+import com.google.common.base.Preconditions;
 import org.opendaylight.controller.cluster.datastore.messages.DataChanged;
 import org.opendaylight.controller.cluster.datastore.messages.DataChangedReply;
 import org.opendaylight.controller.cluster.datastore.messages.EnableNotification;
@@ -27,9 +28,10 @@ public class DataChangeListener extends AbstractUntypedActor {
 
     public DataChangeListener(SchemaContext schemaContext,
                               AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener, YangInstanceIdentifier pathId) {
-        this.listener = listener;
-        this.schemaContext = schemaContext;
-        this.pathId  = pathId;
+
+        this.schemaContext = Preconditions.checkNotNull(schemaContext, "schemaContext should not be null");
+        this.listener = Preconditions.checkNotNull(listener, "listener should not be null");
+        this.pathId  = Preconditions.checkNotNull(pathId, "pathId should not be null");
     }
 
     @Override public void handleReceive(Object message) throws Exception {
@@ -44,7 +46,7 @@ public class DataChangeListener extends AbstractUntypedActor {
         notificationsEnabled = message.isEnabled();
     }
 
-    public void dataChanged(Object message) {
+    private void dataChanged(Object message) {
 
         // Do nothing if notifications are not enabled
         if(!notificationsEnabled){
index a4ca456..6d83549 100644 (file)
@@ -9,6 +9,7 @@
 package org.opendaylight.controller.cluster.datastore;
 
 import akka.actor.ActorSelection;
+import com.google.common.base.Preconditions;
 import org.opendaylight.controller.cluster.datastore.messages.DataChanged;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
@@ -24,7 +25,7 @@ public class DataChangeListenerProxy implements AsyncDataChangeListener<YangInst
     private final SchemaContext schemaContext;
 
     public DataChangeListenerProxy(SchemaContext schemaContext,ActorSelection dataChangeListenerActor) {
-        this.dataChangeListenerActor = dataChangeListenerActor;
+        this.dataChangeListenerActor = Preconditions.checkNotNull(dataChangeListenerActor, "dataChangeListenerActor should not be null");
         this.schemaContext = schemaContext;
     }
 
index c433076..40e045f 100644 (file)
@@ -10,8 +10,10 @@ package org.opendaylight.controller.cluster.datastore;
 
 import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
+import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
+import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
@@ -50,7 +52,6 @@ public class DistributedDataStore implements DOMStore, SchemaContextListener, Au
             "mdsal.dist-datastore-executor-queue.size";
     private static final int DEFAULT_EXECUTOR_MAX_QUEUE_SIZE = 5000;
 
-    private final String type;
     private final ActorContext actorContext;
 
     private SchemaContext schemaContext;
@@ -72,14 +73,23 @@ public class DistributedDataStore implements DOMStore, SchemaContextListener, Au
                                     DEFAULT_EXECUTOR_MAX_QUEUE_SIZE), "DistDataStore"));
 
     public DistributedDataStore(ActorSystem actorSystem, String type, ClusterWrapper cluster, Configuration configuration) {
-        this(new ActorContext(actorSystem, actorSystem
+        Preconditions.checkNotNull(actorSystem, "actorSystem should not be null");
+        Preconditions.checkNotNull(type, "type should not be null");
+        Preconditions.checkNotNull(cluster, "cluster should not be null");
+        Preconditions.checkNotNull(configuration, "configuration should not be null");
+
+
+        String shardManagerId = ShardManagerIdentifier.builder().type(type).build().toString();
+
+        LOG.info("Creating ShardManager : {}", shardManagerId);
+
+        this.actorContext = new ActorContext(actorSystem, actorSystem
             .actorOf(ShardManager.props(type, cluster, configuration),
-                "shardmanager-" + type), cluster, configuration), type);
+                shardManagerId ), cluster, configuration);
     }
 
-    public DistributedDataStore(ActorContext actorContext, String type) {
-        this.type = type;
-        this.actorContext = actorContext;
+    public DistributedDataStore(ActorContext actorContext) {
+        this.actorContext = Preconditions.checkNotNull(actorContext, "actorContext should not be null");
     }
 
 
@@ -88,6 +98,12 @@ public class DistributedDataStore implements DOMStore, SchemaContextListener, Au
         YangInstanceIdentifier path, L listener,
         AsyncDataBroker.DataChangeScope scope) {
 
+        Preconditions.checkNotNull(path, "path should not be null");
+        Preconditions.checkNotNull(listener, "listener should not be null");
+
+
+        LOG.debug("Registering listener: {} for path: {} scope: {}", listener, path, scope);
+
         ActorRef dataChangeListenerActor = actorContext.getActorSystem().actorOf(
             DataChangeListener.props(schemaContext,listener,path ));
 
index 21fea96..63b2633 100644 (file)
@@ -16,7 +16,10 @@ import akka.event.LoggingAdapter;
 import akka.japi.Creator;
 import akka.serialization.Serialization;
 import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.ListenableFuture;
+import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
+import org.opendaylight.controller.cluster.datastore.identifiers.ShardTransactionIdentifier;
 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardMBeanFactory;
 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
@@ -67,6 +70,7 @@ public class Shard extends RaftActor {
 
     public static final String DEFAULT_NAME = "default";
 
+    // The state of this Shard
     private final InMemoryDOMDataStore store;
 
     private final Map<Object, DOMStoreThreePhaseCommitCohort>
@@ -76,10 +80,11 @@ public class Shard extends RaftActor {
         Logging.getLogger(getContext().system(), this);
 
     // By default persistent will be true and can be turned off using the system
-    // property persistent
+    // property shard.persistent
     private final boolean persistent;
 
-    private final String name;
+    /// The name of this shard
+    private final ShardIdentifier name;
 
     private volatile SchemaContext schemaContext;
 
@@ -87,8 +92,8 @@ public class Shard extends RaftActor {
 
     private final List<ActorSelection> dataChangeListeners = new ArrayList<>();
 
-    private Shard(String name, Map<String, String> peerAddresses) {
-        super(name, peerAddresses, Optional.of(configParams));
+    private Shard(ShardIdentifier name, Map<ShardIdentifier, String> peerAddresses) {
+        super(name.toString(), mapPeerAddresses(peerAddresses), Optional.of(configParams));
 
         this.name = name;
 
@@ -96,16 +101,32 @@ public class Shard extends RaftActor {
 
         this.persistent = !"false".equals(setting);
 
-        LOG.info("Creating shard : {} persistent : {}", name, persistent);
+        LOG.info("Shard created : {} persistent : {}", name, persistent);
 
-        store = InMemoryDOMDataStoreFactory.create(name, null);
+        store = InMemoryDOMDataStoreFactory.create(name.toString(), null);
 
-        shardMBean = ShardMBeanFactory.getShardStatsMBean(name);
+        shardMBean = ShardMBeanFactory.getShardStatsMBean(name.toString());
 
     }
 
-    public static Props props(final String name,
-        final Map<String, String> peerAddresses) {
+    private static Map<String, String> mapPeerAddresses(Map<ShardIdentifier, String> peerAddresses){
+        Map<String , String> map = new HashMap<>();
+
+        for(Map.Entry<ShardIdentifier, String> entry : peerAddresses.entrySet()){
+            map.put(entry.getKey().toString(), entry.getValue());
+        }
+
+        return map;
+    }
+
+
+
+
+    public static Props props(final ShardIdentifier name,
+        final Map<ShardIdentifier, String> peerAddresses) {
+        Preconditions.checkNotNull(name, "name should not be null");
+        Preconditions.checkNotNull(peerAddresses, "peerAddresses should not be null");
+
         return Props.create(new Creator<Shard>() {
 
             @Override
@@ -143,39 +164,46 @@ public class Shard extends RaftActor {
             }
         } else if (message instanceof PeerAddressResolved) {
             PeerAddressResolved resolved = (PeerAddressResolved) message;
-            setPeerAddress(resolved.getPeerId(), resolved.getPeerAddress());
-        } else{
+            setPeerAddress(resolved.getPeerId().toString(), resolved.getPeerAddress());
+        } else {
             super.onReceiveCommand(message);
         }
     }
 
     private ActorRef createTypedTransactionActor(
-        CreateTransaction createTransaction, String transactionId) {
+        CreateTransaction createTransaction, ShardTransactionIdentifier transactionId) {
         if (createTransaction.getTransactionType()
             == TransactionProxy.TransactionType.READ_ONLY.ordinal()) {
+
             shardMBean.incrementReadOnlyTransactionCount();
+
             return getContext().actorOf(
                 ShardTransaction
                     .props(store.newReadOnlyTransaction(), getSelf(),
-                        schemaContext), transactionId);
+                        schemaContext), transactionId.toString());
 
         } else if (createTransaction.getTransactionType()
             == TransactionProxy.TransactionType.READ_WRITE.ordinal()) {
+
             shardMBean.incrementReadWriteTransactionCount();
+
             return getContext().actorOf(
                 ShardTransaction
                     .props(store.newReadWriteTransaction(), getSelf(),
-                        schemaContext), transactionId);
+                        schemaContext), transactionId.toString());
 
 
         } else if (createTransaction.getTransactionType()
             == TransactionProxy.TransactionType.WRITE_ONLY.ordinal()) {
+
             shardMBean.incrementWriteOnlyTransactionCount();
+
             return getContext().actorOf(
                 ShardTransaction
                     .props(store.newWriteOnlyTransaction(), getSelf(),
-                        schemaContext), transactionId);
+                        schemaContext), transactionId.toString());
         } else {
+            // FIXME: This does not seem right
             throw new IllegalArgumentException(
                 "CreateTransaction message has unidentified transaction type="
                     + createTransaction.getTransactionType());
@@ -184,8 +212,8 @@ public class Shard extends RaftActor {
 
     private void createTransaction(CreateTransaction createTransaction) {
 
-        String transactionId = "shard-" + createTransaction.getTransactionId();
-        LOG.info("Creating transaction : {} ", transactionId);
+        ShardTransactionIdentifier transactionId = ShardTransactionIdentifier.builder().remoteTransactionId(createTransaction.getTransactionId()).build();
+        LOG.debug("Creating transaction : {} ", transactionId);
         ActorRef transactionActor =
             createTypedTransactionActor(createTransaction, transactionId);
 
@@ -202,9 +230,9 @@ public class Shard extends RaftActor {
         DOMStoreThreePhaseCommitCohort cohort =
             modificationToCohort.remove(serialized);
         if (cohort == null) {
-            LOG.error(
-                "Could not find cohort for modification : {}", modification);
-            LOG.info("Writing modification using a new transaction");
+            LOG.debug(
+                "Could not find cohort for modification : {}. Writing modification using a new transaction",
+                modification);
             DOMStoreReadWriteTransaction transaction =
                 store.newReadWriteTransaction();
             modification.apply(transaction);
@@ -237,7 +265,6 @@ public class Shard extends RaftActor {
                                 self);
                         shardMBean.incrementCommittedTransactionCount();
                         shardMBean.setLastCommittedTransactionTime(new Date());
-
                 } catch (InterruptedException | ExecutionException e) {
                     shardMBean.incrementFailedTransactionsCount();
                     sender.tell(new akka.actor.Status.Failure(e),self);
@@ -269,7 +296,7 @@ public class Shard extends RaftActor {
     private void registerChangeListener(
         RegisterChangeListener registerChangeListener) {
 
-        LOG.debug("registerDataChangeListener for " + registerChangeListener
+        LOG.debug("registerDataChangeListener for {}", registerChangeListener
             .getPath());
 
 
@@ -301,8 +328,8 @@ public class Shard extends RaftActor {
                 DataChangeListenerRegistration.props(registration));
 
         LOG.debug(
-            "registerDataChangeListener sending reply, listenerRegistrationPath = "
-                + listenerRegistration.path().toString());
+            "registerDataChangeListener sending reply, listenerRegistrationPath = {} "
+                , listenerRegistration.path().toString());
 
         getSender()
             .tell(new RegisterChangeListenerReply(listenerRegistration.path()),
@@ -330,7 +357,9 @@ public class Shard extends RaftActor {
             if (modification != null) {
                 commit(clientActor, modification);
             } else {
-                LOG.error("modification is null - this is very unexpected");
+                LOG.error(
+                    "modification is null - this is very unexpected, clientActor = {}, identifier = {}",
+                    identifier, clientActor.path().toString());
             }
 
 
@@ -338,6 +367,7 @@ public class Shard extends RaftActor {
             LOG.error("Unknown state received {}", data);
         }
 
+        // Update stats
         ReplicatedLogEntry lastLogEntry = getLastLogEntry();
 
         if(lastLogEntry != null){
@@ -373,7 +403,7 @@ public class Shard extends RaftActor {
     }
 
     @Override public String persistenceId() {
-        return this.name;
+        return this.name.toString();
     }
 
 
index 64c6821..6162a03 100644 (file)
@@ -18,6 +18,10 @@ import akka.cluster.ClusterEvent;
 import akka.japi.Creator;
 import akka.japi.Function;
 import com.google.common.base.Preconditions;
+import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
+import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier;
+import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfo;
+import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfoMBean;
 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
@@ -28,6 +32,7 @@ import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound;
 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
 import scala.concurrent.duration.Duration;
 
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -61,6 +66,8 @@ public class ShardManager extends AbstractUntypedActor {
 
     private final Configuration configuration;
 
+    private ShardManagerInfoMBean mBean;
+
     /**
      * @param type defines the kind of data that goes into shards created by this shard manager. Examples of type would be
      *             configuration or operational
@@ -82,6 +89,11 @@ public class ShardManager extends AbstractUntypedActor {
     public static Props props(final String type,
         final ClusterWrapper cluster,
         final Configuration configuration) {
+
+        Preconditions.checkNotNull(type, "type should not be null");
+        Preconditions.checkNotNull(cluster, "cluster should not be null");
+        Preconditions.checkNotNull(configuration, "configuration should not be null");
+
         return Props.create(new Creator<ShardManager>() {
 
             @Override
@@ -108,7 +120,7 @@ public class ShardManager extends AbstractUntypedActor {
         } else if(message instanceof ClusterEvent.UnreachableMember) {
             ignoreMessage(message);
         } else{
-          throw new Exception ("Not recognized message received, message="+message);
+            unknownMessage(message);
         }
 
     }
@@ -122,11 +134,8 @@ public class ShardManager extends AbstractUntypedActor {
             return;
         }
 
-        getSender().tell(new LocalShardNotFound(message.getShardName()), getSelf());
-    }
-
-    private void ignoreMessage(Object message){
-        LOG.debug("Unhandled message : " + message);
+        getSender().tell(new LocalShardNotFound(message.getShardName()),
+            getSelf());
     }
 
     private void memberRemoved(ClusterEvent.MemberRemoved message) {
@@ -140,7 +149,7 @@ public class ShardManager extends AbstractUntypedActor {
 
         for(ShardInformation info : localShards.values()){
             String shardName = info.getShardName();
-            info.updatePeerAddress(getShardActorName(memberName, shardName),
+            info.updatePeerAddress(getShardIdentifier(memberName, shardName),
                 getShardActorPath(shardName, memberName));
         }
     }
@@ -159,9 +168,6 @@ public class ShardManager extends AbstractUntypedActor {
     private void findPrimary(FindPrimary message) {
         String shardName = message.getShardName();
 
-        List<String> members =
-            configuration.getMembersFromShardName(shardName);
-
         // First see if the there is a local replica for the shard
         ShardInformation info = localShards.get(shardName);
         if(info != null) {
@@ -175,6 +181,9 @@ public class ShardManager extends AbstractUntypedActor {
             }
         }
 
+        List<String> members =
+            configuration.getMembersFromShardName(shardName);
+
         if(cluster.getCurrentMemberName() != null) {
             members.remove(cluster.getCurrentMemberName());
         }
@@ -196,9 +205,13 @@ public class ShardManager extends AbstractUntypedActor {
     private String getShardActorPath(String shardName, String memberName) {
         Address address = memberNameToAddress.get(memberName);
         if(address != null) {
-            return address.toString() + "/user/shardmanager-" + this.type + "/"
-                + getShardActorName(
-                memberName, shardName);
+            StringBuilder builder = new StringBuilder();
+            builder.append(address.toString())
+                .append("/user/")
+                .append(ShardManagerIdentifier.builder().type(type).build().toString())
+                .append("/")
+                .append(getShardIdentifier(memberName, shardName));
+            return builder.toString();
         }
         return null;
     }
@@ -211,8 +224,8 @@ public class ShardManager extends AbstractUntypedActor {
      * @param shardName
      * @return
      */
-    private String getShardActorName(String memberName, String shardName){
-        return memberName + "-shard-" + shardName + "-" + this.type;
+    private ShardIdentifier getShardIdentifier(String memberName, String shardName){
+        return ShardIdentifier.builder().memberName(memberName).shardName(shardName).type(type).build();
     }
 
     /**
@@ -225,15 +238,20 @@ public class ShardManager extends AbstractUntypedActor {
         List<String> memberShardNames =
             this.configuration.getMemberShardNames(memberName);
 
+        List<String> localShardActorNames = new ArrayList<>();
         for(String shardName : memberShardNames){
-            String shardActorName = getShardActorName(memberName, shardName);
-            Map<String, String> peerAddresses = getPeerAddresses(shardName);
+            ShardIdentifier shardId = getShardIdentifier(memberName, shardName);
+            Map<ShardIdentifier, String> peerAddresses = getPeerAddresses(shardName);
             ActorRef actor = getContext()
-                .actorOf(Shard.props(shardActorName, peerAddresses),
-                    shardActorName);
+                .actorOf(Shard.props(shardId, peerAddresses),
+                    shardId.toString());
+            localShardActorNames.add(shardId.toString());
             localShards.put(shardName, new ShardInformation(shardName, actor, peerAddresses));
         }
 
+        mBean = ShardManagerInfo
+            .createShardManagerMBean("shard-manager-" + this.type, localShardActorNames);
+
     }
 
     /**
@@ -242,9 +260,9 @@ public class ShardManager extends AbstractUntypedActor {
      * @param shardName
      * @return
      */
-    private Map<String, String> getPeerAddresses(String shardName){
+    private Map<ShardIdentifier, String> getPeerAddresses(String shardName){
 
-        Map<String, String> peerAddresses = new HashMap<>();
+        Map<ShardIdentifier, String> peerAddresses = new HashMap<>();
 
         List<String> members =
             this.configuration.getMembersFromShardName(shardName);
@@ -253,16 +271,16 @@ public class ShardManager extends AbstractUntypedActor {
 
         for(String memberName : members){
             if(!currentMemberName.equals(memberName)){
-                String shardActorName = getShardActorName(memberName, shardName);
+                ShardIdentifier shardId = getShardIdentifier(memberName,
+                    shardName);
                 String path =
                     getShardActorPath(shardName, currentMemberName);
-                peerAddresses.put(shardActorName, path);
+                peerAddresses.put(shardId, path);
             }
         }
         return peerAddresses;
     }
 
-
     @Override
     public SupervisorStrategy supervisorStrategy() {
         return new OneForOneStrategy(10, Duration.create("1 minute"),
@@ -280,10 +298,10 @@ public class ShardManager extends AbstractUntypedActor {
         private final String shardName;
         private final ActorRef actor;
         private final ActorPath actorPath;
-        private final Map<String, String> peerAddresses;
+        private final Map<ShardIdentifier, String> peerAddresses;
 
         private ShardInformation(String shardName, ActorRef actor,
-            Map<String, String> peerAddresses) {
+            Map<ShardIdentifier, String> peerAddresses) {
             this.shardName = shardName;
             this.actor = actor;
             this.actorPath = actor.path();
@@ -302,16 +320,15 @@ public class ShardManager extends AbstractUntypedActor {
             return actorPath;
         }
 
-        public Map<String, String> getPeerAddresses() {
-            return peerAddresses;
-        }
-
-        public void updatePeerAddress(String peerId, String peerAddress){
-            LOG.info("updatePeerAddress for peer {} with address {}", peerId, peerAddress);
+        public void updatePeerAddress(ShardIdentifier peerId, String peerAddress){
+            LOG.info("updatePeerAddress for peer {} with address {}", peerId,
+                peerAddress);
             if(peerAddresses.containsKey(peerId)){
                 peerAddresses.put(peerId, peerAddress);
 
-                LOG.info("Sending PeerAddressResolved for peer {} with address {} to {}", peerId, peerAddress, actor.path());
+                LOG.debug(
+                    "Sending PeerAddressResolved for peer {} with address {} to {}",
+                    peerId, peerAddress, actor.path());
 
                 actor
                     .tell(new PeerAddressResolved(peerId, peerAddress),
@@ -321,3 +338,6 @@ public class ShardManager extends AbstractUntypedActor {
         }
     }
 }
+
+
+
index 312ec9a..1ffe5ca 100644 (file)
@@ -15,6 +15,7 @@ import akka.event.LoggingAdapter;
 import akka.japi.Creator;
 import com.google.common.base.Optional;
 import com.google.common.util.concurrent.ListenableFuture;
+import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException;
 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
 import org.opendaylight.controller.cluster.datastore.messages.DeleteDataReply;
@@ -89,7 +90,6 @@ public abstract class ShardTransaction extends AbstractUntypedActor {
   protected ShardTransaction(DOMStoreTransactionChain transactionChain,
                           ActorRef shardActor, SchemaContext schemaContext) {
     this.transactionChain = transactionChain;
-    //this.transaction = transaction;
     this.shardActor = shardActor;
     this.schemaContext = schemaContext;
   }
@@ -173,7 +173,7 @@ public abstract class ShardTransaction extends AbstractUntypedActor {
       getSender().tell(new GetCompositeModificationReply(
           new ImmutableCompositeModification(modification)), getSelf());
     }else{
-      throw new Exception ("ShardTransaction:handleRecieve received an unknown message"+message);
+         throw new UnknownMessageException(message);
     }
   }
 
@@ -232,6 +232,7 @@ public abstract class ShardTransaction extends AbstractUntypedActor {
   }
 
   protected void deleteData(DOMStoreWriteTransaction transaction, DeleteData message) {
+    LOG.debug("deleteData at path : " + message.getPath().toString());
     modification.addModification(new DeleteModification(message.getPath()));
     try {
         transaction.delete(message.getPath());
index ce63f11..c508255 100644 (file)
@@ -40,23 +40,27 @@ public class ShardTransactionChain extends AbstractUntypedActor {
             chain.close();
             getSender().tell(new CloseTransactionChainReply().toSerializable(), getSelf());
         }else{
-          throw new Exception("Not recognized message recieved="+message);
+            unknownMessage(message);
         }
     }
 
+    private ActorRef getShardActor(){
+        return getContext().parent();
+    }
+
   private ActorRef createTypedTransactionActor(CreateTransaction createTransaction,String transactionId){
     if(createTransaction.getTransactionType()== TransactionProxy.TransactionType.READ_ONLY.ordinal()){
       return getContext().actorOf(
-          ShardTransaction.props( chain.newReadOnlyTransaction(), getSelf(), schemaContext), transactionId);
+          ShardTransaction.props( chain.newReadOnlyTransaction(), getShardActor(), schemaContext), transactionId);
 
     }else if (createTransaction.getTransactionType()== TransactionProxy.TransactionType.READ_WRITE.ordinal()){
       return getContext().actorOf(
-          ShardTransaction.props( chain.newReadWriteTransaction(), getSelf(), schemaContext), transactionId);
+          ShardTransaction.props( chain.newReadWriteTransaction(), getShardActor(), schemaContext), transactionId);
 
 
     }else if (createTransaction.getTransactionType()== TransactionProxy.TransactionType.WRITE_ONLY.ordinal()){
       return getContext().actorOf(
-          ShardTransaction.props( chain.newWriteOnlyTransaction(), getSelf(), schemaContext), transactionId);
+          ShardTransaction.props( chain.newWriteOnlyTransaction(), getShardActor(), schemaContext), transactionId);
     }else{
       throw new IllegalArgumentException ("CreateTransaction message has unidentified transaction type="+createTransaction.getTransactionType()) ;
     }
index a8deb01..500b73c 100644 (file)
@@ -67,7 +67,7 @@ public class ThreePhaseCommitCohort extends AbstractUntypedActor {
         } else if (message.getClass().equals(AbortTransaction.SERIALIZABLE_CLASS)) {
             abort(new AbortTransaction());
         } else {
-          throw new Exception ("Not recognized message received,message="+message);
+            unknownMessage(message);
         }
     }
 
@@ -130,7 +130,7 @@ public class ThreePhaseCommitCohort extends AbstractUntypedActor {
                     Boolean canCommit = future.get();
                     sender.tell(new CanCommitTransactionReply(canCommit).toSerializable(), self);
                 } catch (InterruptedException | ExecutionException e) {
-                    log.error(e, "An exception happened when aborting");
+                    log.error(e, "An exception happened when checking canCommit");
                 }
             }
         }, getContext().dispatcher());
index 915b13d..5b44794 100644 (file)
@@ -59,17 +59,22 @@ public class ThreePhaseCommitCohortProxy implements
     }
 
     @Override public ListenableFuture<Boolean> canCommit() {
+        LOG.debug("txn {} canCommit", transactionId);
         Callable<Boolean> call = new Callable<Boolean>() {
 
             @Override
             public Boolean call() throws Exception {
                 for(ActorPath actorPath : cohortPaths){
+
+                    Object message = new CanCommitTransaction().toSerializable();
+                    LOG.debug("txn {} Sending {} to {}", transactionId, message, actorPath);
+
                     ActorSelection cohort = actorContext.actorSelection(actorPath);
 
                     try {
                         Object response =
                                 actorContext.executeRemoteOperation(cohort,
-                                        new CanCommitTransaction().toSerializable(),
+                                        message,
                                         ActorContext.ASK_DURATION);
 
                         if (response.getClass().equals(CanCommitTransactionReply.SERIALIZABLE_CLASS)) {
@@ -80,6 +85,7 @@ public class ThreePhaseCommitCohortProxy implements
                             }
                         }
                     } catch(RuntimeException e){
+                        // FIXME : Need to properly handle this
                         LOG.error("Unexpected Exception", e);
                         return false;
                     }
@@ -93,14 +99,17 @@ public class ThreePhaseCommitCohortProxy implements
     }
 
     @Override public ListenableFuture<Void> preCommit() {
+        LOG.debug("txn {} preCommit", transactionId);
         return voidOperation(new PreCommitTransaction().toSerializable(), PreCommitTransactionReply.SERIALIZABLE_CLASS);
     }
 
     @Override public ListenableFuture<Void> abort() {
+        LOG.debug("txn {} abort", transactionId);
         return voidOperation(new AbortTransaction().toSerializable(), AbortTransactionReply.SERIALIZABLE_CLASS);
     }
 
     @Override public ListenableFuture<Void> commit() {
+        LOG.debug("txn {} commit", transactionId);
         return voidOperation(new CommitTransaction().toSerializable(), CommitTransactionReply.SERIALIZABLE_CLASS);
     }
 
@@ -111,6 +120,8 @@ public class ThreePhaseCommitCohortProxy implements
                 for(ActorPath actorPath : cohortPaths){
                     ActorSelection cohort = actorContext.actorSelection(actorPath);
 
+                    LOG.debug("txn {} Sending {} to {}", transactionId, message, actorPath);
+
                     try {
                         Object response =
                             actorContext.executeRemoteOperation(cohort,
index fa98905..5f9f1f8 100644 (file)
@@ -13,11 +13,13 @@ import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
 import akka.actor.Props;
 import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.CheckedFuture;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListeningExecutorService;
 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
 import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
+import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
@@ -75,7 +77,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
     private final TransactionType transactionType;
     private final ActorContext actorContext;
     private final Map<String, TransactionContext> remoteTransactionPaths = new HashMap<>();
-    private final String identifier;
+    private final TransactionIdentifier identifier;
     private final ListeningExecutorService executor;
     private final SchemaContext schemaContext;
 
@@ -85,13 +87,18 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
         ListeningExecutorService executor,
         SchemaContext schemaContext
     ) {
+        this.actorContext = Preconditions.checkNotNull(actorContext, "actorContext should not be null");
+        this.transactionType = Preconditions.checkNotNull(transactionType, "transactionType should not be null");
+        this.executor = Preconditions.checkNotNull(executor, "executor should not be null");
+        this.schemaContext = Preconditions.checkNotNull(schemaContext, "schemaContext should not be null");
+
+        String memberName = actorContext.getCurrentMemberName();
+        if(memberName == null){
+            memberName = "UNKNOWN-MEMBER";
+        }
+        this.identifier = TransactionIdentifier.builder().memberName(memberName).counter(counter.getAndIncrement()).build();
 
-        this.identifier = actorContext.getCurrentMemberName() + "-txn-" + counter.getAndIncrement();
-        this.transactionType = transactionType;
-        this.actorContext = actorContext;
-        this.executor = executor;
-        this.schemaContext = schemaContext;
-
+        LOG.debug("Created txn {}", identifier);
 
     }
 
@@ -99,6 +106,8 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
     public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read(
             final YangInstanceIdentifier path) {
 
+        LOG.debug("txn {} read {}", identifier, path);
+
         createTransactionIfMissing(actorContext, path);
 
         return transactionContext(path).readData(path);
@@ -107,6 +116,8 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
     @Override
     public void write(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
 
+        LOG.debug("txn {} write {}", identifier, path);
+
         createTransactionIfMissing(actorContext, path);
 
         transactionContext(path).writeData(path, data);
@@ -115,6 +126,8 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
     @Override
     public void merge(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
 
+        LOG.debug("txn {} merge {}", identifier, path);
+
         createTransactionIfMissing(actorContext, path);
 
         transactionContext(path).mergeData(path, data);
@@ -123,6 +136,8 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
     @Override
     public void delete(YangInstanceIdentifier path) {
 
+        LOG.debug("txn {} delete {}", identifier, path);
+
         createTransactionIfMissing(actorContext, path);
 
         transactionContext(path).deleteData(path);
@@ -132,7 +147,12 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
     public DOMStoreThreePhaseCommitCohort ready() {
         List<ActorPath> cohortPaths = new ArrayList<>();
 
+        LOG.debug("txn {} Trying to get {} transactions ready for commit", identifier, remoteTransactionPaths.size());
+
         for(TransactionContext transactionContext : remoteTransactionPaths.values()) {
+
+            LOG.debug("txn {} Readying transaction for shard {}", identifier, transactionContext.getShardName());
+
             Object result = transactionContext.readyTransaction();
 
             if(result.getClass().equals(ReadyTransactionReply.SERIALIZABLE_CLASS)){
@@ -143,7 +163,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
             }
         }
 
-        return new ThreePhaseCommitCohortProxy(actorContext, cohortPaths, identifier, executor);
+        return new ThreePhaseCommitCohortProxy(actorContext, cohortPaths, identifier.toString(), executor);
     }
 
     @Override
@@ -180,7 +200,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
 
         try {
             Object response = actorContext.executeShardOperation(shardName,
-                new CreateTransaction(identifier,this.transactionType.ordinal() ).toSerializable(),
+                new CreateTransaction(identifier.toString(),this.transactionType.ordinal() ).toSerializable(),
                 ActorContext.ASK_DURATION);
             if (response.getClass()
                 .equals(CreateTransactionReply.SERIALIZABLE_CLASS)) {
@@ -189,7 +209,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
 
                 String transactionPath = reply.getTransactionPath();
 
-                LOG.info("Received transaction path = {}"  , transactionPath );
+                LOG.debug("txn {} Received transaction path = {}", identifier, transactionPath);
 
                 ActorSelection transactionActor =
                     actorContext.actorSelection(transactionPath);
@@ -200,7 +220,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
                 remoteTransactionPaths.put(shardName, transactionContext);
             }
         } catch(TimeoutException | PrimaryNotFoundException e){
-            LOG.error("Creating NoOpTransaction because of : {}", e.getMessage());
+            LOG.error("txn {} Creating NoOpTransaction because of : {}", identifier, e.getMessage());
             remoteTransactionPaths.put(shardName,
                 new NoOpTransactionContext(shardName));
         }
@@ -324,35 +344,35 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
         }
 
         @Override public void closeTransaction() {
-            LOG.error("closeTransaction called");
+            LOG.warn("txn {} closeTransaction called", identifier);
         }
 
         @Override public Object readyTransaction() {
-            LOG.error("readyTransaction called");
+            LOG.warn("txn {} readyTransaction called", identifier);
             cohort = actorContext.getActorSystem().actorOf(Props.create(NoOpCohort.class));
             return new ReadyTransactionReply(cohort.path()).toSerializable();
         }
 
         @Override public void deleteData(YangInstanceIdentifier path) {
-            LOG.error("deleteData called path = {}", path);
+            LOG.warn("txt {} deleteData called path = {}", identifier, path);
         }
 
         @Override public void mergeData(YangInstanceIdentifier path,
             NormalizedNode<?, ?> data) {
-            LOG.error("mergeData called path = {}", path);
+            LOG.warn("txn {} mergeData called path = {}", identifier, path);
         }
 
         @Override
         public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> readData(
             YangInstanceIdentifier path) {
-            LOG.error("readData called path = {}", path);
+            LOG.warn("txn {} readData called path = {}", identifier, path);
             return Futures.immediateCheckedFuture(
                 Optional.<NormalizedNode<?, ?>>absent());
         }
 
         @Override public void writeData(YangInstanceIdentifier path,
             NormalizedNode<?, ?> data) {
-            LOG.error("writeData called path = {}", path);
+            LOG.warn("txn {} writeData called path = {}", identifier, path);
         }
     }
 
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/exceptions/UnknownMessageException.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/exceptions/UnknownMessageException.java
new file mode 100644 (file)
index 0000000..f4f2524
--- /dev/null
@@ -0,0 +1,21 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore.exceptions;
+
+public class UnknownMessageException extends Exception {
+    private final Object message;
+
+    public UnknownMessageException(Object message) {
+        this.message = message;
+    }
+
+    @Override public String getMessage() {
+        return "Unknown message received " + " - " + message;
+    }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/identifiers/ShardIdentifier.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/identifiers/ShardIdentifier.java
new file mode 100644 (file)
index 0000000..c692881
--- /dev/null
@@ -0,0 +1,97 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore.identifiers;
+
+import com.google.common.base.Preconditions;
+
+public class ShardIdentifier {
+    private final String shardName;
+    private final String memberName;
+    private final String type;
+
+
+    public ShardIdentifier(String shardName, String memberName, String type) {
+
+        Preconditions.checkNotNull(shardName, "shardName should not be null");
+        Preconditions.checkNotNull(memberName, "memberName should not be null");
+        Preconditions.checkNotNull(type, "type should not be null");
+
+        this.shardName = shardName;
+        this.memberName = memberName;
+        this.type = type;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+
+        ShardIdentifier that = (ShardIdentifier) o;
+
+        if (!memberName.equals(that.memberName)) {
+            return false;
+        }
+        if (!shardName.equals(that.shardName)) {
+            return false;
+        }
+        if (!type.equals(that.type)) {
+            return false;
+        }
+
+        return true;
+    }
+
+    @Override
+    public int hashCode() {
+        int result = shardName.hashCode();
+        result = 31 * result + memberName.hashCode();
+        result = 31 * result + type.hashCode();
+        return result;
+    }
+
+    @Override public String toString() {
+        StringBuilder builder = new StringBuilder();
+        builder.append(memberName).append("-shard-").append(shardName).append("-").append(type);
+        return builder.toString();
+    }
+
+    public static Builder builder(){
+        return new Builder();
+    }
+
+    public static class Builder {
+        private String shardName;
+        private String memberName;
+        private String type;
+
+        public ShardIdentifier build(){
+            return new ShardIdentifier(shardName, memberName, type);
+        }
+
+        public Builder shardName(String shardName){
+            this.shardName = shardName;
+            return this;
+        }
+
+        public Builder memberName(String memberName){
+            this.memberName = memberName;
+            return this;
+        }
+
+        public Builder type(String type){
+            this.type = type;
+            return this;
+        }
+
+    }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/identifiers/ShardManagerIdentifier.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/identifiers/ShardManagerIdentifier.java
new file mode 100644 (file)
index 0000000..65bf010
--- /dev/null
@@ -0,0 +1,64 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore.identifiers;
+
+public class ShardManagerIdentifier {
+    private final String type;
+
+    public ShardManagerIdentifier(String type) {
+        this.type = type;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+
+        ShardManagerIdentifier that = (ShardManagerIdentifier) o;
+
+        if (!type.equals(that.type)) {
+            return false;
+        }
+
+        return true;
+    }
+
+    @Override
+    public int hashCode() {
+        return type.hashCode();
+    }
+
+    @Override public String toString() {
+        StringBuilder builder = new StringBuilder();
+        builder.append("shardmanager-").append(type);
+        return builder.toString();
+    }
+
+    public static Builder builder(){
+        return new Builder();
+    }
+
+    public static class Builder {
+        private String type;
+
+        public Builder type(String type){
+            this.type = type;
+            return this;
+        }
+
+        public ShardManagerIdentifier build(){
+            return new ShardManagerIdentifier(this.type);
+        }
+
+    }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/identifiers/ShardTransactionIdentifier.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/identifiers/ShardTransactionIdentifier.java
new file mode 100644 (file)
index 0000000..77e8142
--- /dev/null
@@ -0,0 +1,67 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore.identifiers;
+
+import com.google.common.base.Preconditions;
+
+public class ShardTransactionIdentifier {
+    private final String remoteTransactionId;
+
+    public ShardTransactionIdentifier(String remoteTransactionId) {
+        this.remoteTransactionId = Preconditions.checkNotNull(remoteTransactionId, "remoteTransactionId should not be null");
+    }
+
+    public static Builder builder(){
+        return new Builder();
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+
+        ShardTransactionIdentifier that = (ShardTransactionIdentifier) o;
+
+        if (!remoteTransactionId.equals(that.remoteTransactionId)) {
+            return false;
+        }
+
+        return true;
+    }
+
+    @Override
+    public int hashCode() {
+        return remoteTransactionId.hashCode();
+    }
+
+    @Override public String toString() {
+        final StringBuilder sb =
+            new StringBuilder();
+        sb.append("shard-").append(remoteTransactionId);
+        return sb.toString();
+    }
+
+    public static class Builder {
+        private String remoteTransactionId;
+
+        public Builder remoteTransactionId(String remoteTransactionId){
+            this.remoteTransactionId = remoteTransactionId;
+            return this;
+        }
+
+        public ShardTransactionIdentifier build(){
+            return new ShardTransactionIdentifier(remoteTransactionId);
+        }
+
+    }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/identifiers/TransactionIdentifier.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/identifiers/TransactionIdentifier.java
new file mode 100644 (file)
index 0000000..ba2e27c
--- /dev/null
@@ -0,0 +1,80 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore.identifiers;
+
+import com.google.common.base.Preconditions;
+
+public class TransactionIdentifier {
+    private final String memberName;
+    private final long counter;
+
+
+    public TransactionIdentifier(String memberName, long counter) {
+        this.memberName = Preconditions.checkNotNull(memberName, "memberName should not be null");
+        this.counter = counter;
+    }
+
+    public static Builder builder(){
+        return new Builder();
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+
+        TransactionIdentifier that = (TransactionIdentifier) o;
+
+        if (counter != that.counter) {
+            return false;
+        }
+        if (!memberName.equals(that.memberName)) {
+            return false;
+        }
+
+        return true;
+    }
+
+    @Override
+    public int hashCode() {
+        int result = memberName.hashCode();
+        result = 31 * result + (int) (counter ^ (counter >>> 32));
+        return result;
+    }
+
+    @Override public String toString() {
+        final StringBuilder sb =
+            new StringBuilder();
+        sb.append(memberName).append("-txn-").append(counter);
+        return sb.toString();
+    }
+
+    public static class Builder {
+        private String memberName;
+        private long counter;
+
+        public TransactionIdentifier build(){
+            return new TransactionIdentifier(memberName, counter);
+        }
+
+        public Builder memberName(String memberName){
+            this.memberName = memberName;
+            return this;
+        }
+
+        public Builder counter(long counter){
+            this.counter = counter;
+            return this;
+        }
+    }
+}
index de1ac18..a5d7b77 100644 (file)
@@ -34,6 +34,7 @@ public abstract class AbstractBaseMBean {
   public static String BASE_JMX_PREFIX = "org.opendaylight.controller:";
   public static String JMX_TYPE_DISTRIBUTED_DATASTORE = "DistributedDatastore";
   public static String JMX_CATEGORY_SHARD = "Shard";
+  public static String JMX_CATEGORY_SHARD_MANAGER = "ShardManager";
 
   private static final Logger LOG = LoggerFactory
       .getLogger(AbstractBaseMBean.class);
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/jmx/mbeans/shardmanager/ShardManagerInfo.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/jmx/mbeans/shardmanager/ShardManagerInfo.java
new file mode 100644 (file)
index 0000000..0c609b4
--- /dev/null
@@ -0,0 +1,51 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager;
+
+import org.opendaylight.controller.cluster.datastore.jmx.mbeans.AbstractBaseMBean;
+
+import java.util.List;
+
+public class ShardManagerInfo extends AbstractBaseMBean implements
+    ShardManagerInfoMBean {
+
+    private final String name;
+    private final List<String> localShards;
+
+    public ShardManagerInfo(String name, List<String> localShards) {
+        this.name = name;
+        this.localShards = localShards;
+    }
+
+
+    @Override protected String getMBeanName() {
+        return name;
+    }
+
+    @Override protected String getMBeanType() {
+        return JMX_TYPE_DISTRIBUTED_DATASTORE;
+    }
+
+    @Override protected String getMBeanCategory() {
+        return JMX_CATEGORY_SHARD_MANAGER;
+    }
+
+    public static ShardManagerInfo createShardManagerMBean(String name, List<String> localShards){
+        ShardManagerInfo shardManagerInfo = new ShardManagerInfo(name,
+            localShards);
+
+        shardManagerInfo.registerMBean();
+
+        return shardManagerInfo;
+    }
+
+    @Override public List<String> getLocalShards() {
+        return localShards;
+    }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/jmx/mbeans/shardmanager/ShardManagerInfoMBean.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/jmx/mbeans/shardmanager/ShardManagerInfoMBean.java
new file mode 100644 (file)
index 0000000..28ccc4f
--- /dev/null
@@ -0,0 +1,15 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager;
+
+import java.util.List;
+
+public interface ShardManagerInfoMBean {
+    List<String> getLocalShards();
+}
index 8c2543e..346519e 100644 (file)
@@ -8,16 +8,18 @@
 
 package org.opendaylight.controller.cluster.datastore.messages;
 
+import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
+
 public class PeerAddressResolved {
-    private final String peerId;
+    private final ShardIdentifier peerId;
     private final String peerAddress;
 
-    public PeerAddressResolved(String peerId, String peerAddress) {
+    public PeerAddressResolved(ShardIdentifier peerId, String peerAddress) {
         this.peerId = peerId;
         this.peerAddress = peerAddress;
     }
 
-    public String getPeerId() {
+    public ShardIdentifier getPeerId() {
         return peerId;
     }
 
index 6178f49..8af9bd0 100644 (file)
@@ -1,6 +1,7 @@
 
 odl-cluster-data {
   akka {
+    loggers = ["akka.event.slf4j.Slf4jLogger"]
     cluster {
         roles = [
           "member-1"
@@ -39,6 +40,7 @@ odl-cluster-data {
 
 odl-cluster-rpc {
   akka {
+    loggers = ["akka.event.slf4j.Slf4jLogger"]
     actor {
       provider = "akka.cluster.ClusterActorRefProvider"
 
index 6599bd8..319451f 100644 (file)
@@ -14,8 +14,8 @@ import akka.actor.ActorSelection;
 import akka.actor.Props;
 import akka.event.Logging;
 import akka.testkit.JavaTestKit;
-import junit.framework.Assert;
 import org.junit.Test;
+import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionChain;
@@ -37,6 +37,8 @@ import scala.concurrent.duration.FiniteDuration;
 import java.util.Collections;
 
 import static junit.framework.Assert.assertEquals;
+import static junit.framework.Assert.assertTrue;
+import static org.junit.Assert.assertNotNull;
 
 public class BasicIntegrationTest extends AbstractActorTest {
 
@@ -52,7 +54,11 @@ public class BasicIntegrationTest extends AbstractActorTest {
 
 
         new JavaTestKit(getSystem()) {{
-            final Props props = Shard.props("config", Collections.EMPTY_MAP);
+            final ShardIdentifier identifier =
+                ShardIdentifier.builder().memberName("member-1")
+                    .shardName("inventory").type("config").build();
+
+            final Props props = Shard.props(identifier, Collections.EMPTY_MAP);
             final ActorRef shard = getSystem().actorOf(props);
 
             new Within(duration("5 seconds")) {
@@ -95,7 +101,7 @@ public class BasicIntegrationTest extends AbstractActorTest {
                             }
                         }.get(); // this extracts the received message
 
-                    Assert.assertNotNull(transactionChain);
+                    assertNotNull(transactionChain);
 
                     System.out.println("Successfully created transaction chain");
 
@@ -116,7 +122,7 @@ public class BasicIntegrationTest extends AbstractActorTest {
                             }
                         }.get(); // this extracts the received message
 
-                    Assert.assertNotNull(transaction);
+                    assertNotNull(transaction);
 
                     System.out.println("Successfully created transaction");
 
@@ -135,7 +141,7 @@ public class BasicIntegrationTest extends AbstractActorTest {
                         }
                     }.get(); // this extracts the received message
 
-                    Assert.assertTrue(writeDone);
+                    assertTrue(writeDone);
 
                     System.out.println("Successfully wrote data");
 
@@ -158,7 +164,7 @@ public class BasicIntegrationTest extends AbstractActorTest {
                             }
                         }.get(); // this extracts the received message
 
-                    Assert.assertNotNull(cohort);
+                    assertNotNull(cohort);
 
                     System.out.println("Successfully readied the transaction");
 
@@ -177,7 +183,7 @@ public class BasicIntegrationTest extends AbstractActorTest {
                             }
                         }.get(); // this extracts the received message
 
-                    Assert.assertTrue(preCommitDone);
+                    assertTrue(preCommitDone);
 
                     System.out.println("Successfully pre-committed the transaction");
 
index 56fd3c5..1732961 100644 (file)
@@ -8,6 +8,8 @@ import org.junit.Test;
 import java.io.File;
 import java.util.List;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
 public class ConfigurationImplTest {
@@ -31,6 +33,49 @@ public class ConfigurationImplTest {
 
         assertTrue(memberShardNames.contains("people-1"));
         assertTrue(memberShardNames.contains("cars-1"));
+
+        // Retrieve once again to hit cache
+
+        memberShardNames =
+            configuration.getMemberShardNames("member-1");
+
+        assertTrue(memberShardNames.contains("people-1"));
+        assertTrue(memberShardNames.contains("cars-1"));
+
+    }
+
+    @Test
+    public void testGetMembersFromShardName(){
+        List<String> members =
+            configuration.getMembersFromShardName("default");
+
+        assertEquals(3, members.size());
+
+        assertTrue(members.contains("member-1"));
+        assertTrue(members.contains("member-2"));
+        assertTrue(members.contains("member-3"));
+
+        assertFalse(members.contains("member-26"));
+
+        // Retrieve once again to hit cache
+        members =
+            configuration.getMembersFromShardName("default");
+
+        assertEquals(3, members.size());
+
+        assertTrue(members.contains("member-1"));
+        assertTrue(members.contains("member-2"));
+        assertTrue(members.contains("member-3"));
+
+        assertFalse(members.contains("member-26"));
+
+
+        // Try to find a shard which is not present
+
+        members =
+            configuration.getMembersFromShardName("foobar");
+
+        assertEquals(0, members.size());
     }
 
     @Test
index d1beab9..406f0ff 100644 (file)
@@ -1,8 +1,11 @@
 package org.opendaylight.controller.cluster.datastore;
 
 import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
 import akka.actor.Props;
-import junit.framework.Assert;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
 import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor;
@@ -21,13 +24,20 @@ import org.opendaylight.yangtools.concepts.ListenerRegistration;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+
 public class DistributedDataStoreTest extends AbstractActorTest{
 
     private DistributedDataStore distributedDataStore;
     private MockActorContext mockActorContext;
     private ActorRef doNothingActorRef;
 
-    @org.junit.Before
+    @Before
     public void setUp() throws Exception {
         ShardStrategyFactory.setConfiguration(new MockConfiguration());
         final Props props = Props.create(DoNothingActor.class);
@@ -35,7 +45,7 @@ public class DistributedDataStoreTest extends AbstractActorTest{
         doNothingActorRef = getSystem().actorOf(props);
 
         mockActorContext = new MockActorContext(getSystem(), doNothingActorRef);
-        distributedDataStore = new DistributedDataStore(mockActorContext, "config");
+        distributedDataStore = new DistributedDataStore(mockActorContext);
         distributedDataStore.onGlobalContextUpdated(
             TestModel.createTestContext());
 
@@ -48,12 +58,22 @@ public class DistributedDataStoreTest extends AbstractActorTest{
                 .build());
     }
 
-    @org.junit.After
+    @After
     public void tearDown() throws Exception {
 
     }
 
-    @org.junit.Test
+    @Test
+    public void testConstructor(){
+        ActorSystem actorSystem = mock(ActorSystem.class);
+
+        new DistributedDataStore(actorSystem, "config",
+            mock(ClusterWrapper.class), mock(Configuration.class));
+
+        verify(actorSystem).actorOf(any(Props.class), eq("shardmanager-config"));
+    }
+
+    @Test
     public void testRegisterChangeListenerWhenShardIsNotLocal() throws Exception {
 
         ListenerRegistration registration =
@@ -65,12 +85,12 @@ public class DistributedDataStoreTest extends AbstractActorTest{
         }, AsyncDataBroker.DataChangeScope.BASE);
 
         // Since we do not expect the shard to be local registration will return a NoOpRegistration
-        Assert.assertTrue(registration instanceof NoOpDataChangeListenerRegistration);
+        assertTrue(registration instanceof NoOpDataChangeListenerRegistration);
 
-        Assert.assertNotNull(registration);
+        assertNotNull(registration);
     }
 
-    @org.junit.Test
+    @Test
     public void testRegisterChangeListenerWhenShardIsLocal() throws Exception {
 
         mockActorContext.setExecuteLocalShardOperationResponse(new RegisterChangeListenerReply(doNothingActorRef.path()));
@@ -83,33 +103,33 @@ public class DistributedDataStoreTest extends AbstractActorTest{
                 }
             }, AsyncDataBroker.DataChangeScope.BASE);
 
-        Assert.assertTrue(registration instanceof DataChangeListenerRegistrationProxy);
+        assertTrue(registration instanceof DataChangeListenerRegistrationProxy);
 
-        Assert.assertNotNull(registration);
+        assertNotNull(registration);
     }
 
 
-    @org.junit.Test
+    @Test
     public void testCreateTransactionChain() throws Exception {
         final DOMStoreTransactionChain transactionChain = distributedDataStore.createTransactionChain();
-        Assert.assertNotNull(transactionChain);
+        assertNotNull(transactionChain);
     }
 
-    @org.junit.Test
+    @Test
     public void testNewReadOnlyTransaction() throws Exception {
         final DOMStoreReadTransaction transaction = distributedDataStore.newReadOnlyTransaction();
-        Assert.assertNotNull(transaction);
+        assertNotNull(transaction);
     }
 
-    @org.junit.Test
+    @Test
     public void testNewWriteOnlyTransaction() throws Exception {
         final DOMStoreWriteTransaction transaction = distributedDataStore.newWriteOnlyTransaction();
-        Assert.assertNotNull(transaction);
+        assertNotNull(transaction);
     }
 
-    @org.junit.Test
+    @Test
     public void testNewReadWriteTransaction() throws Exception {
         final DOMStoreReadWriteTransaction transaction = distributedDataStore.newReadWriteTransaction();
-        Assert.assertNotNull(transaction);
+        assertNotNull(transaction);
     }
 }
index 431a266..0d86ffb 100644 (file)
@@ -6,6 +6,7 @@ import akka.event.Logging;
 import akka.testkit.JavaTestKit;
 import junit.framework.Assert;
 import org.junit.Test;
+import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionChain;
 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionChainReply;
@@ -35,7 +36,11 @@ public class ShardTest extends AbstractActorTest {
     @Test
     public void testOnReceiveCreateTransactionChain() throws Exception {
         new JavaTestKit(getSystem()) {{
-            final Props props = Shard.props("config", Collections.EMPTY_MAP);
+            final ShardIdentifier identifier =
+                ShardIdentifier.builder().memberName("member-1")
+                    .shardName("inventory").type("config").build();
+
+            final Props props = Shard.props(identifier, Collections.EMPTY_MAP);
             final ActorRef subject =
                 getSystem().actorOf(props, "testCreateTransactionChain");
 
@@ -87,7 +92,11 @@ public class ShardTest extends AbstractActorTest {
     @Test
     public void testOnReceiveRegisterListener() throws Exception {
         new JavaTestKit(getSystem()) {{
-            final Props props = Shard.props("config", Collections.EMPTY_MAP);
+            final ShardIdentifier identifier =
+                ShardIdentifier.builder().memberName("member-1")
+                    .shardName("inventory").type("config").build();
+
+            final Props props = Shard.props(identifier, Collections.EMPTY_MAP);
             final ActorRef subject =
                 getSystem().actorOf(props, "testRegisterChangeListener");
 
@@ -141,7 +150,11 @@ public class ShardTest extends AbstractActorTest {
     @Test
     public void testCreateTransaction(){
         new JavaTestKit(getSystem()) {{
-            final Props props = Shard.props("config", Collections.EMPTY_MAP);
+            final ShardIdentifier identifier =
+                ShardIdentifier.builder().memberName("member-1")
+                    .shardName("inventory").type("config").build();
+
+            final Props props = Shard.props(identifier, Collections.EMPTY_MAP);
             final ActorRef subject =
                 getSystem().actorOf(props, "testCreateTransaction");
 
@@ -196,9 +209,14 @@ public class ShardTest extends AbstractActorTest {
     @Test
     public void testPeerAddressResolved(){
         new JavaTestKit(getSystem()) {{
-            Map<String, String> peerAddresses = new HashMap<>();
-            peerAddresses.put("member-2", null);
-            final Props props = Shard.props("config", peerAddresses);
+            Map<ShardIdentifier, String> peerAddresses = new HashMap<>();
+
+            final ShardIdentifier identifier =
+                ShardIdentifier.builder().memberName("member-1")
+                    .shardName("inventory").type("config").build();
+
+            peerAddresses.put(identifier, null);
+            final Props props = Shard.props(identifier, peerAddresses);
             final ActorRef subject =
                 getSystem().actorOf(props, "testPeerAddressResolved");
 
@@ -206,7 +224,7 @@ public class ShardTest extends AbstractActorTest {
                 protected void run() {
 
                     subject.tell(
-                        new PeerAddressResolved("member-2", "akka://foobar"),
+                        new PeerAddressResolved(identifier, "akka://foobar"),
                         getRef());
 
                     expectNoMsg();
index 86016a6..0f9e771 100644 (file)
@@ -15,9 +15,8 @@ import akka.actor.Props;
 import akka.testkit.TestActorRef;
 import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
-import org.junit.Assert;
 import org.junit.Test;
-import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
+import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
@@ -30,7 +29,6 @@ import scala.concurrent.duration.Duration;
 
 import java.util.Collections;
 
-import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
 /**
@@ -48,45 +46,21 @@ public class ShardTransactionFailureTest extends AbstractActorTest {
     private static final SchemaContext testSchemaContext =
         TestModel.createTestContext();
 
+    private static final ShardIdentifier SHARD_IDENTIFIER =
+        ShardIdentifier.builder().memberName("member-1")
+            .shardName("inventory").type("config").build();
+
     static {
         store.onGlobalContextUpdated(testSchemaContext);
     }
 
 
-    @Test
-    public void testNegativePerformingWriteOperationOnReadTransaction()
-        throws Exception {
-        try {
-
-            final ActorRef
-                shard = getSystem()
-                .actorOf(Shard.props("config", Collections.EMPTY_MAP));
-            final Props props =
-                ShardTransaction
-                    .props(store.newReadOnlyTransaction(), shard, TestModel
-                        .createTestContext());
-            final TestActorRef subject = TestActorRef.apply(props, getSystem());
-
-            subject
-                .receive(new DeleteData(TestModel.TEST_PATH).toSerializable(),
-                    ActorRef.noSender());
-            Assert.assertFalse(true);
-
-
-        } catch (Exception cs) {
-            assertEquals(cs.getClass().getSimpleName(),
-                Exception.class.getSimpleName());
-            assertTrue(cs.getMessage().startsWith(
-                "ShardTransaction:handleRecieve received an unknown message"));
-        }
-    }
-
     @Test(expected = ReadFailedException.class)
     public void testNegativeReadWithReadOnlyTransactionClosed()
         throws Throwable {
 
         final ActorRef shard =
-            getSystem().actorOf(Shard.props("config", Collections.EMPTY_MAP));
+            getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP));
         final Props props =
             ShardTransaction.props(store.newReadOnlyTransaction(), shard,
                 TestModel.createTestContext());
@@ -121,7 +95,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest {
         throws Throwable {
 
         final ActorRef shard =
-            getSystem().actorOf(Shard.props("config", Collections.EMPTY_MAP));
+            getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP));
         final Props props =
             ShardTransaction.props(store.newReadWriteTransaction(), shard,
                 TestModel.createTestContext());
@@ -150,13 +124,12 @@ public class ShardTransactionFailureTest extends AbstractActorTest {
 
     }
 
-
     @Test(expected = IllegalStateException.class)
     public void testNegativeWriteWithTransactionReady() throws Exception {
 
 
         final ActorRef shard =
-            getSystem().actorOf(Shard.props("config", Collections.EMPTY_MAP));
+            getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP));
         final Props props =
             ShardTransaction.props(store.newWriteOnlyTransaction(), shard,
                 TestModel.createTestContext());
@@ -195,7 +168,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest {
 
 
         final ActorRef shard =
-            getSystem().actorOf(Shard.props("config", Collections.EMPTY_MAP));
+            getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP));
         final Props props =
             ShardTransaction.props(store.newReadWriteTransaction(), shard,
                 TestModel.createTestContext());
@@ -233,7 +206,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest {
 
 
         final ActorRef shard =
-            getSystem().actorOf(Shard.props("config", Collections.EMPTY_MAP));
+            getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP));
         final Props props =
             ShardTransaction.props(store.newReadWriteTransaction(), shard,
                 TestModel.createTestContext());
@@ -271,7 +244,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest {
 
 
         final ActorRef shard =
-            getSystem().actorOf(Shard.props("config", Collections.EMPTY_MAP));
+            getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP));
         final Props props =
             ShardTransaction.props(store.newReadWriteTransaction(), shard,
                 TestModel.createTestContext());
@@ -300,4 +273,6 @@ public class ShardTransactionFailureTest extends AbstractActorTest {
 
 
     }
+
+
 }
index 6fe5154..a2273f6 100644 (file)
@@ -9,6 +9,8 @@ import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
 import org.junit.Assert;
 import org.junit.Test;
+import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException;
+import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
@@ -46,6 +48,11 @@ public class ShardTransactionTest extends AbstractActorTest {
 
     private static final SchemaContext testSchemaContext = TestModel.createTestContext();
 
+    private static final ShardIdentifier SHARD_IDENTIFIER =
+        ShardIdentifier.builder().memberName("member-1")
+            .shardName("inventory").type("config").build();
+
+
     static {
         store.onGlobalContextUpdated(testSchemaContext);
     }
@@ -53,7 +60,7 @@ public class ShardTransactionTest extends AbstractActorTest {
     @Test
     public void testOnReceiveReadData() throws Exception {
         new JavaTestKit(getSystem()) {{
-            final ActorRef shard = getSystem().actorOf(Shard.props("config", Collections.EMPTY_MAP));
+            final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP));
             final Props props =
                 ShardTransaction.props(store.newReadOnlyTransaction(), shard, testSchemaContext);
             final ActorRef subject = getSystem().actorOf(props, "testReadData");
@@ -95,7 +102,7 @@ public class ShardTransactionTest extends AbstractActorTest {
     @Test
     public void testOnReceiveReadDataWhenDataNotFound() throws Exception {
         new JavaTestKit(getSystem()) {{
-            final ActorRef shard = getSystem().actorOf(Shard.props("config", Collections.EMPTY_MAP));
+            final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP));
             final Props props =
                 ShardTransaction.props( store.newReadOnlyTransaction(), shard, testSchemaContext);
             final ActorRef subject = getSystem().actorOf(props, "testReadDataWhenDataNotFound");
@@ -173,7 +180,7 @@ public class ShardTransactionTest extends AbstractActorTest {
     @Test
     public void testOnReceiveWriteData() throws Exception {
         new JavaTestKit(getSystem()) {{
-            final ActorRef shard = getSystem().actorOf(Shard.props("config", Collections.EMPTY_MAP));
+            final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP));
             final Props props =
                 ShardTransaction.props(store.newWriteOnlyTransaction(), shard, TestModel.createTestContext());
             final ActorRef subject =
@@ -213,7 +220,7 @@ public class ShardTransactionTest extends AbstractActorTest {
     @Test
     public void testOnReceiveMergeData() throws Exception {
         new JavaTestKit(getSystem()) {{
-            final ActorRef shard = getSystem().actorOf(Shard.props("config", Collections.EMPTY_MAP));
+            final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP));
             final Props props =
                 ShardTransaction.props(store.newReadWriteTransaction(), shard, testSchemaContext);
             final ActorRef subject =
@@ -254,7 +261,7 @@ public class ShardTransactionTest extends AbstractActorTest {
     @Test
     public void testOnReceiveDeleteData() throws Exception {
         new JavaTestKit(getSystem()) {{
-            final ActorRef shard = getSystem().actorOf(Shard.props("config", Collections.EMPTY_MAP));
+            final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP));
             final Props props =
                 ShardTransaction.props( store.newWriteOnlyTransaction(), shard, TestModel.createTestContext());
             final ActorRef subject =
@@ -293,7 +300,7 @@ public class ShardTransactionTest extends AbstractActorTest {
     @Test
     public void testOnReceiveReadyTransaction() throws Exception {
         new JavaTestKit(getSystem()) {{
-            final ActorRef shard = getSystem().actorOf(Shard.props("config", Collections.EMPTY_MAP));
+            final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP));
             final Props props =
                 ShardTransaction.props( store.newReadWriteTransaction(), shard, TestModel.createTestContext());
             final ActorRef subject =
@@ -331,7 +338,7 @@ public class ShardTransactionTest extends AbstractActorTest {
     @Test
     public void testOnReceiveCloseTransaction() throws Exception {
         new JavaTestKit(getSystem()) {{
-            final ActorRef shard = getSystem().actorOf(Shard.props("config", Collections.EMPTY_MAP));
+            final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP));
             final Props props =
                 ShardTransaction.props(store.newReadWriteTransaction(), shard, TestModel.createTestContext());
             final ActorRef subject =
@@ -386,7 +393,7 @@ public class ShardTransactionTest extends AbstractActorTest {
   public void testNegativePerformingWriteOperationOnReadTransaction() throws Exception {
     try {
 
-        final ActorRef shard = getSystem().actorOf(Shard.props("config", Collections.EMPTY_MAP));
+        final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP));
         final Props props =
             ShardTransaction.props(store.newReadOnlyTransaction(), shard, TestModel.createTestContext());
          final TestActorRef subject = TestActorRef.apply(props,getSystem());
@@ -396,8 +403,8 @@ public class ShardTransactionTest extends AbstractActorTest {
 
 
     } catch (Exception cs) {
-      assertEquals(cs.getClass().getSimpleName(), Exception.class.getSimpleName());
-      assertTrue(cs.getMessage().startsWith("ShardTransaction:handleRecieve received an unknown message"));
+      assertEquals(UnknownMessageException.class.getSimpleName(), cs.getClass().getSimpleName());
+      assertTrue(cs.getMessage(), cs.getMessage().startsWith("Unknown message received "));
     }
   }
 }
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/identifiers/ShardIdentifierTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/identifiers/ShardIdentifierTest.java
new file mode 100644 (file)
index 0000000..afcd045
--- /dev/null
@@ -0,0 +1,18 @@
+package org.opendaylight.controller.cluster.datastore.identifiers;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class ShardIdentifierTest {
+
+    @Test
+    public void testBasic(){
+        ShardIdentifier id = ShardIdentifier.builder().memberName("member-1")
+            .shardName("inventory").type("config").build();
+
+        assertEquals("member-1-shard-inventory-config", id.toString());
+    }
+
+
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/identifiers/ShardManagerIdentifierTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/identifiers/ShardManagerIdentifierTest.java
new file mode 100644 (file)
index 0000000..44bb4b3
--- /dev/null
@@ -0,0 +1,14 @@
+package org.opendaylight.controller.cluster.datastore.identifiers;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+    public class ShardManagerIdentifierTest {
+
+    @Test
+    public void testIdentifier(){
+        assertEquals("shardmanager-operational", ShardManagerIdentifier.builder().type("operational").build().toString());
+    }
+
+}
index eda1c30..27b0374 100644 (file)
@@ -1,5 +1,6 @@
 akka {
-    loggers = [akka.testkit.TestEventListener]
+    loggers = ["akka.testkit.TestEventListener", "akka.event.slf4j.Slf4jLogger"]
+
     actor {
          serializers {
                   java = "akka.serialization.JavaSerializer"

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.