Bug 1446: Add JMX stats for clustered data store 42/10342/5
authortpantelis <tpanteli@brocade.com>
Wed, 27 Aug 2014 05:24:26 +0000 (01:24 -0400)
committertpantelis <tpanteli@brocade.com>
Thu, 28 Aug 2014 01:52:04 +0000 (21:52 -0400)
  Added stats, available via JMX, to track data for the various
  thread pool executors used by the clustered data store.

Change-Id: Ifb8996075253e87b7fd4e7cb6cf876c3af80ff2a
Signed-off-by: tpantelis <tpanteli@brocade.com>
28 files changed:
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContext.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreFactory.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreProperties.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardReadTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardReadWriteTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransactionChain.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardWriteTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohort.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/jmx/mbeans/AbstractBaseMBean.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/jmx/mbeans/shard/ShardMBeanFactory.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/jmx/mbeans/shard/ShardStats.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/jmx/mbeans/shard/ShardStatsMBean.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/jmx/mbeans/shard/ShardStatsMXBean.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/jmx/mbeans/shardmanager/ShardManagerInfo.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/distributed_datastore_provider/DistributedConfigDataStoreProviderModule.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/distributed_datastore_provider/DistributedOperationalDataStoreProviderModule.java
opendaylight/md-sal/sal-distributed-datastore/src/main/yang/distributed-datastore-provider.yang
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionChainTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionFailureTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortFailureTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/jmx/mbeans/shard/ShardStatsTest.java

index af8a987..1021dde 100644 (file)
@@ -9,13 +9,15 @@
 package org.opendaylight.controller.cluster.datastore;
 
 import com.google.common.base.Preconditions;
+
 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreConfigProperties;
+
 import scala.concurrent.duration.Duration;
 
 import java.util.concurrent.TimeUnit;
 
 /**
- * Contains contextual data for shards.
+ * Contains contextual data for a data store.
  *
  * @author Thomas Pantelis
  */
@@ -23,16 +25,24 @@ public class DatastoreContext {
 
     private final InMemoryDOMDataStoreConfigProperties dataStoreProperties;
     private final Duration shardTransactionIdleTimeout;
+    private final int operationTimeoutInSeconds;
+    private final String dataStoreMXBeanType;
 
     public DatastoreContext() {
         this.dataStoreProperties = null;
+        this.dataStoreMXBeanType = "DistributedDatastore";
         this.shardTransactionIdleTimeout = Duration.create(10, TimeUnit.MINUTES);
+        this.operationTimeoutInSeconds = 5;
     }
 
-    public DatastoreContext(InMemoryDOMDataStoreConfigProperties dataStoreProperties,
-        Duration shardTransactionIdleTimeout) {
+    public DatastoreContext(String dataStoreMXBeanType,
+            InMemoryDOMDataStoreConfigProperties dataStoreProperties,
+            Duration shardTransactionIdleTimeout,
+            int operationTimeoutInSeconds) {
+        this.dataStoreMXBeanType = dataStoreMXBeanType;
         this.dataStoreProperties = Preconditions.checkNotNull(dataStoreProperties);
-        this.shardTransactionIdleTimeout = Preconditions.checkNotNull(shardTransactionIdleTimeout);
+        this.shardTransactionIdleTimeout = shardTransactionIdleTimeout;
+        this.operationTimeoutInSeconds = operationTimeoutInSeconds;
     }
 
     public InMemoryDOMDataStoreConfigProperties getDataStoreProperties() {
@@ -43,5 +53,11 @@ public class DatastoreContext {
         return shardTransactionIdleTimeout;
     }
 
+    public String getDataStoreMXBeanType() {
+        return dataStoreMXBeanType;
+    }
 
+    public int getOperationTimeoutInSeconds() {
+        return operationTimeoutInSeconds;
+    }
 }
index 0a137e0..db01d51 100644 (file)
@@ -8,8 +8,6 @@
 
 package org.opendaylight.controller.cluster.datastore;
 
-import java.util.concurrent.TimeUnit;
-
 import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
 
@@ -22,7 +20,6 @@ import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategy
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
-import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreConfigProperties;
 import org.opendaylight.controller.sal.core.spi.data.DOMStore;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
@@ -36,8 +33,6 @@ import org.opendaylight.yangtools.yang.model.api.SchemaContextListener;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import scala.concurrent.duration.Duration;
-
 /**
  *
  */
@@ -46,42 +41,30 @@ public class DistributedDataStore implements DOMStore, SchemaContextListener, Au
     private static final Logger LOG = LoggerFactory.getLogger(DistributedDataStore.class);
 
     private final ActorContext actorContext;
-    private final DatastoreContext datastoreContext;
 
     public DistributedDataStore(ActorSystem actorSystem, String type, ClusterWrapper cluster,
-            Configuration configuration, DistributedDataStoreProperties dataStoreProperties) {
+            Configuration configuration, DatastoreContext datastoreContext) {
         Preconditions.checkNotNull(actorSystem, "actorSystem should not be null");
         Preconditions.checkNotNull(type, "type should not be null");
         Preconditions.checkNotNull(cluster, "cluster should not be null");
         Preconditions.checkNotNull(configuration, "configuration should not be null");
-
+        Preconditions.checkNotNull(datastoreContext, "datastoreContext should not be null");
 
         String shardManagerId = ShardManagerIdentifier.builder().type(type).build().toString();
 
         LOG.info("Creating ShardManager : {}", shardManagerId);
 
-        datastoreContext = new DatastoreContext(InMemoryDOMDataStoreConfigProperties.create(
-                dataStoreProperties.getMaxShardDataChangeExecutorPoolSize(),
-                dataStoreProperties.getMaxShardDataChangeExecutorQueueSize(),
-                dataStoreProperties.getMaxShardDataChangeListenerQueueSize()),
-                Duration.create(dataStoreProperties.getShardTransactionIdleTimeoutInMinutes(),
-                        TimeUnit.MINUTES));
+        actorContext = new ActorContext(actorSystem, actorSystem.actorOf(
+                ShardManager.props(type, cluster, configuration, datastoreContext)
+                    .withMailbox(ActorContext.MAILBOX), shardManagerId ), cluster, configuration);
 
-        actorContext
-                = new ActorContext(
-                    actorSystem, actorSystem.actorOf(
-                        ShardManager.props(type, cluster, configuration, datastoreContext).
-                            withMailbox(ActorContext.MAILBOX), shardManagerId ), cluster, configuration);
-
-        actorContext.setOperationTimeout(dataStoreProperties.getOperationTimeoutInSeconds());
+        actorContext.setOperationTimeout(datastoreContext.getOperationTimeoutInSeconds());
     }
 
     public DistributedDataStore(ActorContext actorContext) {
         this.actorContext = Preconditions.checkNotNull(actorContext, "actorContext should not be null");
-        this.datastoreContext = new DatastoreContext();
     }
 
-
     @SuppressWarnings("unchecked")
     @Override
     public <L extends AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>
index 72b593f..8739ed1 100644 (file)
@@ -16,13 +16,13 @@ import org.osgi.framework.BundleContext;
 
 public class DistributedDataStoreFactory {
     public static DistributedDataStore createInstance(String name, SchemaService schemaService,
-            DistributedDataStoreProperties dataStoreProperties, BundleContext bundleContext) {
+            DatastoreContext datastoreContext, BundleContext bundleContext) {
 
         ActorSystem actorSystem = ActorSystemFactory.createInstance(bundleContext);
         Configuration config = new ConfigurationImpl("module-shards.conf", "modules.conf");
         final DistributedDataStore dataStore =
             new DistributedDataStore(actorSystem, name, new ClusterWrapperImpl(actorSystem),
-                    config, dataStoreProperties );
+                    config, datastoreContext );
         ShardStrategyFactory.setConfiguration(config);
         schemaService.registerSchemaContextListener(dataStore);
         return dataStore;
index df3245f..e69de29 100644 (file)
@@ -1,60 +0,0 @@
-/*
- * Copyright (c) 2014 Brocade Communications Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.cluster.datastore;
-
-/**
- * Wrapper class for DistributedDataStore configuration properties.
- *
- * @author Thomas Pantelis
- */
-public class DistributedDataStoreProperties {
-    private final int maxShardDataChangeListenerQueueSize;
-    private final int maxShardDataChangeExecutorQueueSize;
-    private final int maxShardDataChangeExecutorPoolSize;
-    private final int shardTransactionIdleTimeoutInMinutes;
-    private final int operationTimeoutInSeconds;
-
-    public DistributedDataStoreProperties() {
-        maxShardDataChangeListenerQueueSize = 1000;
-        maxShardDataChangeExecutorQueueSize = 1000;
-        maxShardDataChangeExecutorPoolSize = 20;
-        shardTransactionIdleTimeoutInMinutes = 10;
-        operationTimeoutInSeconds = 5;
-    }
-
-    public DistributedDataStoreProperties(int maxShardDataChangeListenerQueueSize,
-            int maxShardDataChangeExecutorQueueSize, int maxShardDataChangeExecutorPoolSize,
-            int shardTransactionIdleTimeoutInMinutes, int operationTimeoutInSeconds) {
-        this.maxShardDataChangeListenerQueueSize = maxShardDataChangeListenerQueueSize;
-        this.maxShardDataChangeExecutorQueueSize = maxShardDataChangeExecutorQueueSize;
-        this.maxShardDataChangeExecutorPoolSize = maxShardDataChangeExecutorPoolSize;
-        this.shardTransactionIdleTimeoutInMinutes = shardTransactionIdleTimeoutInMinutes;
-        this.operationTimeoutInSeconds = operationTimeoutInSeconds;
-    }
-
-    public int getMaxShardDataChangeListenerQueueSize() {
-        return maxShardDataChangeListenerQueueSize;
-    }
-
-    public int getMaxShardDataChangeExecutorQueueSize() {
-        return maxShardDataChangeExecutorQueueSize;
-    }
-
-    public int getMaxShardDataChangeExecutorPoolSize() {
-        return maxShardDataChangeExecutorPoolSize;
-    }
-
-    public int getShardTransactionIdleTimeoutInMinutes() {
-        return shardTransactionIdleTimeoutInMinutes;
-    }
-
-    public int getOperationTimeoutInSeconds() {
-        return operationTimeoutInSeconds;
-    }
-}
index 6a6a181..a3393a4 100644 (file)
@@ -58,7 +58,6 @@ import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import scala.concurrent.duration.FiniteDuration;
 
 import java.util.ArrayList;
-import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -118,8 +117,10 @@ public class Shard extends RaftActor {
         store = InMemoryDOMDataStoreFactory.create(name.toString(), null,
                 datastoreContext.getDataStoreProperties());
 
-        shardMBean = ShardMBeanFactory.getShardStatsMBean(name.toString());
-
+        shardMBean = ShardMBeanFactory.getShardStatsMBean(name.toString(),
+                datastoreContext.getDataStoreMXBeanType());
+        shardMBean.setDataStoreExecutor(store.getDomStoreExecutor());
+        shardMBean.setNotificationManager(store.getDataChangeListenerNotificationManager());
 
     }
 
@@ -199,7 +200,7 @@ public class Shard extends RaftActor {
 
             return getContext().actorOf(
                 ShardTransaction.props(store.newReadOnlyTransaction(), getSelf(),
-                        schemaContext,datastoreContext, name.toString()), transactionId.toString());
+                        schemaContext,datastoreContext, shardMBean), transactionId.toString());
 
         } else if (createTransaction.getTransactionType()
             == TransactionProxy.TransactionType.READ_WRITE.ordinal()) {
@@ -208,7 +209,7 @@ public class Shard extends RaftActor {
 
             return getContext().actorOf(
                 ShardTransaction.props(store.newReadWriteTransaction(), getSelf(),
-                        schemaContext, datastoreContext,name.toString()), transactionId.toString());
+                        schemaContext, datastoreContext, shardMBean), transactionId.toString());
 
 
         } else if (createTransaction.getTransactionType()
@@ -218,7 +219,7 @@ public class Shard extends RaftActor {
 
             return getContext().actorOf(
                 ShardTransaction.props(store.newWriteOnlyTransaction(), getSelf(),
-                        schemaContext, datastoreContext, name.toString()), transactionId.toString());
+                        schemaContext, datastoreContext, shardMBean), transactionId.toString());
         } else {
             throw new IllegalArgumentException(
                 "Shard="+name + ":CreateTransaction message has unidentified transaction type="
@@ -281,7 +282,7 @@ public class Shard extends RaftActor {
             public void onSuccess(Void v) {
                 sender.tell(new CommitTransactionReply().toSerializable(), self);
                 shardMBean.incrementCommittedTransactionCount();
-                shardMBean.setLastCommittedTransactionTime(new Date());
+                shardMBean.setLastCommittedTransactionTime(System.currentTimeMillis());
             }
 
             @Override
@@ -358,7 +359,7 @@ public class Shard extends RaftActor {
     private void createTransactionChain() {
         DOMStoreTransactionChain chain = store.createTransactionChain();
         ActorRef transactionChain = getContext().actorOf(
-                ShardTransactionChain.props(chain, schemaContext, datastoreContext,name.toString() ));
+                ShardTransactionChain.props(chain, schemaContext, datastoreContext, shardMBean));
         getSender().tell(new CreateTransactionChainReply(transactionChain.path()).toSerializable(),
                 getSelf());
     }
index e51d49b..1e062f3 100644 (file)
@@ -252,9 +252,8 @@ public class ShardManager extends AbstractUntypedActor {
             localShards.put(shardName, new ShardInformation(shardName, actor, peerAddresses));
         }
 
-        mBean = ShardManagerInfo
-            .createShardManagerMBean("shard-manager-" + this.type, localShardActorNames);
-
+        mBean = ShardManagerInfo.createShardManagerMBean("shard-manager-" + this.type,
+                    datastoreContext.getDataStoreMXBeanType(), localShardActorNames);
     }
 
     /**
index 91d6294..0e9fd11 100644 (file)
@@ -12,6 +12,7 @@ package org.opendaylight.controller.cluster.datastore;
 
 import akka.actor.ActorRef;
 
+import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
@@ -26,8 +27,8 @@ public class ShardReadTransaction extends ShardTransaction {
     private final DOMStoreReadTransaction transaction;
 
     public ShardReadTransaction(DOMStoreReadTransaction transaction, ActorRef shardActor,
-            SchemaContext schemaContext,String shardName) {
-        super(shardActor, schemaContext, shardName);
+            SchemaContext schemaContext, ShardStats shardStats) {
+        super(shardActor, schemaContext, shardStats);
         this.transaction = transaction;
     }
 
index bd71c27..d04ec23 100644 (file)
@@ -12,6 +12,7 @@ package org.opendaylight.controller.cluster.datastore;
 
 import akka.actor.ActorRef;
 
+import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
 import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
 import org.opendaylight.controller.cluster.datastore.messages.MergeData;
@@ -30,8 +31,8 @@ public class ShardReadWriteTransaction extends ShardTransaction {
     private final DOMStoreReadWriteTransaction transaction;
 
     public ShardReadWriteTransaction(DOMStoreReadWriteTransaction transaction, ActorRef shardActor,
-            SchemaContext schemaContext,String shardName) {
-        super(shardActor, schemaContext, shardName);
+            SchemaContext schemaContext, ShardStats shardStats) {
+        super(shardActor, schemaContext, shardStats);
         this.transaction = transaction;
     }
 
index 3b0e093..65f865b 100644 (file)
@@ -13,10 +13,12 @@ import akka.actor.PoisonPill;
 import akka.actor.Props;
 import akka.actor.ReceiveTimeout;
 import akka.japi.Creator;
+
 import com.google.common.base.Optional;
 import com.google.common.util.concurrent.CheckedFuture;
+
 import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException;
-import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardMBeanFactory;
+import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
@@ -73,22 +75,21 @@ public abstract class ShardTransaction extends AbstractUntypedActor {
 
     private final ActorRef shardActor;
     protected final SchemaContext schemaContext;
-    private final String  shardName;
-
+    private final ShardStats shardStats;
 
     private final MutableCompositeModification modification = new MutableCompositeModification();
 
     protected ShardTransaction(ActorRef shardActor, SchemaContext schemaContext,
-        String shardName) {
+            ShardStats shardStats) {
         this.shardActor = shardActor;
         this.schemaContext = schemaContext;
-        this.shardName = shardName;
+        this.shardStats = shardStats;
     }
 
     public static Props props(DOMStoreTransaction transaction, ActorRef shardActor,
-            SchemaContext schemaContext,DatastoreContext datastoreContext, String shardName) {
+            SchemaContext schemaContext,DatastoreContext datastoreContext, ShardStats shardStats) {
         return Props.create(new ShardTransactionCreator(transaction, shardActor, schemaContext,
-           datastoreContext, shardName));
+           datastoreContext, shardStats));
     }
 
     protected abstract DOMStoreTransaction getDOMStoreTransaction();
@@ -137,7 +138,7 @@ public abstract class ShardTransaction extends AbstractUntypedActor {
                         sender.tell(new ReadDataReply(schemaContext,null).toSerializable(), self);
                     }
                 } catch (Exception e) {
-                    ShardMBeanFactory.getShardStatsMBean(shardName).incrementFailedReadTransactionsCount();
+                    shardStats.incrementFailedReadTransactionsCount();
                     sender.tell(new akka.actor.Status.Failure(e), self);
                 }
 
@@ -196,7 +197,7 @@ public abstract class ShardTransaction extends AbstractUntypedActor {
     protected void readyTransaction(DOMStoreWriteTransaction transaction, ReadyTransaction message) {
         DOMStoreThreePhaseCommitCohort cohort =  transaction.ready();
         ActorRef cohortActor = getContext().actorOf(
-            ThreePhaseCommitCohort.props(cohort, shardActor, modification, shardName), "cohort");
+            ThreePhaseCommitCohort.props(cohort, shardActor, modification, shardStats), "cohort");
         getSender()
         .tell(new ReadyTransactionReply(cohortActor.path()).toSerializable(), getSelf());
 
@@ -210,13 +211,14 @@ public abstract class ShardTransaction extends AbstractUntypedActor {
         final ActorRef shardActor;
         final SchemaContext schemaContext;
         final DatastoreContext datastoreContext;
-        final String shardName;
+        final ShardStats shardStats;
 
         ShardTransactionCreator(DOMStoreTransaction transaction, ActorRef shardActor,
-                SchemaContext schemaContext, DatastoreContext datastoreContext, String shardName) {
+                SchemaContext schemaContext, DatastoreContext datastoreContext,
+                ShardStats shardStats) {
             this.transaction = transaction;
             this.shardActor = shardActor;
-            this.shardName = shardName;
+            this.shardStats = shardStats;
             this.schemaContext = schemaContext;
             this.datastoreContext = datastoreContext;
         }
@@ -226,13 +228,13 @@ public abstract class ShardTransaction extends AbstractUntypedActor {
             ShardTransaction tx;
             if(transaction instanceof DOMStoreReadWriteTransaction) {
                 tx = new ShardReadWriteTransaction((DOMStoreReadWriteTransaction)transaction,
-                        shardActor, schemaContext, shardName);
+                        shardActor, schemaContext, shardStats);
             } else if(transaction instanceof DOMStoreReadTransaction) {
                 tx = new ShardReadTransaction((DOMStoreReadTransaction)transaction, shardActor,
-                        schemaContext, shardName);
+                        schemaContext, shardStats);
             } else {
                 tx = new ShardWriteTransaction((DOMStoreWriteTransaction)transaction,
-                        shardActor, schemaContext, shardName);
+                        shardActor, schemaContext, shardStats);
             }
 
             tx.getContext().setReceiveTimeout(datastoreContext.getShardTransactionIdleTimeout());
index e7a1818..484bd54 100644 (file)
@@ -12,6 +12,7 @@ import akka.actor.ActorRef;
 import akka.actor.Props;
 import akka.japi.Creator;
 
+import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
 import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionChain;
 import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionChainReply;
 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
@@ -27,14 +28,14 @@ public class ShardTransactionChain extends AbstractUntypedActor {
     private final DOMStoreTransactionChain chain;
     private final DatastoreContext datastoreContext;
     private final SchemaContext schemaContext;
-    private final String shardName;
+    private final ShardStats shardStats;
 
     public ShardTransactionChain(DOMStoreTransactionChain chain, SchemaContext schemaContext,
-            DatastoreContext datastoreContext,String shardName) {
+            DatastoreContext datastoreContext, ShardStats shardStats) {
         this.chain = chain;
         this.datastoreContext = datastoreContext;
         this.schemaContext = schemaContext;
-        this.shardName = shardName;
+        this.shardStats = shardStats;
     }
 
     @Override
@@ -60,17 +61,17 @@ public class ShardTransactionChain extends AbstractUntypedActor {
                 TransactionProxy.TransactionType.READ_ONLY.ordinal()) {
             return getContext().actorOf(
                     ShardTransaction.props( chain.newReadOnlyTransaction(), getShardActor(),
-                            schemaContext, datastoreContext,shardName), transactionId);
+                            schemaContext, datastoreContext, shardStats), transactionId);
         } else if (createTransaction.getTransactionType() ==
                 TransactionProxy.TransactionType.READ_WRITE.ordinal()) {
             return getContext().actorOf(
                     ShardTransaction.props( chain.newReadWriteTransaction(), getShardActor(),
-                            schemaContext, datastoreContext,shardName), transactionId);
+                            schemaContext, datastoreContext, shardStats), transactionId);
         } else if (createTransaction.getTransactionType() ==
                 TransactionProxy.TransactionType.WRITE_ONLY.ordinal()) {
             return getContext().actorOf(
                     ShardTransaction.props( chain.newWriteOnlyTransaction(), getShardActor(),
-                            schemaContext, datastoreContext,shardName), transactionId);
+                            schemaContext, datastoreContext, shardStats), transactionId);
         } else {
             throw new IllegalArgumentException (
                     "CreateTransaction message has unidentified transaction type=" +
@@ -87,8 +88,9 @@ public class ShardTransactionChain extends AbstractUntypedActor {
     }
 
     public static Props props(DOMStoreTransactionChain chain, SchemaContext schemaContext,
-        DatastoreContext datastoreContext, String shardName) {
-        return Props.create(new ShardTransactionChainCreator(chain, schemaContext, datastoreContext, shardName));
+        DatastoreContext datastoreContext, ShardStats shardStats) {
+        return Props.create(new ShardTransactionChainCreator(chain, schemaContext,
+                datastoreContext, shardStats));
     }
 
     private static class ShardTransactionChainCreator implements Creator<ShardTransactionChain> {
@@ -97,20 +99,20 @@ public class ShardTransactionChain extends AbstractUntypedActor {
         final DOMStoreTransactionChain chain;
         final DatastoreContext datastoreContext;
         final SchemaContext schemaContext;
-        final String shardName;
+        final ShardStats shardStats;
 
 
         ShardTransactionChainCreator(DOMStoreTransactionChain chain, SchemaContext schemaContext,
-            DatastoreContext datastoreContext, String shardName) {
+            DatastoreContext datastoreContext, ShardStats shardStats) {
             this.chain = chain;
             this.datastoreContext = datastoreContext;
             this.schemaContext = schemaContext;
-            this.shardName = shardName;
+            this.shardStats = shardStats;
         }
 
         @Override
         public ShardTransactionChain create() throws Exception {
-            return new ShardTransactionChain(chain, schemaContext, datastoreContext,shardName);
+            return new ShardTransactionChain(chain, schemaContext, datastoreContext, shardStats);
         }
     }
 }
index 41c46c3..396b27a 100644 (file)
@@ -12,6 +12,7 @@ package org.opendaylight.controller.cluster.datastore;
 
 import akka.actor.ActorRef;
 
+import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
 import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
 import org.opendaylight.controller.cluster.datastore.messages.MergeData;
 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
@@ -28,8 +29,8 @@ public class ShardWriteTransaction extends ShardTransaction {
     private final DOMStoreWriteTransaction transaction;
 
     public ShardWriteTransaction(DOMStoreWriteTransaction transaction, ActorRef shardActor,
-            SchemaContext schemaContext,String shardName) {
-        super(shardActor, schemaContext, shardName);
+            SchemaContext schemaContext, ShardStats shardStats) {
+        super(shardActor, schemaContext, shardStats);
         this.transaction = transaction;
     }
 
index 5a6d0ec..2dce6a1 100644 (file)
@@ -19,7 +19,7 @@ import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 
-import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardMBeanFactory;
+import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
@@ -35,25 +35,25 @@ public class ThreePhaseCommitCohort extends AbstractUntypedActor {
     private final DOMStoreThreePhaseCommitCohort cohort;
     private final ActorRef shardActor;
     private final CompositeModification modification;
-    private final String shardName;
+    private final ShardStats shardStats;
 
     public ThreePhaseCommitCohort(DOMStoreThreePhaseCommitCohort cohort,
-        ActorRef shardActor, CompositeModification modification,String shardName) {
+        ActorRef shardActor, CompositeModification modification, ShardStats shardStats) {
 
         this.cohort = cohort;
         this.shardActor = shardActor;
         this.modification = modification;
-        this.shardName = shardName;
+        this.shardStats = shardStats;
     }
 
     private final LoggingAdapter log =
         Logging.getLogger(getContext().system(), this);
 
     public static Props props(final DOMStoreThreePhaseCommitCohort cohort,
-        final ActorRef shardActor, final CompositeModification modification,
-        String shardName) {
+            final ActorRef shardActor, final CompositeModification modification,
+            ShardStats shardStats) {
         return Props.create(new ThreePhaseCommitCohortCreator(cohort, shardActor, modification,
-           shardName));
+                shardStats));
     }
 
     @Override
@@ -83,7 +83,7 @@ public class ThreePhaseCommitCohort extends AbstractUntypedActor {
         Futures.addCallback(future, new FutureCallback<Void>() {
             @Override
             public void onSuccess(Void v) {
-                ShardMBeanFactory.getShardStatsMBean(shardName).incrementAbortTransactionsCount();
+                shardStats.incrementAbortTransactionsCount();
                 sender
                     .tell(new AbortTransactionReply().toSerializable(),
                     self);
@@ -154,19 +154,19 @@ public class ThreePhaseCommitCohort extends AbstractUntypedActor {
         final DOMStoreThreePhaseCommitCohort cohort;
         final ActorRef shardActor;
         final CompositeModification modification;
-        final String shardName;
+        final ShardStats shardStats;
 
         ThreePhaseCommitCohortCreator(DOMStoreThreePhaseCommitCohort cohort,
-            ActorRef shardActor, CompositeModification modification, String shardName) {
+            ActorRef shardActor, CompositeModification modification, ShardStats shardStats) {
             this.cohort = cohort;
             this.shardActor = shardActor;
             this.modification = modification;
-            this.shardName = shardName;
+            this.shardStats = shardStats;
         }
 
         @Override
         public ThreePhaseCommitCohort create() throws Exception {
-            return new ThreePhaseCommitCohort(cohort, shardActor, modification, shardName);
+            return new ThreePhaseCommitCohort(cohort, shardActor, modification, shardStats);
         }
     }
 }
index 3c46935..e69de29 100644 (file)
@@ -1,139 +0,0 @@
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-
-package org.opendaylight.controller.cluster.datastore.jmx.mbeans;
-
-
-import com.google.common.base.Preconditions;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.management.InstanceNotFoundException;
-import javax.management.MBeanRegistrationException;
-import javax.management.MBeanServer;
-import javax.management.MalformedObjectNameException;
-import javax.management.ObjectName;
-import java.lang.management.ManagementFactory;
-
-/**
- * All MBeans should extend this class that help in registering and
- * unregistering the MBeans.
- * @author Basheeruddin <syedbahm@cisco.com>
- */
-
-
-public abstract class AbstractBaseMBean {
-
-
-  public static String BASE_JMX_PREFIX = "org.opendaylight.controller:";
-  public static String JMX_TYPE_DISTRIBUTED_DATASTORE = "DistributedDatastore";
-  public static String JMX_CATEGORY_SHARD = "Shard";
-  public static String JMX_CATEGORY_SHARD_MANAGER = "ShardManager";
-
-  private static final Logger LOG = LoggerFactory
-      .getLogger(AbstractBaseMBean.class);
-
-  MBeanServer server = ManagementFactory.getPlatformMBeanServer();
-  /**
-   * gets the MBean ObjectName
-   *
-   * @return Object name of the MBean
-   * @throws MalformedObjectNameException - The bean name does not have the right format.
-   * @throws NullPointerException - The bean name is null
-   */
-  protected ObjectName getMBeanObjectName()
-      throws MalformedObjectNameException, NullPointerException {
-    String name = BASE_JMX_PREFIX + "type="+getMBeanType()+",Category="+
-                                   getMBeanCategory() + ",name="+
-                                   getMBeanName();
-
-
-    return new ObjectName(name);
-  }
-
-  public boolean registerMBean() {
-    boolean registered = false;
-    try {
-      // Object to identify MBean
-      final ObjectName mbeanName = this.getMBeanObjectName();
-
-      Preconditions.checkArgument(mbeanName != null,
-          "Object name of the MBean cannot be null");
-
-      LOG.debug("Register MBean {}", mbeanName);
-
-      // unregistered if already registered
-      if (server.isRegistered(mbeanName)) {
-
-        LOG.debug("MBean {} found to be already registered", mbeanName);
-
-        try {
-          unregisterMBean(mbeanName);
-        } catch (Exception e) {
-
-          LOG.warn("unregister mbean {} resulted in exception {} ", mbeanName,
-              e);
-        }
-      }
-      server.registerMBean(this, mbeanName);
-
-      LOG.debug("MBean {} registered successfully",
-          mbeanName.getCanonicalName());
-      registered = true;
-    } catch (Exception e) {
-
-      LOG.error("registration failed {}", e);
-
-    }
-    return registered;
-  }
-
-
-  public boolean unregisterMBean() {
-    boolean unregister = false;
-    try {
-      ObjectName mbeanName = this.getMBeanObjectName();
-      unregister = true;
-      unregisterMBean(mbeanName);
-    } catch (Exception e) {
-
-      LOG.error("Failed when unregistering MBean {}", e);
-    }
-    return unregister;
-  }
-
-  private void unregisterMBean(ObjectName mbeanName)
-      throws MBeanRegistrationException, InstanceNotFoundException {
-
-    server.unregisterMBean(mbeanName);
-
-  }
-
-
-  /**
-   * @return name of bean
-   */
-  protected abstract String getMBeanName();
-
-  /**
-   * @return type of the MBean
-   */
-  protected abstract String getMBeanType();
-
-
-  /**
-   * @return Category name of teh bean
-   */
-  protected abstract String getMBeanCategory();
-
-  //require for test cases
-  public MBeanServer getMBeanServer() {
-    return server;
-  }
-}
index 2a409c0..946e525 100644 (file)
@@ -7,28 +7,41 @@
  */
 package org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard;
 
-import java.util.HashMap;
-import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
 
 /**
  * @author Basheeruddin syedbahm@cisco.com
  *
  */
 public class ShardMBeanFactory {
-    private static Map<String, ShardStats> shardMBeans =
-        new HashMap<String, ShardStats>();
 
-    public static ShardStats getShardStatsMBean(String shardName) {
-        if (shardMBeans.containsKey(shardName)) {
-            return shardMBeans.get(shardName);
-        } else {
-            ShardStats shardStatsMBeanImpl = new ShardStats(shardName);
+    private static final Logger LOG = LoggerFactory.getLogger(ShardMBeanFactory.class);
 
-            if (shardStatsMBeanImpl.registerMBean()) {
-                shardMBeans.put(shardName, shardStatsMBeanImpl);
-            }
-            return shardStatsMBeanImpl;
+    private static Cache<String,ShardStats> shardMBeansCache =
+                                      CacheBuilder.newBuilder().weakValues().build();
+
+    public static ShardStats getShardStatsMBean(final String shardName, final String mxBeanType) {
+        final String finalMXBeanType = mxBeanType != null ? mxBeanType : "DistDataStore";
+        try {
+            return shardMBeansCache.get(shardName, new Callable<ShardStats>() {
+                @Override
+                public ShardStats call() throws Exception {
+                    ShardStats shardStatsMBeanImpl = new ShardStats(shardName, finalMXBeanType);
+                    shardStatsMBeanImpl.registerMBean();
+                    return shardStatsMBeanImpl;
+                }
+            });
+        } catch(ExecutionException e) {
+            LOG.error(String.format("Could not create MXBean for shard: %s", shardName), e);
+            // Just return an instance that isn't registered.
+            return new ShardStats(shardName, finalMXBeanType);
         }
     }
-
 }
index 22ad8e7..0a1964b 100644 (file)
 
 package org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard;
 
-import org.opendaylight.controller.cluster.datastore.jmx.mbeans.AbstractBaseMBean;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.opendaylight.controller.md.sal.common.util.jmx.AbstractMXBean;
+import org.opendaylight.controller.md.sal.common.util.jmx.QueuedNotificationManagerMXBeanImpl;
+import org.opendaylight.controller.md.sal.common.util.jmx.ThreadExecutorStats;
+import org.opendaylight.controller.md.sal.common.util.jmx.ThreadExecutorStatsMXBeanImpl;
+import org.opendaylight.yangtools.util.concurrent.ListenerNotificationQueueStats;
+import org.opendaylight.yangtools.util.concurrent.QueuedNotificationManager;
 
 import java.text.SimpleDateFormat;
 import java.util.Date;
 
 /**
+ * Maintains statistics for a shard.
+ *
  * @author  Basheeruddin syedbahm@cisco.com
  */
-public class ShardStats extends AbstractBaseMBean implements ShardStatsMBean {
+public class ShardStats extends AbstractMXBean implements ShardStatsMXBean {
+    public static String JMX_CATEGORY_SHARD = "Shards";
 
-    private final String shardName;
+    private final AtomicLong committedTransactionsCount = new AtomicLong();
 
-    private long committedTransactionsCount = 0L;
+    private final AtomicLong readOnlyTransactionCount = new AtomicLong();
 
-    private long readOnlyTransactionCount = 0L;
+    private final AtomicLong writeOnlyTransactionCount = new AtomicLong();
 
-    private long writeOnlyTransactionCount = 0L;
-
-    private long readWriteTransactionCount = 0L;
+    private final AtomicLong readWriteTransactionCount = new AtomicLong();
 
     private String leader;
 
     private String raftState;
 
-    private long lastLogTerm = -1L;
+    private volatile long lastLogTerm = -1L;
+
+    private volatile long lastLogIndex = -1L;
 
-    private long lastLogIndex = -1L;
+    private volatile long currentTerm = -1L;
 
-    private long currentTerm = -1L;
+    private volatile long commitIndex = -1L;
 
-    private long commitIndex = -1L;
+    private volatile long lastApplied = -1L;
 
-    private long lastApplied = -1L;
+    private volatile long lastCommittedTransactionTime;
 
-    private Date lastCommittedTransactionTime = new Date(0L);
+    private final AtomicLong failedTransactionsCount = new AtomicLong();
 
-    private long failedTransactionsCount = 0L;
+    private final AtomicLong failedReadTransactionsCount = new AtomicLong();
 
-    private long failedReadTransactionsCount = 0L;
+    private final AtomicLong abortTransactionsCount = new AtomicLong();
 
-    private long abortTransactionsCount = 0L;
+    private ThreadExecutorStatsMXBeanImpl notificationExecutorStatsBean;
 
-    private SimpleDateFormat sdf =
+    private ThreadExecutorStatsMXBeanImpl dataStoreExecutorStatsBean;
+
+    private QueuedNotificationManagerMXBeanImpl notificationManagerStatsBean;
+
+    private final SimpleDateFormat sdf =
         new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
 
-    ShardStats(String shardName) {
-        this.shardName = shardName;
+    public ShardStats(String shardName, String mxBeanType) {
+        super(shardName, mxBeanType, JMX_CATEGORY_SHARD);
+    }
+
+    public void setDataStoreExecutor(ExecutorService dsExecutor) {
+        this.dataStoreExecutorStatsBean = new ThreadExecutorStatsMXBeanImpl(dsExecutor,
+                "notification-executor", getMBeanType(), getMBeanCategory());
     }
 
+    public void setNotificationManager(QueuedNotificationManager<?, ?> manager) {
+        this.notificationManagerStatsBean = new QueuedNotificationManagerMXBeanImpl(manager,
+                "notification-manager", getMBeanType(), getMBeanCategory());
+
+        this.notificationExecutorStatsBean = new ThreadExecutorStatsMXBeanImpl(manager.getExecutor(),
+                "data-store-executor", getMBeanType(), getMBeanCategory());
+    }
 
     @Override
     public String getShardName() {
-        return shardName;
+        return getMBeanName();
     }
 
     @Override
     public long getCommittedTransactionsCount() {
-        return committedTransactionsCount;
+        return committedTransactionsCount.get();
     }
 
-    @Override public String getLeader() {
+    @Override
+    public String getLeader() {
         return leader;
     }
 
-    @Override public String getRaftState() {
+    @Override
+    public String getRaftState() {
         return raftState;
     }
 
-    @Override public long getReadOnlyTransactionCount() {
-        return readOnlyTransactionCount;
+    @Override
+    public long getReadOnlyTransactionCount() {
+        return readOnlyTransactionCount.get();
     }
 
-    @Override public long getWriteOnlyTransactionCount() {
-        return writeOnlyTransactionCount;
+    @Override
+    public long getWriteOnlyTransactionCount() {
+        return writeOnlyTransactionCount.get();
     }
 
-    @Override public long getReadWriteTransactionCount() {
-        return readWriteTransactionCount;
+    @Override
+    public long getReadWriteTransactionCount() {
+        return readWriteTransactionCount.get();
     }
 
-    @Override public long getLastLogIndex() {
+    @Override
+    public long getLastLogIndex() {
         return lastLogIndex;
     }
 
-    @Override public long getLastLogTerm() {
+    @Override
+    public long getLastLogTerm() {
         return lastLogTerm;
     }
 
-    @Override public long getCurrentTerm() {
+    @Override
+    public long getCurrentTerm() {
         return currentTerm;
     }
 
-    @Override public long getCommitIndex() {
+    @Override
+    public long getCommitIndex() {
         return commitIndex;
     }
 
-    @Override public long getLastApplied() {
+    @Override
+    public long getLastApplied() {
         return lastApplied;
     }
 
     @Override
     public String getLastCommittedTransactionTime() {
 
-        return sdf.format(lastCommittedTransactionTime);
+        return sdf.format(new Date(lastCommittedTransactionTime));
     }
 
-    @Override public long getFailedTransactionsCount() {
-        return failedTransactionsCount;
+    @Override
+    public long getFailedTransactionsCount() {
+        return failedTransactionsCount.get();
     }
 
-    @Override public long getFailedReadTransactionsCount() {
-        return failedReadTransactionsCount;
+    @Override
+    public long getFailedReadTransactionsCount() {
+        return failedReadTransactionsCount.get();
     }
 
-    @Override public long getAbortTransactionsCount() {
-        return abortTransactionsCount;
+    @Override
+    public long getAbortTransactionsCount() {
+        return abortTransactionsCount.get();
     }
 
     public long incrementCommittedTransactionCount() {
-        return committedTransactionsCount++;
+        return committedTransactionsCount.incrementAndGet();
     }
 
     public long incrementReadOnlyTransactionCount() {
-        return readOnlyTransactionCount++;
+        return readOnlyTransactionCount.incrementAndGet();
     }
 
     public long incrementWriteOnlyTransactionCount() {
-        return writeOnlyTransactionCount++;
+        return writeOnlyTransactionCount.incrementAndGet();
     }
 
     public long incrementReadWriteTransactionCount() {
-        return readWriteTransactionCount++;
+        return readWriteTransactionCount.incrementAndGet();
     }
 
     public long incrementFailedTransactionsCount() {
-        return failedTransactionsCount++;
+        return failedTransactionsCount.incrementAndGet();
     }
 
     public long incrementFailedReadTransactionsCount() {
-        return failedReadTransactionsCount++;
+        return failedReadTransactionsCount.incrementAndGet();
     }
 
-    public long incrementAbortTransactionsCount () { return abortTransactionsCount++;}
+    public long incrementAbortTransactionsCount ()
+    {
+        return abortTransactionsCount.incrementAndGet();
+    }
 
     public void setLeader(String leader) {
         this.leader = leader;
@@ -180,49 +224,50 @@ public class ShardStats extends AbstractBaseMBean implements ShardStatsMBean {
         this.lastApplied = lastApplied;
     }
 
-
-    public void setLastCommittedTransactionTime(
-        Date lastCommittedTransactionTime) {
+    public void setLastCommittedTransactionTime(long lastCommittedTransactionTime) {
         this.lastCommittedTransactionTime = lastCommittedTransactionTime;
     }
 
     @Override
-    protected String getMBeanName() {
-        return shardName;
+    public ThreadExecutorStats getDataStoreExecutorStats() {
+        return dataStoreExecutorStatsBean.toThreadExecutorStats();
+    }
+
+    @Override
+    public ThreadExecutorStats getNotificationMgrExecutorStats() {
+        return notificationExecutorStatsBean.toThreadExecutorStats();
     }
 
     @Override
-    protected String getMBeanType() {
-        return JMX_TYPE_DISTRIBUTED_DATASTORE;
+    public List<ListenerNotificationQueueStats> getCurrentNotificationMgrListenerQueueStats() {
+        return notificationManagerStatsBean.getCurrentListenerQueueStats();
     }
 
     @Override
-    protected String getMBeanCategory() {
-        return JMX_CATEGORY_SHARD;
+    public int getMaxNotificationMgrListenerQueueSize() {
+        return notificationManagerStatsBean.getMaxListenerQueueSize();
     }
 
     /**
      * resets the counters related to transactions
      */
-
+    @Override
     public void resetTransactionCounters(){
-        committedTransactionsCount = 0L;
+        committedTransactionsCount.set(0);
 
-        readOnlyTransactionCount = 0L;
+        readOnlyTransactionCount.set(0);
 
-        writeOnlyTransactionCount = 0L;
+        writeOnlyTransactionCount.set(0);
 
-        readWriteTransactionCount = 0L;
+        readWriteTransactionCount.set(0);
 
-        lastCommittedTransactionTime = new Date(0L);
+        lastCommittedTransactionTime = 0;
 
-        failedTransactionsCount = 0L;
+        failedTransactionsCount.set(0);
 
-        failedReadTransactionsCount = 0L;
+        failedReadTransactionsCount.set(0);
 
-        abortTransactionsCount = 0L;
+        abortTransactionsCount.set(0);
 
     }
-
-
 }
index c16f842..e69de29 100644 (file)
@@ -1,41 +0,0 @@
-package org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard;
-
-/**
- * @author: syedbahm
- */
-public interface ShardStatsMBean {
-    String getShardName();
-
-    long getCommittedTransactionsCount();
-
-    String getLeader();
-
-    String getRaftState();
-
-    long getReadOnlyTransactionCount();
-
-    long getWriteOnlyTransactionCount();
-
-    long getReadWriteTransactionCount();
-
-    long getLastLogIndex();
-
-    long getLastLogTerm();
-
-    long getCurrentTerm();
-
-    long getCommitIndex();
-
-    long getLastApplied();
-
-    String getLastCommittedTransactionTime();
-
-    long getFailedTransactionsCount();
-
-    long getFailedReadTransactionsCount();
-
-    long getAbortTransactionsCount();
-
-    void resetTransactionCounters();
-
-}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/jmx/mbeans/shard/ShardStatsMXBean.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/jmx/mbeans/shard/ShardStatsMXBean.java
new file mode 100644 (file)
index 0000000..8deb0ae
--- /dev/null
@@ -0,0 +1,54 @@
+package org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard;
+
+import java.util.List;
+
+import org.opendaylight.controller.md.sal.common.util.jmx.ThreadExecutorStats;
+import org.opendaylight.yangtools.util.concurrent.ListenerNotificationQueueStats;
+
+/**
+ * @author: syedbahm
+ */
+public interface ShardStatsMXBean {
+
+   String getShardName();
+
+   long getCommittedTransactionsCount();
+
+   long getReadOnlyTransactionCount();
+
+   long getWriteOnlyTransactionCount();
+
+   long getReadWriteTransactionCount();
+
+   long getLastLogIndex();
+
+   long getLastLogTerm();
+
+   long getCurrentTerm();
+
+   long getCommitIndex();
+
+   long getLastApplied();
+
+   String getLastCommittedTransactionTime();
+
+   long getFailedTransactionsCount();
+
+   long getAbortTransactionsCount();
+
+   long getFailedReadTransactionsCount();
+
+   String getLeader();
+
+   String getRaftState();
+
+   ThreadExecutorStats getDataStoreExecutorStats();
+
+   ThreadExecutorStats getNotificationMgrExecutorStats();
+
+   List<ListenerNotificationQueueStats> getCurrentNotificationMgrListenerQueueStats();
+
+   int getMaxNotificationMgrListenerQueueSize();
+
+   void resetTransactionCounters();
+}
index 0c609b4..99c8daf 100644 (file)
@@ -8,44 +8,32 @@
 
 package org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager;
 
-import org.opendaylight.controller.cluster.datastore.jmx.mbeans.AbstractBaseMBean;
-
 import java.util.List;
 
-public class ShardManagerInfo extends AbstractBaseMBean implements
-    ShardManagerInfoMBean {
-
-    private final String name;
-    private final List<String> localShards;
-
-    public ShardManagerInfo(String name, List<String> localShards) {
-        this.name = name;
-        this.localShards = localShards;
-    }
+import org.opendaylight.controller.md.sal.common.util.jmx.AbstractMXBean;
 
+public class ShardManagerInfo extends AbstractMXBean implements ShardManagerInfoMBean {
 
-    @Override protected String getMBeanName() {
-        return name;
-    }
+    public static String JMX_CATEGORY_SHARD_MANAGER = "ShardManager";
 
-    @Override protected String getMBeanType() {
-        return JMX_TYPE_DISTRIBUTED_DATASTORE;
-    }
+    private final List<String> localShards;
 
-    @Override protected String getMBeanCategory() {
-        return JMX_CATEGORY_SHARD_MANAGER;
+    public ShardManagerInfo(String name, String mxBeanType, List<String> localShards) {
+        super(name, mxBeanType, JMX_CATEGORY_SHARD_MANAGER);
+        this.localShards = localShards;
     }
 
-    public static ShardManagerInfo createShardManagerMBean(String name, List<String> localShards){
-        ShardManagerInfo shardManagerInfo = new ShardManagerInfo(name,
-            localShards);
+    public static ShardManagerInfo createShardManagerMBean(String name, String mxBeanType,
+            List<String> localShards){
+        ShardManagerInfo shardManagerInfo = new ShardManagerInfo(name, mxBeanType, localShards);
 
         shardManagerInfo.registerMBean();
 
         return shardManagerInfo;
     }
 
-    @Override public List<String> getLocalShards() {
+    @Override
+    public List<String> getLocalShards() {
         return localShards;
     }
 }
index e2fbacb..e7a7aab 100644 (file)
@@ -1,9 +1,14 @@
 package org.opendaylight.controller.config.yang.config.distributed_datastore_provider;
 
+import java.util.concurrent.TimeUnit;
+
+import org.opendaylight.controller.cluster.datastore.DatastoreContext;
 import org.opendaylight.controller.cluster.datastore.DistributedDataStoreFactory;
-import org.opendaylight.controller.cluster.datastore.DistributedDataStoreProperties;
+import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreConfigProperties;
 import org.osgi.framework.BundleContext;
 
+import scala.concurrent.duration.Duration;
+
 public class DistributedConfigDataStoreProviderModule extends
     org.opendaylight.controller.config.yang.config.distributed_datastore_provider.AbstractDistributedConfigDataStoreProviderModule {
     private BundleContext bundleContext;
@@ -35,13 +40,18 @@ public class DistributedConfigDataStoreProviderModule extends
             props = new ConfigProperties();
         }
 
-        return DistributedDataStoreFactory.createInstance("config", getConfigSchemaServiceDependency(),
-                new DistributedDataStoreProperties(
+        DatastoreContext datastoreContext = new DatastoreContext("DistributedConfigDatastore",
+                InMemoryDOMDataStoreConfigProperties.create(
                         props.getMaxShardDataChangeExecutorPoolSize().getValue(),
                         props.getMaxShardDataChangeExecutorQueueSize().getValue(),
                         props.getMaxShardDataChangeListenerQueueSize().getValue(),
-                        props.getShardTransactionIdleTimeoutInMinutes().getValue(),
-                        props.getOperationTimeoutInSeconds().getValue()), bundleContext);
+                        props.getMaxShardDataStoreExecutorQueueSize().getValue()),
+                Duration.create(props.getShardTransactionIdleTimeoutInMinutes().getValue(),
+                        TimeUnit.MINUTES),
+                props.getOperationTimeoutInSeconds().getValue());
+
+        return DistributedDataStoreFactory.createInstance("config", getConfigSchemaServiceDependency(),
+                datastoreContext, bundleContext);
     }
 
     public void setBundleContext(BundleContext bundleContext) {
index c185e87..814e6f6 100644 (file)
@@ -1,9 +1,14 @@
 package org.opendaylight.controller.config.yang.config.distributed_datastore_provider;
 
+import java.util.concurrent.TimeUnit;
+
+import org.opendaylight.controller.cluster.datastore.DatastoreContext;
 import org.opendaylight.controller.cluster.datastore.DistributedDataStoreFactory;
-import org.opendaylight.controller.cluster.datastore.DistributedDataStoreProperties;
+import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreConfigProperties;
 import org.osgi.framework.BundleContext;
 
+import scala.concurrent.duration.Duration;
+
 public class DistributedOperationalDataStoreProviderModule extends
     org.opendaylight.controller.config.yang.config.distributed_datastore_provider.AbstractDistributedOperationalDataStoreProviderModule {
     private BundleContext bundleContext;
@@ -35,14 +40,18 @@ public class DistributedOperationalDataStoreProviderModule extends
             props = new OperationalProperties();
         }
 
-        return DistributedDataStoreFactory.createInstance("operational",
-                getOperationalSchemaServiceDependency(),
-                new DistributedDataStoreProperties(
+        DatastoreContext datastoreContext = new DatastoreContext("DistributedOperationalDatastore",
+                InMemoryDOMDataStoreConfigProperties.create(
                         props.getMaxShardDataChangeExecutorPoolSize().getValue(),
                         props.getMaxShardDataChangeExecutorQueueSize().getValue(),
                         props.getMaxShardDataChangeListenerQueueSize().getValue(),
-                        props.getShardTransactionIdleTimeoutInMinutes().getValue(),
-                        props.getOperationTimeoutInSeconds().getValue()), bundleContext);
+                        props.getMaxShardDataStoreExecutorQueueSize().getValue()),
+                Duration.create(props.getShardTransactionIdleTimeoutInMinutes().getValue(),
+                        TimeUnit.MINUTES),
+                props.getOperationTimeoutInSeconds().getValue());
+
+        return DistributedDataStoreFactory.createInstance("operational",
+                getOperationalSchemaServiceDependency(), datastoreContext, bundleContext);
     }
 
     public void setBundleContext(BundleContext bundleContext) {
index d50be2c..82bc5e2 100644 (file)
@@ -66,7 +66,13 @@ module distributed-datastore-provider {
             type non-zero-uint16-type;
             description "The maximum queue size for each shard's data store data change listeners.";
          }
-         
+
+         leaf max-shard-data-store-executor-queue-size {
+            default 5000;
+            type non-zero-uint16-type;
+            description "The maximum queue size for each shard's data store executor.";
+         }
+            
          leaf shard-transaction-idle-timeout-in-minutes {
             default 10;
             type non-zero-uint16-type;
index 21aa00e..8a7b50d 100644 (file)
@@ -78,7 +78,7 @@ public class DistributedDataStoreIntegrationTest {
                             final DistributedDataStore distributedDataStore =
                                 new DistributedDataStore(getSystem(), "config",
                                         new MockClusterWrapper(), configuration,
-                                        new DistributedDataStoreProperties());
+                                        new DatastoreContext());
 
                             distributedDataStore.onGlobalContextUpdated(TestModel.createTestContext());
 
index cb473cb..aeb47de 100644 (file)
@@ -71,7 +71,7 @@ public class DistributedDataStoreTest extends AbstractActorTest{
 
         new DistributedDataStore(actorSystem, "config",
             mock(ClusterWrapper.class), mock(Configuration.class),
-            new DistributedDataStoreProperties());
+            new DatastoreContext());
 
         verify(actorSystem).actorOf(any(Props.class), eq("shardmanager-config"));
     }
index 71eb1f1..c5968c3 100644 (file)
@@ -9,6 +9,7 @@ import com.google.common.util.concurrent.MoreExecutors;
 
 import org.junit.BeforeClass;
 import org.junit.Test;
+import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
 import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionChain;
 import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionChainReply;
 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
@@ -32,6 +33,8 @@ public class ShardTransactionChainTest extends AbstractActorTest {
 
     private static final String mockShardName = "mockShardName";
 
+    private final ShardStats shardStats = new ShardStats(mockShardName, "DataStore");
+
     @BeforeClass
     public static void staticSetup() {
         store.onGlobalContextUpdated(testSchemaContext);
@@ -41,7 +44,7 @@ public class ShardTransactionChainTest extends AbstractActorTest {
     public void testOnReceiveCreateTransaction() throws Exception {
         new JavaTestKit(getSystem()) {{
             final Props props = ShardTransactionChain.props(store.createTransactionChain(),
-                    testSchemaContext, DATA_STORE_CONTEXT, mockShardName);
+                    testSchemaContext, DATA_STORE_CONTEXT, shardStats);
             final ActorRef subject = getSystem().actorOf(props, "testCreateTransaction");
 
             new Within(duration("1 seconds")) {
@@ -79,7 +82,7 @@ public class ShardTransactionChainTest extends AbstractActorTest {
     public void testOnReceiveCloseTransactionChain() throws Exception {
         new JavaTestKit(getSystem()) {{
             final Props props = ShardTransactionChain.props(store.createTransactionChain(),
-                    testSchemaContext, DATA_STORE_CONTEXT,mockShardName );
+                    testSchemaContext, DATA_STORE_CONTEXT, shardStats );
             final ActorRef subject = getSystem().actorOf(props, "testCloseTransactionChain");
 
             new Within(duration("1 seconds")) {
index 4fe60f6..eea7352 100644 (file)
@@ -20,6 +20,7 @@ import com.google.common.util.concurrent.MoreExecutors;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
+import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
 import org.opendaylight.controller.cluster.datastore.node.utils.serialization.NormalizedNodeSerializer;
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
@@ -59,6 +60,8 @@ public class ShardTransactionFailureTest extends AbstractActorTest {
 
     private final DatastoreContext datastoreContext = new DatastoreContext();
 
+    private final ShardStats shardStats = new ShardStats(SHARD_IDENTIFIER.toString(), "DataStore");
+
     @BeforeClass
     public static void staticSetup() {
         store.onGlobalContextUpdated(testSchemaContext);
@@ -71,7 +74,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest {
         final ActorRef shard =
             getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP, new DatastoreContext()));
         final Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
-                testSchemaContext, datastoreContext,SHARD_IDENTIFIER.toString());
+                testSchemaContext, datastoreContext, shardStats);
 
         final TestActorRef<ShardTransaction> subject = TestActorRef
             .create(getSystem(), props,
@@ -101,7 +104,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest {
         final ActorRef shard =
             getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP, new DatastoreContext()));
         final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
-                testSchemaContext, datastoreContext,SHARD_IDENTIFIER.toString());
+                testSchemaContext, datastoreContext, shardStats);
 
         final TestActorRef<ShardTransaction> subject = TestActorRef
             .create(getSystem(), props,
@@ -131,7 +134,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest {
         final ActorRef shard =
             getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP, new DatastoreContext()));
         final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
-                testSchemaContext, datastoreContext,SHARD_IDENTIFIER.toString());
+                testSchemaContext, datastoreContext, shardStats);
 
         final TestActorRef<ShardTransaction> subject = TestActorRef
             .create(getSystem(), props,
@@ -161,7 +164,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest {
         final ActorRef shard =
             getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP, new DatastoreContext()));
         final Props props = ShardTransaction.props(store.newWriteOnlyTransaction(), shard,
-                testSchemaContext, datastoreContext,SHARD_IDENTIFIER.toString());
+                testSchemaContext, datastoreContext, shardStats);
 
         final TestActorRef<ShardTransaction> subject = TestActorRef
             .create(getSystem(), props,
@@ -194,7 +197,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest {
         final ActorRef shard =
             getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP, new DatastoreContext()));
         final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
-                testSchemaContext, datastoreContext,SHARD_IDENTIFIER.toString());
+                testSchemaContext, datastoreContext, shardStats);
 
         final TestActorRef<ShardTransaction> subject = TestActorRef
             .create(getSystem(), props,
@@ -232,7 +235,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest {
         final ActorRef shard =
             getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP, new DatastoreContext()));
         final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
-                testSchemaContext, datastoreContext,SHARD_IDENTIFIER.toString());
+                testSchemaContext, datastoreContext, shardStats);
 
         final TestActorRef<ShardTransaction> subject = TestActorRef
             .create(getSystem(), props, "testNegativeMergeTransactionReady");
@@ -265,7 +268,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest {
         final ActorRef shard =
             getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP, new DatastoreContext()));
         final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
-                testSchemaContext, datastoreContext,SHARD_IDENTIFIER.toString());
+                testSchemaContext, datastoreContext, shardStats);
 
         final TestActorRef<ShardTransaction> subject = TestActorRef
             .create(getSystem(), props,
index ff2ee08..c779d7f 100644 (file)
@@ -13,6 +13,7 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
+import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
@@ -62,6 +63,8 @@ public class ShardTransactionTest extends AbstractActorTest {
 
     private DatastoreContext datastoreContext = new DatastoreContext();
 
+    private final ShardStats shardStats = new ShardStats(SHARD_IDENTIFIER.toString(), "DataStore");
+
     @BeforeClass
     public static void staticSetup() {
         store.onGlobalContextUpdated(testSchemaContext);
@@ -73,7 +76,7 @@ public class ShardTransactionTest extends AbstractActorTest {
             final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
                     Collections.EMPTY_MAP, new DatastoreContext()));
             final Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
-                    testSchemaContext, datastoreContext,SHARD_IDENTIFIER.toString());
+                    testSchemaContext, datastoreContext, shardStats);
             final ActorRef subject = getSystem().actorOf(props, "testReadData");
 
             new Within(duration("1 seconds")) {
@@ -116,7 +119,7 @@ public class ShardTransactionTest extends AbstractActorTest {
             final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
                     Collections.EMPTY_MAP, new DatastoreContext()));
             final Props props = ShardTransaction.props( store.newReadOnlyTransaction(), shard,
-                    testSchemaContext, datastoreContext,SHARD_IDENTIFIER.toString());
+                    testSchemaContext, datastoreContext, shardStats);
             final ActorRef subject = getSystem().actorOf(props, "testReadDataWhenDataNotFound");
 
             new Within(duration("1 seconds")) {
@@ -160,7 +163,7 @@ public class ShardTransactionTest extends AbstractActorTest {
             final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
                     Collections.EMPTY_MAP, new DatastoreContext()));
             final Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
-                    testSchemaContext, datastoreContext,SHARD_IDENTIFIER.toString());
+                    testSchemaContext, datastoreContext, shardStats);
             final ActorRef subject = getSystem().actorOf(props, "testDataExistsPositive");
 
             new Within(duration("1 seconds")) {
@@ -203,7 +206,7 @@ public class ShardTransactionTest extends AbstractActorTest {
             final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
                     Collections.EMPTY_MAP, new DatastoreContext()));
             final Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
-                    testSchemaContext, datastoreContext,SHARD_IDENTIFIER.toString());
+                    testSchemaContext, datastoreContext, shardStats);
             final ActorRef subject = getSystem().actorOf(props, "testDataExistsNegative");
 
             new Within(duration("1 seconds")) {
@@ -281,7 +284,7 @@ public class ShardTransactionTest extends AbstractActorTest {
             final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
                     Collections.EMPTY_MAP, new DatastoreContext()));
             final Props props = ShardTransaction.props(store.newWriteOnlyTransaction(), shard,
-                    testSchemaContext, datastoreContext,SHARD_IDENTIFIER.toString());
+                    testSchemaContext, datastoreContext, shardStats);
             final ActorRef subject =
                 getSystem().actorOf(props, "testWriteData");
 
@@ -322,7 +325,7 @@ public class ShardTransactionTest extends AbstractActorTest {
             final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
                     Collections.EMPTY_MAP, new DatastoreContext()));
             final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
-                    testSchemaContext, datastoreContext,SHARD_IDENTIFIER.toString());
+                    testSchemaContext, datastoreContext, shardStats);
             final ActorRef subject =
                 getSystem().actorOf(props, "testMergeData");
 
@@ -364,7 +367,7 @@ public class ShardTransactionTest extends AbstractActorTest {
             final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
                     Collections.EMPTY_MAP, new DatastoreContext()));
             final Props props = ShardTransaction.props( store.newWriteOnlyTransaction(), shard,
-                    testSchemaContext, datastoreContext,SHARD_IDENTIFIER.toString());
+                    testSchemaContext, datastoreContext, shardStats);
             final ActorRef subject =
                 getSystem().actorOf(props, "testDeleteData");
 
@@ -404,7 +407,7 @@ public class ShardTransactionTest extends AbstractActorTest {
             final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
                     Collections.EMPTY_MAP, new DatastoreContext()));
             final Props props = ShardTransaction.props( store.newReadWriteTransaction(), shard,
-                    testSchemaContext, datastoreContext,SHARD_IDENTIFIER.toString());
+                    testSchemaContext, datastoreContext, shardStats);
             final ActorRef subject =
                 getSystem().actorOf(props, "testReadyTransaction");
 
@@ -443,7 +446,7 @@ public class ShardTransactionTest extends AbstractActorTest {
             final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
                     Collections.EMPTY_MAP, new DatastoreContext()));
             final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
-                    testSchemaContext, datastoreContext,SHARD_IDENTIFIER.toString());
+                    testSchemaContext, datastoreContext, shardStats);
             final ActorRef subject =
                 getSystem().actorOf(props, "testCloseTransaction");
 
@@ -494,7 +497,7 @@ public class ShardTransactionTest extends AbstractActorTest {
         final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
                 Collections.EMPTY_MAP, new DatastoreContext()));
         final Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
-                testSchemaContext, datastoreContext,SHARD_IDENTIFIER.toString());
+                testSchemaContext, datastoreContext, shardStats);
         final TestActorRef subject = TestActorRef.apply(props,getSystem());
 
         subject.receive(new DeleteData(TestModel.TEST_PATH).toSerializable(), ActorRef.noSender());
@@ -503,14 +506,15 @@ public class ShardTransactionTest extends AbstractActorTest {
     @Test
     public void testShardTransactionInactivity() {
 
-        datastoreContext = new DatastoreContext(InMemoryDOMDataStoreConfigProperties.getDefault(),
-                Duration.create(500, TimeUnit.MILLISECONDS));
+        datastoreContext = new DatastoreContext("Test",
+                InMemoryDOMDataStoreConfigProperties.getDefault(),
+                Duration.create(500, TimeUnit.MILLISECONDS), 5);
 
         new JavaTestKit(getSystem()) {{
             final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
                     Collections.EMPTY_MAP, new DatastoreContext()));
             final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
-                    testSchemaContext, datastoreContext,SHARD_IDENTIFIER.toString());
+                    testSchemaContext, datastoreContext, shardStats);
             final ActorRef subject =
                 getSystem().actorOf(props, "testShardTransactionInactivity");
 
index e39b9ab..3cd0ad2 100644 (file)
@@ -23,6 +23,7 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 import org.mockito.Mockito;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
+import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
 import org.opendaylight.controller.cluster.datastore.messages.ForwardedCommitTransaction;
 import org.opendaylight.controller.cluster.datastore.modification.CompositeModification;
 import org.opendaylight.controller.cluster.datastore.modification.Modification;
@@ -66,6 +67,7 @@ public class ThreePhaseCommitCohortFailureTest extends AbstractActorTest {
 
     private final DatastoreContext datastoreContext = new DatastoreContext();
 
+    private final ShardStats shardStats = new ShardStats(SHARD_IDENTIFIER.toString(), "DataStore");
 
     @BeforeClass
     public static void staticSetup() {
@@ -85,7 +87,7 @@ public class ThreePhaseCommitCohortFailureTest extends AbstractActorTest {
         final CompositeModification mockComposite =
             Mockito.mock(CompositeModification.class);
         final Props props =
-            ThreePhaseCommitCohort.props(mockCohort, shard, mockComposite,SHARD_IDENTIFIER.toString());
+            ThreePhaseCommitCohort.props(mockCohort, shard, mockComposite, shardStats);
 
         final TestActorRef<ThreePhaseCommitCohort> subject = TestActorRef
             .create(getSystem(), props,
@@ -114,7 +116,7 @@ public class ThreePhaseCommitCohortFailureTest extends AbstractActorTest {
         final CompositeModification mockComposite =
             Mockito.mock(CompositeModification.class);
         final Props props =
-            ThreePhaseCommitCohort.props(mockCohort, shard, mockComposite,SHARD_IDENTIFIER.toString());
+            ThreePhaseCommitCohort.props(mockCohort, shard, mockComposite, shardStats);
 
         final TestActorRef<ThreePhaseCommitCohort> subject = TestActorRef
             .create(getSystem(), props,
@@ -146,7 +148,7 @@ public class ThreePhaseCommitCohortFailureTest extends AbstractActorTest {
         final CompositeModification mockComposite =
             Mockito.mock(CompositeModification.class);
         final Props props =
-            ThreePhaseCommitCohort.props(mockCohort, shard, mockComposite,SHARD_IDENTIFIER.toString());
+            ThreePhaseCommitCohort.props(mockCohort, shard, mockComposite, shardStats);
 
         final TestActorRef<ThreePhaseCommitCohort> subject = TestActorRef
             .create(getSystem(), props,
@@ -175,7 +177,7 @@ public class ThreePhaseCommitCohortFailureTest extends AbstractActorTest {
 
         final ActorRef shardTransaction =
             getSystem().actorOf(ShardTransaction.props(store.newReadWriteTransaction(), subject,
-                    testSchemaContext, datastoreContext,SHARD_IDENTIFIER.toString()));
+                    testSchemaContext, datastoreContext, shardStats));
 
         ShardTransactionMessages.WriteData writeData =
             ShardTransactionMessages.WriteData.newBuilder()
index c4d0b85..e9df3ec 100644 (file)
@@ -11,10 +11,12 @@ import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
-import org.opendaylight.controller.cluster.datastore.jmx.mbeans.AbstractBaseMBean;
+import org.opendaylight.controller.md.sal.common.util.jmx.AbstractMXBean;
 
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
+
+import java.lang.management.ManagementFactory;
 import java.text.SimpleDateFormat;
 import java.util.Date;
 
@@ -26,11 +28,11 @@ public class ShardStatsTest {
     @Before
     public void setUp() throws Exception {
 
-        shardStats = new ShardStats("shard-1");
+        shardStats = new ShardStats("shard-1", "DataStore");
         shardStats.registerMBean();
-        mbeanServer = shardStats.getMBeanServer();
+        mbeanServer = ManagementFactory.getPlatformMBeanServer();
         String objectName =
-            AbstractBaseMBean.BASE_JMX_PREFIX + "type=" + shardStats
+            AbstractMXBean.BASE_JMX_PREFIX + "type=" + shardStats
                 .getMBeanType() + ",Category=" +
                 shardStats.getMBeanCategory() + ",name=" +
                 shardStats.getMBeanName();
@@ -46,7 +48,7 @@ public class ShardStatsTest {
     public void testGetShardName() throws Exception {
 
         Object attribute = mbeanServer.getAttribute(testMBeanName, "ShardName");
-        Assert.assertEquals((String) attribute, "shard-1");
+        Assert.assertEquals(attribute, "shard-1");
 
     }
 
@@ -60,7 +62,7 @@ public class ShardStatsTest {
         //now let us get from MBeanServer what is the transaction count.
         Object attribute = mbeanServer.getAttribute(testMBeanName,
             "CommittedTransactionsCount");
-        Assert.assertEquals((Long) attribute, (Long) 3L);
+        Assert.assertEquals(attribute, 3L);
 
 
     }
@@ -71,13 +73,13 @@ public class ShardStatsTest {
         Assert.assertEquals(shardStats.getLastCommittedTransactionTime(),
             sdf.format(new Date(0L)));
         long millis = System.currentTimeMillis();
-        shardStats.setLastCommittedTransactionTime(new Date(millis));
+        shardStats.setLastCommittedTransactionTime(millis);
 
         //now let us get from MBeanServer what is the transaction count.
         Object attribute = mbeanServer.getAttribute(testMBeanName,
             "LastCommittedTransactionTime");
-        Assert.assertEquals((String) attribute, sdf.format(new Date(millis)));
-        Assert.assertNotEquals((String) attribute,
+        Assert.assertEquals(attribute, sdf.format(new Date(millis)));
+        Assert.assertNotEquals(attribute,
             sdf.format(new Date(millis - 1)));
 
     }
@@ -92,7 +94,7 @@ public class ShardStatsTest {
         //now let us get from MBeanServer what is the transaction count.
         Object attribute =
             mbeanServer.getAttribute(testMBeanName, "FailedTransactionsCount");
-        Assert.assertEquals((Long) attribute, (Long) 2L);
+        Assert.assertEquals(attribute, 2L);
     }
 
     @Test
@@ -105,7 +107,7 @@ public class ShardStatsTest {
         //now let us get from MBeanServer what is the transaction count.
         Object attribute =
             mbeanServer.getAttribute(testMBeanName, "AbortTransactionsCount");
-        Assert.assertEquals((Long) attribute, (Long) 2L);
+        Assert.assertEquals(attribute, 2L);
     }
 
     @Test
@@ -118,7 +120,7 @@ public class ShardStatsTest {
         //now let us get from MBeanServer what is the transaction count.
         Object attribute =
             mbeanServer.getAttribute(testMBeanName, "FailedReadTransactionsCount");
-        Assert.assertEquals((Long) attribute, (Long) 2L);
+        Assert.assertEquals(attribute, 2L);
     }
 
     @Test
@@ -132,7 +134,7 @@ public class ShardStatsTest {
         //now let us get from MBeanServer what is the transaction count.
         Object attribute = mbeanServer.getAttribute(testMBeanName,
             "CommittedTransactionsCount");
-        Assert.assertEquals((Long) attribute, (Long) 3L);
+        Assert.assertEquals(attribute, 3L);
 
         //let us increment FailedReadTransactions count and then check
         shardStats.incrementFailedReadTransactionsCount();
@@ -142,7 +144,7 @@ public class ShardStatsTest {
         //now let us get from MBeanServer what is the transaction count.
         attribute =
             mbeanServer.getAttribute(testMBeanName, "FailedReadTransactionsCount");
-        Assert.assertEquals((Long) attribute, (Long) 2L);
+        Assert.assertEquals(attribute, 2L);
 
 
         //here we will reset the counters and check the above ones are 0 after reset
@@ -151,11 +153,11 @@ public class ShardStatsTest {
         //now let us get from MBeanServer what is the transaction count.
         attribute = mbeanServer.getAttribute(testMBeanName,
             "CommittedTransactionsCount");
-        Assert.assertEquals((Long) attribute, (Long) 0L);
+        Assert.assertEquals(attribute, 0L);
 
         attribute =
             mbeanServer.getAttribute(testMBeanName, "FailedReadTransactionsCount");
-        Assert.assertEquals((Long) attribute, (Long) 0L);
+        Assert.assertEquals(attribute, 0L);
 
 
     }

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.