X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShardTest.java;h=c31acdad9361c2dd07053da257bbcf66eb11e3a5;hb=9412c00daf7c8c3f108d1284a6288753ba67f6ac;hp=23a200f6bcda05b11afcd24b248ed8145f400122;hpb=38909c073135e7b36759f2627c16673c2561926b;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java index 23a200f6bc..c31acdad93 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java @@ -58,6 +58,7 @@ import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransacti import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction; import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply; import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction; +import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply; import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved; import org.opendaylight.controller.cluster.datastore.messages.ReadData; import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply; @@ -97,9 +98,6 @@ import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelpe import org.opendaylight.controller.md.cluster.datastore.model.TestModel; import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker; import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; -import org.opendaylight.controller.protobuff.messages.cohort3pc.ThreePhaseCommitCohortMessages; -import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply; -import org.opendaylight.yangtools.yang.common.QName; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode; import org.opendaylight.yangtools.yang.data.api.schema.MapNode; @@ -119,8 +117,6 @@ import scala.concurrent.Future; import scala.concurrent.duration.FiniteDuration; public class ShardTest extends AbstractShardTest { - private static final QName CARS_QNAME = QName.create("urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:test:cars", "2014-03-13", "cars"); - private static final String DUMMY_DATA = "Dummy data as snapshot sequence number is set to 0 in InMemorySnapshotStore and journal recovery seq number will start from 1"; @Test @@ -356,13 +352,13 @@ public class ShardTest extends AbstractShardTest { shard.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); - shard.tell(new CreateTransaction("txn-1", - TransactionType.READ_ONLY.ordinal() ).toSerializable(), getRef()); + shard.tell(new CreateTransaction("txn-1", TransactionType.READ_ONLY.ordinal(), null, + DataStoreVersions.CURRENT_VERSION).toSerializable(), getRef()); final CreateTransactionReply reply = expectMsgClass(duration("3 seconds"), CreateTransactionReply.class); - final String path = reply.getTransactionActorPath().toString(); + final String path = reply.getTransactionPath().toString(); assertTrue("Unexpected transaction path " + path, path.contains("akka://test/user/testCreateTransaction/shard-txn-1")); }}; @@ -375,14 +371,13 @@ public class ShardTest extends AbstractShardTest { waitUntilLeader(shard); - shard.tell(new CreateTransaction("txn-1", - TransactionType.READ_ONLY.ordinal() , "foobar").toSerializable(), - getRef()); + shard.tell(new CreateTransaction("txn-1",TransactionType.READ_ONLY.ordinal(), "foobar", + DataStoreVersions.CURRENT_VERSION).toSerializable(), getRef()); final CreateTransactionReply reply = expectMsgClass(duration("3 seconds"), CreateTransactionReply.class); - final String path = reply.getTransactionActorPath().toString(); + final String path = reply.getTransactionPath().toString(); assertTrue("Unexpected transaction path " + path, path.contains("akka://test/user/testCreateTransactionOnChain/shard-txn-1")); }}; @@ -564,9 +559,9 @@ public class ShardTest extends AbstractShardTest { // Send the CanCommitTransaction message for the first Tx. - shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef()); + shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef()); final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable( - expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS)); + expectMsgClass(duration, CanCommitTransactionReply.class)); assertEquals("Can commit", true, canCommitReply.getCanCommit()); shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort2, transactionID2, modification2), getRef()); @@ -579,16 +574,16 @@ public class ShardTest extends AbstractShardTest { // processed after the first Tx completes. final Future canCommitFuture1 = Patterns.ask(shard, - new CanCommitTransaction(transactionID2).toSerializable(), timeout); + new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), timeout); final Future canCommitFuture2 = Patterns.ask(shard, - new CanCommitTransaction(transactionID3).toSerializable(), timeout); + new CanCommitTransaction(transactionID3, CURRENT_VERSION).toSerializable(), timeout); // Send the CommitTransaction message for the first Tx. After it completes, it should // trigger the 2nd Tx to proceed which should in turn then trigger the 3rd. - shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef()); - expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS); + shard.tell(new CommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef()); + expectMsgClass(duration, CommitTransactionReply.class); // Wait for the next 2 Tx's to complete. @@ -622,7 +617,7 @@ public class ShardTest extends AbstractShardTest { class OnCommitFutureComplete extends OnFutureComplete { OnCommitFutureComplete() { - super(CommitTransactionReply.SERIALIZABLE_CLASS); + super(CommitTransactionReply.class); } @Override @@ -636,7 +631,7 @@ public class ShardTest extends AbstractShardTest { private final String transactionID; OnCanCommitFutureComplete(final String transactionID) { - super(CanCommitTransactionReply.SERIALIZABLE_CLASS); + super(CanCommitTransactionReply.class); this.transactionID = transactionID; } @@ -647,7 +642,7 @@ public class ShardTest extends AbstractShardTest { assertEquals("Can commit", true, canCommitReply.getCanCommit()); final Future commitFuture = Patterns.ask(shard, - new CommitTransaction(transactionID).toSerializable(), timeout); + new CommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), timeout); commitFuture.onComplete(new OnCommitFutureComplete(), getSystem().dispatcher()); } } @@ -730,15 +725,15 @@ public class ShardTest extends AbstractShardTest { // Send the CanCommitTransaction message. - shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef()); + shard.tell(new CanCommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef()); final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable( - expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS)); + expectMsgClass(duration, CanCommitTransactionReply.class)); assertEquals("Can commit", true, canCommitReply.getCanCommit()); // Send the CanCommitTransaction message. - shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef()); - expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS); + shard.tell(new CommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef()); + expectMsgClass(duration, CommitTransactionReply.class); final InOrder inOrder = inOrder(mockCohort.get()); inOrder.verify(mockCohort.get()).canCommit(); @@ -793,7 +788,7 @@ public class ShardTest extends AbstractShardTest { TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(), ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true, true, 3), getRef()); - expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS); + expectMsgClass(duration, CommitTransactionReply.class); final InOrder inOrder = inOrder(mockCohort.get()); inOrder.verify(mockCohort.get()).canCommit(); @@ -891,24 +886,25 @@ public class ShardTest extends AbstractShardTest { // Create a read Tx on the same chain. - shard.tell(new CreateTransaction(transactionID2, TransactionType.READ_ONLY.ordinal() , - transactionChainID).toSerializable(), getRef()); + shard.tell(new CreateTransaction(transactionID2, TransactionType.READ_ONLY.ordinal(), + transactionChainID, DataStoreVersions.CURRENT_VERSION).toSerializable(), getRef()); final CreateTransactionReply createReply = expectMsgClass(duration("3 seconds"), CreateTransactionReply.class); - getSystem().actorSelection(createReply.getTransactionActorPath()).tell(new ReadData(path), getRef()); + getSystem().actorSelection(createReply.getTransactionPath()).tell( + new ReadData(path, DataStoreVersions.CURRENT_VERSION), getRef()); final ReadDataReply readReply = expectMsgClass(duration("3 seconds"), ReadDataReply.class); assertEquals("Read node", containerNode, readReply.getNormalizedNode()); // Commit the write transaction. - shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef()); + shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef()); final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable( - expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS)); + expectMsgClass(duration, CanCommitTransactionReply.class)); assertEquals("Can commit", true, canCommitReply.getCanCommit()); - shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef()); - expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS); + shard.tell(new CommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef()); + expectMsgClass(duration, CommitTransactionReply.class); // Verify data in the data store. @@ -1008,7 +1004,7 @@ public class ShardTest extends AbstractShardTest { shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID, modification, true), getRef()); - expectMsgClass(duration, ThreePhaseCommitCohortMessages.CommitTransactionReply.class); + expectMsgClass(duration, CommitTransactionReply.class); final InOrder inOrder = inOrder(cohort); inOrder.verify(cohort).canCommit(); @@ -1044,7 +1040,7 @@ public class ShardTest extends AbstractShardTest { shard.tell(readyMessage, getRef()); - expectMsgClass(CommitTransactionReply.SERIALIZABLE_CLASS); + expectMsgClass(CommitTransactionReply.class); final NormalizedNode actualNode = readStore(shard, TestModel.OUTER_LIST_PATH); assertEquals(TestModel.OUTER_LIST_QNAME.getLocalName(), mergeData, actualNode); @@ -1079,15 +1075,15 @@ public class ShardTest extends AbstractShardTest { // Send the CanCommitTransaction message. - shard.tell(new CanCommitTransaction(txId).toSerializable(), getRef()); + shard.tell(new CanCommitTransaction(txId, CURRENT_VERSION).toSerializable(), getRef()); final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable( - expectMsgClass(CanCommitTransactionReply.SERIALIZABLE_CLASS)); + expectMsgClass(CanCommitTransactionReply.class)); assertEquals("Can commit", true, canCommitReply.getCanCommit()); // Send the CanCommitTransaction message. - shard.tell(new CommitTransaction(txId).toSerializable(), getRef()); - expectMsgClass(CommitTransactionReply.SERIALIZABLE_CLASS); + shard.tell(new CommitTransaction(txId, CURRENT_VERSION).toSerializable(), getRef()); + expectMsgClass(CommitTransactionReply.class); final NormalizedNode actualNode = readStore(shard, TestModel.OUTER_LIST_PATH); assertEquals(TestModel.OUTER_LIST_QNAME.getLocalName(), mergeData, actualNode); @@ -1126,15 +1122,15 @@ public class ShardTest extends AbstractShardTest { // Send the CanCommitTransaction message. - shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef()); + shard.tell(new CanCommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef()); final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable( - expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS)); + expectMsgClass(duration, CanCommitTransactionReply.class)); assertEquals("Can commit", true, canCommitReply.getCanCommit()); // Send the CanCommitTransaction message. - shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef()); - expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS); + shard.tell(new CommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef()); + expectMsgClass(duration, CommitTransactionReply.class); final InOrder inOrder = inOrder(cohort); inOrder.verify(cohort).canCommit(); @@ -1178,13 +1174,13 @@ public class ShardTest extends AbstractShardTest { // Send the CanCommitTransaction message. - shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef()); + shard.tell(new CanCommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef()); final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable( - expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS)); + expectMsgClass(duration, CanCommitTransactionReply.class)); assertEquals("Can commit", true, canCommitReply.getCanCommit()); - shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef()); - expectMsgClass(duration, ThreePhaseCommitCohortMessages.CommitTransactionReply.class); + shard.tell(new CommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef()); + expectMsgClass(duration, CommitTransactionReply.class); final InOrder inOrder = inOrder(cohort); inOrder.verify(cohort).canCommit(); @@ -1235,13 +1231,13 @@ public class ShardTest extends AbstractShardTest { // Send the CanCommitTransaction message. - shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef()); + shard.tell(new CanCommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef()); final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable( - expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS)); + expectMsgClass(duration, CanCommitTransactionReply.class)); assertEquals("Can commit", true, canCommitReply.getCanCommit()); - shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef()); - expectMsgClass(duration, ThreePhaseCommitCohortMessages.CommitTransactionReply.class); + shard.tell(new CommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef()); + expectMsgClass(duration, CommitTransactionReply.class); final InOrder inOrder = inOrder(cohort); inOrder.verify(cohort).canCommit(); @@ -1302,21 +1298,21 @@ public class ShardTest extends AbstractShardTest { // Send the CanCommitTransaction message for the first Tx. - shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef()); + shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef()); final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable( - expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS)); + expectMsgClass(duration, CanCommitTransactionReply.class)); assertEquals("Can commit", true, canCommitReply.getCanCommit()); // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and // processed after the first Tx completes. final Future canCommitFuture = Patterns.ask(shard, - new CanCommitTransaction(transactionID2).toSerializable(), timeout); + new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), timeout); // Send the CommitTransaction message for the first Tx. This should send back an error // and trigger the 2nd Tx to proceed. - shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef()); + shard.tell(new CommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef()); expectMsgClass(duration, akka.actor.Status.Failure.class); // Wait for the 2nd Tx to complete the canCommit phase. @@ -1375,21 +1371,21 @@ public class ShardTest extends AbstractShardTest { // Send the CanCommitTransaction message for the first Tx. - shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef()); + shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef()); final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable( - expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS)); + expectMsgClass(duration, CanCommitTransactionReply.class)); assertEquals("Can commit", true, canCommitReply.getCanCommit()); // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and // processed after the first Tx completes. final Future canCommitFuture = Patterns.ask(shard, - new CanCommitTransaction(transactionID2).toSerializable(), timeout); + new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), timeout); // Send the CommitTransaction message for the first Tx. This should send back an error // and trigger the 2nd Tx to proceed. - shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef()); + shard.tell(new CommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef()); expectMsgClass(duration, akka.actor.Status.Failure.class); // Wait for the 2nd Tx to complete the canCommit phase. @@ -1437,7 +1433,7 @@ public class ShardTest extends AbstractShardTest { // Send the CanCommitTransaction message. - shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef()); + shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef()); expectMsgClass(duration, akka.actor.Status.Failure.class); // Send another can commit to ensure the failed one got cleaned up. @@ -1450,9 +1446,9 @@ public class ShardTest extends AbstractShardTest { shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID2, modification), getRef()); expectMsgClass(duration, ReadyTransactionReply.class); - shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef()); + shard.tell(new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), getRef()); final CanCommitTransactionReply reply = CanCommitTransactionReply.fromSerializable( - expectMsgClass(CanCommitTransactionReply.SERIALIZABLE_CLASS)); + expectMsgClass(CanCommitTransactionReply.class)); assertEquals("getCanCommit", true, reply.getCanCommit()); }}; } @@ -1483,9 +1479,9 @@ public class ShardTest extends AbstractShardTest { // Send the CanCommitTransaction message. - shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef()); + shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef()); CanCommitTransactionReply reply = CanCommitTransactionReply.fromSerializable( - expectMsgClass(CanCommitTransactionReply.SERIALIZABLE_CLASS)); + expectMsgClass(CanCommitTransactionReply.class)); assertEquals("getCanCommit", false, reply.getCanCommit()); // Send another can commit to ensure the failed one got cleaned up. @@ -1498,9 +1494,9 @@ public class ShardTest extends AbstractShardTest { shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID2, modification), getRef()); expectMsgClass(duration, ReadyTransactionReply.class); - shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef()); + shard.tell(new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), getRef()); reply = CanCommitTransactionReply.fromSerializable( - expectMsgClass(CanCommitTransactionReply.SERIALIZABLE_CLASS)); + expectMsgClass(CanCommitTransactionReply.class)); assertEquals("getCanCommit", true, reply.getCanCommit()); }}; } @@ -1546,7 +1542,7 @@ public class ShardTest extends AbstractShardTest { shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID2, modification, true), getRef()); - expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS); + expectMsgClass(duration, CommitTransactionReply.class); }}; } @@ -1591,7 +1587,7 @@ public class ShardTest extends AbstractShardTest { shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID2, modification, true), getRef()); - expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS); + expectMsgClass(duration, CommitTransactionReply.class); }}; } @@ -1641,13 +1637,13 @@ public class ShardTest extends AbstractShardTest { shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID, modification), getRef()); expectMsgClass(duration, ReadyTransactionReply.class); - shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef()); + shard.tell(new CanCommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef()); final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable( - expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS)); + expectMsgClass(duration, CanCommitTransactionReply.class)); assertEquals("Can commit", true, canCommitReply.getCanCommit()); - shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef()); - expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS); + shard.tell(new CommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef()); + expectMsgClass(duration, CommitTransactionReply.class); final NormalizedNode node = readStore(shard, TestModel.TEST_PATH); @@ -1713,23 +1709,23 @@ public class ShardTest extends AbstractShardTest { // canCommit 1st Tx. We don't send the commit so it should timeout. - shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef()); - expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS); + shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef()); + expectMsgClass(duration, CanCommitTransactionReply.class); // canCommit the 2nd Tx - it should complete after the 1st Tx times out. - shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef()); - expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS); + shard.tell(new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), getRef()); + expectMsgClass(duration, CanCommitTransactionReply.class); // Try to commit the 1st Tx - should fail as it's not the current Tx. - shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef()); + shard.tell(new CommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef()); expectMsgClass(duration, akka.actor.Status.Failure.class); // Commit the 2nd Tx. - shard.tell(new CommitTransaction(transactionID2).toSerializable(), getRef()); - expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS); + shard.tell(new CommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), getRef()); + expectMsgClass(duration, CommitTransactionReply.class); final NormalizedNode node = readStore(shard, listNodePath); assertNotNull(listNodePath + " not found", node); @@ -1783,16 +1779,16 @@ public class ShardTest extends AbstractShardTest { // canCommit 1st Tx. - shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef()); - expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS); + shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef()); + expectMsgClass(duration, CanCommitTransactionReply.class); // canCommit the 2nd Tx - it should get queued. - shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef()); + shard.tell(new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), getRef()); // canCommit the 3rd Tx - should exceed queue capacity and fail. - shard.tell(new CanCommitTransaction(transactionID3).toSerializable(), getRef()); + shard.tell(new CanCommitTransaction(transactionID3, CURRENT_VERSION).toSerializable(), getRef()); expectMsgClass(duration, akka.actor.Status.Failure.class); }}; } @@ -1839,8 +1835,8 @@ public class ShardTest extends AbstractShardTest { // All Tx's are readied. We'll send canCommit for the last one but not the others. The others // should expire from the queue and the last one should be processed. - shard.tell(new CanCommitTransaction(transactionID3).toSerializable(), getRef()); - expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS); + shard.tell(new CanCommitTransaction(transactionID3, CURRENT_VERSION).toSerializable(), getRef()); + expectMsgClass(duration, CanCommitTransactionReply.class); }}; } @@ -1869,8 +1865,8 @@ public class ShardTest extends AbstractShardTest { // CanCommit the first one so it's the current in-progress CohortEntry. - shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef()); - expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS); + shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef()); + expectMsgClass(duration, CanCommitTransactionReply.class); // Ready the second Tx. @@ -1896,12 +1892,12 @@ public class ShardTest extends AbstractShardTest { // Commit the first Tx. After completing, the second should expire from the queue and the third // Tx committed. - shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef()); - expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS); + shard.tell(new CommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef()); + expectMsgClass(duration, CommitTransactionReply.class); // Expect commit reply from the third Tx. - expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS); + expectMsgClass(duration, CommitTransactionReply.class); final NormalizedNode node = readStore(shard, TestModel.TEST2_PATH); assertNotNull(TestModel.TEST2_PATH + " not found", node); @@ -1915,7 +1911,7 @@ public class ShardTest extends AbstractShardTest { newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), "testCanCommitBeforeReadyFailure"); - shard.tell(new CanCommitTransaction("tx").toSerializable(), getRef()); + shard.tell(new CanCommitTransaction("tx", CURRENT_VERSION).toSerializable(), getRef()); expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class); }}; } @@ -1958,22 +1954,22 @@ public class ShardTest extends AbstractShardTest { // Send the CanCommitTransaction message for the first Tx. - shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef()); + shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef()); final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable( - expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS)); + expectMsgClass(duration, CanCommitTransactionReply.class)); assertEquals("Can commit", true, canCommitReply.getCanCommit()); // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and // processed after the first Tx completes. final Future canCommitFuture = Patterns.ask(shard, - new CanCommitTransaction(transactionID2).toSerializable(), timeout); + new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), timeout); // Send the AbortTransaction message for the first Tx. This should trigger the 2nd // Tx to proceed. - shard.tell(new AbortTransaction(transactionID1).toSerializable(), getRef()); - expectMsgClass(duration, AbortTransactionReply.SERIALIZABLE_CLASS); + shard.tell(new AbortTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef()); + expectMsgClass(duration, AbortTransactionReply.class); // Wait for the 2nd Tx to complete the canCommit phase. @@ -2036,8 +2032,8 @@ public class ShardTest extends AbstractShardTest { // Send the AbortTransaction message. - shard.tell(new AbortTransaction(transactionID).toSerializable(), getRef()); - expectMsgClass(duration, AbortTransactionReply.SERIALIZABLE_CLASS); + shard.tell(new AbortTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef()); + expectMsgClass(duration, AbortTransactionReply.class); verify(cohort).abort(); @@ -2051,7 +2047,7 @@ public class ShardTest extends AbstractShardTest { // Now send CanCommitTransaction - should fail. - shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef()); + shard.tell(new CanCommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef()); Throwable failure = expectMsgClass(duration, akka.actor.Status.Failure.class).cause(); assertTrue("Failure type", failure instanceof IllegalStateException);