Merge "Add more info to ShardStats JXM bean"
authorMoiz Raja <moraja@cisco.com>
Tue, 24 Mar 2015 17:03:46 +0000 (17:03 +0000)
committerGerrit Code Review <gerrit@opendaylight.org>
Tue, 24 Mar 2015 17:03:46 +0000 (17:03 +0000)
1  2 
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java

index ea08ffa9da2dd455cf159b6547b0baee2d5227b7,f46a51ea66e76663838c014bad5962cc29b065d6..a63c62fa30740b5830676ab6f15f3de9c1988e7b
@@@ -134,7 -134,7 +134,7 @@@ public abstract class AbstractLeader ex
       *
       * @return Collection of follower IDs
       */
-     protected final Collection<String> getFollowerIds() {
+     public final Collection<String> getFollowerIds() {
          return followerToLog.keySet();
      }
  
          if (followerActor != null) {
              long followerNextIndex = followerLogInformation.getNextIndex();
              boolean isFollowerActive = followerLogInformation.isFollowerActive();
 +            boolean sendAppendEntries = false;
 +            List<ReplicatedLogEntry> entries = Collections.EMPTY_LIST;
  
              if (mapFollowerToSnapshot.get(followerId) != null) {
                  // if install snapshot is in process , then sent next chunk if possible
                      sendSnapshotChunk(followerActor, followerId);
                  } else if(sendHeartbeat) {
                      // we send a heartbeat even if we have not received a reply for the last chunk
 -                    sendAppendEntriesToFollower(followerActor, followerLogInformation.getNextIndex(),
 -                        Collections.<ReplicatedLogEntry>emptyList(), followerId);
 +                    sendAppendEntries = true;
                  }
              } else {
                  long leaderLastIndex = context.getReplicatedLog().lastIndex();
                              followerNextIndex, followerId);
  
                      // FIXME : Sending one entry at a time
 -                    final List<ReplicatedLogEntry> entries = context.getReplicatedLog().getFrom(followerNextIndex, 1);
 -
 -                    sendAppendEntriesToFollower(followerActor, followerNextIndex, entries, followerId);
 -
 +                    if(followerLogInformation.okToReplicate()) {
 +                        entries = context.getReplicatedLog().getFrom(followerNextIndex, 1);
 +                        sendAppendEntries = true;
 +                    }
                  } else if (isFollowerActive && followerNextIndex >= 0 &&
                      leaderLastIndex > followerNextIndex && !context.isSnapshotCaptureInitiated()) {
                      // if the followers next index is not present in the leaders log, and
                      }
  
                      // Send heartbeat to follower whenever install snapshot is initiated.
 -                    sendAppendEntriesToFollower(followerActor, followerLogInformation.getNextIndex(),
 -                            Collections.<ReplicatedLogEntry>emptyList(), followerId);
 -
 +                    sendAppendEntries = true;
                      initiateCaptureSnapshot(followerId, followerNextIndex);
  
                  } else if(sendHeartbeat) {
 -                    //we send an AppendEntries, even if the follower is inactive
 +                    // we send an AppendEntries, even if the follower is inactive
                      // in-order to update the followers timestamp, in case it becomes active again
 -                    sendAppendEntriesToFollower(followerActor, followerLogInformation.getNextIndex(),
 -                        Collections.<ReplicatedLogEntry>emptyList(), followerId);
 +                    sendAppendEntries = true;
                  }
  
              }
 +
 +            if(sendAppendEntries) {
 +                sendAppendEntriesToFollower(followerActor, followerNextIndex,
 +                        entries, followerId);
 +            }
          }
      }
  
index ff0f4592cba972261660c5ea1db2c3e415a02608,a8ae1818bd34f066f48a772698e89cafc5a50e37..9ec4f9cfdf027680bfa9a4041bc0b335356fb20c
@@@ -66,7 -66,6 +66,6 @@@ import org.opendaylight.controller.clus
  import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
  import org.opendaylight.controller.cluster.notifications.RoleChangeNotifier;
  import org.opendaylight.controller.cluster.raft.RaftActor;
- import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
  import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
  import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
  import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload;
@@@ -173,6 -172,7 +172,7 @@@ public class Shard extends RaftActor 
          shardMBean = ShardMBeanFactory.getShardStatsMBean(name.toString(),
                  datastoreContext.getDataStoreMXBeanType());
          shardMBean.setNotificationManager(store.getDataChangeListenerNotificationManager());
+         shardMBean.setShardActor(getSelf());
  
          if (isMetricsCaptureEnabled()) {
              getContext().become(new MeteringBehavior(this));
              // currently uses a same thread executor anyway.
              cohortEntry.getCohort().preCommit().get();
  
 -            // If we do not have any followers and we are not using persistence we can
 -            // apply modification to the state immediately
 -            if(!hasFollowers() && !persistence().isRecoveryApplicable()){
 +            // If we do not have any followers and we are not using persistence
 +            // or if cohortEntry has no modifications
 +            // we can apply modification to the state immediately
 +            if((!hasFollowers() && !persistence().isRecoveryApplicable()) || (!cohortEntry.hasModifications())){
                  applyModificationToState(getSender(), transactionID, cohortEntry.getModification());
              } else {
                  Shard.this.persistData(getSender(), transactionID,
  
          recoveryCoordinator = null;
          currentLogRecoveryBatch = null;
-         updateJournalStats();
  
          //notify shard manager
          getContext().parent().tell(new ActorInitialized(), getSelf());
                      persistenceId(), data, data.getClass().getClassLoader(),
                      CompositeModificationPayload.class.getClassLoader());
          }
-         updateJournalStats();
      }
  
      private void applyModificationToState(ActorRef clientActor, String identifier, Object modification) {
          }
      }
  
-     private void updateJournalStats() {
-         ReplicatedLogEntry lastLogEntry = getLastLogEntry();
-         if (lastLogEntry != null) {
-             shardMBean.setLastLogIndex(lastLogEntry.getIndex());
-             shardMBean.setLastLogTerm(lastLogEntry.getTerm());
-         }
-         shardMBean.setCommitIndex(getCommitIndex());
-         shardMBean.setLastApplied(getLastApplied());
-         shardMBean.setInMemoryJournalDataSize(getRaftActorContext().getReplicatedLog().dataSize());
-     }
      @Override
      protected void createSnapshot() {
          // Create a transaction actor. We are really going to treat the transaction as a worker
              delayedListenerRegistrations.clear();
          }
  
-         shardMBean.setRaftState(getRaftState().name());
-         shardMBean.setCurrentTerm(getCurrentTerm());
          // If this actor is no longer the leader close all the transaction chains
          if(!isLeader){
              for(Map.Entry<String, DOMStoreTransactionChain> entry : transactionChains.entrySet()){
          return dataPersistenceProvider;
      }
  
-     @Override protected void onLeaderChanged(final String oldLeader, final String newLeader) {
-         shardMBean.setLeader(newLeader);
-     }
      @Override public String persistenceId() {
          return this.name.toString();
      }
index a87000136fc52729d4e760dc2ffa86af84346392,4c4fedb8b2d230995d29ec889ffdc6df66a161b1..d888d62cff4f849ab3ba662829da1a9f4c22cb68
@@@ -53,7 -53,6 +53,7 @@@ import org.opendaylight.controller.clus
  import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
  import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
  import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
 +import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
  import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
  import org.opendaylight.controller.cluster.datastore.modification.Modification;
  import org.opendaylight.controller.cluster.datastore.modification.ModificationPayload;
@@@ -83,7 -82,6 +83,7 @@@ import org.opendaylight.controller.md.s
  import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
  import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
  import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreFactory;
 +import org.opendaylight.controller.protobuff.messages.cohort3pc.ThreePhaseCommitCohortMessages;
  import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
  import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
  import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
@@@ -595,7 -593,7 +595,7 @@@ public class ShardTest extends Abstract
              assertTrue("Missing leaf " + TestModel.ID_QNAME.getLocalName(), idLeaf.isPresent());
              assertEquals(TestModel.ID_QNAME.getLocalName() + " value", 1, idLeaf.get().getValue());
  
-             verifyLastLogIndex(shard, 2);
+             verifyLastApplied(shard, 2);
  
              shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
          }};
          }};
      }
  
 +    @Test
 +    public void testCommitWhenTransactionHasNoModifications(){
 +        // Note that persistence is enabled which would normally result in the entry getting written to the journal
 +        // but here that need not happen
 +        new ShardTestKit(getSystem()) {
 +            {
 +                final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
 +                        newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
 +                        "testCommitWhenTransactionHasNoModifications");
 +
 +                waitUntilLeader(shard);
 +
 +                String transactionID = "tx1";
 +                MutableCompositeModification modification = new MutableCompositeModification();
 +                DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
 +                doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
 +                doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).preCommit();
 +                doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).commit();
 +
 +                FiniteDuration duration = duration("5 seconds");
 +
 +                // Simulate the ForwardedReadyTransaction messages that would be sent
 +                // by the ShardTransaction.
 +
 +                shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
 +                        cohort, modification, true), getRef());
 +                expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
 +
 +                // Send the CanCommitTransaction message.
 +
 +                shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
 +                CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
 +                        expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
 +                assertEquals("Can commit", true, canCommitReply.getCanCommit());
 +
 +                shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
 +                expectMsgClass(duration, ThreePhaseCommitCohortMessages.CommitTransactionReply.class);
 +
 +                InOrder inOrder = inOrder(cohort);
 +                inOrder.verify(cohort).canCommit();
 +                inOrder.verify(cohort).preCommit();
 +                inOrder.verify(cohort).commit();
 +
 +                // Use MBean for verification
 +                // Committed transaction count should increase as usual
 +                assertEquals(1,shard.underlyingActor().getShardMBean().getCommittedTransactionsCount());
 +
 +                // Commit index should not advance because this does not go into the journal
 +                assertEquals(-1, shard.underlyingActor().getShardMBean().getCommitIndex());
 +
 +                shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
 +
 +            }
 +        };
 +    }
 +
 +    @Test
 +    public void testCommitWhenTransactionHasModifications(){
 +        new ShardTestKit(getSystem()) {
 +            {
 +                final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
 +                        newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
 +                        "testCommitWhenTransactionHasModifications");
 +
 +                waitUntilLeader(shard);
 +
 +                String transactionID = "tx1";
 +                MutableCompositeModification modification = new MutableCompositeModification();
 +                modification.addModification(new DeleteModification(YangInstanceIdentifier.builder().build()));
 +                DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
 +                doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
 +                doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).preCommit();
 +                doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).commit();
 +
 +                FiniteDuration duration = duration("5 seconds");
 +
 +                // Simulate the ForwardedReadyTransaction messages that would be sent
 +                // by the ShardTransaction.
 +
 +                shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
 +                        cohort, modification, true), getRef());
 +                expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
 +
 +                // Send the CanCommitTransaction message.
 +
 +                shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
 +                CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
 +                        expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
 +                assertEquals("Can commit", true, canCommitReply.getCanCommit());
 +
 +                shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
 +                expectMsgClass(duration, ThreePhaseCommitCohortMessages.CommitTransactionReply.class);
 +
 +                InOrder inOrder = inOrder(cohort);
 +                inOrder.verify(cohort).canCommit();
 +                inOrder.verify(cohort).preCommit();
 +                inOrder.verify(cohort).commit();
 +
 +                // Use MBean for verification
 +                // Committed transaction count should increase as usual
 +                assertEquals(1,shard.underlyingActor().getShardMBean().getCommittedTransactionsCount());
 +
 +                // Commit index should advance as we do not have an empty modification
 +                assertEquals(0, shard.underlyingActor().getShardMBean().getCommitIndex());
 +
 +                shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
 +
 +            }
 +        };
 +    }
 +
      @Test
      public void testCommitPhaseFailure() throws Throwable {
          new ShardTestKit(getSystem()) {{