import akka.actor.ActorSelection;
import akka.actor.PoisonPill;
import akka.actor.Props;
+import akka.actor.Status.Failure;
import akka.dispatch.Dispatchers;
import akka.dispatch.OnComplete;
import akka.japi.Creator;
}
private static BatchedModifications newBatchedModifications(String transactionID, YangInstanceIdentifier path,
- NormalizedNode<?, ?> data, boolean ready, boolean doCommitOnReady) {
- return newBatchedModifications(transactionID, null, path, data, ready, doCommitOnReady);
+ NormalizedNode<?, ?> data, boolean ready, boolean doCommitOnReady, int messagesSent) {
+ return newBatchedModifications(transactionID, null, path, data, ready, doCommitOnReady, messagesSent);
}
private static BatchedModifications newBatchedModifications(String transactionID, String transactionChainID,
- YangInstanceIdentifier path, NormalizedNode<?, ?> data, boolean ready, boolean doCommitOnReady) {
+ YangInstanceIdentifier path, NormalizedNode<?, ?> data, boolean ready, boolean doCommitOnReady,
+ int messagesSent) {
BatchedModifications batched = new BatchedModifications(transactionID, CURRENT_VERSION, transactionChainID);
batched.addModification(new WriteModification(path, data));
batched.setReady(ready);
batched.setDoCommitOnReady(doCommitOnReady);
+ batched.setTotalMessagesSent(messagesSent);
return batched;
}
// Send a BatchedModifications to start a transaction.
shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH,
- ImmutableNodes.containerNode(TestModel.TEST_QNAME), false, false), getRef());
+ ImmutableNodes.containerNode(TestModel.TEST_QNAME), false, false, 1), getRef());
expectMsgClass(duration, BatchedModificationsReply.class);
// Send a couple more BatchedModifications.
shard.tell(newBatchedModifications(transactionID, TestModel.OUTER_LIST_PATH,
- ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), false, false), getRef());
+ ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), false, false, 2), getRef());
expectMsgClass(duration, BatchedModificationsReply.class);
shard.tell(newBatchedModifications(transactionID, YangInstanceIdentifier.builder(
TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
- ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true, false), getRef());
+ ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true, false, 3), getRef());
expectMsgClass(duration, ReadyTransactionReply.class);
// Send the CanCommitTransaction message.
// Send a BatchedModifications to start a transaction.
shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH,
- ImmutableNodes.containerNode(TestModel.TEST_QNAME), false, false), getRef());
+ ImmutableNodes.containerNode(TestModel.TEST_QNAME), false, false, 1), getRef());
expectMsgClass(duration, BatchedModificationsReply.class);
// Send a couple more BatchedModifications.
shard.tell(newBatchedModifications(transactionID, TestModel.OUTER_LIST_PATH,
- ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), false, false), getRef());
+ ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), false, false, 2), getRef());
expectMsgClass(duration, BatchedModificationsReply.class);
shard.tell(newBatchedModifications(transactionID, YangInstanceIdentifier.builder(
TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
- ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true, true), getRef());
+ ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true, true, 3), getRef());
expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
}};
}
+ @Test(expected=IllegalStateException.class)
+ public void testBatchedModificationsReadyWithIncorrectTotalMessageCount() throws Throwable {
+ new ShardTestKit(getSystem()) {{
+ final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
+ newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
+ "testBatchedModificationsReadyWithIncorrectTotalMessageCount");
+
+ waitUntilLeader(shard);
+
+ String transactionID = "tx1";
+ BatchedModifications batched = new BatchedModifications(transactionID, DataStoreVersions.CURRENT_VERSION, null);
+ batched.setReady(true);
+ batched.setTotalMessagesSent(2);
+
+ shard.tell(batched, getRef());
+
+ Failure failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
+
+ shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
+
+ if(failure != null) {
+ throw failure.cause();
+ }
+ }};
+ }
+
@SuppressWarnings("unchecked")
private static void verifyOuterListEntry(final TestActorRef<Shard> shard, Object expIDValue) throws Exception {
NormalizedNode<?, ?> outerList = readStore(shard, TestModel.OUTER_LIST_PATH);
ContainerNode containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
YangInstanceIdentifier path = TestModel.TEST_PATH;
shard.tell(newBatchedModifications(transactionID1, transactionChainID, path,
- containerNode, true, false), getRef());
+ containerNode, true, false, 1), getRef());
expectMsgClass(duration, ReadyTransactionReply.class);
// Create a read Tx on the same chain.