2 * Copyright (c) 2018 Inocybe Technologies and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.datastore;
10 import static org.junit.Assert.assertNotNull;
11 import static org.opendaylight.controller.cluster.datastore.DataStoreVersions.CURRENT_VERSION;
12 import static org.opendaylight.controller.md.cluster.datastore.model.TestModel.ID_QNAME;
13 import static org.opendaylight.controller.md.cluster.datastore.model.TestModel.INNER_LIST_QNAME;
14 import static org.opendaylight.controller.md.cluster.datastore.model.TestModel.NAME_QNAME;
15 import static org.opendaylight.controller.md.cluster.datastore.model.TestModel.OUTER_LIST_PATH;
16 import static org.opendaylight.controller.md.cluster.datastore.model.TestModel.OUTER_LIST_QNAME;
17 import static org.opendaylight.controller.md.cluster.datastore.model.TestModel.TEST_PATH;
18 import static org.opendaylight.controller.md.cluster.datastore.model.TestModel.TEST_QNAME;
19 import static org.opendaylight.controller.md.cluster.datastore.model.TestModel.innerEntryPath;
20 import static org.opendaylight.controller.md.cluster.datastore.model.TestModel.innerMapPath;
21 import static org.opendaylight.controller.md.cluster.datastore.model.TestModel.innerNode;
22 import static org.opendaylight.controller.md.cluster.datastore.model.TestModel.outerEntryPath;
23 import static org.opendaylight.controller.md.cluster.datastore.model.TestModel.outerMapNode;
24 import static org.opendaylight.controller.md.cluster.datastore.model.TestModel.outerNode;
26 import akka.dispatch.Dispatchers;
27 import akka.testkit.TestActorRef;
28 import akka.testkit.javadsl.TestKit;
29 import com.google.common.collect.ImmutableSortedSet;
30 import java.time.Duration;
31 import java.util.SortedSet;
32 import org.junit.Test;
33 import org.opendaylight.controller.cluster.access.concepts.MemberName;
34 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
35 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
36 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
37 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
38 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
39 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
40 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
41 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
42 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
43 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
44 import org.slf4j.Logger;
45 import org.slf4j.LoggerFactory;
48 * Unit tests for various 3PC coordination scenarios.
50 * @author Thomas Pantelis
52 public class ShardCommitCoordinationTest extends AbstractShardTest {
53 private static final Logger LOG = LoggerFactory.getLogger(ShardCommitCoordinationTest.class);
56 * Test 2 tx's accessing the same shards.
58 * tx1 -> shard A, shard B
59 * tx2 -> shard A, shard B
61 * The tx's are readied such the pendingTransactions queue are as follows:
63 * Queue for shard A -> tx1, tx2
64 * Queue for shard B -> tx2, tx1
66 * This is a potential deadlock scenario (ABBA) which should be avoided by allowing tx1 to proceed on shard B
67 * even though it isn't at the head of the queues.
70 public void testTwoTransactionsWithSameTwoParticipatingShards() {
71 final String testName = "testTwoTransactionsWithSameTwoParticipatingShards";
72 LOG.info("{} starting", testName);
74 final TestKit kit1 = new TestKit(getSystem());
75 final TestKit kit2 = new TestKit(getSystem());
77 final ShardIdentifier shardAId = ShardIdentifier.create("shardA", MemberName.forName(testName), "config");
78 final ShardIdentifier shardBId = ShardIdentifier.create("shardB", MemberName.forName(testName), "config");
80 final TestActorRef<Shard> shardA = actorFactory.createTestActor(
81 newShardBuilder().id(shardAId).props().withDispatcher(Dispatchers.DefaultDispatcherId()));
82 ShardTestKit.waitUntilLeader(shardA);
84 final TestActorRef<Shard> shardB = actorFactory.createTestActor(
85 newShardBuilder().id(shardBId).props().withDispatcher(Dispatchers.DefaultDispatcherId()));
86 ShardTestKit.waitUntilLeader(shardB);
88 final TransactionIdentifier txId1 = nextTransactionId();
89 final TransactionIdentifier txId2 = nextTransactionId();
91 SortedSet<String> participatingShardNames = ImmutableSortedSet.of(shardAId.getShardName(),
92 shardBId.getShardName());
94 // Ready [tx1, tx2] on shard A.
96 shardA.tell(newReadyBatchedModifications(txId1, TEST_PATH,
97 ImmutableNodes.containerNode(TEST_QNAME), participatingShardNames), kit1.getRef());
98 kit1.expectMsgClass(ReadyTransactionReply.class);
100 shardA.tell(newReadyBatchedModifications(txId2, OUTER_LIST_PATH, outerNode(1),
101 participatingShardNames), kit2.getRef());
102 kit2.expectMsgClass(ReadyTransactionReply.class);
104 // Ready [tx2, tx1] on shard B.
106 shardB.tell(newReadyBatchedModifications(txId2, OUTER_LIST_PATH, outerNode(1),
107 participatingShardNames), kit2.getRef());
108 kit2.expectMsgClass(ReadyTransactionReply.class);
110 shardB.tell(newReadyBatchedModifications(txId1, TEST_PATH,
111 ImmutableNodes.containerNode(TEST_QNAME), participatingShardNames), kit1.getRef());
112 kit1.expectMsgClass(ReadyTransactionReply.class);
114 // Send tx2 CanCommit to A - tx1 is at the head of the queue so tx2 should not proceed as A is the first shard
115 // in the participating shard list.
117 shardA.tell(new CanCommitTransaction(txId2, CURRENT_VERSION).toSerializable(), kit2.getRef());
118 kit2.expectNoMessage(Duration.ofMillis(100));
120 // Send tx1 CanCommit to A - it's at the head of the queue so should proceed.
122 shardA.tell(new CanCommitTransaction(txId1, CURRENT_VERSION).toSerializable(), kit1.getRef());
123 kit1.expectMsgClass(CanCommitTransactionReply.class);
125 // Send tx1 CanCommit to B - tx2 is at the head of the queue but the preceding shards in tx1's participating
126 // shard list [A] matches that of tx2 [A] so tx1 should be de-queued and allowed to proceed.
128 shardB.tell(new CanCommitTransaction(txId1, CURRENT_VERSION).toSerializable(), kit1.getRef());
129 kit1.expectMsgClass(CanCommitTransactionReply.class);
131 // Send tx2 CanCommit to B - tx1 should now be at the head of he queue.
133 shardB.tell(new CanCommitTransaction(txId2, CURRENT_VERSION).toSerializable(), kit2.getRef());
134 kit2.expectNoMessage(Duration.ofMillis(100));
136 // Finish commit of tx1.
138 shardA.tell(new CommitTransaction(txId1, CURRENT_VERSION).toSerializable(), kit1.getRef());
139 kit1.expectMsgClass(CommitTransactionReply.class);
141 shardB.tell(new CommitTransaction(txId1, CURRENT_VERSION).toSerializable(), kit1.getRef());
142 kit1.expectMsgClass(CommitTransactionReply.class);
144 // Finish commit of tx2.
146 kit2.expectMsgClass(CanCommitTransactionReply.class);
147 kit2.expectMsgClass(CanCommitTransactionReply.class);
149 shardA.tell(new CommitTransaction(txId2, CURRENT_VERSION).toSerializable(), kit2.getRef());
150 kit2.expectMsgClass(CommitTransactionReply.class);
152 shardB.tell(new CommitTransaction(txId2, CURRENT_VERSION).toSerializable(), kit2.getRef());
153 kit2.expectMsgClass(CommitTransactionReply.class);
155 // Verify data in the data store.
157 verifyOuterListEntry(shardA, 1);
158 verifyOuterListEntry(shardB, 1);
160 LOG.info("{} ending", testName);
164 * Test multiple tx's accessing a mix of same and differing shards.
166 * tx1 -> shard X, shard B
167 * tx2 -> shard X, shard B
168 * tx3 -> shard A, shard B
169 * tx4 -> shard A, shard B
170 * tx5 -> shard A, shard B
172 * The tx's are readied such the pendingTransactions queue are as follows:
174 * Queue for shard A -> tx3, tx4, tx5
175 * Queue for shard B -> tx1, tx2, tx5, tx4, tx3
177 * Note: shard X means any other shard which isn't relevant for the test.
178 * This is a potential deadlock scenario (ABBA) which should be avoided by moving tx3 ahead of tx5 on shard B when
179 * CanCommit is requested.
182 public void testMultipleTransactionsWithMixedParticipatingShards() {
183 final String testName = "testMultipleTransactionsWithMixedParticipatingShards";
184 LOG.info("{} starting", testName);
186 final TestKit kit1 = new TestKit(getSystem());
187 final TestKit kit2 = new TestKit(getSystem());
188 final TestKit kit3 = new TestKit(getSystem());
189 final TestKit kit4 = new TestKit(getSystem());
190 final TestKit kit5 = new TestKit(getSystem());
192 final ShardIdentifier shardAId = ShardIdentifier.create("shardA", MemberName.forName(testName), "config");
193 final ShardIdentifier shardBId = ShardIdentifier.create("shardB", MemberName.forName(testName), "config");
195 final TestActorRef<Shard> shardA = actorFactory.createTestActor(
196 newShardBuilder().id(shardAId).props().withDispatcher(Dispatchers.DefaultDispatcherId()));
197 ShardTestKit.waitUntilLeader(shardA);
199 final TestActorRef<Shard> shardB = actorFactory.createTestActor(
200 newShardBuilder().id(shardBId).props().withDispatcher(Dispatchers.DefaultDispatcherId()));
201 ShardTestKit.waitUntilLeader(shardB);
203 final TransactionIdentifier txId1 = nextTransactionId();
204 final TransactionIdentifier txId2 = nextTransactionId();
205 final TransactionIdentifier txId3 = nextTransactionId();
206 final TransactionIdentifier txId4 = nextTransactionId();
207 final TransactionIdentifier txId5 = nextTransactionId();
209 final SortedSet<String> participatingShardNames1 = ImmutableSortedSet.of(shardAId.getShardName(),
210 shardBId.getShardName());
211 final SortedSet<String> participatingShardNames2 = ImmutableSortedSet.of("shardX", shardBId.getShardName());
213 // Ready [tx3, tx4, tx5] on shard A.
215 shardA.tell(newReadyBatchedModifications(txId3, TEST_PATH,
216 ImmutableNodes.containerNode(TEST_QNAME), participatingShardNames1), kit3.getRef());
217 kit3.expectMsgClass(ReadyTransactionReply.class);
219 shardA.tell(newReadyBatchedModifications(txId4, OUTER_LIST_PATH, outerMapNode(),
220 participatingShardNames1), kit4.getRef());
221 kit4.expectMsgClass(ReadyTransactionReply.class);
223 shardA.tell(newReadyBatchedModifications(txId5, outerEntryPath(1),
224 ImmutableNodes.mapEntry(OUTER_LIST_QNAME, ID_QNAME, 1), participatingShardNames1), kit5.getRef());
225 kit5.expectMsgClass(ReadyTransactionReply.class);
227 // Ready [tx1, tx2, tx5, tx4, tx3] on shard B.
229 shardB.tell(newReadyBatchedModifications(txId1, TEST_PATH,
230 ImmutableNodes.containerNode(TEST_QNAME), participatingShardNames2), kit1.getRef());
231 kit1.expectMsgClass(ReadyTransactionReply.class);
233 shardB.tell(newReadyBatchedModifications(txId2, OUTER_LIST_PATH, outerMapNode(),
234 participatingShardNames2), kit2.getRef());
235 kit2.expectMsgClass(ReadyTransactionReply.class);
237 shardB.tell(newReadyBatchedModifications(txId5, innerEntryPath(1, "one"),
238 ImmutableNodes.mapEntry(INNER_LIST_QNAME, NAME_QNAME, "one"), participatingShardNames1), kit5.getRef());
239 kit5.expectMsgClass(ReadyTransactionReply.class);
241 shardB.tell(newReadyBatchedModifications(txId4, innerMapPath(1), innerNode(),
242 participatingShardNames1), kit4.getRef());
243 kit4.expectMsgClass(ReadyTransactionReply.class);
245 shardB.tell(newReadyBatchedModifications(txId3, outerEntryPath(1),
246 ImmutableNodes.mapEntry(OUTER_LIST_QNAME, ID_QNAME, 1), participatingShardNames1), kit3.getRef());
247 kit3.expectMsgClass(ReadyTransactionReply.class);
249 // Send tx3 CanCommit to A - it's at the head of the queue so should proceed.
251 shardA.tell(new CanCommitTransaction(txId3, CURRENT_VERSION).toSerializable(), kit3.getRef());
252 kit3.expectMsgClass(CanCommitTransactionReply.class);
254 // Send tx1 CanCommit to B - it's at the head of the queue so should proceed.
256 shardB.tell(new CanCommitTransaction(txId1, CURRENT_VERSION).toSerializable(), kit1.getRef());
257 kit1.expectMsgClass(CanCommitTransactionReply.class);
259 // Send tx3 CanCommit to B - tx1 is at the head of the queue but the preceding shards in tx3's participating
260 // shard list [A] matches that of tx5 so tx3 should be moved ahead of tx5 in the queue.
262 shardB.tell(new CanCommitTransaction(txId3, CURRENT_VERSION).toSerializable(), kit3.getRef());
263 kit3.expectNoMessage(Duration.ofMillis(100));
265 // Send tx4 CanCommit to B - tx4's participating shard list [A] matches that of tx3 and tx5 - so tx4 should
266 // be moved ahead of tx5 in the queue but not tx3 since should be in the CAN_COMMIT_PENDING state.
268 shardB.tell(new CanCommitTransaction(txId4, CURRENT_VERSION).toSerializable(), kit4.getRef());
269 kit4.expectNoMessage(Duration.ofMillis(100));
271 // Send tx5 CanCommit to B - it's position in the queue should remain the same.
273 shardB.tell(new CanCommitTransaction(txId5, CURRENT_VERSION).toSerializable(), kit5.getRef());
274 kit5.expectNoMessage(Duration.ofMillis(100));
276 // Finish commit of tx1.
278 shardB.tell(new CommitTransaction(txId1, CURRENT_VERSION).toSerializable(), kit1.getRef());
279 kit1.expectMsgClass(CommitTransactionReply.class);
281 // Finish commit of tx2.
283 shardB.tell(new CanCommitTransaction(txId2, CURRENT_VERSION).toSerializable(), kit2.getRef());
284 kit2.expectMsgClass(CanCommitTransactionReply.class);
286 shardB.tell(new CommitTransaction(txId2, CURRENT_VERSION).toSerializable(), kit2.getRef());
287 kit2.expectMsgClass(CommitTransactionReply.class);
289 // Finish commit of tx3.
292 kit3.expectMsgClass(CanCommitTransactionReply.class);
294 shardA.tell(new CommitTransaction(txId3, CURRENT_VERSION).toSerializable(), kit3.getRef());
295 kit3.expectMsgClass(CommitTransactionReply.class);
297 shardB.tell(new CommitTransaction(txId3, CURRENT_VERSION).toSerializable(), kit3.getRef());
298 kit3.expectMsgClass(CommitTransactionReply.class);
300 // Finish commit of tx4.
303 kit4.expectMsgClass(CanCommitTransactionReply.class);
305 shardA.tell(new CanCommitTransaction(txId4, CURRENT_VERSION).toSerializable(), kit4.getRef());
306 kit4.expectMsgClass(CanCommitTransactionReply.class);
307 shardA.tell(new CommitTransaction(txId4, CURRENT_VERSION).toSerializable(), kit4.getRef());
308 kit4.expectMsgClass(CommitTransactionReply.class);
310 shardB.tell(new CommitTransaction(txId4, CURRENT_VERSION).toSerializable(), kit4.getRef());
311 kit4.expectMsgClass(CommitTransactionReply.class);
313 // Finish commit of tx5.
316 kit5.expectMsgClass(CanCommitTransactionReply.class);
318 shardA.tell(new CanCommitTransaction(txId5, CURRENT_VERSION).toSerializable(), kit5.getRef());
319 kit5.expectMsgClass(CanCommitTransactionReply.class);
320 shardA.tell(new CommitTransaction(txId5, CURRENT_VERSION).toSerializable(), kit5.getRef());
321 kit5.expectMsgClass(CommitTransactionReply.class);
323 shardB.tell(new CommitTransaction(txId5, CURRENT_VERSION).toSerializable(), kit5.getRef());
324 kit5.expectMsgClass(CommitTransactionReply.class);
326 verifyOuterListEntry(shardA, 1);
327 verifyInnerListEntry(shardB, 1, "one");
329 LOG.info("{} ending", testName);
333 * Test 2 tx's accessing 2 shards, the second in common.
335 * tx1 -> shard A, shard C
336 * tx2 -> shard B, shard C
338 * The tx's are readied such the pendingTransactions queue are as follows:
340 * Queue for shard A -> tx1
341 * Queue for shard B -> tx2
342 * Queue for shard C -> tx2, tx1
344 * When the tx's re committed verify the ready order is preserved.
347 public void testTwoTransactionsWithOneCommonParticipatingShard1() {
348 final String testName = "testTwoTransactionsWithOneCommonParticipatingShard1";
349 LOG.info("{} starting", testName);
351 final TestKit kit1 = new TestKit(getSystem());
352 final TestKit kit2 = new TestKit(getSystem());
354 final ShardIdentifier shardAId = ShardIdentifier.create("shardA", MemberName.forName(testName), "config");
355 final ShardIdentifier shardBId = ShardIdentifier.create("shardB", MemberName.forName(testName), "config");
356 final ShardIdentifier shardCId = ShardIdentifier.create("shardC", MemberName.forName(testName), "config");
358 final TestActorRef<Shard> shardA = actorFactory.createTestActor(
359 newShardBuilder().id(shardAId).props().withDispatcher(Dispatchers.DefaultDispatcherId()));
360 ShardTestKit.waitUntilLeader(shardA);
362 final TestActorRef<Shard> shardB = actorFactory.createTestActor(
363 newShardBuilder().id(shardBId).props().withDispatcher(Dispatchers.DefaultDispatcherId()));
364 ShardTestKit.waitUntilLeader(shardB);
366 final TestActorRef<Shard> shardC = actorFactory.createTestActor(
367 newShardBuilder().id(shardCId).props().withDispatcher(Dispatchers.DefaultDispatcherId()));
368 ShardTestKit.waitUntilLeader(shardC);
370 final TransactionIdentifier txId1 = nextTransactionId();
371 final TransactionIdentifier txId2 = nextTransactionId();
373 SortedSet<String> participatingShardNames1 =
374 ImmutableSortedSet.of(shardAId.getShardName(), shardCId.getShardName());
375 SortedSet<String> participatingShardNames2 =
376 ImmutableSortedSet.of(shardBId.getShardName(), shardCId.getShardName());
378 // Ready [tx1] on shard A.
380 shardA.tell(newReadyBatchedModifications(txId1, TEST_PATH,
381 ImmutableNodes.containerNode(TEST_QNAME), participatingShardNames1), kit1.getRef());
382 kit1.expectMsgClass(ReadyTransactionReply.class);
384 // Ready [tx2] on shard B.
386 shardB.tell(newReadyBatchedModifications(txId2, TEST_PATH,
387 ImmutableNodes.containerNode(TEST_QNAME), participatingShardNames2), kit2.getRef());
388 kit2.expectMsgClass(ReadyTransactionReply.class);
390 // Ready [tx2, tx1] on shard C.
392 shardC.tell(newReadyBatchedModifications(txId2, TEST_PATH,
393 ImmutableNodes.containerNode(TEST_QNAME), participatingShardNames2), kit2.getRef());
394 kit2.expectMsgClass(ReadyTransactionReply.class);
396 shardC.tell(newReadyBatchedModifications(txId1, OUTER_LIST_PATH, outerNode(1),
397 participatingShardNames1), kit1.getRef());
398 kit1.expectMsgClass(ReadyTransactionReply.class);
400 // Send tx1 CanCommit to A - should succeed.
402 shardA.tell(new CanCommitTransaction(txId1, CURRENT_VERSION).toSerializable(), kit1.getRef());
403 kit1.expectMsgClass(CanCommitTransactionReply.class);
405 // Send tx2 CanCommit to B - should succeed.
407 shardB.tell(new CanCommitTransaction(txId2, CURRENT_VERSION).toSerializable(), kit2.getRef());
408 kit2.expectMsgClass(CanCommitTransactionReply.class);
410 // Send tx1 CanCommit to C - tx2 is at the head of the queue but the preceding shards in tx1's participating
411 // shard list [A] do not match that of tx2 [B] so tx1 should not be allowed to proceed.
413 shardC.tell(new CanCommitTransaction(txId1, CURRENT_VERSION).toSerializable(), kit1.getRef());
414 kit1.expectNoMessage(Duration.ofMillis(100));
416 // Send tx2 CanCommit to C - it's at the head of the queue so should proceed.
418 shardC.tell(new CanCommitTransaction(txId2, CURRENT_VERSION).toSerializable(), kit2.getRef());
419 kit2.expectMsgClass(CanCommitTransactionReply.class);
421 // Finish commit of tx2.
423 shardB.tell(new CommitTransaction(txId2, CURRENT_VERSION).toSerializable(), kit2.getRef());
424 kit2.expectMsgClass(CommitTransactionReply.class);
426 shardC.tell(new CommitTransaction(txId2, CURRENT_VERSION).toSerializable(), kit2.getRef());
427 kit2.expectMsgClass(CommitTransactionReply.class);
429 // Finish commit of tx1.
431 kit1.expectMsgClass(CanCommitTransactionReply.class);
432 shardA.tell(new CommitTransaction(txId1, CURRENT_VERSION).toSerializable(), kit1.getRef());
433 kit1.expectMsgClass(CommitTransactionReply.class);
435 shardC.tell(new CommitTransaction(txId1, CURRENT_VERSION).toSerializable(), kit1.getRef());
436 kit1.expectMsgClass(CommitTransactionReply.class);
438 // Verify data in the data store.
440 verifyOuterListEntry(shardC, 1);
442 LOG.info("{} ending", testName);
446 * Test 2 tx's accessing 2 shards, the first for one and the second for the other in common.
448 * tx1 -> shard A, shard B
449 * tx2 -> shard B, shard C
451 * The tx's are readied such the pendingTransactions queue are as follows:
453 * Queue for shard A -> tx1
454 * Queue for shard B -> tx2, tx1
455 * Queue for shard C -> tx2
457 * When the tx's re committed verify the ready order is preserved.
460 public void testTwoTransactionsWithOneCommonParticipatingShard2() {
461 final String testName = "testTwoTransactionsWithOneCommonParticipatingShard2";
462 LOG.info("{} starting", testName);
464 final TestKit kit1 = new TestKit(getSystem());
465 final TestKit kit2 = new TestKit(getSystem());
467 final ShardIdentifier shardAId = ShardIdentifier.create("shardA", MemberName.forName(testName), "config");
468 final ShardIdentifier shardBId = ShardIdentifier.create("shardB", MemberName.forName(testName), "config");
469 final ShardIdentifier shardCId = ShardIdentifier.create("shardC", MemberName.forName(testName), "config");
471 final TestActorRef<Shard> shardA = actorFactory.createTestActor(
472 newShardBuilder().id(shardAId).props().withDispatcher(Dispatchers.DefaultDispatcherId()));
473 ShardTestKit.waitUntilLeader(shardA);
475 final TestActorRef<Shard> shardB = actorFactory.createTestActor(
476 newShardBuilder().id(shardBId).props().withDispatcher(Dispatchers.DefaultDispatcherId()));
477 ShardTestKit.waitUntilLeader(shardB);
479 final TestActorRef<Shard> shardC = actorFactory.createTestActor(
480 newShardBuilder().id(shardCId).props().withDispatcher(Dispatchers.DefaultDispatcherId()));
481 ShardTestKit.waitUntilLeader(shardC);
483 final TransactionIdentifier txId1 = nextTransactionId();
484 final TransactionIdentifier txId2 = nextTransactionId();
486 SortedSet<String> participatingShardNames1 =
487 ImmutableSortedSet.of(shardAId.getShardName(), shardBId.getShardName());
488 SortedSet<String> participatingShardNames2 =
489 ImmutableSortedSet.of(shardBId.getShardName(), shardCId.getShardName());
491 // Ready [tx1] on shard A.
493 shardA.tell(newReadyBatchedModifications(txId1, TEST_PATH,
494 ImmutableNodes.containerNode(TEST_QNAME), participatingShardNames1), kit1.getRef());
495 kit1.expectMsgClass(ReadyTransactionReply.class);
497 // Ready [tx2, tx1] on shard B.
499 shardB.tell(newReadyBatchedModifications(txId2, TEST_PATH,
500 ImmutableNodes.containerNode(TEST_QNAME), participatingShardNames2), kit2.getRef());
501 kit2.expectMsgClass(ReadyTransactionReply.class);
503 shardB.tell(newReadyBatchedModifications(txId1, OUTER_LIST_PATH, outerNode(1),
504 participatingShardNames1), kit1.getRef());
505 kit1.expectMsgClass(ReadyTransactionReply.class);
507 // Ready [tx2] on shard C.
509 shardC.tell(newReadyBatchedModifications(txId2, TEST_PATH,
510 ImmutableNodes.containerNode(TEST_QNAME), participatingShardNames2), kit2.getRef());
511 kit2.expectMsgClass(ReadyTransactionReply.class);
513 // Send tx1 CanCommit to A - should succeed.
515 shardA.tell(new CanCommitTransaction(txId1, CURRENT_VERSION).toSerializable(), kit1.getRef());
516 kit1.expectMsgClass(CanCommitTransactionReply.class);
518 // Send tx1 CanCommit to B - tx2 is at the head of the queue but the preceding shards in tx1's participating
519 // shard list [A] do not match that of tx2 [] so tx1 should not be allowed to proceed.
521 shardB.tell(new CanCommitTransaction(txId1, CURRENT_VERSION).toSerializable(), kit1.getRef());
522 kit1.expectNoMessage(Duration.ofMillis(100));
524 // Send tx2 CanCommit to B - it's at the head of the queue so should proceed.
526 shardB.tell(new CanCommitTransaction(txId2, CURRENT_VERSION).toSerializable(), kit2.getRef());
527 kit2.expectMsgClass(CanCommitTransactionReply.class);
529 // Finish commit of tx2.
531 shardC.tell(new CanCommitTransaction(txId2, CURRENT_VERSION).toSerializable(), kit2.getRef());
532 kit2.expectMsgClass(CanCommitTransactionReply.class);
534 shardB.tell(new CommitTransaction(txId2, CURRENT_VERSION).toSerializable(), kit2.getRef());
535 kit2.expectMsgClass(CommitTransactionReply.class);
537 shardC.tell(new CommitTransaction(txId2, CURRENT_VERSION).toSerializable(), kit2.getRef());
538 kit2.expectMsgClass(CommitTransactionReply.class);
540 // Finish commit of tx1.
542 kit1.expectMsgClass(CanCommitTransactionReply.class);
543 shardA.tell(new CommitTransaction(txId1, CURRENT_VERSION).toSerializable(), kit1.getRef());
544 kit1.expectMsgClass(CommitTransactionReply.class);
546 shardB.tell(new CommitTransaction(txId1, CURRENT_VERSION).toSerializable(), kit1.getRef());
547 kit1.expectMsgClass(CommitTransactionReply.class);
549 // Verify data in the data store.
551 verifyOuterListEntry(shardB, 1);
553 LOG.info("{} ending", testName);
556 static void verifyInnerListEntry(TestActorRef<Shard> shard, int outerID, String innerID) {
557 final YangInstanceIdentifier path = innerEntryPath(outerID, innerID);
558 final NormalizedNode<?, ?> innerListEntry = readStore(shard, path);
559 assertNotNull(path + " not found", innerListEntry);