Merge "Bug 1446: Add JMX stats for clustered data store"
authorMoiz Raja <moraja@cisco.com>
Fri, 5 Sep 2014 02:42:24 +0000 (02:42 +0000)
committerGerrit Code Review <gerrit@opendaylight.org>
Fri, 5 Sep 2014 02:42:24 +0000 (02:42 +0000)
1  2 
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionFailureTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortFailureTest.java

index b40ccd08e8fe6fbeae52845604a2bb032dd4cc15,a3393a4c18e103486a79f2176474852870cd7259..7d570046d406feec976620f9215398475711a756
@@@ -10,22 -10,19 +10,22 @@@ package org.opendaylight.controller.clu
  
  import akka.actor.ActorRef;
  import akka.actor.ActorSelection;
 +import akka.actor.PoisonPill;
  import akka.actor.Props;
  import akka.event.Logging;
  import akka.event.LoggingAdapter;
  import akka.japi.Creator;
  import akka.persistence.RecoveryFailure;
  import akka.serialization.Serialization;
 -
 +import com.google.common.annotations.VisibleForTesting;
  import com.google.common.base.Optional;
  import com.google.common.base.Preconditions;
 +import com.google.common.util.concurrent.CheckedFuture;
  import com.google.common.util.concurrent.FutureCallback;
  import com.google.common.util.concurrent.Futures;
  import com.google.common.util.concurrent.ListenableFuture;
  import com.google.protobuf.ByteString;
 +import com.google.protobuf.InvalidProtocolBufferException;
  import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
  import org.opendaylight.controller.cluster.datastore.identifiers.ShardTransactionIdentifier;
  import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardMBeanFactory;
@@@ -38,36 -35,29 +38,35 @@@ import org.opendaylight.controller.clus
  import org.opendaylight.controller.cluster.datastore.messages.EnableNotification;
  import org.opendaylight.controller.cluster.datastore.messages.ForwardedCommitTransaction;
  import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
 +import org.opendaylight.controller.cluster.datastore.messages.ReadData;
 +import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
  import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
  import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
  import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
  import org.opendaylight.controller.cluster.datastore.modification.Modification;
  import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
 +import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec;
  import org.opendaylight.controller.cluster.raft.ConfigParams;
  import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
  import org.opendaylight.controller.cluster.raft.RaftActor;
  import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
 +import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
  import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
 +import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
  import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
  import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreFactory;
 -import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
 +import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages;
 +import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
  import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
  import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
 +import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
  import org.opendaylight.yangtools.concepts.ListenerRegistration;
  import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
  import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
  import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 -
  import scala.concurrent.duration.FiniteDuration;
  
  import java.util.ArrayList;
- import java.util.Date;
  import java.util.HashMap;
  import java.util.List;
  import java.util.Map;
@@@ -111,15 -101,12 +110,15 @@@ public class Shard extends RaftActor 
  
      private SchemaContext schemaContext;
  
 +    private ActorRef createSnapshotTransaction;
 +
      private Shard(ShardIdentifier name, Map<ShardIdentifier, String> peerAddresses,
 -            DatastoreContext datastoreContext) {
 +            DatastoreContext datastoreContext, SchemaContext schemaContext) {
          super(name.toString(), mapPeerAddresses(peerAddresses), Optional.of(configParams));
  
          this.name = name;
          this.datastoreContext = datastoreContext;
 +        this.schemaContext = schemaContext;
  
          String setting = System.getProperty("shard.persistent");
  
          store = InMemoryDOMDataStoreFactory.create(name.toString(), null,
                  datastoreContext.getDataStoreProperties());
  
-         shardMBean = ShardMBeanFactory.getShardStatsMBean(name.toString());
 +        if(schemaContext != null) {
 +            store.onGlobalContextUpdated(schemaContext);
 +        }
 +
+         shardMBean = ShardMBeanFactory.getShardStatsMBean(name.toString(),
+                 datastoreContext.getDataStoreMXBeanType());
+         shardMBean.setDataStoreExecutor(store.getDomStoreExecutor());
+         shardMBean.setNotificationManager(store.getDataChangeListenerNotificationManager());
  
      }
  
  
      public static Props props(final ShardIdentifier name,
          final Map<ShardIdentifier, String> peerAddresses,
 -        DatastoreContext datastoreContext) {
 +        DatastoreContext datastoreContext, SchemaContext schemaContext) {
          Preconditions.checkNotNull(name, "name should not be null");
          Preconditions.checkNotNull(peerAddresses, "peerAddresses should not be null");
 -        Preconditions.checkNotNull(datastoreContext, "shardContext should not be null");
 +        Preconditions.checkNotNull(datastoreContext, "dataStoreContext should not be null");
 +        Preconditions.checkNotNull(schemaContext, "schemaContext should not be null");
  
 -        return Props.create(new ShardCreator(name, peerAddresses, datastoreContext));
 +        return Props.create(new ShardCreator(name, peerAddresses, datastoreContext, schemaContext));
      }
  
      @Override public void onReceiveRecover(Object message) {
              } else if (getLeader() != null) {
                  getLeader().forward(message, getContext());
              }
 +        } else if(message.getClass().equals(ReadDataReply.SERIALIZABLE_CLASS)) {
 +            // This must be for install snapshot. Don't want to open this up and trigger
 +            // deSerialization
 +            self().tell(new CaptureSnapshotReply(ReadDataReply.getNormalizedNodeByteString(message)), self());
 +
 +            // Send a PoisonPill instead of sending close transaction because we do not really need
 +            // a response
 +            getSender().tell(PoisonPill.getInstance(), self());
 +
          } else if (message instanceof RegisterChangeListener) {
              registerChangeListener((RegisterChangeListener) message);
          } else if (message instanceof UpdateSchemaContext) {
      }
  
      private ActorRef createTypedTransactionActor(
 -        CreateTransaction createTransaction,
 +        int transactionType,
          ShardTransactionIdentifier transactionId) {
 -        if (createTransaction.getTransactionType()
 +
 +        if(this.schemaContext == null){
 +            throw new NullPointerException("schemaContext should not be null");
 +        }
 +
 +        if (transactionType
              == TransactionProxy.TransactionType.READ_ONLY.ordinal()) {
  
              shardMBean.incrementReadOnlyTransactionCount();
  
              return getContext().actorOf(
                  ShardTransaction.props(store.newReadOnlyTransaction(), getSelf(),
-                         schemaContext,datastoreContext, name.toString()), transactionId.toString());
+                         schemaContext,datastoreContext, shardMBean), transactionId.toString());
  
 -        } else if (createTransaction.getTransactionType()
 +        } else if (transactionType
              == TransactionProxy.TransactionType.READ_WRITE.ordinal()) {
  
              shardMBean.incrementReadWriteTransactionCount();
  
              return getContext().actorOf(
                  ShardTransaction.props(store.newReadWriteTransaction(), getSelf(),
-                         schemaContext, datastoreContext,name.toString()), transactionId.toString());
+                         schemaContext, datastoreContext, shardMBean), transactionId.toString());
  
  
 -        } else if (createTransaction.getTransactionType()
 +        } else if (transactionType
              == TransactionProxy.TransactionType.WRITE_ONLY.ordinal()) {
  
              shardMBean.incrementWriteOnlyTransactionCount();
  
              return getContext().actorOf(
                  ShardTransaction.props(store.newWriteOnlyTransaction(), getSelf(),
-                         schemaContext, datastoreContext, name.toString()), transactionId.toString());
+                         schemaContext, datastoreContext, shardMBean), transactionId.toString());
          } else {
              throw new IllegalArgumentException(
                  "Shard="+name + ":CreateTransaction message has unidentified transaction type="
 -                    + createTransaction.getTransactionType());
 +                    + transactionType);
          }
      }
  
      private void createTransaction(CreateTransaction createTransaction) {
 +        createTransaction(createTransaction.getTransactionType(),
 +            createTransaction.getTransactionId());
 +    }
 +
 +    private ActorRef createTransaction(int transactionType, String remoteTransactionId) {
  
          ShardTransactionIdentifier transactionId =
              ShardTransactionIdentifier.builder()
 -                .remoteTransactionId(createTransaction.getTransactionId())
 +                .remoteTransactionId(remoteTransactionId)
                  .build();
          LOG.debug("Creating transaction : {} ", transactionId);
          ActorRef transactionActor =
 -            createTypedTransactionActor(createTransaction, transactionId);
 +            createTypedTransactionActor(transactionType, transactionId);
  
          getSender()
              .tell(new CreateTransactionReply(
                      Serialization.serializedActorPath(transactionActor),
 -                    createTransaction.getTransactionId()).toSerializable(),
 -                getSelf()
 -            );
 +                    remoteTransactionId).toSerializable(),
 +                getSelf());
 +
 +        return transactionActor;
      }
  
 +    private void syncCommitTransaction(DOMStoreWriteTransaction transaction)
 +        throws ExecutionException, InterruptedException {
 +        DOMStoreThreePhaseCommitCohort commitCohort = transaction.ready();
 +        commitCohort.preCommit().get();
 +        commitCohort.commit().get();
 +    }
 +
 +
      private void commit(final ActorRef sender, Object serialized) {
          Modification modification = MutableCompositeModification
              .fromSerializable(serialized, schemaContext);
              LOG.debug(
                  "Could not find cohort for modification : {}. Writing modification using a new transaction",
                  modification);
 -            DOMStoreReadWriteTransaction transaction =
 -                store.newReadWriteTransaction();
 +            DOMStoreWriteTransaction transaction =
 +                store.newWriteOnlyTransaction();
              modification.apply(transaction);
 -            DOMStoreThreePhaseCommitCohort commitCohort = transaction.ready();
 -            ListenableFuture<Void> future =
 -                commitCohort.preCommit();
              try {
 -                future.get();
 -                future = commitCohort.commit();
 -                future.get();
 +                syncCommitTransaction(transaction);
              } catch (InterruptedException | ExecutionException e) {
                  shardMBean.incrementFailedTransactionsCount();
                  LOG.error("Failed to commit", e);
              public void onSuccess(Void v) {
                  sender.tell(new CommitTransactionReply().toSerializable(), self);
                  shardMBean.incrementCommittedTransactionCount();
-                 shardMBean.setLastCommittedTransactionTime(new Date());
+                 shardMBean.setLastCommittedTransactionTime(System.currentTimeMillis());
              }
  
              @Override
  
      private void updateSchemaContext(UpdateSchemaContext message) {
          this.schemaContext = message.getSchemaContext();
 +        updateSchemaContext(message.getSchemaContext());
          store.onGlobalContextUpdated(message.getSchemaContext());
      }
  
 +    @VisibleForTesting void updateSchemaContext(SchemaContext schemaContext) {
 +        store.onGlobalContextUpdated(schemaContext);
 +    }
 +
      private void registerChangeListener(
          RegisterChangeListener registerChangeListener) {
  
      private void createTransactionChain() {
          DOMStoreTransactionChain chain = store.createTransactionChain();
          ActorRef transactionChain = getContext().actorOf(
-                 ShardTransactionChain.props(chain, schemaContext, datastoreContext,name.toString() ));
+                 ShardTransactionChain.props(chain, schemaContext, datastoreContext, shardMBean));
          getSender().tell(new CreateTransactionChainReply(transactionChain.path()).toSerializable(),
 -                getSelf());
 +            getSelf());
      }
  
      @Override protected void applyState(ActorRef clientActor, String identifier,
      }
  
      @Override protected void createSnapshot() {
 -        throw new UnsupportedOperationException("createSnapshot");
 +        if (createSnapshotTransaction == null) {
 +
 +            // Create a transaction. We are really going to treat the transaction as a worker
 +            // so that this actor does not get block building the snapshot
 +            createSnapshotTransaction = createTransaction(
 +                TransactionProxy.TransactionType.READ_ONLY.ordinal(),
 +                "createSnapshot");
 +
 +            createSnapshotTransaction.tell(
 +                new ReadData(YangInstanceIdentifier.builder().build()).toSerializable(), self());
 +
 +        }
      }
  
 -    @Override protected void applySnapshot(ByteString snapshot) {
 -        throw new UnsupportedOperationException("applySnapshot");
 +    @VisibleForTesting @Override protected void applySnapshot(ByteString snapshot) {
 +        // Since this will be done only on Recovery or when this actor is a Follower
 +        // we can safely commit everything in here. We not need to worry about event notifications
 +        // as they would have already been disabled on the follower
 +        try {
 +            DOMStoreWriteTransaction transaction = store.newWriteOnlyTransaction();
 +            NormalizedNodeMessages.Node serializedNode = NormalizedNodeMessages.Node.parseFrom(snapshot);
 +            NormalizedNode<?, ?> node = new NormalizedNodeToNodeCodec(schemaContext)
 +                .decode(YangInstanceIdentifier.builder().build(), serializedNode);
 +
 +            // delete everything first
 +            transaction.delete(YangInstanceIdentifier.builder().build());
 +
 +            // Add everything from the remote node back
 +            transaction.write(YangInstanceIdentifier.builder().build(), node);
 +            syncCommitTransaction(transaction);
 +        } catch (InvalidProtocolBufferException | InterruptedException | ExecutionException e) {
 +            LOG.error(e, "An exception occurred when applying snapshot");
 +        }
      }
  
      @Override protected void onStateChanged() {
          final ShardIdentifier name;
          final Map<ShardIdentifier, String> peerAddresses;
          final DatastoreContext datastoreContext;
 +        final SchemaContext schemaContext;
  
          ShardCreator(ShardIdentifier name, Map<ShardIdentifier, String> peerAddresses,
 -                DatastoreContext datastoreContext) {
 +                DatastoreContext datastoreContext, SchemaContext schemaContext) {
              this.name = name;
              this.peerAddresses = peerAddresses;
              this.datastoreContext = datastoreContext;
 +            this.schemaContext = schemaContext;
          }
  
          @Override
          public Shard create() throws Exception {
 -            return new Shard(name, peerAddresses, datastoreContext);
 +            return new Shard(name, peerAddresses, datastoreContext, schemaContext);
          }
      }
 +
 +    @VisibleForTesting NormalizedNode readStore() throws ExecutionException, InterruptedException {
 +        DOMStoreReadTransaction transaction = store.newReadOnlyTransaction();
 +
 +        CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> future =
 +            transaction.read(YangInstanceIdentifier.builder().build());
 +
 +        NormalizedNode<?, ?> node = future.get().get();
 +
 +        transaction.close();
 +
 +        return node;
 +    }
 +
 +    @VisibleForTesting void writeToStore(YangInstanceIdentifier id, NormalizedNode node)
 +        throws ExecutionException, InterruptedException {
 +        DOMStoreWriteTransaction transaction = store.newWriteOnlyTransaction();
 +
 +        transaction.write(id, node);
 +
 +        syncCommitTransaction(transaction);
 +    }
 +
  }
index adfda1ccbe116ced4679ec06551cd52508c0fadb,1e062f3944580d720dabbd1cebcf9f2c642a7e8b..58cdefe5371d2b58be6e7c9f5e461734f34acd07
@@@ -17,7 -17,9 +17,7 @@@ import akka.actor.SupervisorStrategy
  import akka.cluster.ClusterEvent;
  import akka.japi.Creator;
  import akka.japi.Function;
 -
  import com.google.common.base.Preconditions;
 -
  import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
  import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier;
  import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfo;
@@@ -30,8 -32,8 +30,8 @@@ import org.opendaylight.controller.clus
  import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
  import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound;
  import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
 -
  import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
 +import org.opendaylight.yangtools.yang.model.api.SchemaContext;
  import scala.concurrent.duration.Duration;
  
  import java.util.ArrayList;
@@@ -87,7 -89,9 +87,7 @@@ public class ShardManager extends Abstr
          // Subscribe this actor to cluster member events
          cluster.subscribeToMemberEvents(getSelf());
  
 -        // Create all the local Shards and make them a child of the ShardManager
 -        // TODO: This may need to be initiated when we first get the schema context
 -        createLocalShards();
 +        //createLocalShards(null);
      }
  
      public static Props props(final String type,
       * @param message
       */
      private void updateSchemaContext(Object message) {
 -        for(ShardInformation info : localShards.values()){
 -            info.getActor().tell(message,getSelf());
 +        SchemaContext schemaContext = ((UpdateSchemaContext) message).getSchemaContext();
 +
 +        if(localShards.size() == 0){
 +            createLocalShards(schemaContext);
 +        } else {
 +            for (ShardInformation info : localShards.values()) {
 +                info.getActor().tell(message, getSelf());
 +            }
          }
      }
  
       * runs
       *
       */
 -    private void createLocalShards() {
 +    private void createLocalShards(SchemaContext schemaContext) {
          String memberName = this.cluster.getCurrentMemberName();
          List<String> memberShardNames =
              this.configuration.getMemberShardNames(memberName);
              ShardIdentifier shardId = getShardIdentifier(memberName, shardName);
              Map<ShardIdentifier, String> peerAddresses = getPeerAddresses(shardName);
              ActorRef actor = getContext()
 -                .actorOf(Shard.props(shardId, peerAddresses, datastoreContext).
 +                .actorOf(Shard.props(shardId, peerAddresses, datastoreContext, schemaContext).
                      withMailbox(ActorContext.MAILBOX), shardId.toString());
 -
              localShardActorNames.add(shardId.toString());
              localShards.put(shardName, new ShardInformation(shardName, actor, peerAddresses));
          }
  
-         mBean = ShardManagerInfo
-             .createShardManagerMBean("shard-manager-" + this.type, localShardActorNames);
+         mBean = ShardManagerInfo.createShardManagerMBean("shard-manager-" + this.type,
+                     datastoreContext.getDataStoreMXBeanType(), localShardActorNames);
      }
  
      /**
index 4e73c70b9db1b8798e4b715139293bfc00a5ffa0,eea73526f27d57d62b784509d3e99100c8a091f0..869f47578711ea313181e4011444dc82ca35feaa
@@@ -20,6 -20,7 +20,7 @@@ import com.google.common.util.concurren
  import org.junit.BeforeClass;
  import org.junit.Test;
  import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
+ import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
  import org.opendaylight.controller.cluster.datastore.node.utils.serialization.NormalizedNodeSerializer;
  import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
  import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
@@@ -59,22 -60,21 +60,24 @@@ public class ShardTransactionFailureTes
  
      private final DatastoreContext datastoreContext = new DatastoreContext();
  
+     private final ShardStats shardStats = new ShardStats(SHARD_IDENTIFIER.toString(), "DataStore");
      @BeforeClass
      public static void staticSetup() {
          store.onGlobalContextUpdated(testSchemaContext);
      }
  
 +    private ActorRef createShard(){
 +        return getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP, new DatastoreContext(), TestModel.createTestContext()));
 +    }
 +
      @Test(expected = ReadFailedException.class)
      public void testNegativeReadWithReadOnlyTransactionClosed()
          throws Throwable {
  
 -        final ActorRef shard =
 -            getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP, new DatastoreContext()));
 +        final ActorRef shard = createShard();
          final Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
-                 testSchemaContext, datastoreContext,SHARD_IDENTIFIER.toString());
+                 testSchemaContext, datastoreContext, shardStats);
  
          final TestActorRef<ShardTransaction> subject = TestActorRef
              .create(getSystem(), props,
      public void testNegativeReadWithReadWriteTransactionClosed()
          throws Throwable {
  
 -        final ActorRef shard =
 -            getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP, new DatastoreContext()));
 +        final ActorRef shard = createShard();
          final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
-                 testSchemaContext, datastoreContext,SHARD_IDENTIFIER.toString());
+                 testSchemaContext, datastoreContext, shardStats);
  
          final TestActorRef<ShardTransaction> subject = TestActorRef
              .create(getSystem(), props,
      public void testNegativeExistsWithReadWriteTransactionClosed()
          throws Throwable {
  
 -        final ActorRef shard =
 -            getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP, new DatastoreContext()));
 +        final ActorRef shard = createShard();
          final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
-                 testSchemaContext, datastoreContext,SHARD_IDENTIFIER.toString());
+                 testSchemaContext, datastoreContext, shardStats);
  
          final TestActorRef<ShardTransaction> subject = TestActorRef
              .create(getSystem(), props,
      public void testNegativeWriteWithTransactionReady() throws Exception {
  
  
 -        final ActorRef shard =
 -            getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP, new DatastoreContext()));
 +        final ActorRef shard = createShard();
          final Props props = ShardTransaction.props(store.newWriteOnlyTransaction(), shard,
-                 testSchemaContext, datastoreContext,SHARD_IDENTIFIER.toString());
+                 testSchemaContext, datastoreContext, shardStats);
  
          final TestActorRef<ShardTransaction> subject = TestActorRef
              .create(getSystem(), props,
      public void testNegativeReadWriteWithTransactionReady() throws Exception {
  
  
 -        final ActorRef shard =
 -            getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP, new DatastoreContext()));
 +        final ActorRef shard = createShard();
          final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
-                 testSchemaContext, datastoreContext,SHARD_IDENTIFIER.toString());
+                 testSchemaContext, datastoreContext, shardStats);
  
          final TestActorRef<ShardTransaction> subject = TestActorRef
              .create(getSystem(), props,
      public void testNegativeMergeTransactionReady() throws Exception {
  
  
 -        final ActorRef shard =
 -            getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP, new DatastoreContext()));
 +        final ActorRef shard = createShard();
          final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
-                 testSchemaContext, datastoreContext,SHARD_IDENTIFIER.toString());
+                 testSchemaContext, datastoreContext, shardStats);
  
          final TestActorRef<ShardTransaction> subject = TestActorRef
              .create(getSystem(), props, "testNegativeMergeTransactionReady");
      public void testNegativeDeleteDataWhenTransactionReady() throws Exception {
  
  
 -        final ActorRef shard =
 -            getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP, new DatastoreContext()));
 +        final ActorRef shard = createShard();
          final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
-                 testSchemaContext, datastoreContext,SHARD_IDENTIFIER.toString());
+                 testSchemaContext, datastoreContext, shardStats);
  
          final TestActorRef<ShardTransaction> subject = TestActorRef
              .create(getSystem(), props,
index c9e08f9cbb23559be2afa0d4a5aa54d102046a71,c779d7fe553473931a83418b7b1fe5b5524ea1c7..0beb00b435ebe622d888dd58b40d938877c4d612
@@@ -13,6 -13,7 +13,7 @@@ import org.junit.BeforeClass
  import org.junit.Test;
  import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException;
  import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
+ import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
  import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
  import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionReply;
  import org.opendaylight.controller.cluster.datastore.messages.DataExists;
@@@ -62,22 -63,20 +63,24 @@@ public class ShardTransactionTest exten
  
      private DatastoreContext datastoreContext = new DatastoreContext();
  
+     private final ShardStats shardStats = new ShardStats(SHARD_IDENTIFIER.toString(), "DataStore");
      @BeforeClass
      public static void staticSetup() {
          store.onGlobalContextUpdated(testSchemaContext);
      }
  
 +    private ActorRef createShard(){
 +        return getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
 +            Collections.EMPTY_MAP, new DatastoreContext(), TestModel.createTestContext()));
 +    }
 +
      @Test
      public void testOnReceiveReadData() throws Exception {
          new JavaTestKit(getSystem()) {{
 -            final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
 -                    Collections.EMPTY_MAP, new DatastoreContext()));
 +            final ActorRef shard = createShard();
              final Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
-                     testSchemaContext, datastoreContext,SHARD_IDENTIFIER.toString());
+                     testSchemaContext, datastoreContext, shardStats);
              final ActorRef subject = getSystem().actorOf(props, "testReadData");
  
              new Within(duration("1 seconds")) {
      @Test
      public void testOnReceiveReadDataWhenDataNotFound() throws Exception {
          new JavaTestKit(getSystem()) {{
 -            final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
 -                    Collections.EMPTY_MAP, new DatastoreContext()));
 +            final ActorRef shard = createShard();
              final Props props = ShardTransaction.props( store.newReadOnlyTransaction(), shard,
-                     testSchemaContext, datastoreContext,SHARD_IDENTIFIER.toString());
+                     testSchemaContext, datastoreContext, shardStats);
              final ActorRef subject = getSystem().actorOf(props, "testReadDataWhenDataNotFound");
  
              new Within(duration("1 seconds")) {
      @Test
      public void testOnReceiveDataExistsPositive() throws Exception {
          new JavaTestKit(getSystem()) {{
 -            final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
 -                    Collections.EMPTY_MAP, new DatastoreContext()));
 +            final ActorRef shard = createShard();
              final Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
-                     testSchemaContext, datastoreContext,SHARD_IDENTIFIER.toString());
+                     testSchemaContext, datastoreContext, shardStats);
              final ActorRef subject = getSystem().actorOf(props, "testDataExistsPositive");
  
              new Within(duration("1 seconds")) {
      @Test
      public void testOnReceiveDataExistsNegative() throws Exception {
          new JavaTestKit(getSystem()) {{
 -            final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
 -                    Collections.EMPTY_MAP, new DatastoreContext()));
 +            final ActorRef shard = createShard();
              final Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
-                     testSchemaContext, datastoreContext,SHARD_IDENTIFIER.toString());
+                     testSchemaContext, datastoreContext, shardStats);
              final ActorRef subject = getSystem().actorOf(props, "testDataExistsNegative");
  
              new Within(duration("1 seconds")) {
      @Test
      public void testOnReceiveWriteData() throws Exception {
          new JavaTestKit(getSystem()) {{
 -            final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
 -                    Collections.EMPTY_MAP, new DatastoreContext()));
 +            final ActorRef shard = createShard();
              final Props props = ShardTransaction.props(store.newWriteOnlyTransaction(), shard,
-                     testSchemaContext, datastoreContext,SHARD_IDENTIFIER.toString());
+                     testSchemaContext, datastoreContext, shardStats);
              final ActorRef subject =
                  getSystem().actorOf(props, "testWriteData");
  
      @Test
      public void testOnReceiveMergeData() throws Exception {
          new JavaTestKit(getSystem()) {{
 -            final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
 -                    Collections.EMPTY_MAP, new DatastoreContext()));
 +            final ActorRef shard = createShard();
              final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
-                     testSchemaContext, datastoreContext,SHARD_IDENTIFIER.toString());
+                     testSchemaContext, datastoreContext, shardStats);
              final ActorRef subject =
                  getSystem().actorOf(props, "testMergeData");
  
      @Test
      public void testOnReceiveDeleteData() throws Exception {
          new JavaTestKit(getSystem()) {{
 -            final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
 -                    Collections.EMPTY_MAP, new DatastoreContext()));
 +            final ActorRef shard = createShard();
              final Props props = ShardTransaction.props( store.newWriteOnlyTransaction(), shard,
-                     testSchemaContext, datastoreContext,SHARD_IDENTIFIER.toString());
+                     testSchemaContext, datastoreContext, shardStats);
              final ActorRef subject =
                  getSystem().actorOf(props, "testDeleteData");
  
      @Test
      public void testOnReceiveReadyTransaction() throws Exception {
          new JavaTestKit(getSystem()) {{
 -            final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
 -                    Collections.EMPTY_MAP, new DatastoreContext()));
 +            final ActorRef shard = createShard();
              final Props props = ShardTransaction.props( store.newReadWriteTransaction(), shard,
-                     testSchemaContext, datastoreContext,SHARD_IDENTIFIER.toString());
+                     testSchemaContext, datastoreContext, shardStats);
              final ActorRef subject =
                  getSystem().actorOf(props, "testReadyTransaction");
  
      @Test
      public void testOnReceiveCloseTransaction() throws Exception {
          new JavaTestKit(getSystem()) {{
 -            final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
 -                    Collections.EMPTY_MAP, new DatastoreContext()));
 +            final ActorRef shard = createShard();
              final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
-                     testSchemaContext, datastoreContext,SHARD_IDENTIFIER.toString());
+                     testSchemaContext, datastoreContext, shardStats);
              final ActorRef subject =
                  getSystem().actorOf(props, "testCloseTransaction");
  
  
      @Test(expected=UnknownMessageException.class)
      public void testNegativePerformingWriteOperationOnReadTransaction() throws Exception {
 -        final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
 -                Collections.EMPTY_MAP, new DatastoreContext()));
 +        final ActorRef shard = createShard();
          final Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
-                 testSchemaContext, datastoreContext,SHARD_IDENTIFIER.toString());
+                 testSchemaContext, datastoreContext, shardStats);
          final TestActorRef subject = TestActorRef.apply(props,getSystem());
  
          subject.receive(new DeleteData(TestModel.TEST_PATH).toSerializable(), ActorRef.noSender());
      @Test
      public void testShardTransactionInactivity() {
  
-         datastoreContext = new DatastoreContext(InMemoryDOMDataStoreConfigProperties.getDefault(),
-                 Duration.create(500, TimeUnit.MILLISECONDS));
+         datastoreContext = new DatastoreContext("Test",
+                 InMemoryDOMDataStoreConfigProperties.getDefault(),
+                 Duration.create(500, TimeUnit.MILLISECONDS), 5);
  
          new JavaTestKit(getSystem()) {{
 -            final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
 -                    Collections.EMPTY_MAP, new DatastoreContext()));
 +            final ActorRef shard = createShard();
              final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
-                     testSchemaContext, datastoreContext,SHARD_IDENTIFIER.toString());
+                     testSchemaContext, datastoreContext, shardStats);
              final ActorRef subject =
                  getSystem().actorOf(props, "testShardTransactionInactivity");
  
index a0e6b412dd3bd2e6abf3beef9d880d889576a591,3cd0ad2841628be0a20715c69d98d21f2816a428..4e4c34bcbc75fe0e74369ae407d83624c3438ae0
@@@ -23,6 -23,7 +23,7 @@@ import org.junit.BeforeClass
  import org.junit.Test;
  import org.mockito.Mockito;
  import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
+ import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
  import org.opendaylight.controller.cluster.datastore.messages.ForwardedCommitTransaction;
  import org.opendaylight.controller.cluster.datastore.modification.CompositeModification;
  import org.opendaylight.controller.cluster.datastore.modification.Modification;
@@@ -66,6 -67,7 +67,7 @@@ public class ThreePhaseCommitCohortFail
  
      private final DatastoreContext datastoreContext = new DatastoreContext();
  
+     private final ShardStats shardStats = new ShardStats(SHARD_IDENTIFIER.toString(), "DataStore");
  
      @BeforeClass
      public static void staticSetup() {
  
      private final FiniteDuration ASK_RESULT_DURATION = Duration.create(5000, TimeUnit.MILLISECONDS);
  
 +    private ActorRef createShard(){
 +        return getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP, datastoreContext, TestModel.createTestContext()));
 +    }
  
      @Test(expected = TestException.class)
      public void testNegativeAbortResultsInException() throws Exception {
  
 -        final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
 -                Collections.EMPTY_MAP, datastoreContext));
 +        final ActorRef shard = createShard();
          final DOMStoreThreePhaseCommitCohort mockCohort = Mockito
              .mock(DOMStoreThreePhaseCommitCohort.class);
          final CompositeModification mockComposite =
              Mockito.mock(CompositeModification.class);
          final Props props =
-             ThreePhaseCommitCohort.props(mockCohort, shard, mockComposite,SHARD_IDENTIFIER.toString());
+             ThreePhaseCommitCohort.props(mockCohort, shard, mockComposite, shardStats);
  
          final TestActorRef<ThreePhaseCommitCohort> subject = TestActorRef
              .create(getSystem(), props,
      @Test(expected = OptimisticLockFailedException.class)
      public void testNegativeCanCommitResultsInException() throws Exception {
  
 -        final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
 -                Collections.EMPTY_MAP, datastoreContext));
 +        final ActorRef shard = createShard();
          final DOMStoreThreePhaseCommitCohort mockCohort = Mockito
              .mock(DOMStoreThreePhaseCommitCohort.class);
          final CompositeModification mockComposite =
              Mockito.mock(CompositeModification.class);
          final Props props =
-             ThreePhaseCommitCohort.props(mockCohort, shard, mockComposite,SHARD_IDENTIFIER.toString());
+             ThreePhaseCommitCohort.props(mockCohort, shard, mockComposite, shardStats);
  
          final TestActorRef<ThreePhaseCommitCohort> subject = TestActorRef
              .create(getSystem(), props,
      @Test(expected = TestException.class)
      public void testNegativePreCommitResultsInException() throws Exception {
  
 -        final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
 -                Collections.EMPTY_MAP, datastoreContext));
 +        final ActorRef shard = createShard();
          final DOMStoreThreePhaseCommitCohort mockCohort = Mockito
              .mock(DOMStoreThreePhaseCommitCohort.class);
          final CompositeModification mockComposite =
              Mockito.mock(CompositeModification.class);
          final Props props =
-             ThreePhaseCommitCohort.props(mockCohort, shard, mockComposite,SHARD_IDENTIFIER.toString());
+             ThreePhaseCommitCohort.props(mockCohort, shard, mockComposite, shardStats);
  
          final TestActorRef<ThreePhaseCommitCohort> subject = TestActorRef
              .create(getSystem(), props,
      public void testNegativeCommitResultsInException() throws Exception {
  
          final TestActorRef<Shard> subject = TestActorRef.create(getSystem(),
 -                Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP, datastoreContext),
 +                Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP, datastoreContext, TestModel.createTestContext()),
                  "testNegativeCommitResultsInException");
  
          final ActorRef shardTransaction =
              getSystem().actorOf(ShardTransaction.props(store.newReadWriteTransaction(), subject,
-                     testSchemaContext, datastoreContext,SHARD_IDENTIFIER.toString()));
+                     testSchemaContext, datastoreContext, shardStats));
  
          ShardTransactionMessages.WriteData writeData =
              ShardTransactionMessages.WriteData.newBuilder()