Use BatchedModifications message in place of ReadyTransaction message 84/17884/5
authorTom Pantelis <tpanteli@brocade.com>
Tue, 7 Apr 2015 23:54:51 +0000 (19:54 -0400)
committerTom Pantelis <tpanteli@brocade.com>
Fri, 10 Apr 2015 14:11:58 +0000 (10:11 -0400)
For the optimized write transactions (currently disabled), we used the
last BatchedModifications message with the ready flag set to elide
sending the ReadyTransaction message. We can do this in general for both
read-write and write-only.

In ShardWriteTransaction, if the BatchedModifications indicates ready,
it simply calls readyTransaction to send ForwardedReadyTransaction
message to the shard, same as with the ReadyTransacton message.

Instead of returning a BatchedModificationsReply with the cohort path,
I kept the ReadyTransactionReply so the Shard code mostly remains the
same - otherwise we'd have to introduce an equivalent
batched ForwardedReadyTransaction message.

I also made ReadyTransactionReply Externalizable so we don't have to
deal with sending it serialized or not (except for backwards
compatibility - had to keep the protobuff class).

TransactionContextImpl#readyTransaction is now the same as
WriteTransactionContextImpl#readyTransaction so
WriteTransactionContextImpl is no longer needed and was removed.

Change-Id: I5175c77ca08a1877af9593a28e7c4cb46f03287a
Signed-off-by: Tom Pantelis <tpanteli@brocade.com>
20 files changed:
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardWriteTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContextImpl.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/WriteOnlyTransactionContextImpl.java [deleted file]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumTransactionContextImpl.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/BatchedModificationsReply.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ForwardedReadyTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ReadyTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ReadyTransactionReply.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionProxyTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxyTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumShardTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumTransactionProxyTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/messages/BatchedModificationsTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/messages/ReadyTransactionReplyTest.java [new file with mode: 0644]

index 9cd52b219a442320bd0dc8bf12cdfc6b6c4a577b..65b6ac4bd008c95c666ce83a1f0047553104a64a 100644 (file)
@@ -447,8 +447,12 @@ public class Shard extends RaftActor {
         //
         if(isLeader()) {
             try {
         //
         if(isLeader()) {
             try {
-                BatchedModificationsReply reply = commitCoordinator.handleTransactionModifications(batched);
-                sender().tell(reply, self());
+                boolean ready = commitCoordinator.handleTransactionModifications(batched);
+                if(ready) {
+                    sender().tell(READY_TRANSACTION_REPLY, self());
+                } else {
+                    sender().tell(new BatchedModificationsReply(batched.getModifications().size()), self());
+                }
             } catch (Exception e) {
                 LOG.error("{}: Error handling BatchedModifications for Tx {}", persistenceId(),
                         batched.getTransactionID(), e);
             } catch (Exception e) {
                 LOG.error("{}: Error handling BatchedModifications for Tx {}", persistenceId(),
                         batched.getTransactionID(), e);
@@ -488,20 +492,21 @@ public class Shard extends RaftActor {
         // node. In that case, the subsequent 3-phase commit messages won't contain the
         // transactionId so to maintain backwards compatibility, we create a separate cohort actor
         // to provide the compatible behavior.
         // node. In that case, the subsequent 3-phase commit messages won't contain the
         // transactionId so to maintain backwards compatibility, we create a separate cohort actor
         // to provide the compatible behavior.
-        if(ready.getTxnClientVersion() < DataStoreVersions.HELIUM_1_VERSION) {
-            LOG.debug("{}: Creating BackwardsCompatibleThreePhaseCommitCohort", persistenceId());
-            ActorRef replyActorPath = getContext().actorOf(BackwardsCompatibleThreePhaseCommitCohort.props(
-                    ready.getTransactionID()));
+        if(ready.getTxnClientVersion() < DataStoreVersions.LITHIUM_VERSION) {
+            ActorRef replyActorPath = getSelf();
+            if(ready.getTxnClientVersion() < DataStoreVersions.HELIUM_1_VERSION) {
+                LOG.debug("{}: Creating BackwardsCompatibleThreePhaseCommitCohort", persistenceId());
+                replyActorPath = getContext().actorOf(BackwardsCompatibleThreePhaseCommitCohort.props(
+                        ready.getTransactionID()));
+            }
 
             ReadyTransactionReply readyTransactionReply =
 
             ReadyTransactionReply readyTransactionReply =
-                    new ReadyTransactionReply(Serialization.serializedActorPath(replyActorPath));
+                    new ReadyTransactionReply(Serialization.serializedActorPath(replyActorPath),
+                            ready.getTxnClientVersion());
             getSender().tell(ready.isReturnSerialized() ? readyTransactionReply.toSerializable() :
             getSender().tell(ready.isReturnSerialized() ? readyTransactionReply.toSerializable() :
-                    readyTransactionReply, getSelf());
-
+                readyTransactionReply, getSelf());
         } else {
         } else {
-
-            getSender().tell(ready.isReturnSerialized() ? READY_TRANSACTION_REPLY.toSerializable() :
-                    READY_TRANSACTION_REPLY, getSelf());
+            getSender().tell(READY_TRANSACTION_REPLY, getSelf());
         }
     }
 
         }
     }
 
index 54f15fcb4bd03115d97eccc72166b0b12efbca2f..b96e38d76a45aced0e1c326c051cb7fcbd1d82d1 100644 (file)
@@ -22,7 +22,6 @@ import java.util.Queue;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
-import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
 import org.opendaylight.controller.cluster.datastore.modification.Modification;
 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
 import org.opendaylight.controller.cluster.datastore.modification.Modification;
@@ -119,7 +118,7 @@ public class ShardCommitCoordinator {
      *
      * @throws ExecutionException if an error occurs loading the cache
      */
      *
      * @throws ExecutionException if an error occurs loading the cache
      */
-    public BatchedModificationsReply handleTransactionModifications(BatchedModifications batched)
+    public boolean handleTransactionModifications(BatchedModifications batched)
             throws ExecutionException {
         CohortEntry cohortEntry = cohortCache.getIfPresent(batched.getTransactionID());
         if(cohortEntry == null) {
             throws ExecutionException {
         CohortEntry cohortEntry = cohortCache.getIfPresent(batched.getTransactionID());
         if(cohortEntry == null) {
@@ -137,7 +136,6 @@ public class ShardCommitCoordinator {
 
         cohortEntry.applyModifications(batched.getModifications());
 
 
         cohortEntry.applyModifications(batched.getModifications());
 
-        String cohortPath = null;
         if(batched.isReady()) {
             if(log.isDebugEnabled()) {
                 log.debug("{}: Readying Tx {}, client version {}", name,
         if(batched.isReady()) {
             if(log.isDebugEnabled()) {
                 log.debug("{}: Readying Tx {}, client version {}", name,
@@ -145,10 +143,9 @@ public class ShardCommitCoordinator {
             }
 
             cohortEntry.ready(cohortDecorator);
             }
 
             cohortEntry.ready(cohortDecorator);
-            cohortPath = shardActorPath;
         }
 
         }
 
-        return new BatchedModificationsReply(batched.getModifications().size(), cohortPath);
+        return batched.isReady();
     }
 
     /**
     }
 
     /**
index d5dcfde803a16bfcc6ea285473167d9b78e8c5cb..356891164628bdfd8434b620707452ffa89bf7d5 100644 (file)
@@ -88,7 +88,11 @@ public class ShardWriteTransaction extends ShardTransaction {
                 modification.apply(transaction);
             }
 
                 modification.apply(transaction);
             }
 
-            getSender().tell(new BatchedModificationsReply(batched.getModifications().size()), getSelf());
+            if(batched.isReady()) {
+                readyTransaction(transaction, false);
+            } else {
+                getSender().tell(new BatchedModificationsReply(batched.getModifications().size()), getSelf());
+            }
         } catch (Exception e) {
             getSender().tell(new akka.actor.Status.Failure(e), getSelf());
         }
         } catch (Exception e) {
             getSender().tell(new akka.actor.Status.Failure(e), getSelf());
         }
index b9900889b1125e8ac229f89af9b4f64551209b8b..f34c5a257125026f61f02fcbf18c945160ede2bc 100644 (file)
@@ -14,13 +14,11 @@ import com.google.common.base.Optional;
 import com.google.common.util.concurrent.SettableFuture;
 import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
 import com.google.common.util.concurrent.SettableFuture;
 import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
-import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
 import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
 import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
-import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.SerializableMessage;
 import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.SerializableMessage;
 import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
@@ -93,15 +91,11 @@ public class TransactionContextImpl extends AbstractTransactionContext {
     public Future<ActorSelection> readyTransaction() {
         LOG.debug("Tx {} readyTransaction called", getIdentifier());
 
     public Future<ActorSelection> readyTransaction() {
         LOG.debug("Tx {} readyTransaction called", getIdentifier());
 
-        // Send the remaining batched modifications if any.
+        // Send the remaining batched modifications, if any, with the ready flag set.
 
 
-        sendBatchedModifications();
-
-        // Send the ReadyTransaction message to the Tx actor.
-
-        Future<Object> readyReplyFuture = executeOperationAsync(ReadyTransaction.INSTANCE);
+        Future<Object> lastModificationsFuture = sendBatchedModifications(true);
 
 
-        return transformReadyReply(readyReplyFuture);
+        return transformReadyReply(lastModificationsFuture);
     }
 
     protected Future<ActorSelection> transformReadyReply(final Future<Object> readyReplyFuture) {
     }
 
     protected Future<ActorSelection> transformReadyReply(final Future<Object> readyReplyFuture) {
@@ -113,33 +107,31 @@ public class TransactionContextImpl extends AbstractTransactionContext {
             public ActorSelection checkedApply(Object serializedReadyReply) {
                 LOG.debug("Tx {} readyTransaction", getIdentifier());
 
             public ActorSelection checkedApply(Object serializedReadyReply) {
                 LOG.debug("Tx {} readyTransaction", getIdentifier());
 
-                // At this point the rwady operation succeeded and we need to extract the cohort
+                // At this point the ready operation succeeded and we need to extract the cohort
                 // actor path from the reply.
                 // actor path from the reply.
-                if (serializedReadyReply instanceof ReadyTransactionReply) {
-                    return actorContext.actorSelection(((ReadyTransactionReply)serializedReadyReply).getCohortPath());
-                } else if(serializedReadyReply instanceof BatchedModificationsReply) {
-                    return actorContext.actorSelection(((BatchedModificationsReply)serializedReadyReply).getCohortPath());
-                } else if(serializedReadyReply.getClass().equals(ReadyTransactionReply.SERIALIZABLE_CLASS)) {
-                    ReadyTransactionReply reply = ReadyTransactionReply.fromSerializable(serializedReadyReply);
-                    String cohortPath = deserializeCohortPath(reply.getCohortPath());
-                    return actorContext.actorSelection(cohortPath);
-                } else {
-                    // Throwing an exception here will fail the Future.
-                    throw new IllegalArgumentException(String.format("%s: Invalid reply type %s",
-                        getIdentifier(), serializedReadyReply.getClass()));
+                if(ReadyTransactionReply.isSerializedType(serializedReadyReply)) {
+                    ReadyTransactionReply readyTxReply = ReadyTransactionReply.fromSerializable(serializedReadyReply);
+                    return actorContext.actorSelection(extractCohortPathFrom(readyTxReply));
                 }
                 }
+
+                // Throwing an exception here will fail the Future.
+                throw new IllegalArgumentException(String.format("%s: Invalid reply type %s",
+                        getIdentifier(), serializedReadyReply.getClass()));
             }
         }, TransactionProxy.SAME_FAILURE_TRANSFORMER, actorContext.getClientDispatcher());
     }
 
             }
         }, TransactionProxy.SAME_FAILURE_TRANSFORMER, actorContext.getClientDispatcher());
     }
 
-    protected String deserializeCohortPath(String cohortPath) {
-        return cohortPath;
+    protected String extractCohortPathFrom(ReadyTransactionReply readyTxReply) {
+        return readyTxReply.getCohortPath();
+    }
+
+    private BatchedModifications newBatchedModifications() {
+        return new BatchedModifications(getIdentifier().toString(), remoteTransactionVersion, transactionChainId);
     }
 
     private void batchModification(Modification modification) {
         if(batchedModifications == null) {
     }
 
     private void batchModification(Modification modification) {
         if(batchedModifications == null) {
-            batchedModifications = new BatchedModifications(getIdentifier().toString(), remoteTransactionVersion,
-                    transactionChainId);
+            batchedModifications = newBatchedModifications();
         }
 
         batchedModifications.addModification(modification);
         }
 
         batchedModifications.addModification(modification);
@@ -156,7 +148,11 @@ public class TransactionContextImpl extends AbstractTransactionContext {
 
     protected Future<Object> sendBatchedModifications(boolean ready) {
         Future<Object> sent = null;
 
     protected Future<Object> sendBatchedModifications(boolean ready) {
         Future<Object> sent = null;
-        if(batchedModifications != null) {
+        if(ready || (batchedModifications != null && !batchedModifications.getModifications().isEmpty())) {
+            if(batchedModifications == null) {
+                batchedModifications = newBatchedModifications();
+            }
+
             if(LOG.isDebugEnabled()) {
                 LOG.debug("Tx {} sending {} batched modifications, ready: {}", getIdentifier(),
                         batchedModifications.getModifications().size(), ready);
             if(LOG.isDebugEnabled()) {
                 LOG.debug("Tx {} sending {} batched modifications, ready: {}", getIdentifier(),
                         batchedModifications.getModifications().size(), ready);
@@ -165,8 +161,11 @@ public class TransactionContextImpl extends AbstractTransactionContext {
             batchedModifications.setReady(ready);
             sent = executeOperationAsync(batchedModifications);
 
             batchedModifications.setReady(ready);
             sent = executeOperationAsync(batchedModifications);
 
-            batchedModifications = new BatchedModifications(getIdentifier().toString(), remoteTransactionVersion,
-                    transactionChainId);
+            if(ready) {
+                batchedModifications = null;
+            } else {
+                batchedModifications = newBatchedModifications();
+            }
         }
 
         return sent;
         }
 
         return sent;
index 0fd37b9ecef937083f34cc40ce011c5e694a84ec..388dd9f4bda2fee3e82bba4660a137876cf9dbda 100644 (file)
@@ -726,10 +726,6 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
                 return new PreLithiumTransactionContextImpl(transactionPath, transactionActor, getIdentifier(),
                         transactionChainId, actorContext, schemaContext, isTxActorLocal, remoteTransactionVersion,
                         operationCompleter);
                 return new PreLithiumTransactionContextImpl(transactionPath, transactionActor, getIdentifier(),
                         transactionChainId, actorContext, schemaContext, isTxActorLocal, remoteTransactionVersion,
                         operationCompleter);
-            } else if (transactionType == TransactionType.WRITE_ONLY &&
-                    actorContext.getDatastoreContext().isWriteOnlyTransactionOptimizationsEnabled()) {
-                return new WriteOnlyTransactionContextImpl(transactionActor, getIdentifier(), transactionChainId,
-                    actorContext, schemaContext, isTxActorLocal, remoteTransactionVersion, operationCompleter);
             } else {
                 return new TransactionContextImpl(transactionActor, getIdentifier(), transactionChainId,
                         actorContext, schemaContext, isTxActorLocal, remoteTransactionVersion, operationCompleter);
             } else {
                 return new TransactionContextImpl(transactionActor, getIdentifier(), transactionChainId,
                         actorContext, schemaContext, isTxActorLocal, remoteTransactionVersion, operationCompleter);
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/WriteOnlyTransactionContextImpl.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/WriteOnlyTransactionContextImpl.java
deleted file mode 100644 (file)
index b9fe90d..0000000
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.datastore;
-
-import akka.actor.ActorSelection;
-import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
-import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.concurrent.Future;
-
-/**
- * Context for a write-only transaction.
- *
- * @author Thomas Pantelis
- */
-public class WriteOnlyTransactionContextImpl extends TransactionContextImpl {
-    private static final Logger LOG = LoggerFactory.getLogger(WriteOnlyTransactionContextImpl.class);
-
-    public WriteOnlyTransactionContextImpl(ActorSelection actor, TransactionIdentifier identifier,
-            String transactionChainId, ActorContext actorContext, SchemaContext schemaContext, boolean isTxActorLocal,
-            short remoteTransactionVersion, OperationCompleter operationCompleter) {
-        super(actor, identifier, transactionChainId, actorContext, schemaContext, isTxActorLocal,
-                remoteTransactionVersion, operationCompleter);
-    }
-
-    @Override
-    public Future<ActorSelection> readyTransaction() {
-        LOG.debug("Tx {} readyTransaction called", getIdentifier());
-
-        // Send the remaining batched modifications if any.
-
-        Future<Object> lastModificationsFuture = sendBatchedModifications(true);
-
-        return transformReadyReply(lastModificationsFuture);
-    }
-}
index 9509b06ba7725c8a518223c9c146bd60bf6a4545..d17497c18c1acccb4f986be231e0adf3c7b39fab 100644 (file)
@@ -15,6 +15,7 @@ import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIden
 import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
 import org.opendaylight.controller.cluster.datastore.messages.MergeData;
 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
 import org.opendaylight.controller.cluster.datastore.messages.MergeData;
 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
+import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.WriteData;
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.controller.cluster.datastore.messages.WriteData;
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
@@ -70,7 +71,7 @@ public class PreLithiumTransactionContextImpl extends TransactionContextImpl {
     }
 
     @Override
     }
 
     @Override
-    protected String deserializeCohortPath(String cohortPath) {
+    protected String extractCohortPathFrom(ReadyTransactionReply readyTxReply) {
         // In base Helium we used to return the local path of the actor which represented
         // a remote ThreePhaseCommitCohort. The local path would then be converted to
         // a remote path using this resolvePath method. To maintain compatibility with
         // In base Helium we used to return the local path of the actor which represented
         // a remote ThreePhaseCommitCohort. The local path would then be converted to
         // a remote path using this resolvePath method. To maintain compatibility with
@@ -79,9 +80,9 @@ public class PreLithiumTransactionContextImpl extends TransactionContextImpl {
         // we could remove this code to resolvePath and just use the cohortPath as the
         // resolved cohortPath
         if(getRemoteTransactionVersion() < DataStoreVersions.HELIUM_1_VERSION) {
         // we could remove this code to resolvePath and just use the cohortPath as the
         // resolved cohortPath
         if(getRemoteTransactionVersion() < DataStoreVersions.HELIUM_1_VERSION) {
-            return getActorContext().resolvePath(transactionPath, cohortPath);
+            return getActorContext().resolvePath(transactionPath, readyTxReply.getCohortPath());
         }
 
         }
 
-        return cohortPath;
+        return readyTxReply.getCohortPath();
     }
 }
     }
 }
index a10c6ac3fb1b6d673a9f8e5b3517e72cebdde9f5..895de3a62626da3d0d332a205a866b5997f2b320 100644 (file)
@@ -19,11 +19,7 @@ import java.io.ObjectOutput;
 public class BatchedModificationsReply extends VersionedExternalizableMessage {
     private static final long serialVersionUID = 1L;
 
 public class BatchedModificationsReply extends VersionedExternalizableMessage {
     private static final long serialVersionUID = 1L;
 
-    private static final byte COHORT_PATH_NOT_PRESENT = 0;
-    private static final byte COHORT_PATH_PRESENT = 1;
-
     private int numBatched;
     private int numBatched;
-    private String cohortPath;
 
     public BatchedModificationsReply() {
     }
 
     public BatchedModificationsReply() {
     }
@@ -32,40 +28,20 @@ public class BatchedModificationsReply extends VersionedExternalizableMessage {
         this.numBatched = numBatched;
     }
 
         this.numBatched = numBatched;
     }
 
-    public BatchedModificationsReply(int numBatched, String cohortPath) {
-        this.numBatched = numBatched;
-        this.cohortPath = cohortPath;
-    }
-
     public int getNumBatched() {
         return numBatched;
     }
 
     public int getNumBatched() {
         return numBatched;
     }
 
-    public String getCohortPath() {
-        return cohortPath;
-    }
-
     @Override
     public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
         super.readExternal(in);
         numBatched = in.readInt();
     @Override
     public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
         super.readExternal(in);
         numBatched = in.readInt();
-
-        if(in.readByte() == COHORT_PATH_PRESENT) {
-            cohortPath = in.readUTF();
-        }
     }
 
     @Override
     public void writeExternal(ObjectOutput out) throws IOException {
         super.writeExternal(out);
         out.writeInt(numBatched);
     }
 
     @Override
     public void writeExternal(ObjectOutput out) throws IOException {
         super.writeExternal(out);
         out.writeInt(numBatched);
-
-        if(cohortPath != null) {
-            out.writeByte(COHORT_PATH_PRESENT);
-            out.writeUTF(cohortPath);
-        } else {
-            out.writeByte(COHORT_PATH_NOT_PRESENT);
-        }
     }
 
     @Override
     }
 
     @Override
@@ -76,8 +52,7 @@ public class BatchedModificationsReply extends VersionedExternalizableMessage {
     @Override
     public String toString() {
         StringBuilder builder = new StringBuilder();
     @Override
     public String toString() {
         StringBuilder builder = new StringBuilder();
-        builder.append("BatchedModificationsReply [numBatched=").append(numBatched).append(", cohortPath=")
-                .append(cohortPath).append("]");
+        builder.append("BatchedModificationsReply [numBatched=").append(numBatched).append("]");
         return builder.toString();
     }
 }
         return builder.toString();
     }
 }
index 38886c9a583c500e4fc9b86f54422c001482e777..0f872430599d1d75869b2404e5f93052d2986cec 100644 (file)
@@ -20,9 +20,9 @@ public class ForwardedReadyTransaction {
     private final DOMStoreThreePhaseCommitCohort cohort;
     private final Modification modification;
     private final boolean returnSerialized;
     private final DOMStoreThreePhaseCommitCohort cohort;
     private final Modification modification;
     private final boolean returnSerialized;
-    private final int txnClientVersion;
+    private final short txnClientVersion;
 
 
-    public ForwardedReadyTransaction(String transactionID, int txnClientVersion,
+    public ForwardedReadyTransaction(String transactionID, short txnClientVersion,
             DOMStoreThreePhaseCommitCohort cohort, Modification modification,
             boolean returnSerialized) {
         this.transactionID = transactionID;
             DOMStoreThreePhaseCommitCohort cohort, Modification modification,
             boolean returnSerialized) {
         this.transactionID = transactionID;
@@ -48,7 +48,7 @@ public class ForwardedReadyTransaction {
         return returnSerialized;
     }
 
         return returnSerialized;
     }
 
-    public int getTxnClientVersion() {
+    public short getTxnClientVersion() {
         return txnClientVersion;
     }
 }
         return txnClientVersion;
     }
 }
index 09617abde9b370351a45207d8ea42f60c4c6d5fc..8d617d0ba70d266aba9d9b033ae281351e999b88 100644 (file)
@@ -10,6 +10,7 @@ package org.opendaylight.controller.cluster.datastore.messages;
 
 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
 
 
 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
 
+@Deprecated
 public class ReadyTransaction implements SerializableMessage{
     public static final Class<ShardTransactionMessages.ReadyTransaction> SERIALIZABLE_CLASS =
             ShardTransactionMessages.ReadyTransaction.class;
 public class ReadyTransaction implements SerializableMessage{
     public static final Class<ShardTransactionMessages.ReadyTransaction> SERIALIZABLE_CLASS =
             ShardTransactionMessages.ReadyTransaction.class;
index 282e23ed3bab4458ef771b338d6bf03861536662..b25a5ddf296cb729265bb5f745977926c7bb313c 100644 (file)
@@ -8,15 +8,29 @@
 
 package org.opendaylight.controller.cluster.datastore.messages;
 
 
 package org.opendaylight.controller.cluster.datastore.messages;
 
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
 
 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
 
-public class ReadyTransactionReply implements SerializableMessage {
+public class ReadyTransactionReply extends VersionedExternalizableMessage {
+    private static final long serialVersionUID = 1L;
+
     public static final Class<ShardTransactionMessages.ReadyTransactionReply> SERIALIZABLE_CLASS =
             ShardTransactionMessages.ReadyTransactionReply.class;
 
     public static final Class<ShardTransactionMessages.ReadyTransactionReply> SERIALIZABLE_CLASS =
             ShardTransactionMessages.ReadyTransactionReply.class;
 
-    private final String cohortPath;
+    private String cohortPath;
+
+    public ReadyTransactionReply() {
+    }
 
     public ReadyTransactionReply(String cohortPath) {
 
     public ReadyTransactionReply(String cohortPath) {
+        this(cohortPath, DataStoreVersions.CURRENT_VERSION);
+    }
+
+    public ReadyTransactionReply(String cohortPath, short version) {
+        super(version);
         this.cohortPath = cohortPath;
     }
 
         this.cohortPath = cohortPath;
     }
 
@@ -25,16 +39,38 @@ public class ReadyTransactionReply implements SerializableMessage {
     }
 
     @Override
     }
 
     @Override
-    public ShardTransactionMessages.ReadyTransactionReply toSerializable() {
-        return ShardTransactionMessages.ReadyTransactionReply.newBuilder()
-                .setActorPath(cohortPath)
-                .build();
+    public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+        super.readExternal(in);
+        cohortPath = in.readUTF();
+    }
+
+    @Override
+    public void writeExternal(ObjectOutput out) throws IOException {
+        super.writeExternal(out);
+        out.writeUTF(cohortPath);
+    }
+
+    @Override
+    public Object toSerializable() {
+        if(getVersion() >= DataStoreVersions.LITHIUM_VERSION) {
+            return this;
+        } else {
+            return ShardTransactionMessages.ReadyTransactionReply.newBuilder().setActorPath(cohortPath).build();
+        }
     }
 
     public static ReadyTransactionReply fromSerializable(Object serializable) {
     }
 
     public static ReadyTransactionReply fromSerializable(Object serializable) {
-        ShardTransactionMessages.ReadyTransactionReply o =
-                (ShardTransactionMessages.ReadyTransactionReply) serializable;
+        if(serializable instanceof ReadyTransactionReply) {
+            return (ReadyTransactionReply)serializable;
+        } else {
+            ShardTransactionMessages.ReadyTransactionReply o =
+                    (ShardTransactionMessages.ReadyTransactionReply) serializable;
+            return new ReadyTransactionReply(o.getActorPath(), DataStoreVersions.HELIUM_2_VERSION);
+        }
+    }
 
 
-        return new ReadyTransactionReply(o.getActorPath());
+    public static boolean isSerializedType(Object message) {
+        return message instanceof ReadyTransactionReply ||
+               message instanceof ShardTransactionMessages.ReadyTransactionReply;
     }
 }
     }
 }
index c6c5486ee38527407cfca218375d336ecb112f83..6a1e12a96b6e9cd2eb7a3a89204a87d21a64f1da 100644 (file)
@@ -50,7 +50,6 @@ import org.opendaylight.controller.cluster.datastore.messages.DataExists;
 import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
 import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
-import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
 import org.opendaylight.controller.cluster.datastore.modification.AbstractModification;
 import org.opendaylight.controller.cluster.datastore.modification.Modification;
 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
 import org.opendaylight.controller.cluster.datastore.modification.AbstractModification;
 import org.opendaylight.controller.cluster.datastore.modification.Modification;
@@ -204,10 +203,6 @@ public abstract class AbstractTransactionProxyTest {
         return argThat(matcher);
     }
 
         return argThat(matcher);
     }
 
-    protected Future<Object> readySerializedTxReply(String path) {
-        return Futures.successful((Object)new ReadyTransactionReply(path).toSerializable());
-    }
-
     protected Future<Object> readyTxReply(String path) {
         return Futures.successful((Object)new ReadyTransactionReply(path));
     }
     protected Future<Object> readyTxReply(String path) {
         return Futures.successful((Object)new ReadyTransactionReply(path));
     }
@@ -250,10 +245,8 @@ public abstract class AbstractTransactionProxyTest {
                 eq(actorSelection(actorRef)), isA(BatchedModifications.class));
     }
 
                 eq(actorSelection(actorRef)), isA(BatchedModifications.class));
     }
 
-    protected void expectBatchedModificationsReady(ActorRef actorRef, int count) {
-        Future<BatchedModificationsReply> replyFuture = Futures.successful(
-                new BatchedModificationsReply(count, actorRef.path().toString()));
-        doReturn(replyFuture).when(mockActorContext).executeOperationAsync(
+    protected void expectBatchedModificationsReady(ActorRef actorRef) {
+        doReturn(readyTxReply(actorRef.path().toString())).when(mockActorContext).executeOperationAsync(
                 eq(actorSelection(actorRef)), isA(BatchedModifications.class));
     }
 
                 eq(actorSelection(actorRef)), isA(BatchedModifications.class));
     }
 
@@ -267,11 +260,6 @@ public abstract class AbstractTransactionProxyTest {
                 any(ActorSelection.class), isA(BatchedModifications.class));
     }
 
                 any(ActorSelection.class), isA(BatchedModifications.class));
     }
 
-    protected void expectReadyTransaction(ActorRef actorRef) {
-        doReturn(readySerializedTxReply(actorRef.path().toString())).when(mockActorContext).executeOperationAsync(
-                eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS));
-    }
-
     protected void expectFailedBatchedModifications(ActorRef actorRef) {
         doReturn(Futures.failed(new TestException())).when(mockActorContext).executeOperationAsync(
                 eq(actorSelection(actorRef)), isA(BatchedModifications.class));
     protected void expectFailedBatchedModifications(ActorRef actorRef) {
         doReturn(Futures.failed(new TestException())).when(mockActorContext).executeOperationAsync(
                 eq(actorSelection(actorRef)), isA(BatchedModifications.class));
index e04c1a5d185ffbb5bc81b4d035a07545c02ef3a7..b3a0430f9325ae933c355edc4a18e304fa8dae05 100644 (file)
@@ -436,42 +436,42 @@ public class ShardTest extends AbstractShardTest {
 
             waitUntilLeader(shard);
 
 
             waitUntilLeader(shard);
 
-            final String transactionID1 = "tx1";
-            final String transactionID2 = "tx2";
-            final String transactionID3 = "tx3";
+         // Setup 3 simulated transactions with mock cohorts backed by real cohorts.
 
 
-            final AtomicReference<DOMStoreThreePhaseCommitCohort> mockCohort1 = new AtomicReference<>();
-            final AtomicReference<DOMStoreThreePhaseCommitCohort> mockCohort2 = new AtomicReference<>();
-            final AtomicReference<DOMStoreThreePhaseCommitCohort> mockCohort3 = new AtomicReference<>();
-            ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() {
-                @Override
-                public DOMStoreThreePhaseCommitCohort decorate(String transactionID, DOMStoreThreePhaseCommitCohort actual) {
-                    if(transactionID.equals(transactionID1)) {
-                        mockCohort1.set(createDelegatingMockCohort("cohort1", actual));
-                        return mockCohort1.get();
-                    } else if(transactionID.equals(transactionID2)) {
-                        mockCohort2.set(createDelegatingMockCohort("cohort2", actual));
-                        return mockCohort2.get();
-                    } else {
-                        mockCohort3.set(createDelegatingMockCohort("cohort3", actual));
-                        return mockCohort3.get();
-                    }
-                }
-            };
+            InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
 
 
-            shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator);
+            String transactionID1 = "tx1";
+            MutableCompositeModification modification1 = new MutableCompositeModification();
+            DOMStoreThreePhaseCommitCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
+                    TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
+
+            String transactionID2 = "tx2";
+            MutableCompositeModification modification2 = new MutableCompositeModification();
+            DOMStoreThreePhaseCommitCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
+                    TestModel.OUTER_LIST_PATH,
+                    ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
+                    modification2);
+
+            String transactionID3 = "tx3";
+            MutableCompositeModification modification3 = new MutableCompositeModification();
+            DOMStoreThreePhaseCommitCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
+                    YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
+                        .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
+                    ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1),
+                    modification3);
 
             long timeoutSec = 5;
             final FiniteDuration duration = FiniteDuration.create(timeoutSec, TimeUnit.SECONDS);
             final Timeout timeout = new Timeout(duration);
 
 
             long timeoutSec = 5;
             final FiniteDuration duration = FiniteDuration.create(timeoutSec, TimeUnit.SECONDS);
             final Timeout timeout = new Timeout(duration);
 
-            // Send a BatchedModifications message for the first transaction.
+            // Simulate the ForwardedReadyTransaction message for the first Tx that would be sent
+            // by the ShardTransaction.
 
 
-            shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH,
-                    ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
-            BatchedModificationsReply batchedReply = expectMsgClass(duration, BatchedModificationsReply.class);
-            assertEquals("getCohortPath", shard.path().toString(), batchedReply.getCohortPath());
-            assertEquals("getNumBatched", 1, batchedReply.getNumBatched());
+            shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
+                    cohort1, modification1, true), getRef());
+            ReadyTransactionReply readyReply = ReadyTransactionReply.fromSerializable(
+                    expectMsgClass(duration, ReadyTransactionReply.class));
+            assertEquals("Cohort path", shard.path().toString(), readyReply.getCohortPath());
 
             // Send the CanCommitTransaction message for the first Tx.
 
 
             // Send the CanCommitTransaction message for the first Tx.
 
@@ -480,16 +480,15 @@ public class ShardTest extends AbstractShardTest {
                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
             assertEquals("Can commit", true, canCommitReply.getCanCommit());
 
                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
             assertEquals("Can commit", true, canCommitReply.getCanCommit());
 
-            // Send BatchedModifications for the next 2 Tx's.
+            // Send the ForwardedReadyTransaction for the next 2 Tx's.
 
 
-            shard.tell(newBatchedModifications(transactionID2, TestModel.OUTER_LIST_PATH,
-                    ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), true), getRef());
-            expectMsgClass(duration, BatchedModificationsReply.class);
+            shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
+                    cohort2, modification2, true), getRef());
+            expectMsgClass(duration, ReadyTransactionReply.class);
 
 
-            shard.tell(newBatchedModifications(transactionID3, YangInstanceIdentifier.builder(
-                    TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
-                    ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true), getRef());
-            expectMsgClass(duration, BatchedModificationsReply.class);
+            shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
+                    cohort3, modification3, true), getRef());
+            expectMsgClass(duration, ReadyTransactionReply.class);
 
             // Send the CanCommitTransaction message for the next 2 Tx's. These should get queued and
             // processed after the first Tx completes.
 
             // Send the CanCommitTransaction message for the next 2 Tx's. These should get queued and
             // processed after the first Tx completes.
@@ -582,16 +581,16 @@ public class ShardTest extends AbstractShardTest {
 
             assertEquals("Commits complete", true, done);
 
 
             assertEquals("Commits complete", true, done);
 
-            InOrder inOrder = inOrder(mockCohort1.get(), mockCohort2.get(), mockCohort3.get());
-            inOrder.verify(mockCohort1.get()).canCommit();
-            inOrder.verify(mockCohort1.get()).preCommit();
-            inOrder.verify(mockCohort1.get()).commit();
-            inOrder.verify(mockCohort2.get()).canCommit();
-            inOrder.verify(mockCohort2.get()).preCommit();
-            inOrder.verify(mockCohort2.get()).commit();
-            inOrder.verify(mockCohort3.get()).canCommit();
-            inOrder.verify(mockCohort3.get()).preCommit();
-            inOrder.verify(mockCohort3.get()).commit();
+            InOrder inOrder = inOrder(cohort1, cohort2, cohort3);
+            inOrder.verify(cohort1).canCommit();
+            inOrder.verify(cohort1).preCommit();
+            inOrder.verify(cohort1).commit();
+            inOrder.verify(cohort2).canCommit();
+            inOrder.verify(cohort2).preCommit();
+            inOrder.verify(cohort2).commit();
+            inOrder.verify(cohort3).canCommit();
+            inOrder.verify(cohort3).preCommit();
+            inOrder.verify(cohort3).commit();
 
             // Verify data in the data store.
 
 
             // Verify data in the data store.
 
@@ -669,7 +668,7 @@ public class ShardTest extends AbstractShardTest {
             shard.tell(newBatchedModifications(transactionID, YangInstanceIdentifier.builder(
                     TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true), getRef());
             shard.tell(newBatchedModifications(transactionID, YangInstanceIdentifier.builder(
                     TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true), getRef());
-            expectMsgClass(duration, BatchedModificationsReply.class);
+            expectMsgClass(duration, ReadyTransactionReply.class);
 
             // Send the CanCommitTransaction message.
 
 
             // Send the CanCommitTransaction message.
 
@@ -728,7 +727,7 @@ public class ShardTest extends AbstractShardTest {
             YangInstanceIdentifier path = TestModel.TEST_PATH;
             shard.tell(newBatchedModifications(transactionID1, transactionChainID, path,
                     containerNode, true), getRef());
             YangInstanceIdentifier path = TestModel.TEST_PATH;
             shard.tell(newBatchedModifications(transactionID1, transactionChainID, path,
                     containerNode, true), getRef());
-            expectMsgClass(duration, BatchedModificationsReply.class);
+            expectMsgClass(duration, ReadyTransactionReply.class);
 
             // Create a read Tx on the same chain.
 
 
             // Create a read Tx on the same chain.
 
@@ -810,14 +809,24 @@ public class ShardTest extends AbstractShardTest {
 
             waitUntilLeader(shard);
 
 
             waitUntilLeader(shard);
 
+            InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
+
+            // Setup a simulated transactions with a mock cohort.
+
             String transactionID = "tx";
             String transactionID = "tx";
+            MutableCompositeModification modification = new MutableCompositeModification();
+            NormalizedNode<?, ?> containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+            DOMStoreThreePhaseCommitCohort cohort = setupMockWriteTransaction("cohort", dataStore,
+                    TestModel.TEST_PATH, containerNode, modification);
+
             FiniteDuration duration = duration("5 seconds");
 
             FiniteDuration duration = duration("5 seconds");
 
-            // Send a BatchedModifications to start a transaction.
+            // Simulate the ForwardedReadyTransaction messages that would be sent
+            // by the ShardTransaction.
 
 
-            NormalizedNode<?, ?> containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
-            shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH, containerNode, true), getRef());
-            expectMsgClass(duration, BatchedModificationsReply.class);
+            shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
+                    cohort, modification, true), getRef());
+            expectMsgClass(duration, ReadyTransactionReply.class);
 
             // Send the CanCommitTransaction message.
 
 
             // Send the CanCommitTransaction message.
 
@@ -831,6 +840,11 @@ public class ShardTest extends AbstractShardTest {
             shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
 
             shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
 
+            InOrder inOrder = inOrder(cohort);
+            inOrder.verify(cohort).canCommit();
+            inOrder.verify(cohort).preCommit();
+            inOrder.verify(cohort).commit();
+
             NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.TEST_PATH);
             assertEquals(TestModel.TEST_QNAME.getLocalName(), containerNode, actualNode);
 
             NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.TEST_PATH);
             assertEquals(TestModel.TEST_QNAME.getLocalName(), containerNode, actualNode);
 
@@ -864,7 +878,7 @@ public class ShardTest extends AbstractShardTest {
 
                 shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
                         cohort, modification, true), getRef());
 
                 shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
                         cohort, modification, true), getRef());
-                expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
+                expectMsgClass(duration, ReadyTransactionReply.class);
 
                 // Send the CanCommitTransaction message.
 
 
                 // Send the CanCommitTransaction message.
 
@@ -919,7 +933,7 @@ public class ShardTest extends AbstractShardTest {
 
                 shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
                         cohort, modification, true), getRef());
 
                 shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
                         cohort, modification, true), getRef());
-                expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
+                expectMsgClass(duration, ReadyTransactionReply.class);
 
                 // Send the CanCommitTransaction message.
 
 
                 // Send the CanCommitTransaction message.
 
@@ -958,40 +972,34 @@ public class ShardTest extends AbstractShardTest {
 
             waitUntilLeader(shard);
 
 
             waitUntilLeader(shard);
 
-            // Setup 2 mock cohorts. The first one fails in the commit phase.
+         // Setup 2 simulated transactions with mock cohorts. The first one fails in the
+            // commit phase.
 
 
-            final String transactionID1 = "tx1";
-            final DOMStoreThreePhaseCommitCohort cohort1 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
+            String transactionID1 = "tx1";
+            MutableCompositeModification modification1 = new MutableCompositeModification();
+            DOMStoreThreePhaseCommitCohort cohort1 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
             doReturn(Futures.immediateFuture(null)).when(cohort1).preCommit();
             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort1).commit();
 
             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
             doReturn(Futures.immediateFuture(null)).when(cohort1).preCommit();
             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort1).commit();
 
-            final String transactionID2 = "tx2";
-            final DOMStoreThreePhaseCommitCohort cohort2 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort2");
+            String transactionID2 = "tx2";
+            MutableCompositeModification modification2 = new MutableCompositeModification();
+            DOMStoreThreePhaseCommitCohort cohort2 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort2");
             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
 
             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
 
-            ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() {
-                @Override
-                public DOMStoreThreePhaseCommitCohort decorate(String transactionID,
-                        DOMStoreThreePhaseCommitCohort actual) {
-                    return transactionID1.equals(transactionID) ? cohort1 : cohort2;
-                }
-            };
-
-            shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator);
-
             FiniteDuration duration = duration("5 seconds");
             final Timeout timeout = new Timeout(duration);
 
             FiniteDuration duration = duration("5 seconds");
             final Timeout timeout = new Timeout(duration);
 
-            // Send BatchedModifications to start and ready each transaction.
+            // Simulate the ForwardedReadyTransaction messages that would be sent
+            // by the ShardTransaction.
 
 
-            shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH,
-                    ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
-            expectMsgClass(duration, BatchedModificationsReply.class);
+            shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
+                    cohort1, modification1, true), getRef());
+            expectMsgClass(duration, ReadyTransactionReply.class);
 
 
-            shard.tell(newBatchedModifications(transactionID2, TestModel.TEST_PATH,
-                    ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
-            expectMsgClass(duration, BatchedModificationsReply.class);
+            shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
+                    cohort2, modification2, true), getRef());
+            expectMsgClass(duration, ReadyTransactionReply.class);
 
             // Send the CanCommitTransaction message for the first Tx.
 
 
             // Send the CanCommitTransaction message for the first Tx.
 
@@ -1044,27 +1052,19 @@ public class ShardTest extends AbstractShardTest {
             waitUntilLeader(shard);
 
             String transactionID = "tx1";
             waitUntilLeader(shard);
 
             String transactionID = "tx1";
-            final DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
+            MutableCompositeModification modification = new MutableCompositeModification();
+            DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).preCommit();
 
             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).preCommit();
 
-            ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() {
-                @Override
-                public DOMStoreThreePhaseCommitCohort decorate(String transactionID,
-                        DOMStoreThreePhaseCommitCohort actual) {
-                    return cohort;
-                }
-            };
-
-            shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator);
-
             FiniteDuration duration = duration("5 seconds");
 
             FiniteDuration duration = duration("5 seconds");
 
-            // Send BatchedModifications to start and ready a transaction.
+            // Simulate the ForwardedReadyTransaction messages that would be sent
+            // by the ShardTransaction.
 
 
-            shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH,
-                    ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
-            expectMsgClass(duration, BatchedModificationsReply.class);
+            shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
+                    cohort, modification, true), getRef());
+            expectMsgClass(duration, ReadyTransactionReply.class);
 
             // Send the CanCommitTransaction message.
 
 
             // Send the CanCommitTransaction message.
 
@@ -1099,24 +1099,16 @@ public class ShardTest extends AbstractShardTest {
             final FiniteDuration duration = duration("5 seconds");
 
             String transactionID = "tx1";
             final FiniteDuration duration = duration("5 seconds");
 
             String transactionID = "tx1";
-            final DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
+            MutableCompositeModification modification = new MutableCompositeModification();
+            DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).canCommit();
 
             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).canCommit();
 
-            ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() {
-                @Override
-                public DOMStoreThreePhaseCommitCohort decorate(String transactionID,
-                        DOMStoreThreePhaseCommitCohort actual) {
-                    return cohort;
-                }
-            };
-
-            shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator);
-
-            // Send BatchedModifications to start and ready a transaction.
+            // Simulate the ForwardedReadyTransaction messages that would be sent
+            // by the ShardTransaction.
 
 
-            shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH,
-                    ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
-            expectMsgClass(duration, BatchedModificationsReply.class);
+            shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
+                    cohort, modification, true), getRef());
+            expectMsgClass(duration, ReadyTransactionReply.class);
 
             // Send the CanCommitTransaction message.
 
 
             // Send the CanCommitTransaction message.
 
@@ -1160,9 +1152,14 @@ public class ShardTest extends AbstractShardTest {
                 }
             };
 
                 }
             };
 
-            shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH,
-                    ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
-            expectMsgClass(duration, BatchedModificationsReply.class);
+            MutableCompositeModification modification = new MutableCompositeModification();
+            DOMStoreThreePhaseCommitCohort cohort = setupMockWriteTransaction("cohort1", dataStore,
+                    TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME),
+                    modification, preCommit);
+
+            shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
+                    cohort, modification, true), getRef());
+            expectMsgClass(duration, ReadyTransactionReply.class);
 
             shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
 
             shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
@@ -1196,26 +1193,42 @@ public class ShardTest extends AbstractShardTest {
 
             final FiniteDuration duration = duration("5 seconds");
 
 
             final FiniteDuration duration = duration("5 seconds");
 
+            InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
+
             writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
             writeToStore(shard, TestModel.OUTER_LIST_PATH,
                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
 
             writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
             writeToStore(shard, TestModel.OUTER_LIST_PATH,
                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
 
-            // Create and ready the 1st Tx - will timeout
+            // Create 1st Tx - will timeout
 
             String transactionID1 = "tx1";
 
             String transactionID1 = "tx1";
-            shard.tell(newBatchedModifications(transactionID1, YangInstanceIdentifier.builder(
-                    TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
-                ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true), getRef());
-            expectMsgClass(duration, BatchedModificationsReply.class);
+            MutableCompositeModification modification1 = new MutableCompositeModification();
+            DOMStoreThreePhaseCommitCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
+                    YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
+                        .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
+                    ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1),
+                    modification1);
 
 
-            // Create and ready the 2nd Tx
+            // Create 2nd Tx
 
 
-            String transactionID2 = "tx2";
+            String transactionID2 = "tx3";
+            MutableCompositeModification modification2 = new MutableCompositeModification();
             YangInstanceIdentifier listNodePath = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
             YangInstanceIdentifier listNodePath = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
-                    .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build();
-            shard.tell(newBatchedModifications(transactionID2, listNodePath,
-                    ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2), true), getRef());
-            expectMsgClass(duration, BatchedModificationsReply.class);
+                .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build();
+            DOMStoreThreePhaseCommitCohort cohort2 = setupMockWriteTransaction("cohort3", dataStore,
+                    listNodePath,
+                    ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2),
+                    modification2);
+
+            // Ready the Tx's
+
+            shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
+                    cohort1, modification1, true), getRef());
+            expectMsgClass(duration, ReadyTransactionReply.class);
+
+            shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
+                    cohort2, modification2, true), getRef());
+            expectMsgClass(duration, ReadyTransactionReply.class);
 
             // canCommit 1st Tx. We don't send the commit so it should timeout.
 
 
             // canCommit 1st Tx. We don't send the commit so it should timeout.
 
@@ -1252,23 +1265,38 @@ public class ShardTest extends AbstractShardTest {
 
             final FiniteDuration duration = duration("5 seconds");
 
 
             final FiniteDuration duration = duration("5 seconds");
 
+            InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
+
             String transactionID1 = "tx1";
             String transactionID1 = "tx1";
+            MutableCompositeModification modification1 = new MutableCompositeModification();
+            DOMStoreThreePhaseCommitCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
+                    TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
+
             String transactionID2 = "tx2";
             String transactionID2 = "tx2";
+            MutableCompositeModification modification2 = new MutableCompositeModification();
+            DOMStoreThreePhaseCommitCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
+                    TestModel.OUTER_LIST_PATH,
+                    ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
+                    modification2);
+
             String transactionID3 = "tx3";
             String transactionID3 = "tx3";
+            MutableCompositeModification modification3 = new MutableCompositeModification();
+            DOMStoreThreePhaseCommitCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
+                    TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification3);
 
 
-            // Send a BatchedModifications to start transactions and ready them.
+            // Ready the Tx's
 
 
-            shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH,
-                    ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
-            expectMsgClass(duration, BatchedModificationsReply.class);
+            shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
+                    cohort1, modification1, true), getRef());
+            expectMsgClass(duration, ReadyTransactionReply.class);
 
 
-            shard.tell(newBatchedModifications(transactionID2,TestModel.OUTER_LIST_PATH,
-                    ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), true), getRef());
-            expectMsgClass(duration, BatchedModificationsReply.class);
+            shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
+                    cohort2, modification2, true), getRef());
+            expectMsgClass(duration, ReadyTransactionReply.class);
 
 
-            shard.tell(newBatchedModifications(transactionID3, TestModel.TEST_PATH,
-                    ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
-            expectMsgClass(duration, BatchedModificationsReply.class);
+            shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
+                    cohort3, modification3, true), getRef());
+            expectMsgClass(duration, ReadyTransactionReply.class);
 
             // canCommit 1st Tx.
 
 
             // canCommit 1st Tx.
 
@@ -1313,37 +1341,30 @@ public class ShardTest extends AbstractShardTest {
 
             // Setup 2 simulated transactions with mock cohorts. The first one will be aborted.
 
 
             // Setup 2 simulated transactions with mock cohorts. The first one will be aborted.
 
-            final String transactionID1 = "tx1";
-            final DOMStoreThreePhaseCommitCohort cohort1 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
+            String transactionID1 = "tx1";
+            MutableCompositeModification modification1 = new MutableCompositeModification();
+            DOMStoreThreePhaseCommitCohort cohort1 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
             doReturn(Futures.immediateFuture(null)).when(cohort1).abort();
 
             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
             doReturn(Futures.immediateFuture(null)).when(cohort1).abort();
 
-            final String transactionID2 = "tx2";
-            final DOMStoreThreePhaseCommitCohort cohort2 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort2");
+            String transactionID2 = "tx2";
+            MutableCompositeModification modification2 = new MutableCompositeModification();
+            DOMStoreThreePhaseCommitCohort cohort2 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort2");
             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
 
             FiniteDuration duration = duration("5 seconds");
             final Timeout timeout = new Timeout(duration);
 
             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
 
             FiniteDuration duration = duration("5 seconds");
             final Timeout timeout = new Timeout(duration);
 
-            ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() {
-                @Override
-                public DOMStoreThreePhaseCommitCohort decorate(String transactionID,
-                        DOMStoreThreePhaseCommitCohort actual) {
-                    return transactionID1.equals(transactionID) ? cohort1 : cohort2;
-                }
-            };
+            // Simulate the ForwardedReadyTransaction messages that would be sent
+            // by the ShardTransaction.
 
 
-            shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator);
-
-            // Send BatchedModifications to start and ready each transaction.
+            shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
+                    cohort1, modification1, true), getRef());
+            expectMsgClass(duration, ReadyTransactionReply.class);
 
 
-            shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH,
-                    ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
-            expectMsgClass(duration, BatchedModificationsReply.class);
-
-            shard.tell(newBatchedModifications(transactionID2, TestModel.TEST_PATH,
-                    ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
-            expectMsgClass(duration, BatchedModificationsReply.class);
+            shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
+                    cohort2, modification2, true), getRef());
+            expectMsgClass(duration, ReadyTransactionReply.class);
 
             // Send the CanCommitTransaction message for the first Tx.
 
 
             // Send the CanCommitTransaction message for the first Tx.
 
index e63ace3e2cc5abaff32acc4012f583cfbbdde6cc..9715f668e353fe71d527865d2ffd68a4759952b5 100644 (file)
@@ -409,34 +409,58 @@ public class ShardTransactionTest extends AbstractActorTest {
     }
 
     @Test
     }
 
     @Test
-    public void testOnReceiveReadyTransaction() throws Exception {
+    public void testOnReceiveBatchedModificationsReady() throws Exception {
+        new JavaTestKit(getSystem()) {{
+
+            final ActorRef transaction = newTransactionActor(store.newWriteOnlyTransaction(),
+                    "testOnReceiveBatchedModificationsReady");
+
+            JavaTestKit watcher = new JavaTestKit(getSystem());
+            watcher.watch(transaction);
+
+            YangInstanceIdentifier writePath = TestModel.TEST_PATH;
+            NormalizedNode<?, ?> writeData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
+                    new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
+                    withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
+
+            BatchedModifications batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
+            batched.setReady(true);
+            batched.addModification(new WriteModification(writePath, writeData));
+
+            transaction.tell(batched, getRef());
+
+            expectMsgClass(duration("5 seconds"), ReadyTransactionReply.class);
+            watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
+        }};
+    }
+
+    @Test
+    public void testOnReceivePreLithiumReadyTransaction() throws Exception {
         new JavaTestKit(getSystem()) {{
             final ActorRef transaction = newTransactionActor(store.newReadWriteTransaction(),
         new JavaTestKit(getSystem()) {{
             final ActorRef transaction = newTransactionActor(store.newReadWriteTransaction(),
-                    "testReadyTransaction");
+                    "testReadyTransaction", DataStoreVersions.HELIUM_2_VERSION);
 
 
-            watch(transaction);
+            JavaTestKit watcher = new JavaTestKit(getSystem());
+            watcher.watch(transaction);
 
             transaction.tell(new ReadyTransaction().toSerializable(), getRef());
 
 
             transaction.tell(new ReadyTransaction().toSerializable(), getRef());
 
-            expectMsgAnyClassOf(duration("5 seconds"), ReadyTransactionReply.SERIALIZABLE_CLASS,
-                    Terminated.class);
-            expectMsgAnyClassOf(duration("5 seconds"), ReadyTransactionReply.SERIALIZABLE_CLASS,
-                    Terminated.class);
+            expectMsgClass(duration("5 seconds"), ReadyTransactionReply.SERIALIZABLE_CLASS);
+            watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
         }};
 
         // test
         new JavaTestKit(getSystem()) {{
             final ActorRef transaction = newTransactionActor(store.newReadWriteTransaction(),
         }};
 
         // test
         new JavaTestKit(getSystem()) {{
             final ActorRef transaction = newTransactionActor(store.newReadWriteTransaction(),
-                    "testReadyTransaction2");
+                    "testReadyTransaction2", DataStoreVersions.HELIUM_2_VERSION);
 
 
-            watch(transaction);
+            JavaTestKit watcher = new JavaTestKit(getSystem());
+            watcher.watch(transaction);
 
             transaction.tell(new ReadyTransaction(), getRef());
 
 
             transaction.tell(new ReadyTransaction(), getRef());
 
-            expectMsgAnyClassOf(duration("5 seconds"), ReadyTransactionReply.class,
-                    Terminated.class);
-            expectMsgAnyClassOf(duration("5 seconds"), ReadyTransactionReply.class,
-                    Terminated.class);
+            expectMsgClass(duration("5 seconds"), ReadyTransactionReply.class);
+            watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
         }};
     }
 
         }};
     }
 
index acba775445879d5a1305969cc099fc6fd35b2cb8..026b5490288c1bbdeb1ec44d37e998abddad813e 100644 (file)
@@ -30,8 +30,6 @@ import java.util.concurrent.atomic.AtomicReference;
 import org.junit.Assert;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
 import org.junit.Assert;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
-import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
-import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
 import org.opendaylight.controller.cluster.datastore.shardstrategy.DefaultShardStrategy;
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
 import org.opendaylight.controller.cluster.datastore.shardstrategy.DefaultShardStrategy;
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
@@ -176,7 +174,7 @@ public class TransactionChainProxyTest extends AbstractTransactionProxyTest {
             fail("Tx 2 should not have initiated until the Tx 1's ready future completed");
         }
 
             fail("Tx 2 should not have initiated until the Tx 1's ready future completed");
         }
 
-        batchedReplyPromise1.success(new BatchedModificationsReply(1, txActorRef1.path().toString()));
+        batchedReplyPromise1.success(readyTxReply(txActorRef1.path().toString()).value().get().get());
 
         // Tx 2 should've proceeded to find the primary shard.
         verify(mockActorContext, timeout(5000).times(2)).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
 
         // Tx 2 should've proceeded to find the primary shard.
         verify(mockActorContext, timeout(5000).times(2)).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
@@ -196,7 +194,7 @@ public class TransactionChainProxyTest extends AbstractTransactionProxyTest {
 
         Promise<Object> readyReplyPromise1 = akka.dispatch.Futures.promise();
         doReturn(readyReplyPromise1.future()).when(mockActorContext).executeOperationAsync(
 
         Promise<Object> readyReplyPromise1 = akka.dispatch.Futures.promise();
         doReturn(readyReplyPromise1.future()).when(mockActorContext).executeOperationAsync(
-                eq(actorSelection(txActorRef1)), isA(ReadyTransaction.SERIALIZABLE_CLASS));
+                eq(actorSelection(txActorRef1)), isA(BatchedModifications.class));
 
         DOMStoreWriteTransaction writeTx1 = txChainProxy.newReadWriteTransaction();
 
 
         DOMStoreWriteTransaction writeTx1 = txChainProxy.newReadWriteTransaction();
 
@@ -205,7 +203,7 @@ public class TransactionChainProxyTest extends AbstractTransactionProxyTest {
 
         writeTx1.ready();
 
 
         writeTx1.ready();
 
-        verifyOneBatchedModification(txActorRef1, new WriteModification(TestModel.TEST_PATH, writeNode1), false);
+        verifyOneBatchedModification(txActorRef1, new WriteModification(TestModel.TEST_PATH, writeNode1), true);
 
         String tx2MemberName = "tx2MemberName";
         doReturn(tx2MemberName).when(mockActorContext).getCurrentMemberName();
 
         String tx2MemberName = "tx2MemberName";
         doReturn(tx2MemberName).when(mockActorContext).getCurrentMemberName();
@@ -247,7 +245,7 @@ public class TransactionChainProxyTest extends AbstractTransactionProxyTest {
             fail("Tx 2 should not have initiated until the Tx 1's ready future completed");
         }
 
             fail("Tx 2 should not have initiated until the Tx 1's ready future completed");
         }
 
-        readyReplyPromise1.success(readySerializedTxReply(txActorRef1.path().toString()).value().get().get());
+        readyReplyPromise1.success(readyTxReply(txActorRef1.path().toString()).value().get().get());
 
         verify(mockActorContext, timeout(5000)).executeOperationAsync(eq(getSystem().actorSelection(shardActorRef2.path())),
                 eqCreateTransaction(tx2MemberName, READ_WRITE));
 
         verify(mockActorContext, timeout(5000)).executeOperationAsync(eq(getSystem().actorSelection(shardActorRef2.path())),
                 eqCreateTransaction(tx2MemberName, READ_WRITE));
index b95eaf64d75d2ab83bd5cc2ae1c94f7e2f50e2c4..6cfef194915bb81230b6b8b87c0809afcd15eb2e 100644 (file)
@@ -336,8 +336,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
         doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync(
                 eq(actorSelection(actorRef)), eqSerializedReadData());
 
         doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync(
                 eq(actorSelection(actorRef)), eqSerializedReadData());
 
-        expectBatchedModifications(actorRef, 1);
-        expectReadyTransaction(actorRef);
+        expectBatchedModificationsReady(actorRef);
 
         final NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
 
 
         final NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
 
@@ -376,7 +375,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
         // This sends the batched modification.
         transactionProxy.ready();
 
         // This sends the batched modification.
         transactionProxy.ready();
 
-        verifyOneBatchedModification(actorRef, new WriteModification(TestModel.TEST_PATH, nodeToWrite), false);
+        verifyOneBatchedModification(actorRef, new WriteModification(TestModel.TEST_PATH, nodeToWrite), true);
     }
 
     @Test(expected=IllegalStateException.class)
     }
 
     @Test(expected=IllegalStateException.class)
@@ -425,7 +424,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
     }
 
     @Test
     }
 
     @Test
-    public void testReadyWithReadWrite() throws Exception {
+    public void testReadWrite() throws Exception {
         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
 
         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
 
         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
@@ -434,7 +433,34 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
                 eq(actorSelection(actorRef)), eqSerializedReadData());
 
         expectBatchedModifications(actorRef, 1);
                 eq(actorSelection(actorRef)), eqSerializedReadData());
 
         expectBatchedModifications(actorRef, 1);
-        expectReadyTransaction(actorRef);
+
+        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
+
+        transactionProxy.read(TestModel.TEST_PATH);
+
+        transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
+
+        transactionProxy.read(TestModel.TEST_PATH);
+
+        transactionProxy.read(TestModel.TEST_PATH);
+
+        List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
+        assertEquals("Captured BatchedModifications count", 1, batchedModifications.size());
+
+        verifyBatchedModifications(batchedModifications.get(0), false,
+                new WriteModification(TestModel.TEST_PATH, nodeToWrite));
+    }
+
+    @Test
+    public void testReadyWithReadWrite() throws Exception {
+        ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
+
+        NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+
+        doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync(
+                eq(actorSelection(actorRef)), eqSerializedReadData());
+
+        expectBatchedModificationsReady(actorRef);
 
         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
 
 
         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
 
@@ -452,9 +478,33 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
 
         verify(mockActorContext).executeOperationAsync(eq(actorSelection(actorRef)),
                 isA(BatchedModifications.class));
 
         verify(mockActorContext).executeOperationAsync(eq(actorSelection(actorRef)),
                 isA(BatchedModifications.class));
+    }
 
 
-        verify(mockActorContext).executeOperationAsync(eq(actorSelection(actorRef)),
-                isA(ReadyTransaction.SERIALIZABLE_CLASS));
+    @Test
+    public void testReadyWithNoModifications() throws Exception {
+        ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
+
+        doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync(
+                eq(actorSelection(actorRef)), eqSerializedReadData());
+
+        expectBatchedModificationsReady(actorRef);
+
+        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
+
+        transactionProxy.read(TestModel.TEST_PATH);
+
+        DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
+
+        assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
+
+        ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
+
+        verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path()));
+
+        List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
+        assertEquals("Captured BatchedModifications count", 1, batchedModifications.size());
+
+        verifyBatchedModifications(batchedModifications.get(0), true);
     }
 
     @Test
     }
 
     @Test
@@ -465,7 +515,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
 
         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
 
 
         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
 
-        expectBatchedModificationsReady(actorRef, 1);
+        expectBatchedModificationsReady(actorRef);
 
         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
 
 
         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
 
@@ -496,7 +546,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
 
         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
 
 
         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
 
-        expectBatchedModificationsReady(actorRef, 1);
+        expectBatchedModificationsReady(actorRef);
 
         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
 
 
         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
 
@@ -691,18 +741,13 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
         doReturn(true).when(mockActorContext).isPathLocal(anyString());
 
         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
         doReturn(true).when(mockActorContext).isPathLocal(anyString());
 
-        doReturn(batchedModificationsReply(1)).when(mockActorContext).executeOperationAsync(
-                any(ActorSelection.class), isA(BatchedModifications.class));
+        expectBatchedModificationsReady(actorRef);
 
         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
 
         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
 
 
         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
 
         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
 
-        // testing ready
-        doReturn(readyTxReply(actorRef.path().toString())).when(mockActorContext).executeOperationAsync(
-            eq(actorSelection(actorRef)), isA(ReadyTransaction.class));
-
         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
 
         assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
 
         assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
@@ -1130,8 +1175,6 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
 
         expectBatchedModifications(actorRef, shardBatchedModificationCount);
 
 
         expectBatchedModifications(actorRef, shardBatchedModificationCount);
 
-        expectReadyTransaction(actorRef);
-
         YangInstanceIdentifier writePath1 = TestModel.TEST_PATH;
         NormalizedNode<?, ?> writeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
 
         YangInstanceIdentifier writePath1 = TestModel.TEST_PATH;
         NormalizedNode<?, ?> writeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
 
@@ -1176,8 +1219,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
         verifyBatchedModifications(batchedModifications.get(1), false, new MergeModification(mergePath1, mergeNode1),
                 new MergeModification(mergePath2, mergeNode2), new WriteModification(writePath3, writeNode3));
 
         verifyBatchedModifications(batchedModifications.get(1), false, new MergeModification(mergePath1, mergeNode1),
                 new MergeModification(mergePath2, mergeNode2), new WriteModification(writePath3, writeNode3));
 
-        boolean optimizedWriteOnly = type == WRITE_ONLY && dataStoreContextBuilder.build().isWriteOnlyTransactionOptimizationsEnabled();
-        verifyBatchedModifications(batchedModifications.get(2), optimizedWriteOnly, new MergeModification(mergePath3, mergeNode3),
+        verifyBatchedModifications(batchedModifications.get(2), true, new MergeModification(mergePath3, mergeNode3),
                 new DeleteModification(deletePath2));
     }
 
                 new DeleteModification(deletePath2));
     }
 
index cc860eafc7cf46a7aebc374f3351ba1eed5e1f6a..9e1557ae3cf605255721e84bec9da1642c520c8c 100644 (file)
@@ -11,7 +11,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.inOrder;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.inOrder;
-import static org.opendaylight.controller.cluster.datastore.DataStoreVersions.CURRENT_VERSION;
+import static org.opendaylight.controller.cluster.datastore.DataStoreVersions.HELIUM_2_VERSION;
 import akka.actor.ActorRef;
 import akka.actor.PoisonPill;
 import akka.dispatch.Dispatchers;
 import akka.actor.ActorRef;
 import akka.actor.PoisonPill;
 import akka.dispatch.Dispatchers;
@@ -246,7 +246,7 @@ public class PreLithiumShardTest extends AbstractShardTest {
             // Simulate the ForwardedReadyTransaction message for the first Tx that would be sent
             // by the ShardTransaction.
 
             // Simulate the ForwardedReadyTransaction message for the first Tx that would be sent
             // by the ShardTransaction.
 
-            shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
+            shard.tell(new ForwardedReadyTransaction(transactionID1, HELIUM_2_VERSION,
                     cohort1, modification1, true), getRef());
             ReadyTransactionReply readyReply = ReadyTransactionReply.fromSerializable(
                     expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS));
                     cohort1, modification1, true), getRef());
             ReadyTransactionReply readyReply = ReadyTransactionReply.fromSerializable(
                     expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS));
@@ -261,11 +261,11 @@ public class PreLithiumShardTest extends AbstractShardTest {
 
             // Send the ForwardedReadyTransaction for the next 2 Tx's.
 
 
             // Send the ForwardedReadyTransaction for the next 2 Tx's.
 
-            shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
+            shard.tell(new ForwardedReadyTransaction(transactionID2, HELIUM_2_VERSION,
                     cohort2, modification2, true), getRef());
             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
 
                     cohort2, modification2, true), getRef());
             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
 
-            shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
+            shard.tell(new ForwardedReadyTransaction(transactionID3, HELIUM_2_VERSION,
                     cohort3, modification3, true), getRef());
             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
 
                     cohort3, modification3, true), getRef());
             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
 
index 2980f83564fa3b1cafc483cae580fef79838611f..4cf8b67ddbdebeef130b3932a8f77d640b5337ae 100644 (file)
@@ -33,6 +33,7 @@ import org.opendaylight.controller.cluster.datastore.messages.DeleteDataReply;
 import org.opendaylight.controller.cluster.datastore.messages.MergeData;
 import org.opendaylight.controller.cluster.datastore.messages.MergeDataReply;
 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.MergeData;
 import org.opendaylight.controller.cluster.datastore.messages.MergeDataReply;
 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
+import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.WriteData;
 import org.opendaylight.controller.cluster.datastore.messages.WriteDataReply;
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
 import org.opendaylight.controller.cluster.datastore.messages.WriteData;
 import org.opendaylight.controller.cluster.datastore.messages.WriteDataReply;
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
@@ -41,6 +42,7 @@ import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCoh
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
+import scala.concurrent.Future;
 
 /**
  * Unit tests for backwards compatibility with pre-Lithium versions.
 
 /**
  * Unit tests for backwards compatibility with pre-Lithium versions.
@@ -93,6 +95,10 @@ public class PreLithiumTransactionProxyTest extends AbstractTransactionProxyTest
         return argThat(matcher);
     }
 
         return argThat(matcher);
     }
 
+    private Future<Object> readySerializedTxReply(String path, short version) {
+        return Futures.successful(new ReadyTransactionReply(path, version).toSerializable());
+    }
+
     private ActorRef testCompatibilityWithHeliumVersion(short version) throws Exception {
         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE, version);
 
     private ActorRef testCompatibilityWithHeliumVersion(short version) throws Exception {
         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE, version);
 
@@ -110,7 +116,7 @@ public class PreLithiumTransactionProxyTest extends AbstractTransactionProxyTest
         doReturn(Futures.successful(new DeleteDataReply().toSerializable(version))).when(mockActorContext).
                 executeOperationAsync(eq(actorSelection(actorRef)), eqLegacyDeleteData(TestModel.TEST_PATH));
 
         doReturn(Futures.successful(new DeleteDataReply().toSerializable(version))).when(mockActorContext).
                 executeOperationAsync(eq(actorSelection(actorRef)), eqLegacyDeleteData(TestModel.TEST_PATH));
 
-        doReturn(readySerializedTxReply(actorRef.path().toString())).when(mockActorContext).executeOperationAsync(
+        doReturn(readySerializedTxReply(actorRef.path().toString(), version)).when(mockActorContext).executeOperationAsync(
                 eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS));
 
         doReturn(actorRef.path().toString()).when(mockActorContext).resolvePath(eq(actorRef.path().toString()),
                 eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS));
 
         doReturn(actorRef.path().toString()).when(mockActorContext).resolvePath(eq(actorRef.path().toString()),
@@ -170,7 +176,7 @@ public class PreLithiumTransactionProxyTest extends AbstractTransactionProxyTest
         doReturn(Futures.successful(new WriteDataReply().toSerializable(version))).when(mockActorContext).
                 executeOperationAsync(eq(actorSelection(actorRef)), eqLegacyWriteData(testNode));
 
         doReturn(Futures.successful(new WriteDataReply().toSerializable(version))).when(mockActorContext).
                 executeOperationAsync(eq(actorSelection(actorRef)), eqLegacyWriteData(testNode));
 
-        doReturn(readySerializedTxReply(actorRef.path().toString())).when(mockActorContext).executeOperationAsync(
+        doReturn(readySerializedTxReply(actorRef.path().toString(), version)).when(mockActorContext).executeOperationAsync(
                 eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS));
 
         doReturn(actorRef.path().toString()).when(mockActorContext).resolvePath(eq(actorRef.path().toString()),
                 eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS));
 
         doReturn(actorRef.path().toString()).when(mockActorContext).resolvePath(eq(actorRef.path().toString()),
index c4027ad2a5f3ac1292c6213dedc7a8d0dedd6172..b302f527d6eb7019b68df59bac8bed656edb6127 100644 (file)
@@ -91,11 +91,5 @@ public class BatchedModificationsTest {
         BatchedModificationsReply clone = (BatchedModificationsReply) SerializationUtils.clone(
                 (Serializable) new BatchedModificationsReply(100).toSerializable());
         assertEquals("getNumBatched", 100, clone.getNumBatched());
         BatchedModificationsReply clone = (BatchedModificationsReply) SerializationUtils.clone(
                 (Serializable) new BatchedModificationsReply(100).toSerializable());
         assertEquals("getNumBatched", 100, clone.getNumBatched());
-        assertEquals("getCohortPath", null, clone.getCohortPath());
-
-        clone = (BatchedModificationsReply) SerializationUtils.clone(
-                (Serializable) new BatchedModificationsReply(50, "cohort path").toSerializable());
-        assertEquals("getNumBatched", 50, clone.getNumBatched());
-        assertEquals("getCohortPath", "cohort path", clone.getCohortPath());
     }
 }
     }
 }
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/messages/ReadyTransactionReplyTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/messages/ReadyTransactionReplyTest.java
new file mode 100644 (file)
index 0000000..db525ea
--- /dev/null
@@ -0,0 +1,51 @@
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.messages;
+
+import static org.junit.Assert.assertEquals;
+import java.io.Serializable;
+import org.apache.commons.lang.SerializationUtils;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
+import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
+
+/**
+ * Unit tests for ReadyTransactionReply.
+ *
+ * @author Thomas Pantelis
+ */
+public class ReadyTransactionReplyTest {
+
+    @Test
+    public void testSerialization() {
+        String cohortPath = "cohort path";
+        ReadyTransactionReply expected = new ReadyTransactionReply(cohortPath);
+
+        Object serialized = expected.toSerializable();
+        assertEquals("Serialized type", ReadyTransactionReply.class, serialized.getClass());
+
+        ReadyTransactionReply actual = ReadyTransactionReply.fromSerializable(SerializationUtils.clone(
+                (Serializable) serialized));
+        assertEquals("getVersion", DataStoreVersions.CURRENT_VERSION, actual.getVersion());
+        assertEquals("getCohortPath", cohortPath, actual.getCohortPath());
+    }
+
+    @Test
+    public void testSerializationWithPreLithiumVersion() throws Exception {
+        String cohortPath = "cohort path";
+        ReadyTransactionReply expected = new ReadyTransactionReply(cohortPath, DataStoreVersions.HELIUM_2_VERSION);
+
+        Object serialized = expected.toSerializable();
+        assertEquals("Serialized type", ShardTransactionMessages.ReadyTransactionReply.class, serialized.getClass());
+
+        ReadyTransactionReply actual = ReadyTransactionReply.fromSerializable(SerializationUtils.clone(
+                (Serializable) serialized));
+        assertEquals("getVersion", DataStoreVersions.HELIUM_2_VERSION, actual.getVersion());
+        assertEquals("getCohortPath", cohortPath, actual.getCohortPath());
+    }
+}