Merge "Bug 1446: Add JMX stats for clustered data store"
authorMoiz Raja <moraja@cisco.com>
Fri, 5 Sep 2014 02:42:24 +0000 (02:42 +0000)
committerGerrit Code Review <gerrit@opendaylight.org>
Fri, 5 Sep 2014 02:42:24 +0000 (02:42 +0000)
1  2 
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionFailureTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortFailureTest.java

@@@ -130,12 -117,10 +129,14 @@@ public class Shard extends RaftActor 
          store = InMemoryDOMDataStoreFactory.create(name.toString(), null,
                  datastoreContext.getDataStoreProperties());
  
-         shardMBean = ShardMBeanFactory.getShardStatsMBean(name.toString());
 +        if(schemaContext != null) {
 +            store.onGlobalContextUpdated(schemaContext);
 +        }
 +
+         shardMBean = ShardMBeanFactory.getShardStatsMBean(name.toString(),
+                 datastoreContext.getDataStoreMXBeanType());
+         shardMBean.setDataStoreExecutor(store.getDomStoreExecutor());
+         shardMBean.setNotificationManager(store.getDataChangeListenerNotificationManager());
  
      }
  
  
              return getContext().actorOf(
                  ShardTransaction.props(store.newReadOnlyTransaction(), getSelf(),
-                         schemaContext,datastoreContext, name.toString()), transactionId.toString());
+                         schemaContext,datastoreContext, shardMBean), transactionId.toString());
  
 -        } else if (createTransaction.getTransactionType()
 +        } else if (transactionType
              == TransactionProxy.TransactionType.READ_WRITE.ordinal()) {
  
              shardMBean.incrementReadWriteTransactionCount();
  
              return getContext().actorOf(
                  ShardTransaction.props(store.newReadWriteTransaction(), getSelf(),
-                         schemaContext, datastoreContext,name.toString()), transactionId.toString());
+                         schemaContext, datastoreContext, shardMBean), transactionId.toString());
  
  
 -        } else if (createTransaction.getTransactionType()
 +        } else if (transactionType
              == TransactionProxy.TransactionType.WRITE_ONLY.ordinal()) {
  
              shardMBean.incrementWriteOnlyTransactionCount();
      private void createTransactionChain() {
          DOMStoreTransactionChain chain = store.createTransactionChain();
          ActorRef transactionChain = getContext().actorOf(
-                 ShardTransactionChain.props(chain, schemaContext, datastoreContext,name.toString() ));
+                 ShardTransactionChain.props(chain, schemaContext, datastoreContext, shardMBean));
          getSender().tell(new CreateTransactionChainReply(transactionChain.path()).toSerializable(),
 -                getSelf());
 +            getSelf());
      }
  
      @Override protected void applyState(ActorRef clientActor, String identifier,
@@@ -72,9 -71,10 +75,9 @@@ public class ShardTransactionFailureTes
      public void testNegativeReadWithReadOnlyTransactionClosed()
          throws Throwable {
  
 -        final ActorRef shard =
 -            getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP, new DatastoreContext()));
 +        final ActorRef shard = createShard();
          final Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
-                 testSchemaContext, datastoreContext,SHARD_IDENTIFIER.toString());
+                 testSchemaContext, datastoreContext, shardStats);
  
          final TestActorRef<ShardTransaction> subject = TestActorRef
              .create(getSystem(), props,
      public void testNegativeReadWithReadWriteTransactionClosed()
          throws Throwable {
  
 -        final ActorRef shard =
 -            getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP, new DatastoreContext()));
 +        final ActorRef shard = createShard();
          final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
-                 testSchemaContext, datastoreContext,SHARD_IDENTIFIER.toString());
+                 testSchemaContext, datastoreContext, shardStats);
  
          final TestActorRef<ShardTransaction> subject = TestActorRef
              .create(getSystem(), props,
      public void testNegativeExistsWithReadWriteTransactionClosed()
          throws Throwable {
  
 -        final ActorRef shard =
 -            getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP, new DatastoreContext()));
 +        final ActorRef shard = createShard();
          final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
-                 testSchemaContext, datastoreContext,SHARD_IDENTIFIER.toString());
+                 testSchemaContext, datastoreContext, shardStats);
  
          final TestActorRef<ShardTransaction> subject = TestActorRef
              .create(getSystem(), props,
      public void testNegativeWriteWithTransactionReady() throws Exception {
  
  
 -        final ActorRef shard =
 -            getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP, new DatastoreContext()));
 +        final ActorRef shard = createShard();
          final Props props = ShardTransaction.props(store.newWriteOnlyTransaction(), shard,
-                 testSchemaContext, datastoreContext,SHARD_IDENTIFIER.toString());
+                 testSchemaContext, datastoreContext, shardStats);
  
          final TestActorRef<ShardTransaction> subject = TestActorRef
              .create(getSystem(), props,
      public void testNegativeReadWriteWithTransactionReady() throws Exception {
  
  
 -        final ActorRef shard =
 -            getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP, new DatastoreContext()));
 +        final ActorRef shard = createShard();
          final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
-                 testSchemaContext, datastoreContext,SHARD_IDENTIFIER.toString());
+                 testSchemaContext, datastoreContext, shardStats);
  
          final TestActorRef<ShardTransaction> subject = TestActorRef
              .create(getSystem(), props,
      public void testNegativeMergeTransactionReady() throws Exception {
  
  
 -        final ActorRef shard =
 -            getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP, new DatastoreContext()));
 +        final ActorRef shard = createShard();
          final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
-                 testSchemaContext, datastoreContext,SHARD_IDENTIFIER.toString());
+                 testSchemaContext, datastoreContext, shardStats);
  
          final TestActorRef<ShardTransaction> subject = TestActorRef
              .create(getSystem(), props, "testNegativeMergeTransactionReady");
      public void testNegativeDeleteDataWhenTransactionReady() throws Exception {
  
  
 -        final ActorRef shard =
 -            getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP, new DatastoreContext()));
 +        final ActorRef shard = createShard();
          final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
-                 testSchemaContext, datastoreContext,SHARD_IDENTIFIER.toString());
+                 testSchemaContext, datastoreContext, shardStats);
  
          final TestActorRef<ShardTransaction> subject = TestActorRef
              .create(getSystem(), props,
@@@ -75,9 -73,10 +78,9 @@@ public class ShardTransactionTest exten
      @Test
      public void testOnReceiveReadData() throws Exception {
          new JavaTestKit(getSystem()) {{
 -            final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
 -                    Collections.EMPTY_MAP, new DatastoreContext()));
 +            final ActorRef shard = createShard();
              final Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
-                     testSchemaContext, datastoreContext,SHARD_IDENTIFIER.toString());
+                     testSchemaContext, datastoreContext, shardStats);
              final ActorRef subject = getSystem().actorOf(props, "testReadData");
  
              new Within(duration("1 seconds")) {
      @Test
      public void testOnReceiveReadDataWhenDataNotFound() throws Exception {
          new JavaTestKit(getSystem()) {{
 -            final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
 -                    Collections.EMPTY_MAP, new DatastoreContext()));
 +            final ActorRef shard = createShard();
              final Props props = ShardTransaction.props( store.newReadOnlyTransaction(), shard,
-                     testSchemaContext, datastoreContext,SHARD_IDENTIFIER.toString());
+                     testSchemaContext, datastoreContext, shardStats);
              final ActorRef subject = getSystem().actorOf(props, "testReadDataWhenDataNotFound");
  
              new Within(duration("1 seconds")) {
      @Test
      public void testOnReceiveDataExistsPositive() throws Exception {
          new JavaTestKit(getSystem()) {{
 -            final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
 -                    Collections.EMPTY_MAP, new DatastoreContext()));
 +            final ActorRef shard = createShard();
              final Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
-                     testSchemaContext, datastoreContext,SHARD_IDENTIFIER.toString());
+                     testSchemaContext, datastoreContext, shardStats);
              final ActorRef subject = getSystem().actorOf(props, "testDataExistsPositive");
  
              new Within(duration("1 seconds")) {
      @Test
      public void testOnReceiveDataExistsNegative() throws Exception {
          new JavaTestKit(getSystem()) {{
 -            final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
 -                    Collections.EMPTY_MAP, new DatastoreContext()));
 +            final ActorRef shard = createShard();
              final Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
-                     testSchemaContext, datastoreContext,SHARD_IDENTIFIER.toString());
+                     testSchemaContext, datastoreContext, shardStats);
              final ActorRef subject = getSystem().actorOf(props, "testDataExistsNegative");
  
              new Within(duration("1 seconds")) {
      @Test
      public void testOnReceiveWriteData() throws Exception {
          new JavaTestKit(getSystem()) {{
 -            final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
 -                    Collections.EMPTY_MAP, new DatastoreContext()));
 +            final ActorRef shard = createShard();
              final Props props = ShardTransaction.props(store.newWriteOnlyTransaction(), shard,
-                     testSchemaContext, datastoreContext,SHARD_IDENTIFIER.toString());
+                     testSchemaContext, datastoreContext, shardStats);
              final ActorRef subject =
                  getSystem().actorOf(props, "testWriteData");
  
      @Test
      public void testOnReceiveMergeData() throws Exception {
          new JavaTestKit(getSystem()) {{
 -            final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
 -                    Collections.EMPTY_MAP, new DatastoreContext()));
 +            final ActorRef shard = createShard();
              final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
-                     testSchemaContext, datastoreContext,SHARD_IDENTIFIER.toString());
+                     testSchemaContext, datastoreContext, shardStats);
              final ActorRef subject =
                  getSystem().actorOf(props, "testMergeData");
  
      @Test
      public void testOnReceiveDeleteData() throws Exception {
          new JavaTestKit(getSystem()) {{
 -            final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
 -                    Collections.EMPTY_MAP, new DatastoreContext()));
 +            final ActorRef shard = createShard();
              final Props props = ShardTransaction.props( store.newWriteOnlyTransaction(), shard,
-                     testSchemaContext, datastoreContext,SHARD_IDENTIFIER.toString());
+                     testSchemaContext, datastoreContext, shardStats);
              final ActorRef subject =
                  getSystem().actorOf(props, "testDeleteData");
  
      @Test
      public void testOnReceiveReadyTransaction() throws Exception {
          new JavaTestKit(getSystem()) {{
 -            final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
 -                    Collections.EMPTY_MAP, new DatastoreContext()));
 +            final ActorRef shard = createShard();
              final Props props = ShardTransaction.props( store.newReadWriteTransaction(), shard,
-                     testSchemaContext, datastoreContext,SHARD_IDENTIFIER.toString());
+                     testSchemaContext, datastoreContext, shardStats);
              final ActorRef subject =
                  getSystem().actorOf(props, "testReadyTransaction");
  
      @Test
      public void testOnReceiveCloseTransaction() throws Exception {
          new JavaTestKit(getSystem()) {{
 -            final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
 -                    Collections.EMPTY_MAP, new DatastoreContext()));
 +            final ActorRef shard = createShard();
              final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
-                     testSchemaContext, datastoreContext,SHARD_IDENTIFIER.toString());
+                     testSchemaContext, datastoreContext, shardStats);
              final ActorRef subject =
                  getSystem().actorOf(props, "testCloseTransaction");
  
  
      @Test(expected=UnknownMessageException.class)
      public void testNegativePerformingWriteOperationOnReadTransaction() throws Exception {
 -        final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
 -                Collections.EMPTY_MAP, new DatastoreContext()));
 +        final ActorRef shard = createShard();
          final Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
-                 testSchemaContext, datastoreContext,SHARD_IDENTIFIER.toString());
+                 testSchemaContext, datastoreContext, shardStats);
          final TestActorRef subject = TestActorRef.apply(props,getSystem());
  
          subject.receive(new DeleteData(TestModel.TEST_PATH).toSerializable(), ActorRef.noSender());
      @Test
      public void testShardTransactionInactivity() {
  
-         datastoreContext = new DatastoreContext(InMemoryDOMDataStoreConfigProperties.getDefault(),
-                 Duration.create(500, TimeUnit.MILLISECONDS));
+         datastoreContext = new DatastoreContext("Test",
+                 InMemoryDOMDataStoreConfigProperties.getDefault(),
+                 Duration.create(500, TimeUnit.MILLISECONDS), 5);
  
          new JavaTestKit(getSystem()) {{
 -            final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
 -                    Collections.EMPTY_MAP, new DatastoreContext()));
 +            final ActorRef shard = createShard();
              final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
-                     testSchemaContext, datastoreContext,SHARD_IDENTIFIER.toString());
+                     testSchemaContext, datastoreContext, shardStats);
              final ActorRef subject =
                  getSystem().actorOf(props, "testShardTransactionInactivity");