From: Tom Pantelis Date: Tue, 7 Apr 2015 23:54:51 +0000 (-0400) Subject: Use BatchedModifications message in place of ReadyTransaction message X-Git-Tag: release/lithium~289^2 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=7cda871930ba64f8916aceb7751403481703b790 Use BatchedModifications message in place of ReadyTransaction message For the optimized write transactions (currently disabled), we used the last BatchedModifications message with the ready flag set to elide sending the ReadyTransaction message. We can do this in general for both read-write and write-only. In ShardWriteTransaction, if the BatchedModifications indicates ready, it simply calls readyTransaction to send ForwardedReadyTransaction message to the shard, same as with the ReadyTransacton message. Instead of returning a BatchedModificationsReply with the cohort path, I kept the ReadyTransactionReply so the Shard code mostly remains the same - otherwise we'd have to introduce an equivalent batched ForwardedReadyTransaction message. I also made ReadyTransactionReply Externalizable so we don't have to deal with sending it serialized or not (except for backwards compatibility - had to keep the protobuff class). TransactionContextImpl#readyTransaction is now the same as WriteTransactionContextImpl#readyTransaction so WriteTransactionContextImpl is no longer needed and was removed. Change-Id: I5175c77ca08a1877af9593a28e7c4cb46f03287a Signed-off-by: Tom Pantelis --- diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java index 9cd52b219a..65b6ac4bd0 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java @@ -447,8 +447,12 @@ public class Shard extends RaftActor { // if(isLeader()) { try { - BatchedModificationsReply reply = commitCoordinator.handleTransactionModifications(batched); - sender().tell(reply, self()); + boolean ready = commitCoordinator.handleTransactionModifications(batched); + if(ready) { + sender().tell(READY_TRANSACTION_REPLY, self()); + } else { + sender().tell(new BatchedModificationsReply(batched.getModifications().size()), self()); + } } catch (Exception e) { LOG.error("{}: Error handling BatchedModifications for Tx {}", persistenceId(), batched.getTransactionID(), e); @@ -488,20 +492,21 @@ public class Shard extends RaftActor { // node. In that case, the subsequent 3-phase commit messages won't contain the // transactionId so to maintain backwards compatibility, we create a separate cohort actor // to provide the compatible behavior. - if(ready.getTxnClientVersion() < DataStoreVersions.HELIUM_1_VERSION) { - LOG.debug("{}: Creating BackwardsCompatibleThreePhaseCommitCohort", persistenceId()); - ActorRef replyActorPath = getContext().actorOf(BackwardsCompatibleThreePhaseCommitCohort.props( - ready.getTransactionID())); + if(ready.getTxnClientVersion() < DataStoreVersions.LITHIUM_VERSION) { + ActorRef replyActorPath = getSelf(); + if(ready.getTxnClientVersion() < DataStoreVersions.HELIUM_1_VERSION) { + LOG.debug("{}: Creating BackwardsCompatibleThreePhaseCommitCohort", persistenceId()); + replyActorPath = getContext().actorOf(BackwardsCompatibleThreePhaseCommitCohort.props( + ready.getTransactionID())); + } ReadyTransactionReply readyTransactionReply = - new ReadyTransactionReply(Serialization.serializedActorPath(replyActorPath)); + new ReadyTransactionReply(Serialization.serializedActorPath(replyActorPath), + ready.getTxnClientVersion()); getSender().tell(ready.isReturnSerialized() ? readyTransactionReply.toSerializable() : - readyTransactionReply, getSelf()); - + readyTransactionReply, getSelf()); } else { - - getSender().tell(ready.isReturnSerialized() ? READY_TRANSACTION_REPLY.toSerializable() : - READY_TRANSACTION_REPLY, getSelf()); + getSender().tell(READY_TRANSACTION_REPLY, getSelf()); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java index 54f15fcb4b..b96e38d76a 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java @@ -22,7 +22,6 @@ import java.util.Queue; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications; -import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply; import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction; import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply; import org.opendaylight.controller.cluster.datastore.modification.Modification; @@ -119,7 +118,7 @@ public class ShardCommitCoordinator { * * @throws ExecutionException if an error occurs loading the cache */ - public BatchedModificationsReply handleTransactionModifications(BatchedModifications batched) + public boolean handleTransactionModifications(BatchedModifications batched) throws ExecutionException { CohortEntry cohortEntry = cohortCache.getIfPresent(batched.getTransactionID()); if(cohortEntry == null) { @@ -137,7 +136,6 @@ public class ShardCommitCoordinator { cohortEntry.applyModifications(batched.getModifications()); - String cohortPath = null; if(batched.isReady()) { if(log.isDebugEnabled()) { log.debug("{}: Readying Tx {}, client version {}", name, @@ -145,10 +143,9 @@ public class ShardCommitCoordinator { } cohortEntry.ready(cohortDecorator); - cohortPath = shardActorPath; } - return new BatchedModificationsReply(batched.getModifications().size(), cohortPath); + return batched.isReady(); } /** diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardWriteTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardWriteTransaction.java index d5dcfde803..3568911646 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardWriteTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardWriteTransaction.java @@ -88,7 +88,11 @@ public class ShardWriteTransaction extends ShardTransaction { modification.apply(transaction); } - getSender().tell(new BatchedModificationsReply(batched.getModifications().size()), getSelf()); + if(batched.isReady()) { + readyTransaction(transaction, false); + } else { + getSender().tell(new BatchedModificationsReply(batched.getModifications().size()), getSelf()); + } } catch (Exception e) { getSender().tell(new akka.actor.Status.Failure(e), getSelf()); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContextImpl.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContextImpl.java index b9900889b1..f34c5a2571 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContextImpl.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContextImpl.java @@ -14,13 +14,11 @@ import com.google.common.base.Optional; import com.google.common.util.concurrent.SettableFuture; import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier; import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications; -import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply; import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction; import org.opendaylight.controller.cluster.datastore.messages.DataExists; import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply; import org.opendaylight.controller.cluster.datastore.messages.ReadData; import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply; -import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction; import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply; import org.opendaylight.controller.cluster.datastore.messages.SerializableMessage; import org.opendaylight.controller.cluster.datastore.modification.DeleteModification; @@ -93,15 +91,11 @@ public class TransactionContextImpl extends AbstractTransactionContext { public Future readyTransaction() { LOG.debug("Tx {} readyTransaction called", getIdentifier()); - // Send the remaining batched modifications if any. + // Send the remaining batched modifications, if any, with the ready flag set. - sendBatchedModifications(); - - // Send the ReadyTransaction message to the Tx actor. - - Future readyReplyFuture = executeOperationAsync(ReadyTransaction.INSTANCE); + Future lastModificationsFuture = sendBatchedModifications(true); - return transformReadyReply(readyReplyFuture); + return transformReadyReply(lastModificationsFuture); } protected Future transformReadyReply(final Future readyReplyFuture) { @@ -113,33 +107,31 @@ public class TransactionContextImpl extends AbstractTransactionContext { public ActorSelection checkedApply(Object serializedReadyReply) { LOG.debug("Tx {} readyTransaction", getIdentifier()); - // At this point the rwady operation succeeded and we need to extract the cohort + // At this point the ready operation succeeded and we need to extract the cohort // actor path from the reply. - if (serializedReadyReply instanceof ReadyTransactionReply) { - return actorContext.actorSelection(((ReadyTransactionReply)serializedReadyReply).getCohortPath()); - } else if(serializedReadyReply instanceof BatchedModificationsReply) { - return actorContext.actorSelection(((BatchedModificationsReply)serializedReadyReply).getCohortPath()); - } else if(serializedReadyReply.getClass().equals(ReadyTransactionReply.SERIALIZABLE_CLASS)) { - ReadyTransactionReply reply = ReadyTransactionReply.fromSerializable(serializedReadyReply); - String cohortPath = deserializeCohortPath(reply.getCohortPath()); - return actorContext.actorSelection(cohortPath); - } else { - // Throwing an exception here will fail the Future. - throw new IllegalArgumentException(String.format("%s: Invalid reply type %s", - getIdentifier(), serializedReadyReply.getClass())); + if(ReadyTransactionReply.isSerializedType(serializedReadyReply)) { + ReadyTransactionReply readyTxReply = ReadyTransactionReply.fromSerializable(serializedReadyReply); + return actorContext.actorSelection(extractCohortPathFrom(readyTxReply)); } + + // Throwing an exception here will fail the Future. + throw new IllegalArgumentException(String.format("%s: Invalid reply type %s", + getIdentifier(), serializedReadyReply.getClass())); } }, TransactionProxy.SAME_FAILURE_TRANSFORMER, actorContext.getClientDispatcher()); } - protected String deserializeCohortPath(String cohortPath) { - return cohortPath; + protected String extractCohortPathFrom(ReadyTransactionReply readyTxReply) { + return readyTxReply.getCohortPath(); + } + + private BatchedModifications newBatchedModifications() { + return new BatchedModifications(getIdentifier().toString(), remoteTransactionVersion, transactionChainId); } private void batchModification(Modification modification) { if(batchedModifications == null) { - batchedModifications = new BatchedModifications(getIdentifier().toString(), remoteTransactionVersion, - transactionChainId); + batchedModifications = newBatchedModifications(); } batchedModifications.addModification(modification); @@ -156,7 +148,11 @@ public class TransactionContextImpl extends AbstractTransactionContext { protected Future sendBatchedModifications(boolean ready) { Future sent = null; - if(batchedModifications != null) { + if(ready || (batchedModifications != null && !batchedModifications.getModifications().isEmpty())) { + if(batchedModifications == null) { + batchedModifications = newBatchedModifications(); + } + if(LOG.isDebugEnabled()) { LOG.debug("Tx {} sending {} batched modifications, ready: {}", getIdentifier(), batchedModifications.getModifications().size(), ready); @@ -165,8 +161,11 @@ public class TransactionContextImpl extends AbstractTransactionContext { batchedModifications.setReady(ready); sent = executeOperationAsync(batchedModifications); - batchedModifications = new BatchedModifications(getIdentifier().toString(), remoteTransactionVersion, - transactionChainId); + if(ready) { + batchedModifications = null; + } else { + batchedModifications = newBatchedModifications(); + } } return sent; diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java index 0fd37b9ece..388dd9f4bd 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java @@ -726,10 +726,6 @@ public class TransactionProxy extends AbstractDOMStoreTransaction readyTransaction() { - LOG.debug("Tx {} readyTransaction called", getIdentifier()); - - // Send the remaining batched modifications if any. - - Future lastModificationsFuture = sendBatchedModifications(true); - - return transformReadyReply(lastModificationsFuture); - } -} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumTransactionContextImpl.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumTransactionContextImpl.java index 9509b06ba7..d17497c18c 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumTransactionContextImpl.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumTransactionContextImpl.java @@ -15,6 +15,7 @@ import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIden import org.opendaylight.controller.cluster.datastore.messages.DeleteData; import org.opendaylight.controller.cluster.datastore.messages.MergeData; import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction; +import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply; import org.opendaylight.controller.cluster.datastore.messages.WriteData; import org.opendaylight.controller.cluster.datastore.utils.ActorContext; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; @@ -70,7 +71,7 @@ public class PreLithiumTransactionContextImpl extends TransactionContextImpl { } @Override - protected String deserializeCohortPath(String cohortPath) { + protected String extractCohortPathFrom(ReadyTransactionReply readyTxReply) { // In base Helium we used to return the local path of the actor which represented // a remote ThreePhaseCommitCohort. The local path would then be converted to // a remote path using this resolvePath method. To maintain compatibility with @@ -79,9 +80,9 @@ public class PreLithiumTransactionContextImpl extends TransactionContextImpl { // we could remove this code to resolvePath and just use the cohortPath as the // resolved cohortPath if(getRemoteTransactionVersion() < DataStoreVersions.HELIUM_1_VERSION) { - return getActorContext().resolvePath(transactionPath, cohortPath); + return getActorContext().resolvePath(transactionPath, readyTxReply.getCohortPath()); } - return cohortPath; + return readyTxReply.getCohortPath(); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/BatchedModificationsReply.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/BatchedModificationsReply.java index a10c6ac3fb..895de3a626 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/BatchedModificationsReply.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/BatchedModificationsReply.java @@ -19,11 +19,7 @@ import java.io.ObjectOutput; public class BatchedModificationsReply extends VersionedExternalizableMessage { private static final long serialVersionUID = 1L; - private static final byte COHORT_PATH_NOT_PRESENT = 0; - private static final byte COHORT_PATH_PRESENT = 1; - private int numBatched; - private String cohortPath; public BatchedModificationsReply() { } @@ -32,40 +28,20 @@ public class BatchedModificationsReply extends VersionedExternalizableMessage { this.numBatched = numBatched; } - public BatchedModificationsReply(int numBatched, String cohortPath) { - this.numBatched = numBatched; - this.cohortPath = cohortPath; - } - public int getNumBatched() { return numBatched; } - public String getCohortPath() { - return cohortPath; - } - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { super.readExternal(in); numBatched = in.readInt(); - - if(in.readByte() == COHORT_PATH_PRESENT) { - cohortPath = in.readUTF(); - } } @Override public void writeExternal(ObjectOutput out) throws IOException { super.writeExternal(out); out.writeInt(numBatched); - - if(cohortPath != null) { - out.writeByte(COHORT_PATH_PRESENT); - out.writeUTF(cohortPath); - } else { - out.writeByte(COHORT_PATH_NOT_PRESENT); - } } @Override @@ -76,8 +52,7 @@ public class BatchedModificationsReply extends VersionedExternalizableMessage { @Override public String toString() { StringBuilder builder = new StringBuilder(); - builder.append("BatchedModificationsReply [numBatched=").append(numBatched).append(", cohortPath=") - .append(cohortPath).append("]"); + builder.append("BatchedModificationsReply [numBatched=").append(numBatched).append("]"); return builder.toString(); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ForwardedReadyTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ForwardedReadyTransaction.java index 38886c9a58..0f87243059 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ForwardedReadyTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ForwardedReadyTransaction.java @@ -20,9 +20,9 @@ public class ForwardedReadyTransaction { private final DOMStoreThreePhaseCommitCohort cohort; private final Modification modification; private final boolean returnSerialized; - private final int txnClientVersion; + private final short txnClientVersion; - public ForwardedReadyTransaction(String transactionID, int txnClientVersion, + public ForwardedReadyTransaction(String transactionID, short txnClientVersion, DOMStoreThreePhaseCommitCohort cohort, Modification modification, boolean returnSerialized) { this.transactionID = transactionID; @@ -48,7 +48,7 @@ public class ForwardedReadyTransaction { return returnSerialized; } - public int getTxnClientVersion() { + public short getTxnClientVersion() { return txnClientVersion; } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ReadyTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ReadyTransaction.java index 09617abde9..8d617d0ba7 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ReadyTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ReadyTransaction.java @@ -10,6 +10,7 @@ package org.opendaylight.controller.cluster.datastore.messages; import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages; +@Deprecated public class ReadyTransaction implements SerializableMessage{ public static final Class SERIALIZABLE_CLASS = ShardTransactionMessages.ReadyTransaction.class; diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ReadyTransactionReply.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ReadyTransactionReply.java index 282e23ed3b..b25a5ddf29 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ReadyTransactionReply.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ReadyTransactionReply.java @@ -8,15 +8,29 @@ package org.opendaylight.controller.cluster.datastore.messages; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import org.opendaylight.controller.cluster.datastore.DataStoreVersions; import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages; -public class ReadyTransactionReply implements SerializableMessage { +public class ReadyTransactionReply extends VersionedExternalizableMessage { + private static final long serialVersionUID = 1L; + public static final Class SERIALIZABLE_CLASS = ShardTransactionMessages.ReadyTransactionReply.class; - private final String cohortPath; + private String cohortPath; + + public ReadyTransactionReply() { + } public ReadyTransactionReply(String cohortPath) { + this(cohortPath, DataStoreVersions.CURRENT_VERSION); + } + + public ReadyTransactionReply(String cohortPath, short version) { + super(version); this.cohortPath = cohortPath; } @@ -25,16 +39,38 @@ public class ReadyTransactionReply implements SerializableMessage { } @Override - public ShardTransactionMessages.ReadyTransactionReply toSerializable() { - return ShardTransactionMessages.ReadyTransactionReply.newBuilder() - .setActorPath(cohortPath) - .build(); + public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + super.readExternal(in); + cohortPath = in.readUTF(); + } + + @Override + public void writeExternal(ObjectOutput out) throws IOException { + super.writeExternal(out); + out.writeUTF(cohortPath); + } + + @Override + public Object toSerializable() { + if(getVersion() >= DataStoreVersions.LITHIUM_VERSION) { + return this; + } else { + return ShardTransactionMessages.ReadyTransactionReply.newBuilder().setActorPath(cohortPath).build(); + } } public static ReadyTransactionReply fromSerializable(Object serializable) { - ShardTransactionMessages.ReadyTransactionReply o = - (ShardTransactionMessages.ReadyTransactionReply) serializable; + if(serializable instanceof ReadyTransactionReply) { + return (ReadyTransactionReply)serializable; + } else { + ShardTransactionMessages.ReadyTransactionReply o = + (ShardTransactionMessages.ReadyTransactionReply) serializable; + return new ReadyTransactionReply(o.getActorPath(), DataStoreVersions.HELIUM_2_VERSION); + } + } - return new ReadyTransactionReply(o.getActorPath()); + public static boolean isSerializedType(Object message) { + return message instanceof ReadyTransactionReply || + message instanceof ShardTransactionMessages.ReadyTransactionReply; } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionProxyTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionProxyTest.java index c6c5486ee3..6a1e12a96b 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionProxyTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionProxyTest.java @@ -50,7 +50,6 @@ import org.opendaylight.controller.cluster.datastore.messages.DataExists; import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply; import org.opendaylight.controller.cluster.datastore.messages.ReadData; import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply; -import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction; import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply; import org.opendaylight.controller.cluster.datastore.modification.AbstractModification; import org.opendaylight.controller.cluster.datastore.modification.Modification; @@ -204,10 +203,6 @@ public abstract class AbstractTransactionProxyTest { return argThat(matcher); } - protected Future readySerializedTxReply(String path) { - return Futures.successful((Object)new ReadyTransactionReply(path).toSerializable()); - } - protected Future readyTxReply(String path) { return Futures.successful((Object)new ReadyTransactionReply(path)); } @@ -250,10 +245,8 @@ public abstract class AbstractTransactionProxyTest { eq(actorSelection(actorRef)), isA(BatchedModifications.class)); } - protected void expectBatchedModificationsReady(ActorRef actorRef, int count) { - Future replyFuture = Futures.successful( - new BatchedModificationsReply(count, actorRef.path().toString())); - doReturn(replyFuture).when(mockActorContext).executeOperationAsync( + protected void expectBatchedModificationsReady(ActorRef actorRef) { + doReturn(readyTxReply(actorRef.path().toString())).when(mockActorContext).executeOperationAsync( eq(actorSelection(actorRef)), isA(BatchedModifications.class)); } @@ -267,11 +260,6 @@ public abstract class AbstractTransactionProxyTest { any(ActorSelection.class), isA(BatchedModifications.class)); } - protected void expectReadyTransaction(ActorRef actorRef) { - doReturn(readySerializedTxReply(actorRef.path().toString())).when(mockActorContext).executeOperationAsync( - eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS)); - } - protected void expectFailedBatchedModifications(ActorRef actorRef) { doReturn(Futures.failed(new TestException())).when(mockActorContext).executeOperationAsync( eq(actorSelection(actorRef)), isA(BatchedModifications.class)); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java index e04c1a5d18..b3a0430f93 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java @@ -436,42 +436,42 @@ public class ShardTest extends AbstractShardTest { waitUntilLeader(shard); - final String transactionID1 = "tx1"; - final String transactionID2 = "tx2"; - final String transactionID3 = "tx3"; + // Setup 3 simulated transactions with mock cohorts backed by real cohorts. - final AtomicReference mockCohort1 = new AtomicReference<>(); - final AtomicReference mockCohort2 = new AtomicReference<>(); - final AtomicReference mockCohort3 = new AtomicReference<>(); - ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() { - @Override - public DOMStoreThreePhaseCommitCohort decorate(String transactionID, DOMStoreThreePhaseCommitCohort actual) { - if(transactionID.equals(transactionID1)) { - mockCohort1.set(createDelegatingMockCohort("cohort1", actual)); - return mockCohort1.get(); - } else if(transactionID.equals(transactionID2)) { - mockCohort2.set(createDelegatingMockCohort("cohort2", actual)); - return mockCohort2.get(); - } else { - mockCohort3.set(createDelegatingMockCohort("cohort3", actual)); - return mockCohort3.get(); - } - } - }; + InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore(); - shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator); + String transactionID1 = "tx1"; + MutableCompositeModification modification1 = new MutableCompositeModification(); + DOMStoreThreePhaseCommitCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore, + TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1); + + String transactionID2 = "tx2"; + MutableCompositeModification modification2 = new MutableCompositeModification(); + DOMStoreThreePhaseCommitCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore, + TestModel.OUTER_LIST_PATH, + ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), + modification2); + + String transactionID3 = "tx3"; + MutableCompositeModification modification3 = new MutableCompositeModification(); + DOMStoreThreePhaseCommitCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore, + YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH) + .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(), + ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), + modification3); long timeoutSec = 5; final FiniteDuration duration = FiniteDuration.create(timeoutSec, TimeUnit.SECONDS); final Timeout timeout = new Timeout(duration); - // Send a BatchedModifications message for the first transaction. + // Simulate the ForwardedReadyTransaction message for the first Tx that would be sent + // by the ShardTransaction. - shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH, - ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef()); - BatchedModificationsReply batchedReply = expectMsgClass(duration, BatchedModificationsReply.class); - assertEquals("getCohortPath", shard.path().toString(), batchedReply.getCohortPath()); - assertEquals("getNumBatched", 1, batchedReply.getNumBatched()); + shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION, + cohort1, modification1, true), getRef()); + ReadyTransactionReply readyReply = ReadyTransactionReply.fromSerializable( + expectMsgClass(duration, ReadyTransactionReply.class)); + assertEquals("Cohort path", shard.path().toString(), readyReply.getCohortPath()); // Send the CanCommitTransaction message for the first Tx. @@ -480,16 +480,15 @@ public class ShardTest extends AbstractShardTest { expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS)); assertEquals("Can commit", true, canCommitReply.getCanCommit()); - // Send BatchedModifications for the next 2 Tx's. + // Send the ForwardedReadyTransaction for the next 2 Tx's. - shard.tell(newBatchedModifications(transactionID2, TestModel.OUTER_LIST_PATH, - ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), true), getRef()); - expectMsgClass(duration, BatchedModificationsReply.class); + shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION, + cohort2, modification2, true), getRef()); + expectMsgClass(duration, ReadyTransactionReply.class); - shard.tell(newBatchedModifications(transactionID3, YangInstanceIdentifier.builder( - TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(), - ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true), getRef()); - expectMsgClass(duration, BatchedModificationsReply.class); + shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION, + cohort3, modification3, true), getRef()); + expectMsgClass(duration, ReadyTransactionReply.class); // Send the CanCommitTransaction message for the next 2 Tx's. These should get queued and // processed after the first Tx completes. @@ -582,16 +581,16 @@ public class ShardTest extends AbstractShardTest { assertEquals("Commits complete", true, done); - InOrder inOrder = inOrder(mockCohort1.get(), mockCohort2.get(), mockCohort3.get()); - inOrder.verify(mockCohort1.get()).canCommit(); - inOrder.verify(mockCohort1.get()).preCommit(); - inOrder.verify(mockCohort1.get()).commit(); - inOrder.verify(mockCohort2.get()).canCommit(); - inOrder.verify(mockCohort2.get()).preCommit(); - inOrder.verify(mockCohort2.get()).commit(); - inOrder.verify(mockCohort3.get()).canCommit(); - inOrder.verify(mockCohort3.get()).preCommit(); - inOrder.verify(mockCohort3.get()).commit(); + InOrder inOrder = inOrder(cohort1, cohort2, cohort3); + inOrder.verify(cohort1).canCommit(); + inOrder.verify(cohort1).preCommit(); + inOrder.verify(cohort1).commit(); + inOrder.verify(cohort2).canCommit(); + inOrder.verify(cohort2).preCommit(); + inOrder.verify(cohort2).commit(); + inOrder.verify(cohort3).canCommit(); + inOrder.verify(cohort3).preCommit(); + inOrder.verify(cohort3).commit(); // Verify data in the data store. @@ -669,7 +668,7 @@ public class ShardTest extends AbstractShardTest { shard.tell(newBatchedModifications(transactionID, YangInstanceIdentifier.builder( TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(), ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true), getRef()); - expectMsgClass(duration, BatchedModificationsReply.class); + expectMsgClass(duration, ReadyTransactionReply.class); // Send the CanCommitTransaction message. @@ -728,7 +727,7 @@ public class ShardTest extends AbstractShardTest { YangInstanceIdentifier path = TestModel.TEST_PATH; shard.tell(newBatchedModifications(transactionID1, transactionChainID, path, containerNode, true), getRef()); - expectMsgClass(duration, BatchedModificationsReply.class); + expectMsgClass(duration, ReadyTransactionReply.class); // Create a read Tx on the same chain. @@ -810,14 +809,24 @@ public class ShardTest extends AbstractShardTest { waitUntilLeader(shard); + InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore(); + + // Setup a simulated transactions with a mock cohort. + String transactionID = "tx"; + MutableCompositeModification modification = new MutableCompositeModification(); + NormalizedNode containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME); + DOMStoreThreePhaseCommitCohort cohort = setupMockWriteTransaction("cohort", dataStore, + TestModel.TEST_PATH, containerNode, modification); + FiniteDuration duration = duration("5 seconds"); - // Send a BatchedModifications to start a transaction. + // Simulate the ForwardedReadyTransaction messages that would be sent + // by the ShardTransaction. - NormalizedNode containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME); - shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH, containerNode, true), getRef()); - expectMsgClass(duration, BatchedModificationsReply.class); + shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION, + cohort, modification, true), getRef()); + expectMsgClass(duration, ReadyTransactionReply.class); // Send the CanCommitTransaction message. @@ -831,6 +840,11 @@ public class ShardTest extends AbstractShardTest { shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef()); expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS); + InOrder inOrder = inOrder(cohort); + inOrder.verify(cohort).canCommit(); + inOrder.verify(cohort).preCommit(); + inOrder.verify(cohort).commit(); + NormalizedNode actualNode = readStore(shard, TestModel.TEST_PATH); assertEquals(TestModel.TEST_QNAME.getLocalName(), containerNode, actualNode); @@ -864,7 +878,7 @@ public class ShardTest extends AbstractShardTest { shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION, cohort, modification, true), getRef()); - expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS); + expectMsgClass(duration, ReadyTransactionReply.class); // Send the CanCommitTransaction message. @@ -919,7 +933,7 @@ public class ShardTest extends AbstractShardTest { shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION, cohort, modification, true), getRef()); - expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS); + expectMsgClass(duration, ReadyTransactionReply.class); // Send the CanCommitTransaction message. @@ -958,40 +972,34 @@ public class ShardTest extends AbstractShardTest { waitUntilLeader(shard); - // Setup 2 mock cohorts. The first one fails in the commit phase. + // Setup 2 simulated transactions with mock cohorts. The first one fails in the + // commit phase. - final String transactionID1 = "tx1"; - final DOMStoreThreePhaseCommitCohort cohort1 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1"); + String transactionID1 = "tx1"; + MutableCompositeModification modification1 = new MutableCompositeModification(); + DOMStoreThreePhaseCommitCohort cohort1 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1"); doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit(); doReturn(Futures.immediateFuture(null)).when(cohort1).preCommit(); doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort1).commit(); - final String transactionID2 = "tx2"; - final DOMStoreThreePhaseCommitCohort cohort2 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort2"); + String transactionID2 = "tx2"; + MutableCompositeModification modification2 = new MutableCompositeModification(); + DOMStoreThreePhaseCommitCohort cohort2 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort2"); doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit(); - ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() { - @Override - public DOMStoreThreePhaseCommitCohort decorate(String transactionID, - DOMStoreThreePhaseCommitCohort actual) { - return transactionID1.equals(transactionID) ? cohort1 : cohort2; - } - }; - - shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator); - FiniteDuration duration = duration("5 seconds"); final Timeout timeout = new Timeout(duration); - // Send BatchedModifications to start and ready each transaction. + // Simulate the ForwardedReadyTransaction messages that would be sent + // by the ShardTransaction. - shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH, - ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef()); - expectMsgClass(duration, BatchedModificationsReply.class); + shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION, + cohort1, modification1, true), getRef()); + expectMsgClass(duration, ReadyTransactionReply.class); - shard.tell(newBatchedModifications(transactionID2, TestModel.TEST_PATH, - ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef()); - expectMsgClass(duration, BatchedModificationsReply.class); + shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION, + cohort2, modification2, true), getRef()); + expectMsgClass(duration, ReadyTransactionReply.class); // Send the CanCommitTransaction message for the first Tx. @@ -1044,27 +1052,19 @@ public class ShardTest extends AbstractShardTest { waitUntilLeader(shard); String transactionID = "tx1"; - final DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1"); + MutableCompositeModification modification = new MutableCompositeModification(); + DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1"); doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit(); doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).preCommit(); - ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() { - @Override - public DOMStoreThreePhaseCommitCohort decorate(String transactionID, - DOMStoreThreePhaseCommitCohort actual) { - return cohort; - } - }; - - shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator); - FiniteDuration duration = duration("5 seconds"); - // Send BatchedModifications to start and ready a transaction. + // Simulate the ForwardedReadyTransaction messages that would be sent + // by the ShardTransaction. - shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH, - ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef()); - expectMsgClass(duration, BatchedModificationsReply.class); + shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION, + cohort, modification, true), getRef()); + expectMsgClass(duration, ReadyTransactionReply.class); // Send the CanCommitTransaction message. @@ -1099,24 +1099,16 @@ public class ShardTest extends AbstractShardTest { final FiniteDuration duration = duration("5 seconds"); String transactionID = "tx1"; - final DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1"); + MutableCompositeModification modification = new MutableCompositeModification(); + DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1"); doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).canCommit(); - ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() { - @Override - public DOMStoreThreePhaseCommitCohort decorate(String transactionID, - DOMStoreThreePhaseCommitCohort actual) { - return cohort; - } - }; - - shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator); - - // Send BatchedModifications to start and ready a transaction. + // Simulate the ForwardedReadyTransaction messages that would be sent + // by the ShardTransaction. - shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH, - ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef()); - expectMsgClass(duration, BatchedModificationsReply.class); + shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION, + cohort, modification, true), getRef()); + expectMsgClass(duration, ReadyTransactionReply.class); // Send the CanCommitTransaction message. @@ -1160,9 +1152,14 @@ public class ShardTest extends AbstractShardTest { } }; - shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH, - ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef()); - expectMsgClass(duration, BatchedModificationsReply.class); + MutableCompositeModification modification = new MutableCompositeModification(); + DOMStoreThreePhaseCommitCohort cohort = setupMockWriteTransaction("cohort1", dataStore, + TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), + modification, preCommit); + + shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION, + cohort, modification, true), getRef()); + expectMsgClass(duration, ReadyTransactionReply.class); shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef()); CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable( @@ -1196,26 +1193,42 @@ public class ShardTest extends AbstractShardTest { final FiniteDuration duration = duration("5 seconds"); + InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore(); + writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME)); writeToStore(shard, TestModel.OUTER_LIST_PATH, ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build()); - // Create and ready the 1st Tx - will timeout + // Create 1st Tx - will timeout String transactionID1 = "tx1"; - shard.tell(newBatchedModifications(transactionID1, YangInstanceIdentifier.builder( - TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(), - ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true), getRef()); - expectMsgClass(duration, BatchedModificationsReply.class); + MutableCompositeModification modification1 = new MutableCompositeModification(); + DOMStoreThreePhaseCommitCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore, + YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH) + .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(), + ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), + modification1); - // Create and ready the 2nd Tx + // Create 2nd Tx - String transactionID2 = "tx2"; + String transactionID2 = "tx3"; + MutableCompositeModification modification2 = new MutableCompositeModification(); YangInstanceIdentifier listNodePath = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH) - .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build(); - shard.tell(newBatchedModifications(transactionID2, listNodePath, - ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2), true), getRef()); - expectMsgClass(duration, BatchedModificationsReply.class); + .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build(); + DOMStoreThreePhaseCommitCohort cohort2 = setupMockWriteTransaction("cohort3", dataStore, + listNodePath, + ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2), + modification2); + + // Ready the Tx's + + shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION, + cohort1, modification1, true), getRef()); + expectMsgClass(duration, ReadyTransactionReply.class); + + shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION, + cohort2, modification2, true), getRef()); + expectMsgClass(duration, ReadyTransactionReply.class); // canCommit 1st Tx. We don't send the commit so it should timeout. @@ -1252,23 +1265,38 @@ public class ShardTest extends AbstractShardTest { final FiniteDuration duration = duration("5 seconds"); + InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore(); + String transactionID1 = "tx1"; + MutableCompositeModification modification1 = new MutableCompositeModification(); + DOMStoreThreePhaseCommitCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore, + TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1); + String transactionID2 = "tx2"; + MutableCompositeModification modification2 = new MutableCompositeModification(); + DOMStoreThreePhaseCommitCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore, + TestModel.OUTER_LIST_PATH, + ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), + modification2); + String transactionID3 = "tx3"; + MutableCompositeModification modification3 = new MutableCompositeModification(); + DOMStoreThreePhaseCommitCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore, + TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification3); - // Send a BatchedModifications to start transactions and ready them. + // Ready the Tx's - shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH, - ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef()); - expectMsgClass(duration, BatchedModificationsReply.class); + shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION, + cohort1, modification1, true), getRef()); + expectMsgClass(duration, ReadyTransactionReply.class); - shard.tell(newBatchedModifications(transactionID2,TestModel.OUTER_LIST_PATH, - ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), true), getRef()); - expectMsgClass(duration, BatchedModificationsReply.class); + shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION, + cohort2, modification2, true), getRef()); + expectMsgClass(duration, ReadyTransactionReply.class); - shard.tell(newBatchedModifications(transactionID3, TestModel.TEST_PATH, - ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef()); - expectMsgClass(duration, BatchedModificationsReply.class); + shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION, + cohort3, modification3, true), getRef()); + expectMsgClass(duration, ReadyTransactionReply.class); // canCommit 1st Tx. @@ -1313,37 +1341,30 @@ public class ShardTest extends AbstractShardTest { // Setup 2 simulated transactions with mock cohorts. The first one will be aborted. - final String transactionID1 = "tx1"; - final DOMStoreThreePhaseCommitCohort cohort1 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1"); + String transactionID1 = "tx1"; + MutableCompositeModification modification1 = new MutableCompositeModification(); + DOMStoreThreePhaseCommitCohort cohort1 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1"); doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit(); doReturn(Futures.immediateFuture(null)).when(cohort1).abort(); - final String transactionID2 = "tx2"; - final DOMStoreThreePhaseCommitCohort cohort2 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort2"); + String transactionID2 = "tx2"; + MutableCompositeModification modification2 = new MutableCompositeModification(); + DOMStoreThreePhaseCommitCohort cohort2 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort2"); doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit(); FiniteDuration duration = duration("5 seconds"); final Timeout timeout = new Timeout(duration); - ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() { - @Override - public DOMStoreThreePhaseCommitCohort decorate(String transactionID, - DOMStoreThreePhaseCommitCohort actual) { - return transactionID1.equals(transactionID) ? cohort1 : cohort2; - } - }; + // Simulate the ForwardedReadyTransaction messages that would be sent + // by the ShardTransaction. - shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator); - - // Send BatchedModifications to start and ready each transaction. + shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION, + cohort1, modification1, true), getRef()); + expectMsgClass(duration, ReadyTransactionReply.class); - shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH, - ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef()); - expectMsgClass(duration, BatchedModificationsReply.class); - - shard.tell(newBatchedModifications(transactionID2, TestModel.TEST_PATH, - ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef()); - expectMsgClass(duration, BatchedModificationsReply.class); + shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION, + cohort2, modification2, true), getRef()); + expectMsgClass(duration, ReadyTransactionReply.class); // Send the CanCommitTransaction message for the first Tx. diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java index e63ace3e2c..9715f668e3 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java @@ -409,34 +409,58 @@ public class ShardTransactionTest extends AbstractActorTest { } @Test - public void testOnReceiveReadyTransaction() throws Exception { + public void testOnReceiveBatchedModificationsReady() throws Exception { + new JavaTestKit(getSystem()) {{ + + final ActorRef transaction = newTransactionActor(store.newWriteOnlyTransaction(), + "testOnReceiveBatchedModificationsReady"); + + JavaTestKit watcher = new JavaTestKit(getSystem()); + watcher.watch(transaction); + + YangInstanceIdentifier writePath = TestModel.TEST_PATH; + NormalizedNode writeData = ImmutableContainerNodeBuilder.create().withNodeIdentifier( + new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)). + withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build(); + + BatchedModifications batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null); + batched.setReady(true); + batched.addModification(new WriteModification(writePath, writeData)); + + transaction.tell(batched, getRef()); + + expectMsgClass(duration("5 seconds"), ReadyTransactionReply.class); + watcher.expectMsgClass(duration("5 seconds"), Terminated.class); + }}; + } + + @Test + public void testOnReceivePreLithiumReadyTransaction() throws Exception { new JavaTestKit(getSystem()) {{ final ActorRef transaction = newTransactionActor(store.newReadWriteTransaction(), - "testReadyTransaction"); + "testReadyTransaction", DataStoreVersions.HELIUM_2_VERSION); - watch(transaction); + JavaTestKit watcher = new JavaTestKit(getSystem()); + watcher.watch(transaction); transaction.tell(new ReadyTransaction().toSerializable(), getRef()); - expectMsgAnyClassOf(duration("5 seconds"), ReadyTransactionReply.SERIALIZABLE_CLASS, - Terminated.class); - expectMsgAnyClassOf(duration("5 seconds"), ReadyTransactionReply.SERIALIZABLE_CLASS, - Terminated.class); + expectMsgClass(duration("5 seconds"), ReadyTransactionReply.SERIALIZABLE_CLASS); + watcher.expectMsgClass(duration("5 seconds"), Terminated.class); }}; // test new JavaTestKit(getSystem()) {{ final ActorRef transaction = newTransactionActor(store.newReadWriteTransaction(), - "testReadyTransaction2"); + "testReadyTransaction2", DataStoreVersions.HELIUM_2_VERSION); - watch(transaction); + JavaTestKit watcher = new JavaTestKit(getSystem()); + watcher.watch(transaction); transaction.tell(new ReadyTransaction(), getRef()); - expectMsgAnyClassOf(duration("5 seconds"), ReadyTransactionReply.class, - Terminated.class); - expectMsgAnyClassOf(duration("5 seconds"), ReadyTransactionReply.class, - Terminated.class); + expectMsgClass(duration("5 seconds"), ReadyTransactionReply.class); + watcher.expectMsgClass(duration("5 seconds"), Terminated.class); }}; } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxyTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxyTest.java index acba775445..026b549028 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxyTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxyTest.java @@ -30,8 +30,6 @@ import java.util.concurrent.atomic.AtomicReference; import org.junit.Assert; import org.junit.Test; import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications; -import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply; -import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction; import org.opendaylight.controller.cluster.datastore.modification.WriteModification; import org.opendaylight.controller.cluster.datastore.shardstrategy.DefaultShardStrategy; import org.opendaylight.controller.cluster.datastore.utils.ActorContext; @@ -176,7 +174,7 @@ public class TransactionChainProxyTest extends AbstractTransactionProxyTest { fail("Tx 2 should not have initiated until the Tx 1's ready future completed"); } - batchedReplyPromise1.success(new BatchedModificationsReply(1, txActorRef1.path().toString())); + batchedReplyPromise1.success(readyTxReply(txActorRef1.path().toString()).value().get().get()); // Tx 2 should've proceeded to find the primary shard. verify(mockActorContext, timeout(5000).times(2)).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD)); @@ -196,7 +194,7 @@ public class TransactionChainProxyTest extends AbstractTransactionProxyTest { Promise readyReplyPromise1 = akka.dispatch.Futures.promise(); doReturn(readyReplyPromise1.future()).when(mockActorContext).executeOperationAsync( - eq(actorSelection(txActorRef1)), isA(ReadyTransaction.SERIALIZABLE_CLASS)); + eq(actorSelection(txActorRef1)), isA(BatchedModifications.class)); DOMStoreWriteTransaction writeTx1 = txChainProxy.newReadWriteTransaction(); @@ -205,7 +203,7 @@ public class TransactionChainProxyTest extends AbstractTransactionProxyTest { writeTx1.ready(); - verifyOneBatchedModification(txActorRef1, new WriteModification(TestModel.TEST_PATH, writeNode1), false); + verifyOneBatchedModification(txActorRef1, new WriteModification(TestModel.TEST_PATH, writeNode1), true); String tx2MemberName = "tx2MemberName"; doReturn(tx2MemberName).when(mockActorContext).getCurrentMemberName(); @@ -247,7 +245,7 @@ public class TransactionChainProxyTest extends AbstractTransactionProxyTest { fail("Tx 2 should not have initiated until the Tx 1's ready future completed"); } - readyReplyPromise1.success(readySerializedTxReply(txActorRef1.path().toString()).value().get().get()); + readyReplyPromise1.success(readyTxReply(txActorRef1.path().toString()).value().get().get()); verify(mockActorContext, timeout(5000)).executeOperationAsync(eq(getSystem().actorSelection(shardActorRef2.path())), eqCreateTransaction(tx2MemberName, READ_WRITE)); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java index b95eaf64d7..6cfef19491 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java @@ -336,8 +336,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync( eq(actorSelection(actorRef)), eqSerializedReadData()); - expectBatchedModifications(actorRef, 1); - expectReadyTransaction(actorRef); + expectBatchedModificationsReady(actorRef); final NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); @@ -376,7 +375,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { // This sends the batched modification. transactionProxy.ready(); - verifyOneBatchedModification(actorRef, new WriteModification(TestModel.TEST_PATH, nodeToWrite), false); + verifyOneBatchedModification(actorRef, new WriteModification(TestModel.TEST_PATH, nodeToWrite), true); } @Test(expected=IllegalStateException.class) @@ -425,7 +424,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { } @Test - public void testReadyWithReadWrite() throws Exception { + public void testReadWrite() throws Exception { ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE); NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); @@ -434,7 +433,34 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { eq(actorSelection(actorRef)), eqSerializedReadData()); expectBatchedModifications(actorRef, 1); - expectReadyTransaction(actorRef); + + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE); + + transactionProxy.read(TestModel.TEST_PATH); + + transactionProxy.write(TestModel.TEST_PATH, nodeToWrite); + + transactionProxy.read(TestModel.TEST_PATH); + + transactionProxy.read(TestModel.TEST_PATH); + + List batchedModifications = captureBatchedModifications(actorRef); + assertEquals("Captured BatchedModifications count", 1, batchedModifications.size()); + + verifyBatchedModifications(batchedModifications.get(0), false, + new WriteModification(TestModel.TEST_PATH, nodeToWrite)); + } + + @Test + public void testReadyWithReadWrite() throws Exception { + ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE); + + NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); + + doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync( + eq(actorSelection(actorRef)), eqSerializedReadData()); + + expectBatchedModificationsReady(actorRef); TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE); @@ -452,9 +478,33 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { verify(mockActorContext).executeOperationAsync(eq(actorSelection(actorRef)), isA(BatchedModifications.class)); + } - verify(mockActorContext).executeOperationAsync(eq(actorSelection(actorRef)), - isA(ReadyTransaction.SERIALIZABLE_CLASS)); + @Test + public void testReadyWithNoModifications() throws Exception { + ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE); + + doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync( + eq(actorSelection(actorRef)), eqSerializedReadData()); + + expectBatchedModificationsReady(actorRef); + + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE); + + transactionProxy.read(TestModel.TEST_PATH); + + DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready(); + + assertTrue(ready instanceof ThreePhaseCommitCohortProxy); + + ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready; + + verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path())); + + List batchedModifications = captureBatchedModifications(actorRef); + assertEquals("Captured BatchedModifications count", 1, batchedModifications.size()); + + verifyBatchedModifications(batchedModifications.get(0), true); } @Test @@ -465,7 +515,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); - expectBatchedModificationsReady(actorRef, 1); + expectBatchedModificationsReady(actorRef); TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY); @@ -496,7 +546,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); - expectBatchedModificationsReady(actorRef, 1); + expectBatchedModificationsReady(actorRef); TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY); @@ -691,18 +741,13 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE); doReturn(true).when(mockActorContext).isPathLocal(anyString()); - doReturn(batchedModificationsReply(1)).when(mockActorContext).executeOperationAsync( - any(ActorSelection.class), isA(BatchedModifications.class)); + expectBatchedModificationsReady(actorRef); TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE); NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); transactionProxy.write(TestModel.TEST_PATH, nodeToWrite); - // testing ready - doReturn(readyTxReply(actorRef.path().toString())).when(mockActorContext).executeOperationAsync( - eq(actorSelection(actorRef)), isA(ReadyTransaction.class)); - DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready(); assertTrue(ready instanceof ThreePhaseCommitCohortProxy); @@ -1130,8 +1175,6 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { expectBatchedModifications(actorRef, shardBatchedModificationCount); - expectReadyTransaction(actorRef); - YangInstanceIdentifier writePath1 = TestModel.TEST_PATH; NormalizedNode writeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME); @@ -1176,8 +1219,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { verifyBatchedModifications(batchedModifications.get(1), false, new MergeModification(mergePath1, mergeNode1), new MergeModification(mergePath2, mergeNode2), new WriteModification(writePath3, writeNode3)); - boolean optimizedWriteOnly = type == WRITE_ONLY && dataStoreContextBuilder.build().isWriteOnlyTransactionOptimizationsEnabled(); - verifyBatchedModifications(batchedModifications.get(2), optimizedWriteOnly, new MergeModification(mergePath3, mergeNode3), + verifyBatchedModifications(batchedModifications.get(2), true, new MergeModification(mergePath3, mergeNode3), new DeleteModification(deletePath2)); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumShardTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumShardTest.java index cc860eafc7..9e1557ae3c 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumShardTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumShardTest.java @@ -11,7 +11,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.inOrder; -import static org.opendaylight.controller.cluster.datastore.DataStoreVersions.CURRENT_VERSION; +import static org.opendaylight.controller.cluster.datastore.DataStoreVersions.HELIUM_2_VERSION; import akka.actor.ActorRef; import akka.actor.PoisonPill; import akka.dispatch.Dispatchers; @@ -246,7 +246,7 @@ public class PreLithiumShardTest extends AbstractShardTest { // Simulate the ForwardedReadyTransaction message for the first Tx that would be sent // by the ShardTransaction. - shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION, + shard.tell(new ForwardedReadyTransaction(transactionID1, HELIUM_2_VERSION, cohort1, modification1, true), getRef()); ReadyTransactionReply readyReply = ReadyTransactionReply.fromSerializable( expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS)); @@ -261,11 +261,11 @@ public class PreLithiumShardTest extends AbstractShardTest { // Send the ForwardedReadyTransaction for the next 2 Tx's. - shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION, + shard.tell(new ForwardedReadyTransaction(transactionID2, HELIUM_2_VERSION, cohort2, modification2, true), getRef()); expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS); - shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION, + shard.tell(new ForwardedReadyTransaction(transactionID3, HELIUM_2_VERSION, cohort3, modification3, true), getRef()); expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumTransactionProxyTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumTransactionProxyTest.java index 2980f83564..4cf8b67ddb 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumTransactionProxyTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumTransactionProxyTest.java @@ -33,6 +33,7 @@ import org.opendaylight.controller.cluster.datastore.messages.DeleteDataReply; import org.opendaylight.controller.cluster.datastore.messages.MergeData; import org.opendaylight.controller.cluster.datastore.messages.MergeDataReply; import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction; +import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply; import org.opendaylight.controller.cluster.datastore.messages.WriteData; import org.opendaylight.controller.cluster.datastore.messages.WriteDataReply; import org.opendaylight.controller.md.cluster.datastore.model.TestModel; @@ -41,6 +42,7 @@ import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCoh import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; +import scala.concurrent.Future; /** * Unit tests for backwards compatibility with pre-Lithium versions. @@ -93,6 +95,10 @@ public class PreLithiumTransactionProxyTest extends AbstractTransactionProxyTest return argThat(matcher); } + private Future readySerializedTxReply(String path, short version) { + return Futures.successful(new ReadyTransactionReply(path, version).toSerializable()); + } + private ActorRef testCompatibilityWithHeliumVersion(short version) throws Exception { ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE, version); @@ -110,7 +116,7 @@ public class PreLithiumTransactionProxyTest extends AbstractTransactionProxyTest doReturn(Futures.successful(new DeleteDataReply().toSerializable(version))).when(mockActorContext). executeOperationAsync(eq(actorSelection(actorRef)), eqLegacyDeleteData(TestModel.TEST_PATH)); - doReturn(readySerializedTxReply(actorRef.path().toString())).when(mockActorContext).executeOperationAsync( + doReturn(readySerializedTxReply(actorRef.path().toString(), version)).when(mockActorContext).executeOperationAsync( eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS)); doReturn(actorRef.path().toString()).when(mockActorContext).resolvePath(eq(actorRef.path().toString()), @@ -170,7 +176,7 @@ public class PreLithiumTransactionProxyTest extends AbstractTransactionProxyTest doReturn(Futures.successful(new WriteDataReply().toSerializable(version))).when(mockActorContext). executeOperationAsync(eq(actorSelection(actorRef)), eqLegacyWriteData(testNode)); - doReturn(readySerializedTxReply(actorRef.path().toString())).when(mockActorContext).executeOperationAsync( + doReturn(readySerializedTxReply(actorRef.path().toString(), version)).when(mockActorContext).executeOperationAsync( eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS)); doReturn(actorRef.path().toString()).when(mockActorContext).resolvePath(eq(actorRef.path().toString()), diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/messages/BatchedModificationsTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/messages/BatchedModificationsTest.java index c4027ad2a5..b302f527d6 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/messages/BatchedModificationsTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/messages/BatchedModificationsTest.java @@ -91,11 +91,5 @@ public class BatchedModificationsTest { BatchedModificationsReply clone = (BatchedModificationsReply) SerializationUtils.clone( (Serializable) new BatchedModificationsReply(100).toSerializable()); assertEquals("getNumBatched", 100, clone.getNumBatched()); - assertEquals("getCohortPath", null, clone.getCohortPath()); - - clone = (BatchedModificationsReply) SerializationUtils.clone( - (Serializable) new BatchedModificationsReply(50, "cohort path").toSerializable()); - assertEquals("getNumBatched", 50, clone.getNumBatched()); - assertEquals("getCohortPath", "cohort path", clone.getCohortPath()); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/messages/ReadyTransactionReplyTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/messages/ReadyTransactionReplyTest.java new file mode 100644 index 0000000000..db525eafbe --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/messages/ReadyTransactionReplyTest.java @@ -0,0 +1,51 @@ +/* + * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore.messages; + +import static org.junit.Assert.assertEquals; +import java.io.Serializable; +import org.apache.commons.lang.SerializationUtils; +import org.junit.Test; +import org.opendaylight.controller.cluster.datastore.DataStoreVersions; +import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages; + +/** + * Unit tests for ReadyTransactionReply. + * + * @author Thomas Pantelis + */ +public class ReadyTransactionReplyTest { + + @Test + public void testSerialization() { + String cohortPath = "cohort path"; + ReadyTransactionReply expected = new ReadyTransactionReply(cohortPath); + + Object serialized = expected.toSerializable(); + assertEquals("Serialized type", ReadyTransactionReply.class, serialized.getClass()); + + ReadyTransactionReply actual = ReadyTransactionReply.fromSerializable(SerializationUtils.clone( + (Serializable) serialized)); + assertEquals("getVersion", DataStoreVersions.CURRENT_VERSION, actual.getVersion()); + assertEquals("getCohortPath", cohortPath, actual.getCohortPath()); + } + + @Test + public void testSerializationWithPreLithiumVersion() throws Exception { + String cohortPath = "cohort path"; + ReadyTransactionReply expected = new ReadyTransactionReply(cohortPath, DataStoreVersions.HELIUM_2_VERSION); + + Object serialized = expected.toSerializable(); + assertEquals("Serialized type", ShardTransactionMessages.ReadyTransactionReply.class, serialized.getClass()); + + ReadyTransactionReply actual = ReadyTransactionReply.fromSerializable(SerializationUtils.clone( + (Serializable) serialized)); + assertEquals("getVersion", DataStoreVersions.HELIUM_2_VERSION, actual.getVersion()); + assertEquals("getCohortPath", cohortPath, actual.getCohortPath()); + } +}