Merge " Expose more information about a Shard via JMX"
authorEd Warnicke <eaw@cisco.com>
Mon, 11 Aug 2014 16:04:41 +0000 (16:04 +0000)
committerGerrit Code Review <gerrit@opendaylight.org>
Mon, 11 Aug 2014 16:04:41 +0000 (16:04 +0000)
1  2 
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java

index 8f058a34c2a6a47497cee03b806b21f102c4c478,0717598f0c09d83b5c2d3b17a04462ed1f3a4149..27744fcb8de8e5cde22c16090908850f231bfec8
@@@ -17,6 -17,8 +17,6 @@@ import akka.japi.Creator
  import akka.serialization.Serialization;
  import com.google.common.base.Optional;
  import com.google.common.util.concurrent.ListenableFuture;
 -import com.google.common.util.concurrent.ListeningExecutorService;
 -import com.google.common.util.concurrent.MoreExecutors;
  import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardMBeanFactory;
  import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
  import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
@@@ -35,9 -37,9 +35,10 @@@ import org.opendaylight.controller.clus
  import org.opendaylight.controller.cluster.raft.ConfigParams;
  import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
  import org.opendaylight.controller.cluster.raft.RaftActor;
+ import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
  import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
  import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
 +import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreFactory;
  import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
  import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
  import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
@@@ -51,6 -53,7 +52,6 @@@ import java.util.HashMap
  import java.util.List;
  import java.util.Map;
  import java.util.concurrent.ExecutionException;
 -import java.util.concurrent.Executors;
  import java.util.concurrent.TimeUnit;
  
  /**
@@@ -65,6 -68,9 +66,6 @@@ public class Shard extends RaftActor 
  
      public static final String DEFAULT_NAME = "default";
  
 -    private final ListeningExecutorService storeExecutor =
 -        MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(2));
 -
      private final InMemoryDOMDataStore store;
  
      private final Map<Object, DOMStoreThreePhaseCommitCohort>
  
          LOG.info("Creating shard : {} persistent : {}", name, persistent);
  
 -        store = new InMemoryDOMDataStore(name, storeExecutor);
 +        store = InMemoryDOMDataStoreFactory.create(name, null);
  
          shardMBean = ShardMBeanFactory.getShardStatsMBean(name);
  
      }
  
-     public static Props props(final String name, final Map<String, String> peerAddresses) {
+     public static Props props(final String name,
+         final Map<String, String> peerAddresses) {
          return Props.create(new Creator<Shard>() {
  
              @Override
      }
  
  
-     @Override public void onReceiveCommand(Object message){
-         LOG.debug("Received message {} from {}", message.getClass().toString(), getSender());
+     @Override public void onReceiveCommand(Object message) {
+         LOG.debug("Received message {} from {}", message.getClass().toString(),
+             getSender());
  
-         if (message.getClass().equals(CreateTransactionChain.SERIALIZABLE_CLASS)) {
-             if(isLeader()) {
+         if (message.getClass()
+             .equals(CreateTransactionChain.SERIALIZABLE_CLASS)) {
+             if (isLeader()) {
                  createTransactionChain();
-             } else if(getLeader() != null){
+             } else if (getLeader() != null) {
                  getLeader().forward(message, getContext());
              }
          } else if (message instanceof RegisterChangeListener) {
              updateSchemaContext((UpdateSchemaContext) message);
          } else if (message instanceof ForwardedCommitTransaction) {
              handleForwardedCommit((ForwardedCommitTransaction) message);
-         } else if (message.getClass().equals(CreateTransaction.SERIALIZABLE_CLASS)) {
-             if(isLeader()) {
+         } else if (message.getClass()
+             .equals(CreateTransaction.SERIALIZABLE_CLASS)) {
+             if (isLeader()) {
                  createTransaction(CreateTransaction.fromSerializable(message));
-             } else if(getLeader() != null){
+             } else if (getLeader() != null) {
                  getLeader().forward(message, getContext());
              }
-         } else if (message instanceof PeerAddressResolved){
+         } else if (message instanceof PeerAddressResolved) {
              PeerAddressResolved resolved = (PeerAddressResolved) message;
              setPeerAddress(resolved.getPeerId(), resolved.getPeerAddress());
          } else {
-           super.onReceiveCommand(message);
+             super.onReceiveCommand(message);
          }
      }
  
-    private ActorRef createTypedTransactionActor(CreateTransaction createTransaction,String transactionId){
-       if(createTransaction.getTransactionType()== TransactionProxy.TransactionType.READ_ONLY.ordinal()){
-         return getContext().actorOf(
-             ShardTransaction.props( store.newReadOnlyTransaction(), getSelf(), schemaContext), transactionId);
-       }else if (createTransaction.getTransactionType()== TransactionProxy.TransactionType.READ_WRITE.ordinal()){
-         return getContext().actorOf(
-             ShardTransaction.props( store.newReadWriteTransaction(), getSelf(), schemaContext), transactionId);
-       }else if (createTransaction.getTransactionType()== TransactionProxy.TransactionType.WRITE_ONLY.ordinal()){
-         return getContext().actorOf(
-             ShardTransaction.props( store.newWriteOnlyTransaction(), getSelf(), schemaContext), transactionId);
-       }else{
-         throw new IllegalArgumentException ("CreateTransaction message has unidentified transaction type="+createTransaction.getTransactionType()) ;
-       }
-    }
+     private ActorRef createTypedTransactionActor(
+         CreateTransaction createTransaction, String transactionId) {
+         if (createTransaction.getTransactionType()
+             == TransactionProxy.TransactionType.READ_ONLY.ordinal()) {
+             shardMBean.incrementReadOnlyTransactionCount();
+             return getContext().actorOf(
+                 ShardTransaction
+                     .props(store.newReadOnlyTransaction(), getSelf(),
+                         schemaContext), transactionId);
+         } else if (createTransaction.getTransactionType()
+             == TransactionProxy.TransactionType.READ_WRITE.ordinal()) {
+             shardMBean.incrementReadWriteTransactionCount();
+             return getContext().actorOf(
+                 ShardTransaction
+                     .props(store.newReadWriteTransaction(), getSelf(),
+                         schemaContext), transactionId);
+         } else if (createTransaction.getTransactionType()
+             == TransactionProxy.TransactionType.WRITE_ONLY.ordinal()) {
+             shardMBean.incrementWriteOnlyTransactionCount();
+             return getContext().actorOf(
+                 ShardTransaction
+                     .props(store.newWriteOnlyTransaction(), getSelf(),
+                         schemaContext), transactionId);
+         } else {
+             throw new IllegalArgumentException(
+                 "CreateTransaction message has unidentified transaction type="
+                     + createTransaction.getTransactionType());
+         }
+     }
  
      private void createTransaction(CreateTransaction createTransaction) {
  
          String transactionId = "shard-" + createTransaction.getTransactionId();
-         LOG.info("Creating transaction : {} " , transactionId);
-         ActorRef transactionActor = createTypedTransactionActor(createTransaction,transactionId);
+         LOG.info("Creating transaction : {} ", transactionId);
+         ActorRef transactionActor =
+             createTypedTransactionActor(createTransaction, transactionId);
  
          getSender()
-             .tell(new CreateTransactionReply(Serialization.serializedActorPath(transactionActor), createTransaction.getTransactionId()).toSerializable(),
-                 getSelf());
+             .tell(new CreateTransactionReply(
+                     Serialization.serializedActorPath(transactionActor),
+                     createTransaction.getTransactionId()).toSerializable(),
+                 getSelf()
+             );
      }
  
      private void commit(final ActorRef sender, Object serialized) {
-         Modification modification = MutableCompositeModification.fromSerializable(serialized, schemaContext);
+         Modification modification =
+             MutableCompositeModification.fromSerializable(
+                 serialized, schemaContext);
          DOMStoreThreePhaseCommitCohort cohort =
              modificationToCohort.remove(serialized);
          if (cohort == null) {
                  try {
                      future.get();
  
-                     if(sender != null) {
+                     if (sender != null) {
                          sender
                              .tell(new CommitTransactionReply().toSerializable(),
                                  self);
      }
  
      private void handleForwardedCommit(ForwardedCommitTransaction message) {
-         Object serializedModification = message.getModification().toSerializable();
+         Object serializedModification =
+             message.getModification().toSerializable();
  
          modificationToCohort
-             .put(serializedModification , message.getCohort());
+             .put(serializedModification, message.getCohort());
  
-         if(persistent) {
-             this.persistData(getSender(), "identifier", new CompositeModificationPayload(serializedModification));
+         if (persistent) {
+             this.persistData(getSender(), "identifier",
+                 new CompositeModificationPayload(serializedModification));
          } else {
              this.commit(getSender(), serializedModification);
          }
      private void registerChangeListener(
          RegisterChangeListener registerChangeListener) {
  
-         LOG.debug("registerDataChangeListener for " + registerChangeListener.getPath());
+         LOG.debug("registerDataChangeListener for " + registerChangeListener
+             .getPath());
  
  
          ActorSelection dataChangeListenerPath = getContext()
          // Notify the listener if notifications should be enabled or not
          // If this shard is the leader then it will enable notifications else
          // it will not
-         dataChangeListenerPath.tell(new EnableNotification(isLeader()), getSelf());
+         dataChangeListenerPath
+             .tell(new EnableNotification(isLeader()), getSelf());
  
          // Now store a reference to the data change listener so it can be notified
          // at a later point if notifications should be enabled or disabled
          dataChangeListeners.add(dataChangeListenerPath);
  
          AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>
-             listener = new DataChangeListenerProxy(schemaContext,dataChangeListenerPath);
+             listener =
+             new DataChangeListenerProxy(schemaContext, dataChangeListenerPath);
  
          org.opendaylight.yangtools.concepts.ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>
              registration =
              getContext().actorOf(
                  DataChangeListenerRegistration.props(registration));
  
-         LOG.debug("registerDataChangeListener sending reply, listenerRegistrationPath = " + listenerRegistration.path().toString());
+         LOG.debug(
+             "registerDataChangeListener sending reply, listenerRegistrationPath = "
+                 + listenerRegistration.path().toString());
  
          getSender()
              .tell(new RegisterChangeListenerReply(listenerRegistration.path()),
                  ShardTransactionChain.props(chain, schemaContext));
          getSender()
              .tell(new CreateTransactionChainReply(transactionChain.path())
-                 .toSerializable(),
-                 getSelf());
+                     .toSerializable(),
+                 getSelf()
+             );
      }
  
      @Override protected void applyState(ActorRef clientActor, String identifier,
          Object data) {
  
-         if(data instanceof CompositeModificationPayload){
+         if (data instanceof CompositeModificationPayload) {
              Object modification =
                  ((CompositeModificationPayload) data).getModification();
  
-             if(modification != null){
+             if (modification != null) {
                  commit(clientActor, modification);
              } else {
                  LOG.error("modification is null - this is very unexpected");
              LOG.error("Unknown state received {}", data);
          }
  
+         ReplicatedLogEntry lastLogEntry = getLastLogEntry();
+         if(lastLogEntry != null){
+             shardMBean.setLastLogIndex(lastLogEntry.getIndex());
+             shardMBean.setLastLogTerm(lastLogEntry.getTerm());
+         }
+         shardMBean.setCommitIndex(getCommitIndex());
+         shardMBean.setLastApplied(getLastApplied());
      }
  
      @Override protected Object createSnapshot() {
      }
  
      @Override protected void onStateChanged() {
-         for(ActorSelection dataChangeListener : dataChangeListeners){
-             dataChangeListener.tell(new EnableNotification(isLeader()), getSelf());
+         for (ActorSelection dataChangeListener : dataChangeListeners) {
+             dataChangeListener
+                 .tell(new EnableNotification(isLeader()), getSelf());
          }
  
-         if(getLeaderId() != null){
+         if (getLeaderId() != null) {
              shardMBean.setLeader(getLeaderId());
          }
  
          shardMBean.setRaftState(getRaftState().name());
+         shardMBean.setCurrentTerm(getCurrentTerm());
      }
  
      @Override public String persistenceId() {