Merge "Use BatchedModifications message in place of ReadyTransaction message"
authorMoiz Raja <moraja@cisco.com>
Fri, 10 Apr 2015 15:19:30 +0000 (15:19 +0000)
committerGerrit Code Review <gerrit@opendaylight.org>
Fri, 10 Apr 2015 15:19:31 +0000 (15:19 +0000)
20 files changed:
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardWriteTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContextImpl.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/WriteOnlyTransactionContextImpl.java [deleted file]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumTransactionContextImpl.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/BatchedModificationsReply.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ForwardedReadyTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ReadyTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ReadyTransactionReply.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionProxyTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxyTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumShardTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumTransactionProxyTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/messages/BatchedModificationsTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/messages/ReadyTransactionReplyTest.java [new file with mode: 0644]

index 17f1abb92cc4ec7d4d0a7dffb818cec20b5cff0b..b2a86376941d4e8e609d8e3f976041b9f7e665b3 100644 (file)
@@ -439,8 +439,12 @@ public class Shard extends RaftActor {
         //
         if(isLeader()) {
             try {
-                BatchedModificationsReply reply = commitCoordinator.handleTransactionModifications(batched);
-                sender().tell(reply, self());
+                boolean ready = commitCoordinator.handleTransactionModifications(batched);
+                if(ready) {
+                    sender().tell(READY_TRANSACTION_REPLY, self());
+                } else {
+                    sender().tell(new BatchedModificationsReply(batched.getModifications().size()), self());
+                }
             } catch (Exception e) {
                 LOG.error("{}: Error handling BatchedModifications for Tx {}", persistenceId(),
                         batched.getTransactionID(), e);
@@ -480,20 +484,21 @@ public class Shard extends RaftActor {
         // node. In that case, the subsequent 3-phase commit messages won't contain the
         // transactionId so to maintain backwards compatibility, we create a separate cohort actor
         // to provide the compatible behavior.
-        if(ready.getTxnClientVersion() < DataStoreVersions.HELIUM_1_VERSION) {
-            LOG.debug("{}: Creating BackwardsCompatibleThreePhaseCommitCohort", persistenceId());
-            ActorRef replyActorPath = getContext().actorOf(BackwardsCompatibleThreePhaseCommitCohort.props(
-                    ready.getTransactionID()));
+        if(ready.getTxnClientVersion() < DataStoreVersions.LITHIUM_VERSION) {
+            ActorRef replyActorPath = getSelf();
+            if(ready.getTxnClientVersion() < DataStoreVersions.HELIUM_1_VERSION) {
+                LOG.debug("{}: Creating BackwardsCompatibleThreePhaseCommitCohort", persistenceId());
+                replyActorPath = getContext().actorOf(BackwardsCompatibleThreePhaseCommitCohort.props(
+                        ready.getTransactionID()));
+            }
 
             ReadyTransactionReply readyTransactionReply =
-                    new ReadyTransactionReply(Serialization.serializedActorPath(replyActorPath));
+                    new ReadyTransactionReply(Serialization.serializedActorPath(replyActorPath),
+                            ready.getTxnClientVersion());
             getSender().tell(ready.isReturnSerialized() ? readyTransactionReply.toSerializable() :
-                    readyTransactionReply, getSelf());
-
+                readyTransactionReply, getSelf());
         } else {
-
-            getSender().tell(ready.isReturnSerialized() ? READY_TRANSACTION_REPLY.toSerializable() :
-                    READY_TRANSACTION_REPLY, getSelf());
+            getSender().tell(READY_TRANSACTION_REPLY, getSelf());
         }
     }
 
index 54f15fcb4bd03115d97eccc72166b0b12efbca2f..b96e38d76a45aced0e1c326c051cb7fcbd1d82d1 100644 (file)
@@ -22,7 +22,6 @@ import java.util.Queue;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
-import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
 import org.opendaylight.controller.cluster.datastore.modification.Modification;
@@ -119,7 +118,7 @@ public class ShardCommitCoordinator {
      *
      * @throws ExecutionException if an error occurs loading the cache
      */
-    public BatchedModificationsReply handleTransactionModifications(BatchedModifications batched)
+    public boolean handleTransactionModifications(BatchedModifications batched)
             throws ExecutionException {
         CohortEntry cohortEntry = cohortCache.getIfPresent(batched.getTransactionID());
         if(cohortEntry == null) {
@@ -137,7 +136,6 @@ public class ShardCommitCoordinator {
 
         cohortEntry.applyModifications(batched.getModifications());
 
-        String cohortPath = null;
         if(batched.isReady()) {
             if(log.isDebugEnabled()) {
                 log.debug("{}: Readying Tx {}, client version {}", name,
@@ -145,10 +143,9 @@ public class ShardCommitCoordinator {
             }
 
             cohortEntry.ready(cohortDecorator);
-            cohortPath = shardActorPath;
         }
 
-        return new BatchedModificationsReply(batched.getModifications().size(), cohortPath);
+        return batched.isReady();
     }
 
     /**
index d5dcfde803a16bfcc6ea285473167d9b78e8c5cb..356891164628bdfd8434b620707452ffa89bf7d5 100644 (file)
@@ -88,7 +88,11 @@ public class ShardWriteTransaction extends ShardTransaction {
                 modification.apply(transaction);
             }
 
-            getSender().tell(new BatchedModificationsReply(batched.getModifications().size()), getSelf());
+            if(batched.isReady()) {
+                readyTransaction(transaction, false);
+            } else {
+                getSender().tell(new BatchedModificationsReply(batched.getModifications().size()), getSelf());
+            }
         } catch (Exception e) {
             getSender().tell(new akka.actor.Status.Failure(e), getSelf());
         }
index b9900889b1125e8ac229f89af9b4f64551209b8b..f34c5a257125026f61f02fcbf18c945160ede2bc 100644 (file)
@@ -14,13 +14,11 @@ import com.google.common.base.Optional;
 import com.google.common.util.concurrent.SettableFuture;
 import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
-import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
 import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
-import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.SerializableMessage;
 import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
@@ -93,15 +91,11 @@ public class TransactionContextImpl extends AbstractTransactionContext {
     public Future<ActorSelection> readyTransaction() {
         LOG.debug("Tx {} readyTransaction called", getIdentifier());
 
-        // Send the remaining batched modifications if any.
+        // Send the remaining batched modifications, if any, with the ready flag set.
 
-        sendBatchedModifications();
-
-        // Send the ReadyTransaction message to the Tx actor.
-
-        Future<Object> readyReplyFuture = executeOperationAsync(ReadyTransaction.INSTANCE);
+        Future<Object> lastModificationsFuture = sendBatchedModifications(true);
 
-        return transformReadyReply(readyReplyFuture);
+        return transformReadyReply(lastModificationsFuture);
     }
 
     protected Future<ActorSelection> transformReadyReply(final Future<Object> readyReplyFuture) {
@@ -113,33 +107,31 @@ public class TransactionContextImpl extends AbstractTransactionContext {
             public ActorSelection checkedApply(Object serializedReadyReply) {
                 LOG.debug("Tx {} readyTransaction", getIdentifier());
 
-                // At this point the rwady operation succeeded and we need to extract the cohort
+                // At this point the ready operation succeeded and we need to extract the cohort
                 // actor path from the reply.
-                if (serializedReadyReply instanceof ReadyTransactionReply) {
-                    return actorContext.actorSelection(((ReadyTransactionReply)serializedReadyReply).getCohortPath());
-                } else if(serializedReadyReply instanceof BatchedModificationsReply) {
-                    return actorContext.actorSelection(((BatchedModificationsReply)serializedReadyReply).getCohortPath());
-                } else if(serializedReadyReply.getClass().equals(ReadyTransactionReply.SERIALIZABLE_CLASS)) {
-                    ReadyTransactionReply reply = ReadyTransactionReply.fromSerializable(serializedReadyReply);
-                    String cohortPath = deserializeCohortPath(reply.getCohortPath());
-                    return actorContext.actorSelection(cohortPath);
-                } else {
-                    // Throwing an exception here will fail the Future.
-                    throw new IllegalArgumentException(String.format("%s: Invalid reply type %s",
-                        getIdentifier(), serializedReadyReply.getClass()));
+                if(ReadyTransactionReply.isSerializedType(serializedReadyReply)) {
+                    ReadyTransactionReply readyTxReply = ReadyTransactionReply.fromSerializable(serializedReadyReply);
+                    return actorContext.actorSelection(extractCohortPathFrom(readyTxReply));
                 }
+
+                // Throwing an exception here will fail the Future.
+                throw new IllegalArgumentException(String.format("%s: Invalid reply type %s",
+                        getIdentifier(), serializedReadyReply.getClass()));
             }
         }, TransactionProxy.SAME_FAILURE_TRANSFORMER, actorContext.getClientDispatcher());
     }
 
-    protected String deserializeCohortPath(String cohortPath) {
-        return cohortPath;
+    protected String extractCohortPathFrom(ReadyTransactionReply readyTxReply) {
+        return readyTxReply.getCohortPath();
+    }
+
+    private BatchedModifications newBatchedModifications() {
+        return new BatchedModifications(getIdentifier().toString(), remoteTransactionVersion, transactionChainId);
     }
 
     private void batchModification(Modification modification) {
         if(batchedModifications == null) {
-            batchedModifications = new BatchedModifications(getIdentifier().toString(), remoteTransactionVersion,
-                    transactionChainId);
+            batchedModifications = newBatchedModifications();
         }
 
         batchedModifications.addModification(modification);
@@ -156,7 +148,11 @@ public class TransactionContextImpl extends AbstractTransactionContext {
 
     protected Future<Object> sendBatchedModifications(boolean ready) {
         Future<Object> sent = null;
-        if(batchedModifications != null) {
+        if(ready || (batchedModifications != null && !batchedModifications.getModifications().isEmpty())) {
+            if(batchedModifications == null) {
+                batchedModifications = newBatchedModifications();
+            }
+
             if(LOG.isDebugEnabled()) {
                 LOG.debug("Tx {} sending {} batched modifications, ready: {}", getIdentifier(),
                         batchedModifications.getModifications().size(), ready);
@@ -165,8 +161,11 @@ public class TransactionContextImpl extends AbstractTransactionContext {
             batchedModifications.setReady(ready);
             sent = executeOperationAsync(batchedModifications);
 
-            batchedModifications = new BatchedModifications(getIdentifier().toString(), remoteTransactionVersion,
-                    transactionChainId);
+            if(ready) {
+                batchedModifications = null;
+            } else {
+                batchedModifications = newBatchedModifications();
+            }
         }
 
         return sent;
index 0fd37b9ecef937083f34cc40ce011c5e694a84ec..388dd9f4bda2fee3e82bba4660a137876cf9dbda 100644 (file)
@@ -726,10 +726,6 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
                 return new PreLithiumTransactionContextImpl(transactionPath, transactionActor, getIdentifier(),
                         transactionChainId, actorContext, schemaContext, isTxActorLocal, remoteTransactionVersion,
                         operationCompleter);
-            } else if (transactionType == TransactionType.WRITE_ONLY &&
-                    actorContext.getDatastoreContext().isWriteOnlyTransactionOptimizationsEnabled()) {
-                return new WriteOnlyTransactionContextImpl(transactionActor, getIdentifier(), transactionChainId,
-                    actorContext, schemaContext, isTxActorLocal, remoteTransactionVersion, operationCompleter);
             } else {
                 return new TransactionContextImpl(transactionActor, getIdentifier(), transactionChainId,
                         actorContext, schemaContext, isTxActorLocal, remoteTransactionVersion, operationCompleter);
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/WriteOnlyTransactionContextImpl.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/WriteOnlyTransactionContextImpl.java
deleted file mode 100644 (file)
index b9fe90d..0000000
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.datastore;
-
-import akka.actor.ActorSelection;
-import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
-import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.concurrent.Future;
-
-/**
- * Context for a write-only transaction.
- *
- * @author Thomas Pantelis
- */
-public class WriteOnlyTransactionContextImpl extends TransactionContextImpl {
-    private static final Logger LOG = LoggerFactory.getLogger(WriteOnlyTransactionContextImpl.class);
-
-    public WriteOnlyTransactionContextImpl(ActorSelection actor, TransactionIdentifier identifier,
-            String transactionChainId, ActorContext actorContext, SchemaContext schemaContext, boolean isTxActorLocal,
-            short remoteTransactionVersion, OperationCompleter operationCompleter) {
-        super(actor, identifier, transactionChainId, actorContext, schemaContext, isTxActorLocal,
-                remoteTransactionVersion, operationCompleter);
-    }
-
-    @Override
-    public Future<ActorSelection> readyTransaction() {
-        LOG.debug("Tx {} readyTransaction called", getIdentifier());
-
-        // Send the remaining batched modifications if any.
-
-        Future<Object> lastModificationsFuture = sendBatchedModifications(true);
-
-        return transformReadyReply(lastModificationsFuture);
-    }
-}
index 9509b06ba7725c8a518223c9c146bd60bf6a4545..d17497c18c1acccb4f986be231e0adf3c7b39fab 100644 (file)
@@ -15,6 +15,7 @@ import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIden
 import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
 import org.opendaylight.controller.cluster.datastore.messages.MergeData;
 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
+import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.WriteData;
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
@@ -70,7 +71,7 @@ public class PreLithiumTransactionContextImpl extends TransactionContextImpl {
     }
 
     @Override
-    protected String deserializeCohortPath(String cohortPath) {
+    protected String extractCohortPathFrom(ReadyTransactionReply readyTxReply) {
         // In base Helium we used to return the local path of the actor which represented
         // a remote ThreePhaseCommitCohort. The local path would then be converted to
         // a remote path using this resolvePath method. To maintain compatibility with
@@ -79,9 +80,9 @@ public class PreLithiumTransactionContextImpl extends TransactionContextImpl {
         // we could remove this code to resolvePath and just use the cohortPath as the
         // resolved cohortPath
         if(getRemoteTransactionVersion() < DataStoreVersions.HELIUM_1_VERSION) {
-            return getActorContext().resolvePath(transactionPath, cohortPath);
+            return getActorContext().resolvePath(transactionPath, readyTxReply.getCohortPath());
         }
 
-        return cohortPath;
+        return readyTxReply.getCohortPath();
     }
 }
index a10c6ac3fb1b6d673a9f8e5b3517e72cebdde9f5..895de3a62626da3d0d332a205a866b5997f2b320 100644 (file)
@@ -19,11 +19,7 @@ import java.io.ObjectOutput;
 public class BatchedModificationsReply extends VersionedExternalizableMessage {
     private static final long serialVersionUID = 1L;
 
-    private static final byte COHORT_PATH_NOT_PRESENT = 0;
-    private static final byte COHORT_PATH_PRESENT = 1;
-
     private int numBatched;
-    private String cohortPath;
 
     public BatchedModificationsReply() {
     }
@@ -32,40 +28,20 @@ public class BatchedModificationsReply extends VersionedExternalizableMessage {
         this.numBatched = numBatched;
     }
 
-    public BatchedModificationsReply(int numBatched, String cohortPath) {
-        this.numBatched = numBatched;
-        this.cohortPath = cohortPath;
-    }
-
     public int getNumBatched() {
         return numBatched;
     }
 
-    public String getCohortPath() {
-        return cohortPath;
-    }
-
     @Override
     public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
         super.readExternal(in);
         numBatched = in.readInt();
-
-        if(in.readByte() == COHORT_PATH_PRESENT) {
-            cohortPath = in.readUTF();
-        }
     }
 
     @Override
     public void writeExternal(ObjectOutput out) throws IOException {
         super.writeExternal(out);
         out.writeInt(numBatched);
-
-        if(cohortPath != null) {
-            out.writeByte(COHORT_PATH_PRESENT);
-            out.writeUTF(cohortPath);
-        } else {
-            out.writeByte(COHORT_PATH_NOT_PRESENT);
-        }
     }
 
     @Override
@@ -76,8 +52,7 @@ public class BatchedModificationsReply extends VersionedExternalizableMessage {
     @Override
     public String toString() {
         StringBuilder builder = new StringBuilder();
-        builder.append("BatchedModificationsReply [numBatched=").append(numBatched).append(", cohortPath=")
-                .append(cohortPath).append("]");
+        builder.append("BatchedModificationsReply [numBatched=").append(numBatched).append("]");
         return builder.toString();
     }
 }
index 38886c9a583c500e4fc9b86f54422c001482e777..0f872430599d1d75869b2404e5f93052d2986cec 100644 (file)
@@ -20,9 +20,9 @@ public class ForwardedReadyTransaction {
     private final DOMStoreThreePhaseCommitCohort cohort;
     private final Modification modification;
     private final boolean returnSerialized;
-    private final int txnClientVersion;
+    private final short txnClientVersion;
 
-    public ForwardedReadyTransaction(String transactionID, int txnClientVersion,
+    public ForwardedReadyTransaction(String transactionID, short txnClientVersion,
             DOMStoreThreePhaseCommitCohort cohort, Modification modification,
             boolean returnSerialized) {
         this.transactionID = transactionID;
@@ -48,7 +48,7 @@ public class ForwardedReadyTransaction {
         return returnSerialized;
     }
 
-    public int getTxnClientVersion() {
+    public short getTxnClientVersion() {
         return txnClientVersion;
     }
 }
index 09617abde9b370351a45207d8ea42f60c4c6d5fc..8d617d0ba70d266aba9d9b033ae281351e999b88 100644 (file)
@@ -10,6 +10,7 @@ package org.opendaylight.controller.cluster.datastore.messages;
 
 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
 
+@Deprecated
 public class ReadyTransaction implements SerializableMessage{
     public static final Class<ShardTransactionMessages.ReadyTransaction> SERIALIZABLE_CLASS =
             ShardTransactionMessages.ReadyTransaction.class;
index 282e23ed3bab4458ef771b338d6bf03861536662..b25a5ddf296cb729265bb5f745977926c7bb313c 100644 (file)
@@ -8,15 +8,29 @@
 
 package org.opendaylight.controller.cluster.datastore.messages;
 
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
 
-public class ReadyTransactionReply implements SerializableMessage {
+public class ReadyTransactionReply extends VersionedExternalizableMessage {
+    private static final long serialVersionUID = 1L;
+
     public static final Class<ShardTransactionMessages.ReadyTransactionReply> SERIALIZABLE_CLASS =
             ShardTransactionMessages.ReadyTransactionReply.class;
 
-    private final String cohortPath;
+    private String cohortPath;
+
+    public ReadyTransactionReply() {
+    }
 
     public ReadyTransactionReply(String cohortPath) {
+        this(cohortPath, DataStoreVersions.CURRENT_VERSION);
+    }
+
+    public ReadyTransactionReply(String cohortPath, short version) {
+        super(version);
         this.cohortPath = cohortPath;
     }
 
@@ -25,16 +39,38 @@ public class ReadyTransactionReply implements SerializableMessage {
     }
 
     @Override
-    public ShardTransactionMessages.ReadyTransactionReply toSerializable() {
-        return ShardTransactionMessages.ReadyTransactionReply.newBuilder()
-                .setActorPath(cohortPath)
-                .build();
+    public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+        super.readExternal(in);
+        cohortPath = in.readUTF();
+    }
+
+    @Override
+    public void writeExternal(ObjectOutput out) throws IOException {
+        super.writeExternal(out);
+        out.writeUTF(cohortPath);
+    }
+
+    @Override
+    public Object toSerializable() {
+        if(getVersion() >= DataStoreVersions.LITHIUM_VERSION) {
+            return this;
+        } else {
+            return ShardTransactionMessages.ReadyTransactionReply.newBuilder().setActorPath(cohortPath).build();
+        }
     }
 
     public static ReadyTransactionReply fromSerializable(Object serializable) {
-        ShardTransactionMessages.ReadyTransactionReply o =
-                (ShardTransactionMessages.ReadyTransactionReply) serializable;
+        if(serializable instanceof ReadyTransactionReply) {
+            return (ReadyTransactionReply)serializable;
+        } else {
+            ShardTransactionMessages.ReadyTransactionReply o =
+                    (ShardTransactionMessages.ReadyTransactionReply) serializable;
+            return new ReadyTransactionReply(o.getActorPath(), DataStoreVersions.HELIUM_2_VERSION);
+        }
+    }
 
-        return new ReadyTransactionReply(o.getActorPath());
+    public static boolean isSerializedType(Object message) {
+        return message instanceof ReadyTransactionReply ||
+               message instanceof ShardTransactionMessages.ReadyTransactionReply;
     }
 }
index c6c5486ee38527407cfca218375d336ecb112f83..6a1e12a96b6e9cd2eb7a3a89204a87d21a64f1da 100644 (file)
@@ -50,7 +50,6 @@ import org.opendaylight.controller.cluster.datastore.messages.DataExists;
 import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
-import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
 import org.opendaylight.controller.cluster.datastore.modification.AbstractModification;
 import org.opendaylight.controller.cluster.datastore.modification.Modification;
@@ -204,10 +203,6 @@ public abstract class AbstractTransactionProxyTest {
         return argThat(matcher);
     }
 
-    protected Future<Object> readySerializedTxReply(String path) {
-        return Futures.successful((Object)new ReadyTransactionReply(path).toSerializable());
-    }
-
     protected Future<Object> readyTxReply(String path) {
         return Futures.successful((Object)new ReadyTransactionReply(path));
     }
@@ -250,10 +245,8 @@ public abstract class AbstractTransactionProxyTest {
                 eq(actorSelection(actorRef)), isA(BatchedModifications.class));
     }
 
-    protected void expectBatchedModificationsReady(ActorRef actorRef, int count) {
-        Future<BatchedModificationsReply> replyFuture = Futures.successful(
-                new BatchedModificationsReply(count, actorRef.path().toString()));
-        doReturn(replyFuture).when(mockActorContext).executeOperationAsync(
+    protected void expectBatchedModificationsReady(ActorRef actorRef) {
+        doReturn(readyTxReply(actorRef.path().toString())).when(mockActorContext).executeOperationAsync(
                 eq(actorSelection(actorRef)), isA(BatchedModifications.class));
     }
 
@@ -267,11 +260,6 @@ public abstract class AbstractTransactionProxyTest {
                 any(ActorSelection.class), isA(BatchedModifications.class));
     }
 
-    protected void expectReadyTransaction(ActorRef actorRef) {
-        doReturn(readySerializedTxReply(actorRef.path().toString())).when(mockActorContext).executeOperationAsync(
-                eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS));
-    }
-
     protected void expectFailedBatchedModifications(ActorRef actorRef) {
         doReturn(Futures.failed(new TestException())).when(mockActorContext).executeOperationAsync(
                 eq(actorSelection(actorRef)), isA(BatchedModifications.class));
index e04c1a5d185ffbb5bc81b4d035a07545c02ef3a7..b3a0430f9325ae933c355edc4a18e304fa8dae05 100644 (file)
@@ -436,42 +436,42 @@ public class ShardTest extends AbstractShardTest {
 
             waitUntilLeader(shard);
 
-            final String transactionID1 = "tx1";
-            final String transactionID2 = "tx2";
-            final String transactionID3 = "tx3";
+         // Setup 3 simulated transactions with mock cohorts backed by real cohorts.
 
-            final AtomicReference<DOMStoreThreePhaseCommitCohort> mockCohort1 = new AtomicReference<>();
-            final AtomicReference<DOMStoreThreePhaseCommitCohort> mockCohort2 = new AtomicReference<>();
-            final AtomicReference<DOMStoreThreePhaseCommitCohort> mockCohort3 = new AtomicReference<>();
-            ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() {
-                @Override
-                public DOMStoreThreePhaseCommitCohort decorate(String transactionID, DOMStoreThreePhaseCommitCohort actual) {
-                    if(transactionID.equals(transactionID1)) {
-                        mockCohort1.set(createDelegatingMockCohort("cohort1", actual));
-                        return mockCohort1.get();
-                    } else if(transactionID.equals(transactionID2)) {
-                        mockCohort2.set(createDelegatingMockCohort("cohort2", actual));
-                        return mockCohort2.get();
-                    } else {
-                        mockCohort3.set(createDelegatingMockCohort("cohort3", actual));
-                        return mockCohort3.get();
-                    }
-                }
-            };
+            InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
 
-            shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator);
+            String transactionID1 = "tx1";
+            MutableCompositeModification modification1 = new MutableCompositeModification();
+            DOMStoreThreePhaseCommitCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
+                    TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
+
+            String transactionID2 = "tx2";
+            MutableCompositeModification modification2 = new MutableCompositeModification();
+            DOMStoreThreePhaseCommitCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
+                    TestModel.OUTER_LIST_PATH,
+                    ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
+                    modification2);
+
+            String transactionID3 = "tx3";
+            MutableCompositeModification modification3 = new MutableCompositeModification();
+            DOMStoreThreePhaseCommitCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
+                    YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
+                        .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
+                    ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1),
+                    modification3);
 
             long timeoutSec = 5;
             final FiniteDuration duration = FiniteDuration.create(timeoutSec, TimeUnit.SECONDS);
             final Timeout timeout = new Timeout(duration);
 
-            // Send a BatchedModifications message for the first transaction.
+            // Simulate the ForwardedReadyTransaction message for the first Tx that would be sent
+            // by the ShardTransaction.
 
-            shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH,
-                    ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
-            BatchedModificationsReply batchedReply = expectMsgClass(duration, BatchedModificationsReply.class);
-            assertEquals("getCohortPath", shard.path().toString(), batchedReply.getCohortPath());
-            assertEquals("getNumBatched", 1, batchedReply.getNumBatched());
+            shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
+                    cohort1, modification1, true), getRef());
+            ReadyTransactionReply readyReply = ReadyTransactionReply.fromSerializable(
+                    expectMsgClass(duration, ReadyTransactionReply.class));
+            assertEquals("Cohort path", shard.path().toString(), readyReply.getCohortPath());
 
             // Send the CanCommitTransaction message for the first Tx.
 
@@ -480,16 +480,15 @@ public class ShardTest extends AbstractShardTest {
                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
             assertEquals("Can commit", true, canCommitReply.getCanCommit());
 
-            // Send BatchedModifications for the next 2 Tx's.
+            // Send the ForwardedReadyTransaction for the next 2 Tx's.
 
-            shard.tell(newBatchedModifications(transactionID2, TestModel.OUTER_LIST_PATH,
-                    ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), true), getRef());
-            expectMsgClass(duration, BatchedModificationsReply.class);
+            shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
+                    cohort2, modification2, true), getRef());
+            expectMsgClass(duration, ReadyTransactionReply.class);
 
-            shard.tell(newBatchedModifications(transactionID3, YangInstanceIdentifier.builder(
-                    TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
-                    ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true), getRef());
-            expectMsgClass(duration, BatchedModificationsReply.class);
+            shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
+                    cohort3, modification3, true), getRef());
+            expectMsgClass(duration, ReadyTransactionReply.class);
 
             // Send the CanCommitTransaction message for the next 2 Tx's. These should get queued and
             // processed after the first Tx completes.
@@ -582,16 +581,16 @@ public class ShardTest extends AbstractShardTest {
 
             assertEquals("Commits complete", true, done);
 
-            InOrder inOrder = inOrder(mockCohort1.get(), mockCohort2.get(), mockCohort3.get());
-            inOrder.verify(mockCohort1.get()).canCommit();
-            inOrder.verify(mockCohort1.get()).preCommit();
-            inOrder.verify(mockCohort1.get()).commit();
-            inOrder.verify(mockCohort2.get()).canCommit();
-            inOrder.verify(mockCohort2.get()).preCommit();
-            inOrder.verify(mockCohort2.get()).commit();
-            inOrder.verify(mockCohort3.get()).canCommit();
-            inOrder.verify(mockCohort3.get()).preCommit();
-            inOrder.verify(mockCohort3.get()).commit();
+            InOrder inOrder = inOrder(cohort1, cohort2, cohort3);
+            inOrder.verify(cohort1).canCommit();
+            inOrder.verify(cohort1).preCommit();
+            inOrder.verify(cohort1).commit();
+            inOrder.verify(cohort2).canCommit();
+            inOrder.verify(cohort2).preCommit();
+            inOrder.verify(cohort2).commit();
+            inOrder.verify(cohort3).canCommit();
+            inOrder.verify(cohort3).preCommit();
+            inOrder.verify(cohort3).commit();
 
             // Verify data in the data store.
 
@@ -669,7 +668,7 @@ public class ShardTest extends AbstractShardTest {
             shard.tell(newBatchedModifications(transactionID, YangInstanceIdentifier.builder(
                     TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true), getRef());
-            expectMsgClass(duration, BatchedModificationsReply.class);
+            expectMsgClass(duration, ReadyTransactionReply.class);
 
             // Send the CanCommitTransaction message.
 
@@ -728,7 +727,7 @@ public class ShardTest extends AbstractShardTest {
             YangInstanceIdentifier path = TestModel.TEST_PATH;
             shard.tell(newBatchedModifications(transactionID1, transactionChainID, path,
                     containerNode, true), getRef());
-            expectMsgClass(duration, BatchedModificationsReply.class);
+            expectMsgClass(duration, ReadyTransactionReply.class);
 
             // Create a read Tx on the same chain.
 
@@ -810,14 +809,24 @@ public class ShardTest extends AbstractShardTest {
 
             waitUntilLeader(shard);
 
+            InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
+
+            // Setup a simulated transactions with a mock cohort.
+
             String transactionID = "tx";
+            MutableCompositeModification modification = new MutableCompositeModification();
+            NormalizedNode<?, ?> containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+            DOMStoreThreePhaseCommitCohort cohort = setupMockWriteTransaction("cohort", dataStore,
+                    TestModel.TEST_PATH, containerNode, modification);
+
             FiniteDuration duration = duration("5 seconds");
 
-            // Send a BatchedModifications to start a transaction.
+            // Simulate the ForwardedReadyTransaction messages that would be sent
+            // by the ShardTransaction.
 
-            NormalizedNode<?, ?> containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
-            shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH, containerNode, true), getRef());
-            expectMsgClass(duration, BatchedModificationsReply.class);
+            shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
+                    cohort, modification, true), getRef());
+            expectMsgClass(duration, ReadyTransactionReply.class);
 
             // Send the CanCommitTransaction message.
 
@@ -831,6 +840,11 @@ public class ShardTest extends AbstractShardTest {
             shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
 
+            InOrder inOrder = inOrder(cohort);
+            inOrder.verify(cohort).canCommit();
+            inOrder.verify(cohort).preCommit();
+            inOrder.verify(cohort).commit();
+
             NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.TEST_PATH);
             assertEquals(TestModel.TEST_QNAME.getLocalName(), containerNode, actualNode);
 
@@ -864,7 +878,7 @@ public class ShardTest extends AbstractShardTest {
 
                 shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
                         cohort, modification, true), getRef());
-                expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
+                expectMsgClass(duration, ReadyTransactionReply.class);
 
                 // Send the CanCommitTransaction message.
 
@@ -919,7 +933,7 @@ public class ShardTest extends AbstractShardTest {
 
                 shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
                         cohort, modification, true), getRef());
-                expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
+                expectMsgClass(duration, ReadyTransactionReply.class);
 
                 // Send the CanCommitTransaction message.
 
@@ -958,40 +972,34 @@ public class ShardTest extends AbstractShardTest {
 
             waitUntilLeader(shard);
 
-            // Setup 2 mock cohorts. The first one fails in the commit phase.
+         // Setup 2 simulated transactions with mock cohorts. The first one fails in the
+            // commit phase.
 
-            final String transactionID1 = "tx1";
-            final DOMStoreThreePhaseCommitCohort cohort1 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
+            String transactionID1 = "tx1";
+            MutableCompositeModification modification1 = new MutableCompositeModification();
+            DOMStoreThreePhaseCommitCohort cohort1 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
             doReturn(Futures.immediateFuture(null)).when(cohort1).preCommit();
             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort1).commit();
 
-            final String transactionID2 = "tx2";
-            final DOMStoreThreePhaseCommitCohort cohort2 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort2");
+            String transactionID2 = "tx2";
+            MutableCompositeModification modification2 = new MutableCompositeModification();
+            DOMStoreThreePhaseCommitCohort cohort2 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort2");
             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
 
-            ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() {
-                @Override
-                public DOMStoreThreePhaseCommitCohort decorate(String transactionID,
-                        DOMStoreThreePhaseCommitCohort actual) {
-                    return transactionID1.equals(transactionID) ? cohort1 : cohort2;
-                }
-            };
-
-            shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator);
-
             FiniteDuration duration = duration("5 seconds");
             final Timeout timeout = new Timeout(duration);
 
-            // Send BatchedModifications to start and ready each transaction.
+            // Simulate the ForwardedReadyTransaction messages that would be sent
+            // by the ShardTransaction.
 
-            shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH,
-                    ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
-            expectMsgClass(duration, BatchedModificationsReply.class);
+            shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
+                    cohort1, modification1, true), getRef());
+            expectMsgClass(duration, ReadyTransactionReply.class);
 
-            shard.tell(newBatchedModifications(transactionID2, TestModel.TEST_PATH,
-                    ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
-            expectMsgClass(duration, BatchedModificationsReply.class);
+            shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
+                    cohort2, modification2, true), getRef());
+            expectMsgClass(duration, ReadyTransactionReply.class);
 
             // Send the CanCommitTransaction message for the first Tx.
 
@@ -1044,27 +1052,19 @@ public class ShardTest extends AbstractShardTest {
             waitUntilLeader(shard);
 
             String transactionID = "tx1";
-            final DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
+            MutableCompositeModification modification = new MutableCompositeModification();
+            DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).preCommit();
 
-            ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() {
-                @Override
-                public DOMStoreThreePhaseCommitCohort decorate(String transactionID,
-                        DOMStoreThreePhaseCommitCohort actual) {
-                    return cohort;
-                }
-            };
-
-            shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator);
-
             FiniteDuration duration = duration("5 seconds");
 
-            // Send BatchedModifications to start and ready a transaction.
+            // Simulate the ForwardedReadyTransaction messages that would be sent
+            // by the ShardTransaction.
 
-            shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH,
-                    ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
-            expectMsgClass(duration, BatchedModificationsReply.class);
+            shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
+                    cohort, modification, true), getRef());
+            expectMsgClass(duration, ReadyTransactionReply.class);
 
             // Send the CanCommitTransaction message.
 
@@ -1099,24 +1099,16 @@ public class ShardTest extends AbstractShardTest {
             final FiniteDuration duration = duration("5 seconds");
 
             String transactionID = "tx1";
-            final DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
+            MutableCompositeModification modification = new MutableCompositeModification();
+            DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).canCommit();
 
-            ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() {
-                @Override
-                public DOMStoreThreePhaseCommitCohort decorate(String transactionID,
-                        DOMStoreThreePhaseCommitCohort actual) {
-                    return cohort;
-                }
-            };
-
-            shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator);
-
-            // Send BatchedModifications to start and ready a transaction.
+            // Simulate the ForwardedReadyTransaction messages that would be sent
+            // by the ShardTransaction.
 
-            shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH,
-                    ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
-            expectMsgClass(duration, BatchedModificationsReply.class);
+            shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
+                    cohort, modification, true), getRef());
+            expectMsgClass(duration, ReadyTransactionReply.class);
 
             // Send the CanCommitTransaction message.
 
@@ -1160,9 +1152,14 @@ public class ShardTest extends AbstractShardTest {
                 }
             };
 
-            shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH,
-                    ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
-            expectMsgClass(duration, BatchedModificationsReply.class);
+            MutableCompositeModification modification = new MutableCompositeModification();
+            DOMStoreThreePhaseCommitCohort cohort = setupMockWriteTransaction("cohort1", dataStore,
+                    TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME),
+                    modification, preCommit);
+
+            shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
+                    cohort, modification, true), getRef());
+            expectMsgClass(duration, ReadyTransactionReply.class);
 
             shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
@@ -1196,26 +1193,42 @@ public class ShardTest extends AbstractShardTest {
 
             final FiniteDuration duration = duration("5 seconds");
 
+            InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
+
             writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
             writeToStore(shard, TestModel.OUTER_LIST_PATH,
                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
 
-            // Create and ready the 1st Tx - will timeout
+            // Create 1st Tx - will timeout
 
             String transactionID1 = "tx1";
-            shard.tell(newBatchedModifications(transactionID1, YangInstanceIdentifier.builder(
-                    TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
-                ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true), getRef());
-            expectMsgClass(duration, BatchedModificationsReply.class);
+            MutableCompositeModification modification1 = new MutableCompositeModification();
+            DOMStoreThreePhaseCommitCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
+                    YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
+                        .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
+                    ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1),
+                    modification1);
 
-            // Create and ready the 2nd Tx
+            // Create 2nd Tx
 
-            String transactionID2 = "tx2";
+            String transactionID2 = "tx3";
+            MutableCompositeModification modification2 = new MutableCompositeModification();
             YangInstanceIdentifier listNodePath = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
-                    .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build();
-            shard.tell(newBatchedModifications(transactionID2, listNodePath,
-                    ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2), true), getRef());
-            expectMsgClass(duration, BatchedModificationsReply.class);
+                .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build();
+            DOMStoreThreePhaseCommitCohort cohort2 = setupMockWriteTransaction("cohort3", dataStore,
+                    listNodePath,
+                    ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2),
+                    modification2);
+
+            // Ready the Tx's
+
+            shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
+                    cohort1, modification1, true), getRef());
+            expectMsgClass(duration, ReadyTransactionReply.class);
+
+            shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
+                    cohort2, modification2, true), getRef());
+            expectMsgClass(duration, ReadyTransactionReply.class);
 
             // canCommit 1st Tx. We don't send the commit so it should timeout.
 
@@ -1252,23 +1265,38 @@ public class ShardTest extends AbstractShardTest {
 
             final FiniteDuration duration = duration("5 seconds");
 
+            InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
+
             String transactionID1 = "tx1";
+            MutableCompositeModification modification1 = new MutableCompositeModification();
+            DOMStoreThreePhaseCommitCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
+                    TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
+
             String transactionID2 = "tx2";
+            MutableCompositeModification modification2 = new MutableCompositeModification();
+            DOMStoreThreePhaseCommitCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
+                    TestModel.OUTER_LIST_PATH,
+                    ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
+                    modification2);
+
             String transactionID3 = "tx3";
+            MutableCompositeModification modification3 = new MutableCompositeModification();
+            DOMStoreThreePhaseCommitCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
+                    TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification3);
 
-            // Send a BatchedModifications to start transactions and ready them.
+            // Ready the Tx's
 
-            shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH,
-                    ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
-            expectMsgClass(duration, BatchedModificationsReply.class);
+            shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
+                    cohort1, modification1, true), getRef());
+            expectMsgClass(duration, ReadyTransactionReply.class);
 
-            shard.tell(newBatchedModifications(transactionID2,TestModel.OUTER_LIST_PATH,
-                    ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), true), getRef());
-            expectMsgClass(duration, BatchedModificationsReply.class);
+            shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
+                    cohort2, modification2, true), getRef());
+            expectMsgClass(duration, ReadyTransactionReply.class);
 
-            shard.tell(newBatchedModifications(transactionID3, TestModel.TEST_PATH,
-                    ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
-            expectMsgClass(duration, BatchedModificationsReply.class);
+            shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
+                    cohort3, modification3, true), getRef());
+            expectMsgClass(duration, ReadyTransactionReply.class);
 
             // canCommit 1st Tx.
 
@@ -1313,37 +1341,30 @@ public class ShardTest extends AbstractShardTest {
 
             // Setup 2 simulated transactions with mock cohorts. The first one will be aborted.
 
-            final String transactionID1 = "tx1";
-            final DOMStoreThreePhaseCommitCohort cohort1 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
+            String transactionID1 = "tx1";
+            MutableCompositeModification modification1 = new MutableCompositeModification();
+            DOMStoreThreePhaseCommitCohort cohort1 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
             doReturn(Futures.immediateFuture(null)).when(cohort1).abort();
 
-            final String transactionID2 = "tx2";
-            final DOMStoreThreePhaseCommitCohort cohort2 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort2");
+            String transactionID2 = "tx2";
+            MutableCompositeModification modification2 = new MutableCompositeModification();
+            DOMStoreThreePhaseCommitCohort cohort2 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort2");
             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
 
             FiniteDuration duration = duration("5 seconds");
             final Timeout timeout = new Timeout(duration);
 
-            ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() {
-                @Override
-                public DOMStoreThreePhaseCommitCohort decorate(String transactionID,
-                        DOMStoreThreePhaseCommitCohort actual) {
-                    return transactionID1.equals(transactionID) ? cohort1 : cohort2;
-                }
-            };
+            // Simulate the ForwardedReadyTransaction messages that would be sent
+            // by the ShardTransaction.
 
-            shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator);
-
-            // Send BatchedModifications to start and ready each transaction.
+            shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
+                    cohort1, modification1, true), getRef());
+            expectMsgClass(duration, ReadyTransactionReply.class);
 
-            shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH,
-                    ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
-            expectMsgClass(duration, BatchedModificationsReply.class);
-
-            shard.tell(newBatchedModifications(transactionID2, TestModel.TEST_PATH,
-                    ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
-            expectMsgClass(duration, BatchedModificationsReply.class);
+            shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
+                    cohort2, modification2, true), getRef());
+            expectMsgClass(duration, ReadyTransactionReply.class);
 
             // Send the CanCommitTransaction message for the first Tx.
 
index e63ace3e2cc5abaff32acc4012f583cfbbdde6cc..9715f668e353fe71d527865d2ffd68a4759952b5 100644 (file)
@@ -409,34 +409,58 @@ public class ShardTransactionTest extends AbstractActorTest {
     }
 
     @Test
-    public void testOnReceiveReadyTransaction() throws Exception {
+    public void testOnReceiveBatchedModificationsReady() throws Exception {
+        new JavaTestKit(getSystem()) {{
+
+            final ActorRef transaction = newTransactionActor(store.newWriteOnlyTransaction(),
+                    "testOnReceiveBatchedModificationsReady");
+
+            JavaTestKit watcher = new JavaTestKit(getSystem());
+            watcher.watch(transaction);
+
+            YangInstanceIdentifier writePath = TestModel.TEST_PATH;
+            NormalizedNode<?, ?> writeData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
+                    new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
+                    withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
+
+            BatchedModifications batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
+            batched.setReady(true);
+            batched.addModification(new WriteModification(writePath, writeData));
+
+            transaction.tell(batched, getRef());
+
+            expectMsgClass(duration("5 seconds"), ReadyTransactionReply.class);
+            watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
+        }};
+    }
+
+    @Test
+    public void testOnReceivePreLithiumReadyTransaction() throws Exception {
         new JavaTestKit(getSystem()) {{
             final ActorRef transaction = newTransactionActor(store.newReadWriteTransaction(),
-                    "testReadyTransaction");
+                    "testReadyTransaction", DataStoreVersions.HELIUM_2_VERSION);
 
-            watch(transaction);
+            JavaTestKit watcher = new JavaTestKit(getSystem());
+            watcher.watch(transaction);
 
             transaction.tell(new ReadyTransaction().toSerializable(), getRef());
 
-            expectMsgAnyClassOf(duration("5 seconds"), ReadyTransactionReply.SERIALIZABLE_CLASS,
-                    Terminated.class);
-            expectMsgAnyClassOf(duration("5 seconds"), ReadyTransactionReply.SERIALIZABLE_CLASS,
-                    Terminated.class);
+            expectMsgClass(duration("5 seconds"), ReadyTransactionReply.SERIALIZABLE_CLASS);
+            watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
         }};
 
         // test
         new JavaTestKit(getSystem()) {{
             final ActorRef transaction = newTransactionActor(store.newReadWriteTransaction(),
-                    "testReadyTransaction2");
+                    "testReadyTransaction2", DataStoreVersions.HELIUM_2_VERSION);
 
-            watch(transaction);
+            JavaTestKit watcher = new JavaTestKit(getSystem());
+            watcher.watch(transaction);
 
             transaction.tell(new ReadyTransaction(), getRef());
 
-            expectMsgAnyClassOf(duration("5 seconds"), ReadyTransactionReply.class,
-                    Terminated.class);
-            expectMsgAnyClassOf(duration("5 seconds"), ReadyTransactionReply.class,
-                    Terminated.class);
+            expectMsgClass(duration("5 seconds"), ReadyTransactionReply.class);
+            watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
         }};
     }
 
index acba775445879d5a1305969cc099fc6fd35b2cb8..026b5490288c1bbdeb1ec44d37e998abddad813e 100644 (file)
@@ -30,8 +30,6 @@ import java.util.concurrent.atomic.AtomicReference;
 import org.junit.Assert;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
-import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
-import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
 import org.opendaylight.controller.cluster.datastore.shardstrategy.DefaultShardStrategy;
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
@@ -176,7 +174,7 @@ public class TransactionChainProxyTest extends AbstractTransactionProxyTest {
             fail("Tx 2 should not have initiated until the Tx 1's ready future completed");
         }
 
-        batchedReplyPromise1.success(new BatchedModificationsReply(1, txActorRef1.path().toString()));
+        batchedReplyPromise1.success(readyTxReply(txActorRef1.path().toString()).value().get().get());
 
         // Tx 2 should've proceeded to find the primary shard.
         verify(mockActorContext, timeout(5000).times(2)).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
@@ -196,7 +194,7 @@ public class TransactionChainProxyTest extends AbstractTransactionProxyTest {
 
         Promise<Object> readyReplyPromise1 = akka.dispatch.Futures.promise();
         doReturn(readyReplyPromise1.future()).when(mockActorContext).executeOperationAsync(
-                eq(actorSelection(txActorRef1)), isA(ReadyTransaction.SERIALIZABLE_CLASS));
+                eq(actorSelection(txActorRef1)), isA(BatchedModifications.class));
 
         DOMStoreWriteTransaction writeTx1 = txChainProxy.newReadWriteTransaction();
 
@@ -205,7 +203,7 @@ public class TransactionChainProxyTest extends AbstractTransactionProxyTest {
 
         writeTx1.ready();
 
-        verifyOneBatchedModification(txActorRef1, new WriteModification(TestModel.TEST_PATH, writeNode1), false);
+        verifyOneBatchedModification(txActorRef1, new WriteModification(TestModel.TEST_PATH, writeNode1), true);
 
         String tx2MemberName = "tx2MemberName";
         doReturn(tx2MemberName).when(mockActorContext).getCurrentMemberName();
@@ -247,7 +245,7 @@ public class TransactionChainProxyTest extends AbstractTransactionProxyTest {
             fail("Tx 2 should not have initiated until the Tx 1's ready future completed");
         }
 
-        readyReplyPromise1.success(readySerializedTxReply(txActorRef1.path().toString()).value().get().get());
+        readyReplyPromise1.success(readyTxReply(txActorRef1.path().toString()).value().get().get());
 
         verify(mockActorContext, timeout(5000)).executeOperationAsync(eq(getSystem().actorSelection(shardActorRef2.path())),
                 eqCreateTransaction(tx2MemberName, READ_WRITE));
index b95eaf64d75d2ab83bd5cc2ae1c94f7e2f50e2c4..6cfef194915bb81230b6b8b87c0809afcd15eb2e 100644 (file)
@@ -336,8 +336,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
         doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync(
                 eq(actorSelection(actorRef)), eqSerializedReadData());
 
-        expectBatchedModifications(actorRef, 1);
-        expectReadyTransaction(actorRef);
+        expectBatchedModificationsReady(actorRef);
 
         final NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
 
@@ -376,7 +375,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
         // This sends the batched modification.
         transactionProxy.ready();
 
-        verifyOneBatchedModification(actorRef, new WriteModification(TestModel.TEST_PATH, nodeToWrite), false);
+        verifyOneBatchedModification(actorRef, new WriteModification(TestModel.TEST_PATH, nodeToWrite), true);
     }
 
     @Test(expected=IllegalStateException.class)
@@ -425,7 +424,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
     }
 
     @Test
-    public void testReadyWithReadWrite() throws Exception {
+    public void testReadWrite() throws Exception {
         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
 
         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
@@ -434,7 +433,34 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
                 eq(actorSelection(actorRef)), eqSerializedReadData());
 
         expectBatchedModifications(actorRef, 1);
-        expectReadyTransaction(actorRef);
+
+        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
+
+        transactionProxy.read(TestModel.TEST_PATH);
+
+        transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
+
+        transactionProxy.read(TestModel.TEST_PATH);
+
+        transactionProxy.read(TestModel.TEST_PATH);
+
+        List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
+        assertEquals("Captured BatchedModifications count", 1, batchedModifications.size());
+
+        verifyBatchedModifications(batchedModifications.get(0), false,
+                new WriteModification(TestModel.TEST_PATH, nodeToWrite));
+    }
+
+    @Test
+    public void testReadyWithReadWrite() throws Exception {
+        ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
+
+        NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+
+        doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync(
+                eq(actorSelection(actorRef)), eqSerializedReadData());
+
+        expectBatchedModificationsReady(actorRef);
 
         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
 
@@ -452,9 +478,33 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
 
         verify(mockActorContext).executeOperationAsync(eq(actorSelection(actorRef)),
                 isA(BatchedModifications.class));
+    }
 
-        verify(mockActorContext).executeOperationAsync(eq(actorSelection(actorRef)),
-                isA(ReadyTransaction.SERIALIZABLE_CLASS));
+    @Test
+    public void testReadyWithNoModifications() throws Exception {
+        ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
+
+        doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync(
+                eq(actorSelection(actorRef)), eqSerializedReadData());
+
+        expectBatchedModificationsReady(actorRef);
+
+        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
+
+        transactionProxy.read(TestModel.TEST_PATH);
+
+        DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
+
+        assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
+
+        ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
+
+        verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path()));
+
+        List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
+        assertEquals("Captured BatchedModifications count", 1, batchedModifications.size());
+
+        verifyBatchedModifications(batchedModifications.get(0), true);
     }
 
     @Test
@@ -465,7 +515,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
 
         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
 
-        expectBatchedModificationsReady(actorRef, 1);
+        expectBatchedModificationsReady(actorRef);
 
         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
 
@@ -496,7 +546,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
 
         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
 
-        expectBatchedModificationsReady(actorRef, 1);
+        expectBatchedModificationsReady(actorRef);
 
         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
 
@@ -691,18 +741,13 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
         doReturn(true).when(mockActorContext).isPathLocal(anyString());
 
-        doReturn(batchedModificationsReply(1)).when(mockActorContext).executeOperationAsync(
-                any(ActorSelection.class), isA(BatchedModifications.class));
+        expectBatchedModificationsReady(actorRef);
 
         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
 
         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
 
-        // testing ready
-        doReturn(readyTxReply(actorRef.path().toString())).when(mockActorContext).executeOperationAsync(
-            eq(actorSelection(actorRef)), isA(ReadyTransaction.class));
-
         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
 
         assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
@@ -1130,8 +1175,6 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
 
         expectBatchedModifications(actorRef, shardBatchedModificationCount);
 
-        expectReadyTransaction(actorRef);
-
         YangInstanceIdentifier writePath1 = TestModel.TEST_PATH;
         NormalizedNode<?, ?> writeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
 
@@ -1176,8 +1219,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
         verifyBatchedModifications(batchedModifications.get(1), false, new MergeModification(mergePath1, mergeNode1),
                 new MergeModification(mergePath2, mergeNode2), new WriteModification(writePath3, writeNode3));
 
-        boolean optimizedWriteOnly = type == WRITE_ONLY && dataStoreContextBuilder.build().isWriteOnlyTransactionOptimizationsEnabled();
-        verifyBatchedModifications(batchedModifications.get(2), optimizedWriteOnly, new MergeModification(mergePath3, mergeNode3),
+        verifyBatchedModifications(batchedModifications.get(2), true, new MergeModification(mergePath3, mergeNode3),
                 new DeleteModification(deletePath2));
     }
 
index cc860eafc7cf46a7aebc374f3351ba1eed5e1f6a..9e1557ae3cf605255721e84bec9da1642c520c8c 100644 (file)
@@ -11,7 +11,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.inOrder;
-import static org.opendaylight.controller.cluster.datastore.DataStoreVersions.CURRENT_VERSION;
+import static org.opendaylight.controller.cluster.datastore.DataStoreVersions.HELIUM_2_VERSION;
 import akka.actor.ActorRef;
 import akka.actor.PoisonPill;
 import akka.dispatch.Dispatchers;
@@ -246,7 +246,7 @@ public class PreLithiumShardTest extends AbstractShardTest {
             // Simulate the ForwardedReadyTransaction message for the first Tx that would be sent
             // by the ShardTransaction.
 
-            shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
+            shard.tell(new ForwardedReadyTransaction(transactionID1, HELIUM_2_VERSION,
                     cohort1, modification1, true), getRef());
             ReadyTransactionReply readyReply = ReadyTransactionReply.fromSerializable(
                     expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS));
@@ -261,11 +261,11 @@ public class PreLithiumShardTest extends AbstractShardTest {
 
             // Send the ForwardedReadyTransaction for the next 2 Tx's.
 
-            shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
+            shard.tell(new ForwardedReadyTransaction(transactionID2, HELIUM_2_VERSION,
                     cohort2, modification2, true), getRef());
             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
 
-            shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
+            shard.tell(new ForwardedReadyTransaction(transactionID3, HELIUM_2_VERSION,
                     cohort3, modification3, true), getRef());
             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
 
index 2980f83564fa3b1cafc483cae580fef79838611f..4cf8b67ddbdebeef130b3932a8f77d640b5337ae 100644 (file)
@@ -33,6 +33,7 @@ import org.opendaylight.controller.cluster.datastore.messages.DeleteDataReply;
 import org.opendaylight.controller.cluster.datastore.messages.MergeData;
 import org.opendaylight.controller.cluster.datastore.messages.MergeDataReply;
 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
+import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.WriteData;
 import org.opendaylight.controller.cluster.datastore.messages.WriteDataReply;
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
@@ -41,6 +42,7 @@ import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCoh
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
+import scala.concurrent.Future;
 
 /**
  * Unit tests for backwards compatibility with pre-Lithium versions.
@@ -93,6 +95,10 @@ public class PreLithiumTransactionProxyTest extends AbstractTransactionProxyTest
         return argThat(matcher);
     }
 
+    private Future<Object> readySerializedTxReply(String path, short version) {
+        return Futures.successful(new ReadyTransactionReply(path, version).toSerializable());
+    }
+
     private ActorRef testCompatibilityWithHeliumVersion(short version) throws Exception {
         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE, version);
 
@@ -110,7 +116,7 @@ public class PreLithiumTransactionProxyTest extends AbstractTransactionProxyTest
         doReturn(Futures.successful(new DeleteDataReply().toSerializable(version))).when(mockActorContext).
                 executeOperationAsync(eq(actorSelection(actorRef)), eqLegacyDeleteData(TestModel.TEST_PATH));
 
-        doReturn(readySerializedTxReply(actorRef.path().toString())).when(mockActorContext).executeOperationAsync(
+        doReturn(readySerializedTxReply(actorRef.path().toString(), version)).when(mockActorContext).executeOperationAsync(
                 eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS));
 
         doReturn(actorRef.path().toString()).when(mockActorContext).resolvePath(eq(actorRef.path().toString()),
@@ -170,7 +176,7 @@ public class PreLithiumTransactionProxyTest extends AbstractTransactionProxyTest
         doReturn(Futures.successful(new WriteDataReply().toSerializable(version))).when(mockActorContext).
                 executeOperationAsync(eq(actorSelection(actorRef)), eqLegacyWriteData(testNode));
 
-        doReturn(readySerializedTxReply(actorRef.path().toString())).when(mockActorContext).executeOperationAsync(
+        doReturn(readySerializedTxReply(actorRef.path().toString(), version)).when(mockActorContext).executeOperationAsync(
                 eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS));
 
         doReturn(actorRef.path().toString()).when(mockActorContext).resolvePath(eq(actorRef.path().toString()),
index c4027ad2a5f3ac1292c6213dedc7a8d0dedd6172..b302f527d6eb7019b68df59bac8bed656edb6127 100644 (file)
@@ -91,11 +91,5 @@ public class BatchedModificationsTest {
         BatchedModificationsReply clone = (BatchedModificationsReply) SerializationUtils.clone(
                 (Serializable) new BatchedModificationsReply(100).toSerializable());
         assertEquals("getNumBatched", 100, clone.getNumBatched());
-        assertEquals("getCohortPath", null, clone.getCohortPath());
-
-        clone = (BatchedModificationsReply) SerializationUtils.clone(
-                (Serializable) new BatchedModificationsReply(50, "cohort path").toSerializable());
-        assertEquals("getNumBatched", 50, clone.getNumBatched());
-        assertEquals("getCohortPath", "cohort path", clone.getCohortPath());
     }
 }
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/messages/ReadyTransactionReplyTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/messages/ReadyTransactionReplyTest.java
new file mode 100644 (file)
index 0000000..db525ea
--- /dev/null
@@ -0,0 +1,51 @@
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.messages;
+
+import static org.junit.Assert.assertEquals;
+import java.io.Serializable;
+import org.apache.commons.lang.SerializationUtils;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
+import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
+
+/**
+ * Unit tests for ReadyTransactionReply.
+ *
+ * @author Thomas Pantelis
+ */
+public class ReadyTransactionReplyTest {
+
+    @Test
+    public void testSerialization() {
+        String cohortPath = "cohort path";
+        ReadyTransactionReply expected = new ReadyTransactionReply(cohortPath);
+
+        Object serialized = expected.toSerializable();
+        assertEquals("Serialized type", ReadyTransactionReply.class, serialized.getClass());
+
+        ReadyTransactionReply actual = ReadyTransactionReply.fromSerializable(SerializationUtils.clone(
+                (Serializable) serialized));
+        assertEquals("getVersion", DataStoreVersions.CURRENT_VERSION, actual.getVersion());
+        assertEquals("getCohortPath", cohortPath, actual.getCohortPath());
+    }
+
+    @Test
+    public void testSerializationWithPreLithiumVersion() throws Exception {
+        String cohortPath = "cohort path";
+        ReadyTransactionReply expected = new ReadyTransactionReply(cohortPath, DataStoreVersions.HELIUM_2_VERSION);
+
+        Object serialized = expected.toSerializable();
+        assertEquals("Serialized type", ShardTransactionMessages.ReadyTransactionReply.class, serialized.getClass());
+
+        ReadyTransactionReply actual = ReadyTransactionReply.fromSerializable(SerializationUtils.clone(
+                (Serializable) serialized));
+        assertEquals("getVersion", DataStoreVersions.HELIUM_2_VERSION, actual.getVersion());
+        assertEquals("getCohortPath", cohortPath, actual.getCohortPath());
+    }
+}