Merge "Do not use ActorSystem.actorFor as it is deprecated"
authorTom Pantelis <tpanteli@brocade.com>
Wed, 25 Mar 2015 18:43:11 +0000 (18:43 +0000)
committerGerrit Code Review <gerrit@opendaylight.org>
Wed, 25 Mar 2015 18:43:12 +0000 (18:43 +0000)
1  2 
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java

@@@ -43,8 -43,6 +43,8 @@@ import org.opendaylight.controller.clus
  import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
  import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
  import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
 +import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
 +import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
  import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
  import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionChain;
  import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
@@@ -68,6 -66,7 +68,6 @@@ import org.opendaylight.controller.clus
  import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
  import org.opendaylight.controller.cluster.notifications.RoleChangeNotifier;
  import org.opendaylight.controller.cluster.raft.RaftActor;
 -import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
  import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
  import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
  import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload;
@@@ -78,6 -77,8 +78,6 @@@ import org.opendaylight.controller.md.s
  import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreFactory;
  import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
  import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction;
 -import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
 -import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionFactory;
  import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
  import org.opendaylight.yangtools.concepts.ListenerRegistration;
  import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
@@@ -105,7 -106,7 +105,7 @@@ public class Shard extends RaftActor 
      private final InMemoryDOMDataStore store;
  
      /// The name of this shard
 -    private final ShardIdentifier name;
 +    private final String name;
  
      private final ShardStats shardMBean;
  
      private ShardRecoveryCoordinator recoveryCoordinator;
      private List<Object> currentLogRecoveryBatch;
  
 -    private final Map<String, DOMStoreTransactionChain> transactionChains = new HashMap<>();
 +    private final DOMTransactionFactory transactionFactory;
  
      private final String txnDispatcherPath;
  
 -    protected Shard(final ShardIdentifier name, final Map<ShardIdentifier, String> peerAddresses,
 +    protected Shard(final ShardIdentifier name, final Map<String, String> peerAddresses,
              final DatastoreContext datastoreContext, final SchemaContext schemaContext) {
 -        super(name.toString(), mapPeerAddresses(peerAddresses),
 -                Optional.of(datastoreContext.getShardRaftConfig()));
 +        super(name.toString(), new HashMap<>(peerAddresses), Optional.of(datastoreContext.getShardRaftConfig()));
  
 -        this.name = name;
 +        this.name = name.toString();
          this.datastoreContext = datastoreContext;
          this.schemaContext = schemaContext;
          this.dataPersistenceProvider = (datastoreContext.isPersistent())
          shardMBean = ShardMBeanFactory.getShardStatsMBean(name.toString(),
                  datastoreContext.getDataStoreMXBeanType());
          shardMBean.setNotificationManager(store.getDataChangeListenerNotificationManager());
 +        shardMBean.setShardActor(getSelf());
  
          if (isMetricsCaptureEnabled()) {
              getContext().become(new MeteringBehavior(this));
          }
  
 -        commitCoordinator = new ShardCommitCoordinator(TimeUnit.SECONDS.convert(1, TimeUnit.MINUTES),
 -                datastoreContext.getShardTransactionCommitQueueCapacity(), LOG, name.toString());
 +        transactionFactory = new DOMTransactionFactory(store, shardMBean, LOG, this.name);
 +
 +        commitCoordinator = new ShardCommitCoordinator(transactionFactory,
 +                TimeUnit.SECONDS.convert(5, TimeUnit.MINUTES),
 +                datastoreContext.getShardTransactionCommitQueueCapacity(), self(), LOG, this.name);
  
          setTransactionCommitTimeout();
  
                  datastoreContext.getShardTransactionCommitTimeoutInSeconds(), TimeUnit.SECONDS);
      }
  
 -    private static Map<String, String> mapPeerAddresses(
 -        final Map<ShardIdentifier, String> peerAddresses) {
 -        Map<String, String> map = new HashMap<>();
 -
 -        for (Map.Entry<ShardIdentifier, String> entry : peerAddresses
 -            .entrySet()) {
 -            map.put(entry.getKey().toString(), entry.getValue());
 -        }
 -
 -        return map;
 -    }
 -
      public static Props props(final ShardIdentifier name,
 -        final Map<ShardIdentifier, String> peerAddresses,
 +        final Map<String, String> peerAddresses,
          final DatastoreContext datastoreContext, final SchemaContext schemaContext) {
          Preconditions.checkNotNull(name, "name should not be null");
          Preconditions.checkNotNull(peerAddresses, "peerAddresses should not be null");
          try {
              if (CreateTransaction.SERIALIZABLE_CLASS.isInstance(message)) {
                  handleCreateTransaction(message);
 +            } else if (BatchedModifications.class.isInstance(message)) {
 +                handleBatchedModifications((BatchedModifications)message);
              } else if (message instanceof ForwardedReadyTransaction) {
                  handleForwardedReadyTransaction((ForwardedReadyTransaction) message);
              } else if (CanCommitTransaction.SERIALIZABLE_CLASS.isInstance(message)) {
              // currently uses a same thread executor anyway.
              cohortEntry.getCohort().preCommit().get();
  
 -            // If we do not have any followers and we are not using persistence we can
 -            // apply modification to the state immediately
 -            if(!hasFollowers() && !persistence().isRecoveryApplicable()){
 +            // If we do not have any followers and we are not using persistence
 +            // or if cohortEntry has no modifications
 +            // we can apply modification to the state immediately
 +            if((!hasFollowers() && !persistence().isRecoveryApplicable()) || (!cohortEntry.hasModifications())){
                  applyModificationToState(getSender(), transactionID, cohortEntry.getModification());
              } else {
                  Shard.this.persistData(getSender(), transactionID,
          commitCoordinator.handleCanCommit(canCommit, getSender(), self());
      }
  
 +    private void handleBatchedModifications(BatchedModifications batched) {
 +        // This message is sent to prepare the modificationsa transaction directly on the Shard as an
 +        // optimization to avoid the extra overhead of a separate ShardTransaction actor. On the last
 +        // BatchedModifications message, the caller sets the ready flag in the message indicating
 +        // modifications are complete. The reply contains the cohort actor path (this actor) for the caller
 +        // to initiate the 3-phase commit. This also avoids the overhead of sending an additional
 +        // ReadyTransaction message.
 +
 +        // If we're not the leader then forward to the leader. This is a safety measure - we shouldn't
 +        // normally get here if we're not the leader as the front-end (TransactionProxy) should determine
 +        // the primary/leader shard. However with timing and caching on the front-end, there's a small
 +        // window where it could have a stale leader during leadership transitions.
 +        //
 +        if(isLeader()) {
 +            try {
 +                BatchedModificationsReply reply = commitCoordinator.handleTransactionModifications(batched);
 +                sender().tell(reply, self());
 +            } catch (Exception e) {
 +                LOG.error("{}: Error handling BatchedModifications for Tx {}", persistenceId(),
 +                        batched.getTransactionID(), e);
 +                getSender().tell(new akka.actor.Status.Failure(e), getSelf());
 +            }
 +        } else {
 +            ActorSelection leader = getLeader();
 +            if(leader != null) {
 +                // TODO: what if this is not the first batch and leadership changed in between batched messages?
 +                // We could check if the commitCoordinator already has a cached entry and forward all the previous
 +                // batched modifications.
 +                LOG.debug("{}: Forwarding BatchedModifications to leader {}", persistenceId(), leader);
 +                leader.forward(batched, getContext());
 +            } else {
 +                // TODO: rather than throwing an immediate exception, we could schedule a timer to try again to make
 +                // it more resilient in case we're in the process of electing a new leader.
 +                getSender().tell(new akka.actor.Status.Failure(new NoShardLeaderException(String.format(
 +                    "Could not find the leader for shard %s. This typically happens" +
 +                    " when the system is coming up or recovering and a leader is being elected. Try again" +
 +                    " later.", persistenceId()))), getSelf());
 +            }
 +        }
 +    }
 +
      private void handleForwardedReadyTransaction(ForwardedReadyTransaction ready) {
          LOG.debug("{}: Readying transaction {}, client version {}", persistenceId(),
                  ready.getTransactionID(), ready.getTxnClientVersion());
          // commitCoordinator in preparation for the subsequent three phase commit initiated by
          // the front-end.
          commitCoordinator.transactionReady(ready.getTransactionID(), ready.getCohort(),
 -                ready.getModification());
 +                (MutableCompositeModification) ready.getModification());
  
          // Return our actor path as we'll handle the three phase commit, except if the Tx client
          // version < 1 (Helium-1 version). This means the Tx was initiated by a base Helium version
      }
  
      private void closeTransactionChain(final CloseTransactionChain closeTransactionChain) {
 -        DOMStoreTransactionChain chain =
 -            transactionChains.remove(closeTransactionChain.getTransactionChainId());
 -
 -        if(chain != null) {
 -            chain.close();
 -        }
 +        transactionFactory.closeTransactionChain(closeTransactionChain.getTransactionChainId());
      }
  
      private ActorRef createTypedTransactionActor(int transactionType,
              ShardTransactionIdentifier transactionId, String transactionChainId,
              short clientVersion ) {
  
 -        DOMStoreTransactionFactory factory = store;
 -
 -        if(!transactionChainId.isEmpty()) {
 -            factory = transactionChains.get(transactionChainId);
 -            if(factory == null){
 -                DOMStoreTransactionChain transactionChain = store.createTransactionChain();
 -                transactionChains.put(transactionChainId, transactionChain);
 -                factory = transactionChain;
 -            }
 -        }
 -
 -        if(this.schemaContext == null) {
 -            throw new IllegalStateException("SchemaContext is not set");
 -        }
 -
 -        if (transactionType == TransactionProxy.TransactionType.WRITE_ONLY.ordinal()) {
 -
 -            shardMBean.incrementWriteOnlyTransactionCount();
 +        DOMStoreTransaction transaction = transactionFactory.newTransaction(
 +                TransactionProxy.TransactionType.fromInt(transactionType), transactionId.toString(),
 +                transactionChainId);
  
 -            return createShardTransaction(factory.newWriteOnlyTransaction(), transactionId, clientVersion);
 -
 -        } else if (transactionType == TransactionProxy.TransactionType.READ_WRITE.ordinal()) {
 -
 -            shardMBean.incrementReadWriteTransactionCount();
 -
 -            return createShardTransaction(factory.newReadWriteTransaction(), transactionId, clientVersion);
 -
 -        } else if (transactionType == TransactionProxy.TransactionType.READ_ONLY.ordinal()) {
 -
 -            shardMBean.incrementReadOnlyTransactionCount();
 -
 -            return createShardTransaction(factory.newReadOnlyTransaction(), transactionId, clientVersion);
 -
 -        } else {
 -            throw new IllegalArgumentException(
 -                "Shard="+name + ":CreateTransaction message has unidentified transaction type="
 -                    + transactionType);
 -        }
 +        return createShardTransaction(transaction, transactionId, clientVersion);
      }
  
      private ActorRef createShardTransaction(DOMStoreTransaction transaction, ShardTransactionIdentifier transactionId,
          LOG.debug("{}: registerDataChangeListener sending reply, listenerRegistrationPath = {} ",
                  persistenceId(), listenerRegistration.path());
  
-         getSender().tell(new RegisterChangeListenerReply(listenerRegistration.path()), getSelf());
+         getSender().tell(new RegisterChangeListenerReply(listenerRegistration), getSelf());
      }
  
      private ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier,
  
          recoveryCoordinator = null;
          currentLogRecoveryBatch = null;
 -        updateJournalStats();
  
          //notify shard manager
          getContext().parent().tell(new ActorInitialized(), getSelf());
                      persistenceId(), data, data.getClass().getClassLoader(),
                      CompositeModificationPayload.class.getClassLoader());
          }
 -
 -        updateJournalStats();
 -
      }
  
      private void applyModificationToState(ActorRef clientActor, String identifier, Object modification) {
          }
      }
  
 -    private void updateJournalStats() {
 -        ReplicatedLogEntry lastLogEntry = getLastLogEntry();
 -
 -        if (lastLogEntry != null) {
 -            shardMBean.setLastLogIndex(lastLogEntry.getIndex());
 -            shardMBean.setLastLogTerm(lastLogEntry.getTerm());
 -        }
 -
 -        shardMBean.setCommitIndex(getCommitIndex());
 -        shardMBean.setLastApplied(getLastApplied());
 -        shardMBean.setInMemoryJournalDataSize(getRaftActorContext().getReplicatedLog().dataSize());
 -    }
 -
      @Override
      protected void createSnapshot() {
          // Create a transaction actor. We are really going to treat the transaction as a worker
              delayedListenerRegistrations.clear();
          }
  
 -        shardMBean.setRaftState(getRaftState().name());
 -        shardMBean.setCurrentTerm(getCurrentTerm());
 -
          // If this actor is no longer the leader close all the transaction chains
 -        if(!isLeader){
 -            for(Map.Entry<String, DOMStoreTransactionChain> entry : transactionChains.entrySet()){
 -                if(LOG.isDebugEnabled()) {
 -                    LOG.debug(
 -                        "{}: onStateChanged: Closing transaction chain {} because shard {} is no longer the leader",
 -                        persistenceId(), entry.getKey(), getId());
 -                }
 -                entry.getValue().close();
 +        if(!isLeader) {
 +            if(LOG.isDebugEnabled()) {
 +                LOG.debug(
 +                    "{}: onStateChanged: Closing all transaction chains because shard {} is no longer the leader",
 +                    persistenceId(), getId());
              }
  
 -            transactionChains.clear();
 +            transactionFactory.closeAllTransactionChains();
          }
      }
  
          return dataPersistenceProvider;
      }
  
 -    @Override protected void onLeaderChanged(final String oldLeader, final String newLeader) {
 -        shardMBean.setLeader(newLeader);
 -    }
 -
      @Override public String persistenceId() {
 -        return this.name.toString();
 +        return this.name;
      }
  
      @VisibleForTesting
          return dataPersistenceProvider;
      }
  
 +    @VisibleForTesting
 +    ShardCommitCoordinator getCommitCoordinator() {
 +        return commitCoordinator;
 +    }
 +
 +
      private static class ShardCreator implements Creator<Shard> {
  
          private static final long serialVersionUID = 1L;
  
          final ShardIdentifier name;
 -        final Map<ShardIdentifier, String> peerAddresses;
 +        final Map<String, String> peerAddresses;
          final DatastoreContext datastoreContext;
          final SchemaContext schemaContext;
  
 -        ShardCreator(final ShardIdentifier name, final Map<ShardIdentifier, String> peerAddresses,
 +        ShardCreator(final ShardIdentifier name, final Map<String, String> peerAddresses,
                  final DatastoreContext datastoreContext, final SchemaContext schemaContext) {
              this.name = name;
              this.peerAddresses = peerAddresses;
@@@ -10,7 -10,6 +10,7 @@@ import static org.mockito.Mockito.inOrd
  import static org.mockito.Mockito.mock;
  import static org.opendaylight.controller.cluster.datastore.DataStoreVersions.CURRENT_VERSION;
  import akka.actor.ActorRef;
 +import akka.actor.ActorSelection;
  import akka.actor.PoisonPill;
  import akka.actor.Props;
  import akka.dispatch.Dispatchers;
@@@ -36,15 -35,13 +36,15 @@@ import java.util.Set
  import java.util.concurrent.CountDownLatch;
  import java.util.concurrent.ExecutionException;
  import java.util.concurrent.TimeUnit;
 +import java.util.concurrent.atomic.AtomicBoolean;
  import java.util.concurrent.atomic.AtomicReference;
  import org.junit.Test;
  import org.mockito.InOrder;
  import org.opendaylight.controller.cluster.DataPersistenceProvider;
 -import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
  import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
  import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
 +import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
 +import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
  import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
  import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
  import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
@@@ -52,13 -49,10 +52,13 @@@ import org.opendaylight.controller.clus
  import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
  import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
  import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
 +import org.opendaylight.controller.cluster.datastore.messages.ReadData;
 +import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
  import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
  import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
  import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
  import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
 +import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
  import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
  import org.opendaylight.controller.cluster.datastore.modification.Modification;
  import org.opendaylight.controller.cluster.datastore.modification.ModificationPayload;
@@@ -88,13 -82,11 +88,13 @@@ import org.opendaylight.controller.md.s
  import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
  import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
  import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreFactory;
 +import org.opendaylight.controller.protobuff.messages.cohort3pc.ThreePhaseCommitCohortMessages;
  import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
  import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
  import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
  import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
  import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
 +import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
  import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
  import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
  import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
@@@ -104,7 -96,6 +104,7 @@@ import scala.concurrent.Future
  import scala.concurrent.duration.FiniteDuration;
  
  public class ShardTest extends AbstractShardTest {
 +
      @Test
      public void testRegisterChangeListener() throws Exception {
          new ShardTestKit(getSystem()) {{
                      "testRegisterChangeListener-DataChangeListener");
  
              shard.tell(new RegisterChangeListener(TestModel.TEST_PATH,
-                     dclActor.path(), AsyncDataBroker.DataChangeScope.BASE), getRef());
+                     dclActor, AsyncDataBroker.DataChangeScope.BASE), getRef());
  
              RegisterChangeListenerReply reply = expectMsgClass(duration("3 seconds"),
                      RegisterChangeListenerReply.class);
  
                  @Override
                  public Shard create() throws Exception {
 -                    return new Shard(shardID, Collections.<ShardIdentifier,String>emptyMap(),
 +                    return new Shard(shardID, Collections.<String,String>emptyMap(),
                              newDatastoreContext(), SCHEMA_CONTEXT) {
                          @Override
                          public void onReceiveCommand(final Object message) throws Exception {
                      onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
  
              // Now send the RegisterChangeListener and wait for the reply.
-             shard.tell(new RegisterChangeListener(path, dclActor.path(),
+             shard.tell(new RegisterChangeListener(path, dclActor,
                      AsyncDataBroker.DataChangeScope.SUBTREE), getRef());
  
              RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
              final CountDownLatch recoveryComplete = new CountDownLatch(1);
              class TestShard extends Shard {
                  TestShard() {
 -                    super(shardID, Collections.<ShardIdentifier, String>singletonMap(shardID, null),
 +                    super(shardID, Collections.<String, String>singletonMap(shardID.toString(), null),
                              newDatastoreContext(), SCHEMA_CONTEXT);
                  }
  
                      Uninterruptibles.awaitUninterruptibly(recoveryComplete, 5, TimeUnit.SECONDS));
  
              String address = "akka://foobar";
 -            shard.underlyingActor().onReceiveCommand(new PeerAddressResolved(shardID, address));
 +            shard.underlyingActor().onReceiveCommand(new PeerAddressResolved(shardID.toString(), address));
  
              assertEquals("getPeerAddresses", address,
                      ((TestShard)shard.underlyingActor()).getPeerAddresses().get(shardID.toString()));
  
              waitUntilLeader(shard);
  
 -            // Setup 3 simulated transactions with mock cohorts backed by real cohorts.
 -
 -            InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
 +            final String transactionID1 = "tx1";
 +            final String transactionID2 = "tx2";
 +            final String transactionID3 = "tx3";
  
 -            String transactionID1 = "tx1";
 -            MutableCompositeModification modification1 = new MutableCompositeModification();
 -            DOMStoreThreePhaseCommitCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
 -                    TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
 -
 -            String transactionID2 = "tx2";
 -            MutableCompositeModification modification2 = new MutableCompositeModification();
 -            DOMStoreThreePhaseCommitCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
 -                    TestModel.OUTER_LIST_PATH,
 -                    ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
 -                    modification2);
 +            final AtomicReference<DOMStoreThreePhaseCommitCohort> mockCohort1 = new AtomicReference<>();
 +            final AtomicReference<DOMStoreThreePhaseCommitCohort> mockCohort2 = new AtomicReference<>();
 +            final AtomicReference<DOMStoreThreePhaseCommitCohort> mockCohort3 = new AtomicReference<>();
 +            ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() {
 +                @Override
 +                public DOMStoreThreePhaseCommitCohort decorate(String transactionID, DOMStoreThreePhaseCommitCohort actual) {
 +                    if(transactionID.equals(transactionID1)) {
 +                        mockCohort1.set(createDelegatingMockCohort("cohort1", actual));
 +                        return mockCohort1.get();
 +                    } else if(transactionID.equals(transactionID2)) {
 +                        mockCohort2.set(createDelegatingMockCohort("cohort2", actual));
 +                        return mockCohort2.get();
 +                    } else {
 +                        mockCohort3.set(createDelegatingMockCohort("cohort3", actual));
 +                        return mockCohort3.get();
 +                    }
 +                }
 +            };
  
 -            String transactionID3 = "tx3";
 -            MutableCompositeModification modification3 = new MutableCompositeModification();
 -            DOMStoreThreePhaseCommitCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
 -                    YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
 -                        .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
 -                    ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1),
 -                    modification3);
 +            shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator);
  
              long timeoutSec = 5;
              final FiniteDuration duration = FiniteDuration.create(timeoutSec, TimeUnit.SECONDS);
              final Timeout timeout = new Timeout(duration);
  
 -            // Simulate the ForwardedReadyTransaction message for the first Tx that would be sent
 -            // by the ShardTransaction.
 +            // Send a BatchedModifications message for the first transaction.
  
 -            shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
 -                    cohort1, modification1, true), getRef());
 -            ReadyTransactionReply readyReply = ReadyTransactionReply.fromSerializable(
 -                    expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS));
 -            assertEquals("Cohort path", shard.path().toString(), readyReply.getCohortPath());
 +            shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH,
 +                    ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
 +            BatchedModificationsReply batchedReply = expectMsgClass(duration, BatchedModificationsReply.class);
 +            assertEquals("getCohortPath", shard.path().toString(), batchedReply.getCohortPath());
 +            assertEquals("getNumBatched", 1, batchedReply.getNumBatched());
  
              // Send the CanCommitTransaction message for the first Tx.
  
                      expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
              assertEquals("Can commit", true, canCommitReply.getCanCommit());
  
 -            // Send the ForwardedReadyTransaction for the next 2 Tx's.
 +            // Send BatchedModifications for the next 2 Tx's.
  
 -            shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
 -                    cohort2, modification2, true), getRef());
 -            expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
 +            shard.tell(newBatchedModifications(transactionID2, TestModel.OUTER_LIST_PATH,
 +                    ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), true), getRef());
 +            expectMsgClass(duration, BatchedModificationsReply.class);
  
 -            shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
 -                    cohort3, modification3, true), getRef());
 -            expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
 +            shard.tell(newBatchedModifications(transactionID3, YangInstanceIdentifier.builder(
 +                    TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
 +                    ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true), getRef());
 +            expectMsgClass(duration, BatchedModificationsReply.class);
  
              // Send the CanCommitTransaction message for the next 2 Tx's. These should get queued and
              // processed after the first Tx completes.
  
              assertEquals("Commits complete", true, done);
  
 -            InOrder inOrder = inOrder(cohort1, cohort2, cohort3);
 -            inOrder.verify(cohort1).canCommit();
 -            inOrder.verify(cohort1).preCommit();
 -            inOrder.verify(cohort1).commit();
 -            inOrder.verify(cohort2).canCommit();
 -            inOrder.verify(cohort2).preCommit();
 -            inOrder.verify(cohort2).commit();
 -            inOrder.verify(cohort3).canCommit();
 -            inOrder.verify(cohort3).preCommit();
 -            inOrder.verify(cohort3).commit();
 +            InOrder inOrder = inOrder(mockCohort1.get(), mockCohort2.get(), mockCohort3.get());
 +            inOrder.verify(mockCohort1.get()).canCommit();
 +            inOrder.verify(mockCohort1.get()).preCommit();
 +            inOrder.verify(mockCohort1.get()).commit();
 +            inOrder.verify(mockCohort2.get()).canCommit();
 +            inOrder.verify(mockCohort2.get()).preCommit();
 +            inOrder.verify(mockCohort2.get()).commit();
 +            inOrder.verify(mockCohort3.get()).canCommit();
 +            inOrder.verify(mockCohort3.get()).preCommit();
 +            inOrder.verify(mockCohort3.get()).commit();
  
              // Verify data in the data store.
  
              assertTrue("Missing leaf " + TestModel.ID_QNAME.getLocalName(), idLeaf.isPresent());
              assertEquals(TestModel.ID_QNAME.getLocalName() + " value", 1, idLeaf.get().getValue());
  
 -            verifyLastLogIndex(shard, 2);
 +            verifyLastApplied(shard, 2);
  
              shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
          }};
      }
  
 +    private BatchedModifications newBatchedModifications(String transactionID, YangInstanceIdentifier path,
 +            NormalizedNode<?, ?> data, boolean ready) {
 +        return newBatchedModifications(transactionID, null, path, data, ready);
 +    }
 +
 +    private BatchedModifications newBatchedModifications(String transactionID, String transactionChainID,
 +            YangInstanceIdentifier path, NormalizedNode<?, ?> data, boolean ready) {
 +        BatchedModifications batched = new BatchedModifications(transactionID, CURRENT_VERSION, transactionChainID);
 +        batched.addModification(new WriteModification(path, data));
 +        batched.setReady(ready);
 +        return batched;
 +    }
 +
 +    @SuppressWarnings("unchecked")
      @Test
 -    public void testCommitWithPersistenceDisabled() throws Throwable {
 -        dataStoreContextBuilder.persistent(false);
 +    public void testMultipleBatchedModifications() throws Throwable {
          new ShardTestKit(getSystem()) {{
              final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
                      newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
 -                    "testCommitPhaseFailure");
 +                    "testMultipleBatchedModifications");
  
              waitUntilLeader(shard);
  
 -            InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
 +            final String transactionID = "tx";
 +            FiniteDuration duration = duration("5 seconds");
 +
 +            final AtomicReference<DOMStoreThreePhaseCommitCohort> mockCohort = new AtomicReference<>();
 +            ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() {
 +                @Override
 +                public DOMStoreThreePhaseCommitCohort decorate(String txID, DOMStoreThreePhaseCommitCohort actual) {
 +                    if(mockCohort.get() == null) {
 +                        mockCohort.set(createDelegatingMockCohort("cohort", actual));
 +                    }
  
 -            // Setup a simulated transactions with a mock cohort.
 +                    return mockCohort.get();
 +                }
 +            };
  
 -            String transactionID = "tx";
 -            MutableCompositeModification modification = new MutableCompositeModification();
 -            NormalizedNode<?, ?> containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
 -            DOMStoreThreePhaseCommitCohort cohort = setupMockWriteTransaction("cohort", dataStore,
 -                    TestModel.TEST_PATH, containerNode, modification);
 +            shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator);
  
 -            FiniteDuration duration = duration("5 seconds");
 +            // Send a BatchedModifications to start a transaction.
 +
 +            shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH,
 +                    ImmutableNodes.containerNode(TestModel.TEST_QNAME), false), getRef());
 +            expectMsgClass(duration, BatchedModificationsReply.class);
  
 -            // Simulate the ForwardedReadyTransaction messages that would be sent
 -            // by the ShardTransaction.
 +            // Send a couple more BatchedModifications.
  
 -            shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
 -                    cohort, modification, true), getRef());
 -            expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
 +            shard.tell(newBatchedModifications(transactionID, TestModel.OUTER_LIST_PATH,
 +                    ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), false), getRef());
 +            expectMsgClass(duration, BatchedModificationsReply.class);
 +
 +            shard.tell(newBatchedModifications(transactionID, YangInstanceIdentifier.builder(
 +                    TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
 +                    ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true), getRef());
 +            expectMsgClass(duration, BatchedModificationsReply.class);
  
              // Send the CanCommitTransaction message.
  
              shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
              expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
  
 -            InOrder inOrder = inOrder(cohort);
 -            inOrder.verify(cohort).canCommit();
 -            inOrder.verify(cohort).preCommit();
 -            inOrder.verify(cohort).commit();
 +            InOrder inOrder = inOrder(mockCohort.get());
 +            inOrder.verify(mockCohort.get()).canCommit();
 +            inOrder.verify(mockCohort.get()).preCommit();
 +            inOrder.verify(mockCohort.get()).commit();
 +
 +            // Verify data in the data store.
 +
 +            NormalizedNode<?, ?> outerList = readStore(shard, TestModel.OUTER_LIST_PATH);
 +            assertNotNull(TestModel.OUTER_LIST_QNAME.getLocalName() + " not found", outerList);
 +            assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " value is not Iterable",
 +                    outerList.getValue() instanceof Iterable);
 +            Object entry = ((Iterable<Object>)outerList.getValue()).iterator().next();
 +            assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " entry is not MapEntryNode",
 +                       entry instanceof MapEntryNode);
 +            MapEntryNode mapEntry = (MapEntryNode)entry;
 +            Optional<DataContainerChild<? extends PathArgument, ?>> idLeaf =
 +                    mapEntry.getChild(new YangInstanceIdentifier.NodeIdentifier(TestModel.ID_QNAME));
 +            assertTrue("Missing leaf " + TestModel.ID_QNAME.getLocalName(), idLeaf.isPresent());
 +            assertEquals(TestModel.ID_QNAME.getLocalName() + " value", 1, idLeaf.get().getValue());
 +
 +            shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
 +        }};
 +    }
 +
 +    @Test
 +    public void testBatchedModificationsOnTransactionChain() throws Throwable {
 +        new ShardTestKit(getSystem()) {{
 +            final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
 +                    newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
 +                    "testBatchedModificationsOnTransactionChain");
 +
 +            waitUntilLeader(shard);
 +
 +            String transactionChainID = "txChain";
 +            String transactionID1 = "tx1";
 +            String transactionID2 = "tx2";
 +
 +            FiniteDuration duration = duration("5 seconds");
 +
 +            // Send a BatchedModifications to start a chained write transaction and ready it.
 +
 +            ContainerNode containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
 +            YangInstanceIdentifier path = TestModel.TEST_PATH;
 +            shard.tell(newBatchedModifications(transactionID1, transactionChainID, path,
 +                    containerNode, true), getRef());
 +            expectMsgClass(duration, BatchedModificationsReply.class);
 +
 +            // Create a read Tx on the same chain.
 +
 +            shard.tell(new CreateTransaction(transactionID2, TransactionProxy.TransactionType.READ_ONLY.ordinal() ,
 +                    transactionChainID).toSerializable(), getRef());
 +
 +            CreateTransactionReply createReply = expectMsgClass(duration("3 seconds"), CreateTransactionReply.class);
 +
 +            getSystem().actorSelection(createReply.getTransactionActorPath()).tell(new ReadData(path), getRef());
 +            ReadDataReply readReply = expectMsgClass(duration("3 seconds"), ReadDataReply.class);
 +            assertEquals("Read node", containerNode, readReply.getNormalizedNode());
 +
 +            // Commit the write transaction.
 +
 +            shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
 +            CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
 +                    expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
 +            assertEquals("Can commit", true, canCommitReply.getCanCommit());
 +
 +            shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
 +            expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
 +
 +            // Verify data in the data store.
 +
 +            NormalizedNode<?, ?> actualNode = readStore(shard, path);
 +            assertEquals("Stored node", containerNode, actualNode);
 +
 +            shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
 +        }};
 +    }
 +
 +    @Test
 +    public void testOnBatchedModificationsWhenNotLeader() {
 +        final AtomicBoolean overrideLeaderCalls = new AtomicBoolean();
 +        new ShardTestKit(getSystem()) {{
 +            Creator<Shard> creator = new Creator<Shard>() {
 +                @Override
 +                public Shard create() throws Exception {
 +                    return new Shard(shardID, Collections.<String,String>emptyMap(),
 +                            newDatastoreContext(), SCHEMA_CONTEXT) {
 +                        @Override
 +                        protected boolean isLeader() {
 +                            return overrideLeaderCalls.get() ? false : super.isLeader();
 +                        }
 +
 +                        @Override
 +                        protected ActorSelection getLeader() {
 +                            return overrideLeaderCalls.get() ? getSystem().actorSelection(getRef().path()) :
 +                                super.getLeader();
 +                        }
 +                    };
 +                }
 +            };
 +
 +            TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
 +                    Props.create(new DelegatingShardCreator(creator)), "testOnBatchedModificationsWhenNotLeader");
 +
 +            waitUntilLeader(shard);
 +
 +            overrideLeaderCalls.set(true);
 +
 +            BatchedModifications batched = new BatchedModifications("tx", DataStoreVersions.CURRENT_VERSION, "");
 +
 +            shard.tell(batched, ActorRef.noSender());
 +
 +            expectMsgEquals(batched);
 +
 +            shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
 +        }};
 +    }
 +
 +    @Test
 +    public void testCommitWithPersistenceDisabled() throws Throwable {
 +        dataStoreContextBuilder.persistent(false);
 +        new ShardTestKit(getSystem()) {{
 +            final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
 +                    newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
 +                    "testCommitWithPersistenceDisabled");
 +
 +            waitUntilLeader(shard);
 +
 +            String transactionID = "tx";
 +            FiniteDuration duration = duration("5 seconds");
 +
 +            // Send a BatchedModifications to start a transaction.
 +
 +            NormalizedNode<?, ?> containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
 +            shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH, containerNode, true), getRef());
 +            expectMsgClass(duration, BatchedModificationsReply.class);
 +
 +            // Send the CanCommitTransaction message.
 +
 +            shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
 +            CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
 +                    expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
 +            assertEquals("Can commit", true, canCommitReply.getCanCommit());
 +
 +            // Send the CanCommitTransaction message.
 +
 +            shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
 +            expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
  
              NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.TEST_PATH);
              assertEquals(TestModel.TEST_QNAME.getLocalName(), containerNode, actualNode);
          }};
      }
  
 +    @Test
 +    public void testCommitWhenTransactionHasNoModifications(){
 +        // Note that persistence is enabled which would normally result in the entry getting written to the journal
 +        // but here that need not happen
 +        new ShardTestKit(getSystem()) {
 +            {
 +                final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
 +                        newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
 +                        "testCommitWhenTransactionHasNoModifications");
 +
 +                waitUntilLeader(shard);
 +
 +                String transactionID = "tx1";
 +                MutableCompositeModification modification = new MutableCompositeModification();
 +                DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
 +                doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
 +                doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).preCommit();
 +                doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).commit();
 +
 +                FiniteDuration duration = duration("5 seconds");
 +
 +                // Simulate the ForwardedReadyTransaction messages that would be sent
 +                // by the ShardTransaction.
 +
 +                shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
 +                        cohort, modification, true), getRef());
 +                expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
 +
 +                // Send the CanCommitTransaction message.
 +
 +                shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
 +                CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
 +                        expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
 +                assertEquals("Can commit", true, canCommitReply.getCanCommit());
 +
 +                shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
 +                expectMsgClass(duration, ThreePhaseCommitCohortMessages.CommitTransactionReply.class);
 +
 +                InOrder inOrder = inOrder(cohort);
 +                inOrder.verify(cohort).canCommit();
 +                inOrder.verify(cohort).preCommit();
 +                inOrder.verify(cohort).commit();
 +
 +                // Use MBean for verification
 +                // Committed transaction count should increase as usual
 +                assertEquals(1,shard.underlyingActor().getShardMBean().getCommittedTransactionsCount());
 +
 +                // Commit index should not advance because this does not go into the journal
 +                assertEquals(-1, shard.underlyingActor().getShardMBean().getCommitIndex());
 +
 +                shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
 +
 +            }
 +        };
 +    }
 +
 +    @Test
 +    public void testCommitWhenTransactionHasModifications(){
 +        new ShardTestKit(getSystem()) {
 +            {
 +                final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
 +                        newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
 +                        "testCommitWhenTransactionHasModifications");
 +
 +                waitUntilLeader(shard);
 +
 +                String transactionID = "tx1";
 +                MutableCompositeModification modification = new MutableCompositeModification();
 +                modification.addModification(new DeleteModification(YangInstanceIdentifier.builder().build()));
 +                DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
 +                doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
 +                doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).preCommit();
 +                doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).commit();
 +
 +                FiniteDuration duration = duration("5 seconds");
 +
 +                // Simulate the ForwardedReadyTransaction messages that would be sent
 +                // by the ShardTransaction.
 +
 +                shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
 +                        cohort, modification, true), getRef());
 +                expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
 +
 +                // Send the CanCommitTransaction message.
 +
 +                shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
 +                CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
 +                        expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
 +                assertEquals("Can commit", true, canCommitReply.getCanCommit());
 +
 +                shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
 +                expectMsgClass(duration, ThreePhaseCommitCohortMessages.CommitTransactionReply.class);
 +
 +                InOrder inOrder = inOrder(cohort);
 +                inOrder.verify(cohort).canCommit();
 +                inOrder.verify(cohort).preCommit();
 +                inOrder.verify(cohort).commit();
 +
 +                // Use MBean for verification
 +                // Committed transaction count should increase as usual
 +                assertEquals(1,shard.underlyingActor().getShardMBean().getCommittedTransactionsCount());
 +
 +                // Commit index should advance as we do not have an empty modification
 +                assertEquals(0, shard.underlyingActor().getShardMBean().getCommitIndex());
 +
 +                shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
 +
 +            }
 +        };
 +    }
 +
      @Test
      public void testCommitPhaseFailure() throws Throwable {
          new ShardTestKit(getSystem()) {{
  
              waitUntilLeader(shard);
  
 -            // Setup 2 simulated transactions with mock cohorts. The first one fails in the
 -            // commit phase.
 +            // Setup 2 mock cohorts. The first one fails in the commit phase.
  
 -            String transactionID1 = "tx1";
 -            MutableCompositeModification modification1 = new MutableCompositeModification();
 -            DOMStoreThreePhaseCommitCohort cohort1 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
 +            final String transactionID1 = "tx1";
 +            final DOMStoreThreePhaseCommitCohort cohort1 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
              doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
              doReturn(Futures.immediateFuture(null)).when(cohort1).preCommit();
              doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort1).commit();
  
 -            String transactionID2 = "tx2";
 -            MutableCompositeModification modification2 = new MutableCompositeModification();
 -            DOMStoreThreePhaseCommitCohort cohort2 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort2");
 +            final String transactionID2 = "tx2";
 +            final DOMStoreThreePhaseCommitCohort cohort2 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort2");
              doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
  
 +            ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() {
 +                @Override
 +                public DOMStoreThreePhaseCommitCohort decorate(String transactionID,
 +                        DOMStoreThreePhaseCommitCohort actual) {
 +                    return transactionID1.equals(transactionID) ? cohort1 : cohort2;
 +                }
 +            };
 +
 +            shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator);
 +
              FiniteDuration duration = duration("5 seconds");
              final Timeout timeout = new Timeout(duration);
  
 -            // Simulate the ForwardedReadyTransaction messages that would be sent
 -            // by the ShardTransaction.
 +            // Send BatchedModifications to start and ready each transaction.
  
 -            shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
 -                    cohort1, modification1, true), getRef());
 -            expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
 +            shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH,
 +                    ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
 +            expectMsgClass(duration, BatchedModificationsReply.class);
  
 -            shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
 -                    cohort2, modification2, true), getRef());
 -            expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
 +            shard.tell(newBatchedModifications(transactionID2, TestModel.TEST_PATH,
 +                    ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
 +            expectMsgClass(duration, BatchedModificationsReply.class);
  
              // Send the CanCommitTransaction message for the first Tx.
  
              waitUntilLeader(shard);
  
              String transactionID = "tx1";
 -            MutableCompositeModification modification = new MutableCompositeModification();
 -            DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
 +            final DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
              doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
              doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).preCommit();
  
 +            ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() {
 +                @Override
 +                public DOMStoreThreePhaseCommitCohort decorate(String transactionID,
 +                        DOMStoreThreePhaseCommitCohort actual) {
 +                    return cohort;
 +                }
 +            };
 +
 +            shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator);
 +
              FiniteDuration duration = duration("5 seconds");
  
 -            // Simulate the ForwardedReadyTransaction messages that would be sent
 -            // by the ShardTransaction.
 +            // Send BatchedModifications to start and ready a transaction.
  
 -            shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
 -                    cohort, modification, true), getRef());
 -            expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
 +            shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH,
 +                    ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
 +            expectMsgClass(duration, BatchedModificationsReply.class);
  
              // Send the CanCommitTransaction message.
  
              final FiniteDuration duration = duration("5 seconds");
  
              String transactionID = "tx1";
 -            MutableCompositeModification modification = new MutableCompositeModification();
 -            DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
 +            final DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
              doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).canCommit();
  
 -            // Simulate the ForwardedReadyTransaction messages that would be sent
 -            // by the ShardTransaction.
 +            ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() {
 +                @Override
 +                public DOMStoreThreePhaseCommitCohort decorate(String transactionID,
 +                        DOMStoreThreePhaseCommitCohort actual) {
 +                    return cohort;
 +                }
 +            };
 +
 +            shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator);
  
 -            shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
 -                    cohort, modification, true), getRef());
 -            expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
 +            // Send BatchedModifications to start and ready a transaction.
 +
 +            shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH,
 +                    ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
 +            expectMsgClass(duration, BatchedModificationsReply.class);
  
              // Send the CanCommitTransaction message.
  
                  }
              };
  
 -            MutableCompositeModification modification = new MutableCompositeModification();
 -            DOMStoreThreePhaseCommitCohort cohort = setupMockWriteTransaction("cohort1", dataStore,
 -                    TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME),
 -                    modification, preCommit);
 -
 -            shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
 -                    cohort, modification, true), getRef());
 -            expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
 +            shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH,
 +                    ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
 +            expectMsgClass(duration, BatchedModificationsReply.class);
  
              shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
              CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
  
              final FiniteDuration duration = duration("5 seconds");
  
 -            InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
 -
              writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
              writeToStore(shard, TestModel.OUTER_LIST_PATH,
                      ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
  
 -            // Create 1st Tx - will timeout
 +            // Create and ready the 1st Tx - will timeout
  
              String transactionID1 = "tx1";
 -            MutableCompositeModification modification1 = new MutableCompositeModification();
 -            DOMStoreThreePhaseCommitCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
 -                    YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
 -                        .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
 -                    ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1),
 -                    modification1);
 +            shard.tell(newBatchedModifications(transactionID1, YangInstanceIdentifier.builder(
 +                    TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
 +                ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true), getRef());
 +            expectMsgClass(duration, BatchedModificationsReply.class);
  
 -            // Create 2nd Tx
 +            // Create and ready the 2nd Tx
  
 -            String transactionID2 = "tx3";
 -            MutableCompositeModification modification2 = new MutableCompositeModification();
 +            String transactionID2 = "tx2";
              YangInstanceIdentifier listNodePath = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
 -                .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build();
 -            DOMStoreThreePhaseCommitCohort cohort2 = setupMockWriteTransaction("cohort3", dataStore,
 -                    listNodePath,
 -                    ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2),
 -                    modification2);
 -
 -            // Ready the Tx's
 -
 -            shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
 -                    cohort1, modification1, true), getRef());
 -            expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
 -
 -            shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
 -                    cohort2, modification2, true), getRef());
 -            expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
 +                    .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build();
 +            shard.tell(newBatchedModifications(transactionID2, listNodePath,
 +                    ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2), true), getRef());
 +            expectMsgClass(duration, BatchedModificationsReply.class);
  
              // canCommit 1st Tx. We don't send the commit so it should timeout.
  
  
              final FiniteDuration duration = duration("5 seconds");
  
 -            InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
 -
              String transactionID1 = "tx1";
 -            MutableCompositeModification modification1 = new MutableCompositeModification();
 -            DOMStoreThreePhaseCommitCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
 -                    TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
 -
              String transactionID2 = "tx2";
 -            MutableCompositeModification modification2 = new MutableCompositeModification();
 -            DOMStoreThreePhaseCommitCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
 -                    TestModel.OUTER_LIST_PATH,
 -                    ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
 -                    modification2);
 -
              String transactionID3 = "tx3";
 -            MutableCompositeModification modification3 = new MutableCompositeModification();
 -            DOMStoreThreePhaseCommitCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
 -                    TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification3);
  
 -            // Ready the Tx's
 +            // Send a BatchedModifications to start transactions and ready them.
  
 -            shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
 -                    cohort1, modification1, true), getRef());
 -            expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
 +            shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH,
 +                    ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
 +            expectMsgClass(duration, BatchedModificationsReply.class);
  
 -            shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
 -                    cohort2, modification2, true), getRef());
 -            expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
 +            shard.tell(newBatchedModifications(transactionID2,TestModel.OUTER_LIST_PATH,
 +                    ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), true), getRef());
 +            expectMsgClass(duration, BatchedModificationsReply.class);
  
 -            shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
 -                    cohort3, modification3, true), getRef());
 -            expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
 +            shard.tell(newBatchedModifications(transactionID3, TestModel.TEST_PATH,
 +                    ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
 +            expectMsgClass(duration, BatchedModificationsReply.class);
  
              // canCommit 1st Tx.
  
  
              // Setup 2 simulated transactions with mock cohorts. The first one will be aborted.
  
 -            String transactionID1 = "tx1";
 -            MutableCompositeModification modification1 = new MutableCompositeModification();
 -            DOMStoreThreePhaseCommitCohort cohort1 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
 +            final String transactionID1 = "tx1";
 +            final DOMStoreThreePhaseCommitCohort cohort1 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
              doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
              doReturn(Futures.immediateFuture(null)).when(cohort1).abort();
  
 -            String transactionID2 = "tx2";
 -            MutableCompositeModification modification2 = new MutableCompositeModification();
 -            DOMStoreThreePhaseCommitCohort cohort2 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort2");
 +            final String transactionID2 = "tx2";
 +            final DOMStoreThreePhaseCommitCohort cohort2 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort2");
              doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
  
              FiniteDuration duration = duration("5 seconds");
              final Timeout timeout = new Timeout(duration);
  
 -            // Simulate the ForwardedReadyTransaction messages that would be sent
 -            // by the ShardTransaction.
 +            ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() {
 +                @Override
 +                public DOMStoreThreePhaseCommitCohort decorate(String transactionID,
 +                        DOMStoreThreePhaseCommitCohort actual) {
 +                    return transactionID1.equals(transactionID) ? cohort1 : cohort2;
 +                }
 +            };
  
 -            shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
 -                    cohort1, modification1, true), getRef());
 -            expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
 +            shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator);
  
 -            shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
 -                    cohort2, modification2, true), getRef());
 -            expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
 +            // Send BatchedModifications to start and ready each transaction.
 +
 +            shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH,
 +                    ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
 +            expectMsgClass(duration, BatchedModificationsReply.class);
 +
 +            shard.tell(newBatchedModifications(transactionID2, TestModel.TEST_PATH,
 +                    ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
 +            expectMsgClass(duration, BatchedModificationsReply.class);
  
              // Send the CanCommitTransaction message for the first Tx.
  
              Creator<Shard> creator = new Creator<Shard>() {
                  @Override
                  public Shard create() throws Exception {
 -                    return new Shard(shardID, Collections.<ShardIdentifier,String>emptyMap(),
 +                    return new Shard(shardID, Collections.<String,String>emptyMap(),
                              newDatastoreContext(), SCHEMA_CONTEXT) {
  
                          DelegatingPersistentDataProvider delegating;
          final DatastoreContext persistentContext = DatastoreContext.newBuilder().
                  shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(true).build();
  
 -        final Props persistentProps = Shard.props(shardID, Collections.<ShardIdentifier, String>emptyMap(),
 +        final Props persistentProps = Shard.props(shardID, Collections.<String, String>emptyMap(),
                  persistentContext, SCHEMA_CONTEXT);
  
          final DatastoreContext nonPersistentContext = DatastoreContext.newBuilder().
                  shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(false).build();
  
 -        final Props nonPersistentProps = Shard.props(shardID, Collections.<ShardIdentifier, String>emptyMap(),
 +        final Props nonPersistentProps = Shard.props(shardID, Collections.<String, String>emptyMap(),
                  nonPersistentContext, SCHEMA_CONTEXT);
  
          new ShardTestKit(getSystem()) {{
              shard2.tell(PoisonPill.getInstance(), ActorRef.noSender());
  
          }};
 +
      }
  
      @Test