+ waitUntilLeader(shard);
+
+ overrideLeaderCalls.set(true);
+
+ final BatchedModifications batched = new BatchedModifications(nextTransactionId(), DataStoreVersions.CURRENT_VERSION);
+
+ shard.tell(batched, ActorRef.noSender());
+
+ expectMsgEquals(batched);
+ }};
+ }
+
+ @Test
+ public void testTransactionMessagesWithNoLeader() {
+ new ShardTestKit(getSystem()) {{
+ dataStoreContextBuilder.customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName()).
+ shardHeartbeatIntervalInMillis(50).shardElectionTimeoutFactor(1);
+ final TestActorRef<Shard> shard = actorFactory.createTestActor(
+ newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
+ "testTransactionMessagesWithNoLeader");
+
+ waitUntilNoLeader(shard);
+
+ final TransactionIdentifier txId = nextTransactionId();
+ shard.tell(new BatchedModifications(txId, DataStoreVersions.CURRENT_VERSION), getRef());
+ Failure failure = expectMsgClass(Failure.class);
+ assertEquals("Failure cause type", NoShardLeaderException.class, failure.cause().getClass());
+
+ shard.tell(prepareForwardedReadyTransaction(shard, txId, TestModel.TEST_PATH,
+ ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
+ failure = expectMsgClass(Failure.class);
+ assertEquals("Failure cause type", NoShardLeaderException.class, failure.cause().getClass());
+
+ shard.tell(new ReadyLocalTransaction(txId, mock(DataTreeModification.class), true), getRef());
+ failure = expectMsgClass(Failure.class);
+ assertEquals("Failure cause type", NoShardLeaderException.class, failure.cause().getClass());
+ }};
+ }
+
+ @Test
+ public void testReadyWithReadWriteImmediateCommit() throws Exception{
+ testReadyWithImmediateCommit(true);
+ }
+
+ @Test
+ public void testReadyWithWriteOnlyImmediateCommit() throws Exception{
+ testReadyWithImmediateCommit(false);
+ }
+
+ private void testReadyWithImmediateCommit(final boolean readWrite) throws Exception{
+ new ShardTestKit(getSystem()) {{
+ final TestActorRef<Shard> shard = actorFactory.createTestActor(
+ newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
+ "testReadyWithImmediateCommit-" + readWrite);
+
+ waitUntilLeader(shard);
+
+ final TransactionIdentifier transactionID = nextTransactionId();
+ final NormalizedNode<?, ?> containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+ if(readWrite) {
+ shard.tell(prepareForwardedReadyTransaction(shard, transactionID, TestModel.TEST_PATH,
+ containerNode, true), getRef());
+ } else {
+ shard.tell(prepareBatchedModifications(transactionID, TestModel.TEST_PATH, containerNode, true), getRef());
+ }
+
+ expectMsgClass(duration("5 seconds"), CommitTransactionReply.class);
+
+ final NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.TEST_PATH);
+ assertEquals(TestModel.TEST_QNAME.getLocalName(), containerNode, actualNode);
+ }};
+ }
+
+ @Test
+ public void testReadyLocalTransactionWithImmediateCommit() throws Exception{
+ new ShardTestKit(getSystem()) {{
+ final TestActorRef<Shard> shard = actorFactory.createTestActor(
+ newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
+ "testReadyLocalTransactionWithImmediateCommit");
+
+ waitUntilLeader(shard);
+
+ final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
+
+ final DataTreeModification modification = dataStore.newModification();
+
+ final ContainerNode writeData = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+ new WriteModification(TestModel.TEST_PATH, writeData).apply(modification);
+ final MapNode mergeData = ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build();
+ new MergeModification(TestModel.OUTER_LIST_PATH, mergeData).apply(modification);
+
+ final TransactionIdentifier txId = nextTransactionId();
+ modification.ready();
+ final ReadyLocalTransaction readyMessage = new ReadyLocalTransaction(txId, modification, true);
+
+ shard.tell(readyMessage, getRef());
+
+ expectMsgClass(CommitTransactionReply.class);
+
+ final NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.OUTER_LIST_PATH);
+ assertEquals(TestModel.OUTER_LIST_QNAME.getLocalName(), mergeData, actualNode);
+ }};
+ }
+
+ @Test
+ public void testReadyLocalTransactionWithThreePhaseCommit() throws Exception{
+ new ShardTestKit(getSystem()) {{
+ final TestActorRef<Shard> shard = actorFactory.createTestActor(
+ newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
+ "testReadyLocalTransactionWithThreePhaseCommit");
+
+ waitUntilLeader(shard);
+
+ final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
+
+ final DataTreeModification modification = dataStore.newModification();
+
+ final ContainerNode writeData = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+ new WriteModification(TestModel.TEST_PATH, writeData).apply(modification);
+ final MapNode mergeData = ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build();
+ new MergeModification(TestModel.OUTER_LIST_PATH, mergeData).apply(modification);
+
+ final TransactionIdentifier txId = nextTransactionId();
+ modification.ready();
+ final ReadyLocalTransaction readyMessage = new ReadyLocalTransaction(txId, modification, false);
+
+ shard.tell(readyMessage, getRef());
+
+ expectMsgClass(ReadyTransactionReply.class);
+
+ // Send the CanCommitTransaction message.
+
+ shard.tell(new CanCommitTransaction(txId, CURRENT_VERSION).toSerializable(), getRef());
+ final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
+ expectMsgClass(CanCommitTransactionReply.class));
+ assertEquals("Can commit", true, canCommitReply.getCanCommit());
+
+ // Send the CanCommitTransaction message.
+
+ shard.tell(new CommitTransaction(txId, CURRENT_VERSION).toSerializable(), getRef());
+ expectMsgClass(CommitTransactionReply.class);
+
+ final NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.OUTER_LIST_PATH);
+ assertEquals(TestModel.OUTER_LIST_QNAME.getLocalName(), mergeData, actualNode);
+ }};
+ }
+
+ @Test
+ public void testReadWriteCommitWithPersistenceDisabled() throws Throwable {
+ dataStoreContextBuilder.persistent(false);
+ new ShardTestKit(getSystem()) {{
+ final TestActorRef<Shard> shard = actorFactory.createTestActor(
+ newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
+ "testCommitWithPersistenceDisabled");
+
+ waitUntilLeader(shard);
+
+ // Setup a simulated transactions with a mock cohort.
+
+ final FiniteDuration duration = duration("5 seconds");
+
+ final TransactionIdentifier transactionID = nextTransactionId();
+ final NormalizedNode<?, ?> containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+ shard.tell(prepareBatchedModifications(transactionID, TestModel.TEST_PATH, containerNode, false), getRef());
+ expectMsgClass(duration, ReadyTransactionReply.class);
+
+ // Send the CanCommitTransaction message.
+
+ shard.tell(new CanCommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
+ final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
+ expectMsgClass(duration, CanCommitTransactionReply.class));
+ assertEquals("Can commit", true, canCommitReply.getCanCommit());
+
+ // Send the CanCommitTransaction message.
+
+ shard.tell(new CommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
+ expectMsgClass(duration, CommitTransactionReply.class);
+
+ final NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.TEST_PATH);
+ assertEquals(TestModel.TEST_QNAME.getLocalName(), containerNode, actualNode);
+ }};
+ }
+
+ @Test
+ public void testReadWriteCommitWhenTransactionHasNoModifications() {
+ testCommitWhenTransactionHasNoModifications(true);
+ }
+
+ @Test
+ public void testWriteOnlyCommitWhenTransactionHasNoModifications() {
+ testCommitWhenTransactionHasNoModifications(false);
+ }
+
+ private void testCommitWhenTransactionHasNoModifications(final boolean readWrite){
+ // Note that persistence is enabled which would normally result in the entry getting written to the journal
+ // but here that need not happen
+ new ShardTestKit(getSystem()) {{
+ final TestActorRef<Shard> shard = actorFactory.createTestActor(
+ newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
+ "testCommitWhenTransactionHasNoModifications-" + readWrite);
+
+ waitUntilLeader(shard);
+
+ final TransactionIdentifier transactionID = nextTransactionId();
+
+ final FiniteDuration duration = duration("5 seconds");
+
+ if(readWrite) {
+ ReadWriteShardDataTreeTransaction rwTx = shard.underlyingActor().getDataStore().
+ newReadWriteTransaction(transactionID);
+ shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION, rwTx, false), getRef());
+ } else {
+ shard.tell(prepareBatchedModifications(transactionID, new MutableCompositeModification()), getRef());
+ }
+
+ expectMsgClass(duration, ReadyTransactionReply.class);
+
+ // Send the CanCommitTransaction message.
+
+ shard.tell(new CanCommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
+ final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
+ expectMsgClass(duration, CanCommitTransactionReply.class));
+ assertEquals("Can commit", true, canCommitReply.getCanCommit());
+
+ shard.tell(new CommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
+ expectMsgClass(duration, CommitTransactionReply.class);
+
+ shard.tell(Shard.GET_SHARD_MBEAN_MESSAGE, getRef());
+ final ShardStats shardStats = expectMsgClass(duration, ShardStats.class);
+
+ // Use MBean for verification
+ // Committed transaction count should increase as usual
+ assertEquals(1,shardStats.getCommittedTransactionsCount());
+
+ // Commit index should not advance because this does not go into the journal
+ assertEquals(-1, shardStats.getCommitIndex());
+ }};
+ }
+
+ @Test
+ public void testReadWriteCommitWhenTransactionHasModifications() throws Exception {
+ testCommitWhenTransactionHasModifications(true);
+ }
+
+ @Test
+ public void testWriteOnlyCommitWhenTransactionHasModifications() throws Exception {
+ testCommitWhenTransactionHasModifications(false);
+ }
+
+ private void testCommitWhenTransactionHasModifications(final boolean readWrite) throws Exception {
+ new ShardTestKit(getSystem()) {{
+ final TipProducingDataTree dataTree = createDelegatingMockDataTree();
+ final TestActorRef<Shard> shard = actorFactory.createTestActor(
+ newShardBuilder().dataTree(dataTree).props().withDispatcher(Dispatchers.DefaultDispatcherId()),
+ "testCommitWhenTransactionHasModifications-" + readWrite);
+
+ waitUntilLeader(shard);
+
+ final FiniteDuration duration = duration("5 seconds");
+ final TransactionIdentifier transactionID = nextTransactionId();
+
+ if(readWrite) {
+ shard.tell(prepareForwardedReadyTransaction(shard, transactionID, TestModel.TEST_PATH,
+ ImmutableNodes.containerNode(TestModel.TEST_QNAME), false), getRef());
+ } else {
+ shard.tell(prepareBatchedModifications(transactionID, TestModel.TEST_PATH,
+ ImmutableNodes.containerNode(TestModel.TEST_QNAME), false), getRef());
+ }
+
+ expectMsgClass(duration, ReadyTransactionReply.class);
+
+ // Send the CanCommitTransaction message.
+
+ shard.tell(new CanCommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
+ final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
+ expectMsgClass(duration, CanCommitTransactionReply.class));