Merge changes I114cbac1,I45c2e7cd
authorMoiz Raja <moraja@cisco.com>
Fri, 10 Apr 2015 15:21:28 +0000 (15:21 +0000)
committerGerrit Code Review <gerrit@opendaylight.org>
Fri, 10 Apr 2015 15:21:29 +0000 (15:21 +0000)
* changes:
  Calculate replicated log data size on recovery
  Refactor snapshot message processing to RaftActorSnapshotMessageSupport

1  2 
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardWriteTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java

@@@ -45,7 -45,6 +45,6 @@@ import org.opendaylight.controller.clus
  import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionChain;
  import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
  import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
- import org.opendaylight.controller.cluster.datastore.messages.CreateSnapshot;
  import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
  import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
  import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
@@@ -59,22 -58,18 +58,18 @@@ import org.opendaylight.controller.clus
  import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
  import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
  import org.opendaylight.controller.cluster.datastore.utils.MessageTracker;
- import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
  import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
  import org.opendaylight.controller.cluster.notifications.RoleChangeNotifier;
  import org.opendaylight.controller.cluster.raft.RaftActor;
  import org.opendaylight.controller.cluster.raft.RaftActorRecoveryCohort;
+ import org.opendaylight.controller.cluster.raft.RaftActorSnapshotCohort;
  import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
  import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
  import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload;
  import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload;
  import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
  import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreFactory;
- import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
- import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction;
  import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
- import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
- import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
  import org.opendaylight.yangtools.yang.model.api.SchemaContext;
  import scala.concurrent.duration.Duration;
  import scala.concurrent.duration.FiniteDuration;
@@@ -87,8 -82,6 +82,6 @@@
   */
  public class Shard extends RaftActor {
  
-     private static final YangInstanceIdentifier DATASTORE_ROOT = YangInstanceIdentifier.builder().build();
      private static final Object TX_COMMIT_TIMEOUT_CHECK_MESSAGE = "txCommitTimeoutCheck";
  
      @VisibleForTesting
  
      private DatastoreContext datastoreContext;
  
-     private SchemaContext schemaContext;
-     private int createSnapshotTransactionCounter;
      private final ShardCommitCoordinator commitCoordinator;
  
      private long transactionCommitTimeout;
      private final ReadyTransactionReply READY_TRANSACTION_REPLY = new ReadyTransactionReply(
              Serialization.serializedActorPath(getSelf()));
  
-     private final DOMTransactionFactory transactionFactory;
+     private final DOMTransactionFactory domTransactionFactory;
+     private final ShardTransactionActorFactory transactionActorFactory;
  
-     private final String txnDispatcherPath;
+     private final ShardSnapshotCohort snapshotCohort;
  
      private final DataTreeChangeListenerSupport treeChangeSupport = new DataTreeChangeListenerSupport(this);
      private final DataChangeListenerSupport changeSupport = new DataChangeListenerSupport(this);
  
          this.name = name.toString();
          this.datastoreContext = datastoreContext;
-         this.schemaContext = schemaContext;
-         this.txnDispatcherPath = new Dispatchers(context().system().dispatchers())
-                 .getDispatcherPath(Dispatchers.DispatcherType.Transaction);
  
          setPersistence(datastoreContext.isPersistent());
  
              getContext().become(new MeteringBehavior(this));
          }
  
-         transactionFactory = new DOMTransactionFactory(store, shardMBean, LOG, this.name);
+         domTransactionFactory = new DOMTransactionFactory(store, shardMBean, LOG, this.name);
  
-         commitCoordinator = new ShardCommitCoordinator(transactionFactory,
+         commitCoordinator = new ShardCommitCoordinator(domTransactionFactory,
                  TimeUnit.SECONDS.convert(5, TimeUnit.MINUTES),
                  datastoreContext.getShardTransactionCommitQueueCapacity(), self(), LOG, this.name);
  
  
          appendEntriesReplyTracker = new MessageTracker(AppendEntriesReply.class,
                  getRaftActorContext().getConfigParams().getIsolatedCheckIntervalInMillis());
+         transactionActorFactory = new ShardTransactionActorFactory(domTransactionFactory, datastoreContext,
+                 new Dispatchers(context().system().dispatchers()).getDispatcherPath(
+                         Dispatchers.DispatcherType.Transaction), self(), getContext(), shardMBean);
+         snapshotCohort = new ShardSnapshotCohort(transactionActorFactory, store, LOG, this.name);
      }
  
      private void setTransactionCommitTimeout() {
          //
          if(isLeader()) {
              try {
 -                BatchedModificationsReply reply = commitCoordinator.handleTransactionModifications(batched);
 -                sender().tell(reply, self());
 +                boolean ready = commitCoordinator.handleTransactionModifications(batched);
 +                if(ready) {
 +                    sender().tell(READY_TRANSACTION_REPLY, self());
 +                } else {
 +                    sender().tell(new BatchedModificationsReply(batched.getModifications().size()), self());
 +                }
              } catch (Exception e) {
                  LOG.error("{}: Error handling BatchedModifications for Tx {}", persistenceId(),
                          batched.getTransactionID(), e);
          // node. In that case, the subsequent 3-phase commit messages won't contain the
          // transactionId so to maintain backwards compatibility, we create a separate cohort actor
          // to provide the compatible behavior.
 -        if(ready.getTxnClientVersion() < DataStoreVersions.HELIUM_1_VERSION) {
 -            LOG.debug("{}: Creating BackwardsCompatibleThreePhaseCommitCohort", persistenceId());
 -            ActorRef replyActorPath = getContext().actorOf(BackwardsCompatibleThreePhaseCommitCohort.props(
 -                    ready.getTransactionID()));
 +        if(ready.getTxnClientVersion() < DataStoreVersions.LITHIUM_VERSION) {
 +            ActorRef replyActorPath = getSelf();
 +            if(ready.getTxnClientVersion() < DataStoreVersions.HELIUM_1_VERSION) {
 +                LOG.debug("{}: Creating BackwardsCompatibleThreePhaseCommitCohort", persistenceId());
 +                replyActorPath = getContext().actorOf(BackwardsCompatibleThreePhaseCommitCohort.props(
 +                        ready.getTransactionID()));
 +            }
  
              ReadyTransactionReply readyTransactionReply =
 -                    new ReadyTransactionReply(Serialization.serializedActorPath(replyActorPath));
 +                    new ReadyTransactionReply(Serialization.serializedActorPath(replyActorPath),
 +                            ready.getTxnClientVersion());
              getSender().tell(ready.isReturnSerialized() ? readyTransactionReply.toSerializable() :
 -                    readyTransactionReply, getSelf());
 -
 +                readyTransactionReply, getSelf());
          } else {
 -
 -            getSender().tell(ready.isReturnSerialized() ? READY_TRANSACTION_REPLY.toSerializable() :
 -                    READY_TRANSACTION_REPLY, getSelf());
 +            getSender().tell(READY_TRANSACTION_REPLY, getSelf());
          }
      }
  
      }
  
      private void closeTransactionChain(final CloseTransactionChain closeTransactionChain) {
-         transactionFactory.closeTransactionChain(closeTransactionChain.getTransactionChainId());
+         domTransactionFactory.closeTransactionChain(closeTransactionChain.getTransactionChainId());
      }
  
      private ActorRef createTypedTransactionActor(int transactionType,
              ShardTransactionIdentifier transactionId, String transactionChainId,
              short clientVersion ) {
  
-         DOMStoreTransaction transaction = transactionFactory.newTransaction(
-                 TransactionProxy.TransactionType.fromInt(transactionType), transactionId.toString(),
-                 transactionChainId);
-         return createShardTransaction(transaction, transactionId, clientVersion);
-     }
-     private ActorRef createShardTransaction(DOMStoreTransaction transaction, ShardTransactionIdentifier transactionId,
-                                             short clientVersion){
-         return getContext().actorOf(
-                 ShardTransaction.props(transaction, getSelf(),
-                         schemaContext, datastoreContext, shardMBean,
-                         transactionId.getRemoteTransactionId(), clientVersion)
-                         .withDispatcher(txnDispatcherPath),
-                 transactionId.toString());
+         return transactionActorFactory.newShardTransaction(TransactionProxy.TransactionType.fromInt(transactionType),
+                 transactionId, transactionChainId, clientVersion);
      }
  
      private void createTransaction(CreateTransaction createTransaction) {
          return transactionActor;
      }
  
-     private void syncCommitTransaction(final DOMStoreWriteTransaction transaction)
-         throws ExecutionException, InterruptedException {
-         DOMStoreThreePhaseCommitCohort commitCohort = transaction.ready();
-         commitCohort.preCommit().get();
-         commitCohort.commit().get();
-     }
      private void commitWithNewTransaction(final Modification modification) {
          DOMStoreWriteTransaction tx = store.newWriteOnlyTransaction();
          modification.apply(tx);
          try {
-             syncCommitTransaction(tx);
+             snapshotCohort.syncCommitTransaction(tx);
              shardMBean.incrementCommittedTransactionCount();
              shardMBean.setLastCommittedTransactionTime(System.currentTimeMillis());
          } catch (InterruptedException | ExecutionException e) {
      }
  
      private void updateSchemaContext(final UpdateSchemaContext message) {
-         this.schemaContext = message.getSchemaContext();
          updateSchemaContext(message.getSchemaContext());
-         store.onGlobalContextUpdated(message.getSchemaContext());
      }
  
      @VisibleForTesting
          return config.isMetricCaptureEnabled();
      }
  
+     @Override
+     protected RaftActorSnapshotCohort getRaftActorSnapshotCohort() {
+         return snapshotCohort;
+     }
      @Override
      @Nonnull
      protected RaftActorRecoveryCohort getRaftActorRecoveryCohort() {
          }
      }
  
-     @Override
-     protected void createSnapshot() {
-         // Create a transaction actor. We are really going to treat the transaction as a worker
-         // so that this actor does not get block building the snapshot. THe transaction actor will
-         // after processing the CreateSnapshot message.
-         ActorRef createSnapshotTransaction = createTransaction(
-                 TransactionProxy.TransactionType.READ_ONLY.ordinal(),
-                 "createSnapshot" + ++createSnapshotTransactionCounter, "",
-                 DataStoreVersions.CURRENT_VERSION);
-         createSnapshotTransaction.tell(CreateSnapshot.INSTANCE, self());
-     }
-     @VisibleForTesting
-     @Override
-     protected void applySnapshot(final byte[] snapshotBytes) {
-         // Since this will be done only on Recovery or when this actor is a Follower
-         // we can safely commit everything in here. We not need to worry about event notifications
-         // as they would have already been disabled on the follower
-         LOG.info("{}: Applying snapshot", persistenceId());
-         try {
-             DOMStoreWriteTransaction transaction = store.newWriteOnlyTransaction();
-             NormalizedNode<?, ?> node = SerializationUtils.deserializeNormalizedNode(snapshotBytes);
-             // delete everything first
-             transaction.delete(DATASTORE_ROOT);
-             // Add everything from the remote node back
-             transaction.write(DATASTORE_ROOT, node);
-             syncCommitTransaction(transaction);
-         } catch (InterruptedException | ExecutionException e) {
-             LOG.error("{}: An exception occurred when applying snapshot", persistenceId(), e);
-         } finally {
-             LOG.info("{}: Done applying snapshot", persistenceId());
-         }
-     }
      @Override
      protected void onStateChanged() {
          boolean isLeader = isLeader();
                      persistenceId(), getId());
              }
  
-             transactionFactory.closeAllTransactionChains();
+             domTransactionFactory.closeAllTransactionChains();
          }
      }
  
@@@ -1,6 -1,6 +1,6 @@@
  /*
 - *
   *  Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
 + *  Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
   *
   *  This program and the accompanying materials are made available under the
   *  terms of the Eclipse Public License v1.0 which accompanies this distribution,
@@@ -32,7 -32,6 +32,6 @@@ import org.opendaylight.controller.clus
  import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
  import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction;
  import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
- import org.opendaylight.yangtools.yang.model.api.SchemaContext;
  
  /**
   * @author: syedbahm
  public class ShardWriteTransaction extends ShardTransaction {
  
      private final MutableCompositeModification compositeModification = new MutableCompositeModification();
 +    private int totalBatchedModificationsReceived;
 +    private Exception lastBatchedModificationsException;
      private final DOMStoreWriteTransaction transaction;
  
      public ShardWriteTransaction(DOMStoreWriteTransaction transaction, ActorRef shardActor,
-             SchemaContext schemaContext, ShardStats shardStats, String transactionID,
-             short clientTxVersion) {
-         super(shardActor, schemaContext, shardStats, transactionID, clientTxVersion);
+             ShardStats shardStats, String transactionID, short clientTxVersion) {
+         super(shardActor, shardStats, transactionID, clientTxVersion);
          this.transaction = transaction;
      }
  
                  modification.apply(transaction);
              }
  
 -            getSender().tell(new BatchedModificationsReply(batched.getModifications().size()), getSelf());
 +            totalBatchedModificationsReceived++;
 +            if(batched.isReady()) {
 +                if(lastBatchedModificationsException != null) {
 +                    throw lastBatchedModificationsException;
 +                }
 +
 +                if(totalBatchedModificationsReceived != batched.getTotalMessagesSent()) {
 +                    throw new IllegalStateException(String.format(
 +                            "The total number of batched messages received %d does not match the number sent %d",
 +                            totalBatchedModificationsReceived, batched.getTotalMessagesSent()));
 +                }
 +
 +                readyTransaction(transaction, false);
 +            } else {
 +                getSender().tell(new BatchedModificationsReply(batched.getModifications().size()), getSelf());
 +            }
          } catch (Exception e) {
 +            lastBatchedModificationsException = e;
              getSender().tell(new akka.actor.Status.Failure(e), getSelf());
 +
 +            if(batched.isReady()) {
 +                getSelf().tell(PoisonPill.getInstance(), getSelf());
 +            }
          }
      }
  
@@@ -17,6 -17,7 +17,7 @@@ import akka.dispatch.Dispatchers
  import akka.dispatch.OnComplete;
  import akka.japi.Creator;
  import akka.pattern.Patterns;
+ import akka.persistence.SaveSnapshotSuccess;
  import akka.testkit.TestActorRef;
  import akka.util.Timeout;
  import com.google.common.base.Function;
@@@ -436,42 -437,42 +437,42 @@@ public class ShardTest extends Abstract
  
              waitUntilLeader(shard);
  
 -            final String transactionID1 = "tx1";
 -            final String transactionID2 = "tx2";
 -            final String transactionID3 = "tx3";
 +         // Setup 3 simulated transactions with mock cohorts backed by real cohorts.
  
 -            final AtomicReference<DOMStoreThreePhaseCommitCohort> mockCohort1 = new AtomicReference<>();
 -            final AtomicReference<DOMStoreThreePhaseCommitCohort> mockCohort2 = new AtomicReference<>();
 -            final AtomicReference<DOMStoreThreePhaseCommitCohort> mockCohort3 = new AtomicReference<>();
 -            ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() {
 -                @Override
 -                public DOMStoreThreePhaseCommitCohort decorate(String transactionID, DOMStoreThreePhaseCommitCohort actual) {
 -                    if(transactionID.equals(transactionID1)) {
 -                        mockCohort1.set(createDelegatingMockCohort("cohort1", actual));
 -                        return mockCohort1.get();
 -                    } else if(transactionID.equals(transactionID2)) {
 -                        mockCohort2.set(createDelegatingMockCohort("cohort2", actual));
 -                        return mockCohort2.get();
 -                    } else {
 -                        mockCohort3.set(createDelegatingMockCohort("cohort3", actual));
 -                        return mockCohort3.get();
 -                    }
 -                }
 -            };
 +            InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
  
 -            shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator);
 +            String transactionID1 = "tx1";
 +            MutableCompositeModification modification1 = new MutableCompositeModification();
 +            DOMStoreThreePhaseCommitCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
 +                    TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
 +
 +            String transactionID2 = "tx2";
 +            MutableCompositeModification modification2 = new MutableCompositeModification();
 +            DOMStoreThreePhaseCommitCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
 +                    TestModel.OUTER_LIST_PATH,
 +                    ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
 +                    modification2);
 +
 +            String transactionID3 = "tx3";
 +            MutableCompositeModification modification3 = new MutableCompositeModification();
 +            DOMStoreThreePhaseCommitCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
 +                    YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
 +                        .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
 +                    ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1),
 +                    modification3);
  
              long timeoutSec = 5;
              final FiniteDuration duration = FiniteDuration.create(timeoutSec, TimeUnit.SECONDS);
              final Timeout timeout = new Timeout(duration);
  
 -            // Send a BatchedModifications message for the first transaction.
 +            // Simulate the ForwardedReadyTransaction message for the first Tx that would be sent
 +            // by the ShardTransaction.
  
 -            shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH,
 -                    ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
 -            BatchedModificationsReply batchedReply = expectMsgClass(duration, BatchedModificationsReply.class);
 -            assertEquals("getCohortPath", shard.path().toString(), batchedReply.getCohortPath());
 -            assertEquals("getNumBatched", 1, batchedReply.getNumBatched());
 +            shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
 +                    cohort1, modification1, true), getRef());
 +            ReadyTransactionReply readyReply = ReadyTransactionReply.fromSerializable(
 +                    expectMsgClass(duration, ReadyTransactionReply.class));
 +            assertEquals("Cohort path", shard.path().toString(), readyReply.getCohortPath());
  
              // Send the CanCommitTransaction message for the first Tx.
  
                      expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
              assertEquals("Can commit", true, canCommitReply.getCanCommit());
  
 -            // Send BatchedModifications for the next 2 Tx's.
 +            // Send the ForwardedReadyTransaction for the next 2 Tx's.
  
 -            shard.tell(newBatchedModifications(transactionID2, TestModel.OUTER_LIST_PATH,
 -                    ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), true), getRef());
 -            expectMsgClass(duration, BatchedModificationsReply.class);
 +            shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
 +                    cohort2, modification2, true), getRef());
 +            expectMsgClass(duration, ReadyTransactionReply.class);
  
 -            shard.tell(newBatchedModifications(transactionID3, YangInstanceIdentifier.builder(
 -                    TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
 -                    ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true), getRef());
 -            expectMsgClass(duration, BatchedModificationsReply.class);
 +            shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
 +                    cohort3, modification3, true), getRef());
 +            expectMsgClass(duration, ReadyTransactionReply.class);
  
              // Send the CanCommitTransaction message for the next 2 Tx's. These should get queued and
              // processed after the first Tx completes.
  
              assertEquals("Commits complete", true, done);
  
 -            InOrder inOrder = inOrder(mockCohort1.get(), mockCohort2.get(), mockCohort3.get());
 -            inOrder.verify(mockCohort1.get()).canCommit();
 -            inOrder.verify(mockCohort1.get()).preCommit();
 -            inOrder.verify(mockCohort1.get()).commit();
 -            inOrder.verify(mockCohort2.get()).canCommit();
 -            inOrder.verify(mockCohort2.get()).preCommit();
 -            inOrder.verify(mockCohort2.get()).commit();
 -            inOrder.verify(mockCohort3.get()).canCommit();
 -            inOrder.verify(mockCohort3.get()).preCommit();
 -            inOrder.verify(mockCohort3.get()).commit();
 +            InOrder inOrder = inOrder(cohort1, cohort2, cohort3);
 +            inOrder.verify(cohort1).canCommit();
 +            inOrder.verify(cohort1).preCommit();
 +            inOrder.verify(cohort1).commit();
 +            inOrder.verify(cohort2).canCommit();
 +            inOrder.verify(cohort2).preCommit();
 +            inOrder.verify(cohort2).commit();
 +            inOrder.verify(cohort3).canCommit();
 +            inOrder.verify(cohort3).preCommit();
 +            inOrder.verify(cohort3).commit();
  
              // Verify data in the data store.
  
              shard.tell(newBatchedModifications(transactionID, YangInstanceIdentifier.builder(
                      TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
                      ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true), getRef());
 -            expectMsgClass(duration, BatchedModificationsReply.class);
 +            expectMsgClass(duration, ReadyTransactionReply.class);
  
              // Send the CanCommitTransaction message.
  
              YangInstanceIdentifier path = TestModel.TEST_PATH;
              shard.tell(newBatchedModifications(transactionID1, transactionChainID, path,
                      containerNode, true), getRef());
 -            expectMsgClass(duration, BatchedModificationsReply.class);
 +            expectMsgClass(duration, ReadyTransactionReply.class);
  
              // Create a read Tx on the same chain.
  
  
              waitUntilLeader(shard);
  
 +            InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
 +
 +            // Setup a simulated transactions with a mock cohort.
 +
              String transactionID = "tx";
 +            MutableCompositeModification modification = new MutableCompositeModification();
 +            NormalizedNode<?, ?> containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
 +            DOMStoreThreePhaseCommitCohort cohort = setupMockWriteTransaction("cohort", dataStore,
 +                    TestModel.TEST_PATH, containerNode, modification);
 +
              FiniteDuration duration = duration("5 seconds");
  
 -            // Send a BatchedModifications to start a transaction.
 +            // Simulate the ForwardedReadyTransaction messages that would be sent
 +            // by the ShardTransaction.
  
 -            NormalizedNode<?, ?> containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
 -            shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH, containerNode, true), getRef());
 -            expectMsgClass(duration, BatchedModificationsReply.class);
 +            shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
 +                    cohort, modification, true), getRef());
 +            expectMsgClass(duration, ReadyTransactionReply.class);
  
              // Send the CanCommitTransaction message.
  
              shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
              expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
  
 +            InOrder inOrder = inOrder(cohort);
 +            inOrder.verify(cohort).canCommit();
 +            inOrder.verify(cohort).preCommit();
 +            inOrder.verify(cohort).commit();
 +
              NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.TEST_PATH);
              assertEquals(TestModel.TEST_QNAME.getLocalName(), containerNode, actualNode);
  
  
                  shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
                          cohort, modification, true), getRef());
 -                expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
 +                expectMsgClass(duration, ReadyTransactionReply.class);
  
                  // Send the CanCommitTransaction message.
  
  
                  shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
                          cohort, modification, true), getRef());
 -                expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
 +                expectMsgClass(duration, ReadyTransactionReply.class);
  
                  // Send the CanCommitTransaction message.
  
  
              waitUntilLeader(shard);
  
 -            // Setup 2 mock cohorts. The first one fails in the commit phase.
 +         // Setup 2 simulated transactions with mock cohorts. The first one fails in the
 +            // commit phase.
  
 -            final String transactionID1 = "tx1";
 -            final DOMStoreThreePhaseCommitCohort cohort1 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
 +            String transactionID1 = "tx1";
 +            MutableCompositeModification modification1 = new MutableCompositeModification();
 +            DOMStoreThreePhaseCommitCohort cohort1 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
              doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
              doReturn(Futures.immediateFuture(null)).when(cohort1).preCommit();
              doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort1).commit();
  
 -            final String transactionID2 = "tx2";
 -            final DOMStoreThreePhaseCommitCohort cohort2 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort2");
 +            String transactionID2 = "tx2";
 +            MutableCompositeModification modification2 = new MutableCompositeModification();
 +            DOMStoreThreePhaseCommitCohort cohort2 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort2");
              doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
  
 -            ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() {
 -                @Override
 -                public DOMStoreThreePhaseCommitCohort decorate(String transactionID,
 -                        DOMStoreThreePhaseCommitCohort actual) {
 -                    return transactionID1.equals(transactionID) ? cohort1 : cohort2;
 -                }
 -            };
 -
 -            shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator);
 -
              FiniteDuration duration = duration("5 seconds");
              final Timeout timeout = new Timeout(duration);
  
 -            // Send BatchedModifications to start and ready each transaction.
 +            // Simulate the ForwardedReadyTransaction messages that would be sent
 +            // by the ShardTransaction.
  
 -            shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH,
 -                    ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
 -            expectMsgClass(duration, BatchedModificationsReply.class);
 +            shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
 +                    cohort1, modification1, true), getRef());
 +            expectMsgClass(duration, ReadyTransactionReply.class);
  
 -            shard.tell(newBatchedModifications(transactionID2, TestModel.TEST_PATH,
 -                    ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
 -            expectMsgClass(duration, BatchedModificationsReply.class);
 +            shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
 +                    cohort2, modification2, true), getRef());
 +            expectMsgClass(duration, ReadyTransactionReply.class);
  
              // Send the CanCommitTransaction message for the first Tx.
  
              waitUntilLeader(shard);
  
              String transactionID = "tx1";
 -            final DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
 +            MutableCompositeModification modification = new MutableCompositeModification();
 +            DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
              doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
              doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).preCommit();
  
 -            ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() {
 -                @Override
 -                public DOMStoreThreePhaseCommitCohort decorate(String transactionID,
 -                        DOMStoreThreePhaseCommitCohort actual) {
 -                    return cohort;
 -                }
 -            };
 -
 -            shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator);
 -
              FiniteDuration duration = duration("5 seconds");
  
 -            // Send BatchedModifications to start and ready a transaction.
 +            // Simulate the ForwardedReadyTransaction messages that would be sent
 +            // by the ShardTransaction.
  
 -            shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH,
 -                    ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
 -            expectMsgClass(duration, BatchedModificationsReply.class);
 +            shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
 +                    cohort, modification, true), getRef());
 +            expectMsgClass(duration, ReadyTransactionReply.class);
  
              // Send the CanCommitTransaction message.
  
              final FiniteDuration duration = duration("5 seconds");
  
              String transactionID = "tx1";
 -            final DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
 +            MutableCompositeModification modification = new MutableCompositeModification();
 +            DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
              doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).canCommit();
  
 -            ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() {
 -                @Override
 -                public DOMStoreThreePhaseCommitCohort decorate(String transactionID,
 -                        DOMStoreThreePhaseCommitCohort actual) {
 -                    return cohort;
 -                }
 -            };
 -
 -            shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator);
 -
 -            // Send BatchedModifications to start and ready a transaction.
 +            // Simulate the ForwardedReadyTransaction messages that would be sent
 +            // by the ShardTransaction.
  
 -            shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH,
 -                    ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
 -            expectMsgClass(duration, BatchedModificationsReply.class);
 +            shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
 +                    cohort, modification, true), getRef());
 +            expectMsgClass(duration, ReadyTransactionReply.class);
  
              // Send the CanCommitTransaction message.
  
                  }
              };
  
 -            shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH,
 -                    ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
 -            expectMsgClass(duration, BatchedModificationsReply.class);
 +            MutableCompositeModification modification = new MutableCompositeModification();
 +            DOMStoreThreePhaseCommitCohort cohort = setupMockWriteTransaction("cohort1", dataStore,
 +                    TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME),
 +                    modification, preCommit);
 +
 +            shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
 +                    cohort, modification, true), getRef());
 +            expectMsgClass(duration, ReadyTransactionReply.class);
  
              shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
              CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
  
              final FiniteDuration duration = duration("5 seconds");
  
 +            InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
 +
              writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
              writeToStore(shard, TestModel.OUTER_LIST_PATH,
                      ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
  
 -            // Create and ready the 1st Tx - will timeout
 +            // Create 1st Tx - will timeout
  
              String transactionID1 = "tx1";
 -            shard.tell(newBatchedModifications(transactionID1, YangInstanceIdentifier.builder(
 -                    TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
 -                ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true), getRef());
 -            expectMsgClass(duration, BatchedModificationsReply.class);
 +            MutableCompositeModification modification1 = new MutableCompositeModification();
 +            DOMStoreThreePhaseCommitCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
 +                    YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
 +                        .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
 +                    ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1),
 +                    modification1);
  
 -            // Create and ready the 2nd Tx
 +            // Create 2nd Tx
  
 -            String transactionID2 = "tx2";
 +            String transactionID2 = "tx3";
 +            MutableCompositeModification modification2 = new MutableCompositeModification();
              YangInstanceIdentifier listNodePath = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
 -                    .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build();
 -            shard.tell(newBatchedModifications(transactionID2, listNodePath,
 -                    ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2), true), getRef());
 -            expectMsgClass(duration, BatchedModificationsReply.class);
 +                .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build();
 +            DOMStoreThreePhaseCommitCohort cohort2 = setupMockWriteTransaction("cohort3", dataStore,
 +                    listNodePath,
 +                    ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2),
 +                    modification2);
 +
 +            // Ready the Tx's
 +
 +            shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
 +                    cohort1, modification1, true), getRef());
 +            expectMsgClass(duration, ReadyTransactionReply.class);
 +
 +            shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
 +                    cohort2, modification2, true), getRef());
 +            expectMsgClass(duration, ReadyTransactionReply.class);
  
              // canCommit 1st Tx. We don't send the commit so it should timeout.
  
  
              final FiniteDuration duration = duration("5 seconds");
  
 +            InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
 +
              String transactionID1 = "tx1";
 +            MutableCompositeModification modification1 = new MutableCompositeModification();
 +            DOMStoreThreePhaseCommitCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
 +                    TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
 +
              String transactionID2 = "tx2";
 +            MutableCompositeModification modification2 = new MutableCompositeModification();
 +            DOMStoreThreePhaseCommitCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
 +                    TestModel.OUTER_LIST_PATH,
 +                    ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
 +                    modification2);
 +
              String transactionID3 = "tx3";
 +            MutableCompositeModification modification3 = new MutableCompositeModification();
 +            DOMStoreThreePhaseCommitCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
 +                    TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification3);
  
 -            // Send a BatchedModifications to start transactions and ready them.
 +            // Ready the Tx's
  
 -            shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH,
 -                    ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
 -            expectMsgClass(duration, BatchedModificationsReply.class);
 +            shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
 +                    cohort1, modification1, true), getRef());
 +            expectMsgClass(duration, ReadyTransactionReply.class);
  
 -            shard.tell(newBatchedModifications(transactionID2,TestModel.OUTER_LIST_PATH,
 -                    ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), true), getRef());
 -            expectMsgClass(duration, BatchedModificationsReply.class);
 +            shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
 +                    cohort2, modification2, true), getRef());
 +            expectMsgClass(duration, ReadyTransactionReply.class);
  
 -            shard.tell(newBatchedModifications(transactionID3, TestModel.TEST_PATH,
 -                    ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
 -            expectMsgClass(duration, BatchedModificationsReply.class);
 +            shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
 +                    cohort3, modification3, true), getRef());
 +            expectMsgClass(duration, ReadyTransactionReply.class);
  
              // canCommit 1st Tx.
  
  
              // Setup 2 simulated transactions with mock cohorts. The first one will be aborted.
  
 -            final String transactionID1 = "tx1";
 -            final DOMStoreThreePhaseCommitCohort cohort1 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
 +            String transactionID1 = "tx1";
 +            MutableCompositeModification modification1 = new MutableCompositeModification();
 +            DOMStoreThreePhaseCommitCohort cohort1 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
              doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
              doReturn(Futures.immediateFuture(null)).when(cohort1).abort();
  
 -            final String transactionID2 = "tx2";
 -            final DOMStoreThreePhaseCommitCohort cohort2 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort2");
 +            String transactionID2 = "tx2";
 +            MutableCompositeModification modification2 = new MutableCompositeModification();
 +            DOMStoreThreePhaseCommitCohort cohort2 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort2");
              doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
  
              FiniteDuration duration = duration("5 seconds");
              final Timeout timeout = new Timeout(duration);
  
 -            ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() {
 -                @Override
 -                public DOMStoreThreePhaseCommitCohort decorate(String transactionID,
 -                        DOMStoreThreePhaseCommitCohort actual) {
 -                    return transactionID1.equals(transactionID) ? cohort1 : cohort2;
 -                }
 -            };
 +            // Simulate the ForwardedReadyTransaction messages that would be sent
 +            // by the ShardTransaction.
  
 -            shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator);
 -
 -            // Send BatchedModifications to start and ready each transaction.
 +            shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
 +                    cohort1, modification1, true), getRef());
 +            expectMsgClass(duration, ReadyTransactionReply.class);
  
 -            shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH,
 -                    ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
 -            expectMsgClass(duration, BatchedModificationsReply.class);
 -
 -            shard.tell(newBatchedModifications(transactionID2, TestModel.TEST_PATH,
 -                    ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
 -            expectMsgClass(duration, BatchedModificationsReply.class);
 +            shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
 +                    cohort2, modification2, true), getRef());
 +            expectMsgClass(duration, ReadyTransactionReply.class);
  
              // Send the CanCommitTransaction message for the first Tx.
  
      @SuppressWarnings("serial")
      public void testCreateSnapshot(final boolean persistent, final String shardActorName) throws Exception{
  
+         final AtomicReference<CountDownLatch> latch = new AtomicReference<>(new CountDownLatch(1));
          final AtomicReference<Object> savedSnapshot = new AtomicReference<>();
          class TestPersistentDataProvider extends DelegatingPersistentDataProvider {
              TestPersistentDataProvider(DataPersistenceProvider delegate) {
          dataStoreContextBuilder.persistent(persistent);
  
          new ShardTestKit(getSystem()) {{
-             final AtomicReference<CountDownLatch> latch = new AtomicReference<>(new CountDownLatch(1));
              class TestShard extends Shard {
  
                  protected TestShard(ShardIdentifier name, Map<String, String> peerAddresses,
                  }
  
                  @Override
-                 protected void commitSnapshot(final long sequenceNumber) {
-                     super.commitSnapshot(sequenceNumber);
-                     latch.get().countDown();
+                 public void handleCommand(Object message) {
+                     super.handleCommand(message);
+                     if (message instanceof SaveSnapshotSuccess || message.equals("commit_snapshot")) {
+                         latch.get().countDown();
+                     }
                  }
  
                  @Override
@@@ -4,10 -4,8 +4,10 @@@ import static org.junit.Assert.assertEq
  import static org.junit.Assert.assertFalse;
  import static org.junit.Assert.assertNotNull;
  import static org.junit.Assert.assertTrue;
 +import static org.mockito.Mockito.doThrow;
  import akka.actor.ActorRef;
  import akka.actor.Props;
 +import akka.actor.Status.Failure;
  import akka.actor.Terminated;
  import akka.testkit.JavaTestKit;
  import akka.testkit.TestActorRef;
@@@ -54,7 -52,6 +54,7 @@@ import org.opendaylight.controller.prot
  import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction;
  import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
  import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 +import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
  import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
  import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
  import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
@@@ -100,7 -97,7 +100,7 @@@ public class ShardTransactionTest exten
      private ActorRef newTransactionActor(DOMStoreTransaction transaction, ActorRef shard, String name,
              short version) {
          Props props = ShardTransaction.props(transaction, shard != null ? shard : createShard(),
-                 testSchemaContext, datastoreContext, shardStats, "txn", version);
+                 datastoreContext, shardStats, "txn", version);
          return getSystem().actorOf(props, name);
      }
  
      }
  
      @Test
 -    public void testOnReceiveReadyTransaction() throws Exception {
 +    public void testOnReceiveBatchedModificationsReady() throws Exception {
 +        new JavaTestKit(getSystem()) {{
 +
 +            final ActorRef transaction = newTransactionActor(store.newWriteOnlyTransaction(),
 +                    "testOnReceiveBatchedModificationsReady");
 +
 +            JavaTestKit watcher = new JavaTestKit(getSystem());
 +            watcher.watch(transaction);
 +
 +            YangInstanceIdentifier writePath = TestModel.TEST_PATH;
 +            NormalizedNode<?, ?> writeData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
 +                    new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
 +                    withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
 +
 +            BatchedModifications batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
 +            batched.addModification(new WriteModification(writePath, writeData));
 +
 +            transaction.tell(batched, getRef());
 +            BatchedModificationsReply reply = expectMsgClass(duration("5 seconds"), BatchedModificationsReply.class);
 +            assertEquals("getNumBatched", 1, reply.getNumBatched());
 +
 +            batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
 +            batched.setReady(true);
 +            batched.setTotalMessagesSent(2);
 +
 +            transaction.tell(batched, getRef());
 +            expectMsgClass(duration("5 seconds"), ReadyTransactionReply.class);
 +            watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
 +        }};
 +    }
 +
 +    @Test(expected=TestException.class)
 +    public void testOnReceiveBatchedModificationsFailure() throws Throwable {
 +        new JavaTestKit(getSystem()) {{
 +
 +            DOMStoreWriteTransaction mockWriteTx = Mockito.mock(DOMStoreWriteTransaction.class);
 +            final ActorRef transaction = newTransactionActor(mockWriteTx,
 +                    "testOnReceiveBatchedModificationsFailure");
 +
 +            JavaTestKit watcher = new JavaTestKit(getSystem());
 +            watcher.watch(transaction);
 +
 +            YangInstanceIdentifier path = TestModel.TEST_PATH;
 +            ContainerNode node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
 +
 +            doThrow(new TestException()).when(mockWriteTx).write(path, node);
 +
 +            BatchedModifications batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
 +            batched.addModification(new WriteModification(path, node));
 +
 +            transaction.tell(batched, getRef());
 +            expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
 +
 +            batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
 +            batched.setReady(true);
 +            batched.setTotalMessagesSent(2);
 +
 +            transaction.tell(batched, getRef());
 +            Failure failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
 +            watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
 +
 +            if(failure != null) {
 +                throw failure.cause();
 +            }
 +        }};
 +    }
 +
 +    @Test(expected=IllegalStateException.class)
 +    public void testOnReceiveBatchedModificationsReadyWithIncorrectTotalMessageCount() throws Throwable {
 +        new JavaTestKit(getSystem()) {{
 +
 +            final ActorRef transaction = newTransactionActor(store.newWriteOnlyTransaction(),
 +                    "testOnReceiveBatchedModificationsReadyWithIncorrectTotalMessageCount");
 +
 +            JavaTestKit watcher = new JavaTestKit(getSystem());
 +            watcher.watch(transaction);
 +
 +            BatchedModifications batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
 +            batched.setReady(true);
 +            batched.setTotalMessagesSent(2);
 +
 +            transaction.tell(batched, getRef());
 +
 +            Failure failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
 +            watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
 +
 +            if(failure != null) {
 +                throw failure.cause();
 +            }
 +        }};
 +    }
 +
 +    @Test
 +    public void testOnReceivePreLithiumReadyTransaction() throws Exception {
          new JavaTestKit(getSystem()) {{
              final ActorRef transaction = newTransactionActor(store.newReadWriteTransaction(),
 -                    "testReadyTransaction");
 +                    "testReadyTransaction", DataStoreVersions.HELIUM_2_VERSION);
  
 -            watch(transaction);
 +            JavaTestKit watcher = new JavaTestKit(getSystem());
 +            watcher.watch(transaction);
  
              transaction.tell(new ReadyTransaction().toSerializable(), getRef());
  
 -            expectMsgAnyClassOf(duration("5 seconds"), ReadyTransactionReply.SERIALIZABLE_CLASS,
 -                    Terminated.class);
 -            expectMsgAnyClassOf(duration("5 seconds"), ReadyTransactionReply.SERIALIZABLE_CLASS,
 -                    Terminated.class);
 +            expectMsgClass(duration("5 seconds"), ReadyTransactionReply.SERIALIZABLE_CLASS);
 +            watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
          }};
  
          // test
          new JavaTestKit(getSystem()) {{
              final ActorRef transaction = newTransactionActor(store.newReadWriteTransaction(),
 -                    "testReadyTransaction2");
 +                    "testReadyTransaction2", DataStoreVersions.HELIUM_2_VERSION);
  
 -            watch(transaction);
 +            JavaTestKit watcher = new JavaTestKit(getSystem());
 +            watcher.watch(transaction);
  
              transaction.tell(new ReadyTransaction(), getRef());
  
 -            expectMsgAnyClassOf(duration("5 seconds"), ReadyTransactionReply.class,
 -                    Terminated.class);
 -            expectMsgAnyClassOf(duration("5 seconds"), ReadyTransactionReply.class,
 -                    Terminated.class);
 +            expectMsgClass(duration("5 seconds"), ReadyTransactionReply.class);
 +            watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
          }};
      }
  
      public void testNegativePerformingWriteOperationOnReadTransaction() throws Exception {
          final ActorRef shard = createShard();
          final Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
-                 testSchemaContext, datastoreContext, shardStats, "txn",
-                 DataStoreVersions.CURRENT_VERSION);
+                 datastoreContext, shardStats, "txn", DataStoreVersions.CURRENT_VERSION);
          final TestActorRef<ShardTransaction> transaction = TestActorRef.apply(props,getSystem());
  
          transaction.receive(new DeleteData(TestModel.TEST_PATH, DataStoreVersions.CURRENT_VERSION).
              expectMsgClass(duration("3 seconds"), Terminated.class);
          }};
      }
 +
 +    public static class TestException extends RuntimeException {
 +        private static final long serialVersionUID = 1L;
 +    }
  }

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.