2 * Copyright (c) 2018 Inocybe Technologies and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.datastore;
10 import static org.junit.Assert.assertNotNull;
11 import static org.opendaylight.controller.cluster.datastore.DataStoreVersions.CURRENT_VERSION;
12 import static org.opendaylight.controller.md.cluster.datastore.model.TestModel.ID_QNAME;
13 import static org.opendaylight.controller.md.cluster.datastore.model.TestModel.INNER_LIST_QNAME;
14 import static org.opendaylight.controller.md.cluster.datastore.model.TestModel.NAME_QNAME;
15 import static org.opendaylight.controller.md.cluster.datastore.model.TestModel.OUTER_LIST_PATH;
16 import static org.opendaylight.controller.md.cluster.datastore.model.TestModel.OUTER_LIST_QNAME;
17 import static org.opendaylight.controller.md.cluster.datastore.model.TestModel.TEST_PATH;
18 import static org.opendaylight.controller.md.cluster.datastore.model.TestModel.TEST_QNAME;
19 import static org.opendaylight.controller.md.cluster.datastore.model.TestModel.innerEntryPath;
20 import static org.opendaylight.controller.md.cluster.datastore.model.TestModel.innerMapPath;
21 import static org.opendaylight.controller.md.cluster.datastore.model.TestModel.innerNode;
22 import static org.opendaylight.controller.md.cluster.datastore.model.TestModel.outerEntryPath;
23 import static org.opendaylight.controller.md.cluster.datastore.model.TestModel.outerMapNode;
24 import static org.opendaylight.controller.md.cluster.datastore.model.TestModel.outerNode;
26 import akka.dispatch.Dispatchers;
27 import akka.testkit.TestActorRef;
28 import akka.testkit.javadsl.TestKit;
29 import com.google.common.collect.ImmutableSortedSet;
30 import java.util.SortedSet;
31 import java.util.concurrent.TimeUnit;
32 import org.junit.Test;
33 import org.opendaylight.controller.cluster.access.concepts.MemberName;
34 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
35 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
36 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
37 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
38 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
39 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
40 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
41 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
42 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
43 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
44 import org.slf4j.Logger;
45 import org.slf4j.LoggerFactory;
46 import scala.concurrent.duration.FiniteDuration;
49 * Unit tests for various 3PC coordination scenarios.
51 * @author Thomas Pantelis
53 public class ShardCommitCoordinationTest extends AbstractShardTest {
54 private static final Logger LOG = LoggerFactory.getLogger(ShardCommitCoordinationTest.class);
57 * Test 2 tx's accessing the same shards.
59 * tx1 -> shard A, shard B
60 * tx2 -> shard A, shard B
62 * The tx's are readied such the pendingTransactions queue are as follows:
64 * Queue for shard A -> tx1, tx2
65 * Queue for shard B -> tx2, tx1
67 * This is a potential deadlock scenario (ABBA) which should be avoided by allowing tx1 to proceed on shard B
68 * even though it isn't at the head of the queues.
71 public void testTwoTransactionsWithSameTwoParticipatingShards() throws Exception {
72 final String testName = "testTwoTransactionsWithSameTwoParticipatingShards";
73 LOG.info("{} starting", testName);
75 final TestKit kit1 = new TestKit(getSystem());
76 final TestKit kit2 = new TestKit(getSystem());
78 final ShardIdentifier shardAId = ShardIdentifier.create("shardA", MemberName.forName(testName), "config");
79 final ShardIdentifier shardBId = ShardIdentifier.create("shardB", MemberName.forName(testName), "config");
81 final TestActorRef<Shard> shardA = actorFactory.createTestActor(
82 newShardBuilder().id(shardAId).props().withDispatcher(Dispatchers.DefaultDispatcherId()));
83 ShardTestKit.waitUntilLeader(shardA);
85 final TestActorRef<Shard> shardB = actorFactory.createTestActor(
86 newShardBuilder().id(shardBId).props().withDispatcher(Dispatchers.DefaultDispatcherId()));
87 ShardTestKit.waitUntilLeader(shardB);
89 final TransactionIdentifier txId1 = nextTransactionId();
90 final TransactionIdentifier txId2 = nextTransactionId();
92 SortedSet<String> participatingShardNames = ImmutableSortedSet.of(shardAId.getShardName(),
93 shardBId.getShardName());
95 // Ready [tx1, tx2] on shard A.
97 shardA.tell(newReadyBatchedModifications(txId1, TEST_PATH,
98 ImmutableNodes.containerNode(TEST_QNAME), participatingShardNames), kit1.getRef());
99 kit1.expectMsgClass(ReadyTransactionReply.class);
101 shardA.tell(newReadyBatchedModifications(txId2, OUTER_LIST_PATH, outerNode(1),
102 participatingShardNames), kit2.getRef());
103 kit2.expectMsgClass(ReadyTransactionReply.class);
105 // Ready [tx2, tx1] on shard B.
107 shardB.tell(newReadyBatchedModifications(txId2, OUTER_LIST_PATH, outerNode(1),
108 participatingShardNames), kit2.getRef());
109 kit2.expectMsgClass(ReadyTransactionReply.class);
111 shardB.tell(newReadyBatchedModifications(txId1, TEST_PATH,
112 ImmutableNodes.containerNode(TEST_QNAME), participatingShardNames), kit1.getRef());
113 kit1.expectMsgClass(ReadyTransactionReply.class);
115 // Send tx2 CanCommit to A - tx1 is at the head of the queue so tx2 should not proceed as A is the first shard
116 // in the participating shard list.
118 shardA.tell(new CanCommitTransaction(txId2, CURRENT_VERSION).toSerializable(), kit2.getRef());
119 kit2.expectNoMessage(FiniteDuration.create(100, TimeUnit.MILLISECONDS));
121 // Send tx1 CanCommit to A - it's at the head of the queue so should proceed.
123 shardA.tell(new CanCommitTransaction(txId1, CURRENT_VERSION).toSerializable(), kit1.getRef());
124 kit1.expectMsgClass(CanCommitTransactionReply.class);
126 // Send tx1 CanCommit to B - tx2 is at the head of the queue but the preceding shards in tx1's participating
127 // shard list [A] matches that of tx2 [A] so tx1 should be de-queued and allowed to proceed.
129 shardB.tell(new CanCommitTransaction(txId1, CURRENT_VERSION).toSerializable(), kit1.getRef());
130 kit1.expectMsgClass(CanCommitTransactionReply.class);
132 // Send tx2 CanCommit to B - tx1 should now be at the head of he queue.
134 shardB.tell(new CanCommitTransaction(txId2, CURRENT_VERSION).toSerializable(), kit2.getRef());
135 kit2.expectNoMessage(FiniteDuration.create(100, TimeUnit.MILLISECONDS));
137 // Finish commit of tx1.
139 shardA.tell(new CommitTransaction(txId1, CURRENT_VERSION).toSerializable(), kit1.getRef());
140 kit1.expectMsgClass(CommitTransactionReply.class);
142 shardB.tell(new CommitTransaction(txId1, CURRENT_VERSION).toSerializable(), kit1.getRef());
143 kit1.expectMsgClass(CommitTransactionReply.class);
145 // Finish commit of tx2.
147 kit2.expectMsgClass(CanCommitTransactionReply.class);
148 kit2.expectMsgClass(CanCommitTransactionReply.class);
150 shardA.tell(new CommitTransaction(txId2, CURRENT_VERSION).toSerializable(), kit2.getRef());
151 kit2.expectMsgClass(CommitTransactionReply.class);
153 shardB.tell(new CommitTransaction(txId2, CURRENT_VERSION).toSerializable(), kit2.getRef());
154 kit2.expectMsgClass(CommitTransactionReply.class);
156 // Verify data in the data store.
158 verifyOuterListEntry(shardA, 1);
159 verifyOuterListEntry(shardB, 1);
161 LOG.info("{} ending", testName);
165 * Test multiple tx's accessing a mix of same and differing shards.
167 * tx1 -> shard X, shard B
168 * tx2 -> shard X, shard B
169 * tx3 -> shard A, shard B
170 * tx4 -> shard A, shard B
171 * tx5 -> shard A, shard B
173 * The tx's are readied such the pendingTransactions queue are as follows:
175 * Queue for shard A -> tx3, tx4, tx5
176 * Queue for shard B -> tx1, tx2, tx5, tx4, tx3
178 * Note: shard X means any other shard which isn't relevant for the test.
179 * This is a potential deadlock scenario (ABBA) which should be avoided by moving tx3 ahead of tx5 on shard B when
180 * CanCommit is requested.
183 public void testMultipleTransactionsWithMixedParticipatingShards() throws Exception {
184 final String testName = "testMultipleTransactionsWithMixedParticipatingShards";
185 LOG.info("{} starting", testName);
187 final TestKit kit1 = new TestKit(getSystem());
188 final TestKit kit2 = new TestKit(getSystem());
189 final TestKit kit3 = new TestKit(getSystem());
190 final TestKit kit4 = new TestKit(getSystem());
191 final TestKit kit5 = new TestKit(getSystem());
193 final ShardIdentifier shardAId = ShardIdentifier.create("shardA", MemberName.forName(testName), "config");
194 final ShardIdentifier shardBId = ShardIdentifier.create("shardB", MemberName.forName(testName), "config");
196 final TestActorRef<Shard> shardA = actorFactory.createTestActor(
197 newShardBuilder().id(shardAId).props().withDispatcher(Dispatchers.DefaultDispatcherId()));
198 ShardTestKit.waitUntilLeader(shardA);
200 final TestActorRef<Shard> shardB = actorFactory.createTestActor(
201 newShardBuilder().id(shardBId).props().withDispatcher(Dispatchers.DefaultDispatcherId()));
202 ShardTestKit.waitUntilLeader(shardB);
204 final TransactionIdentifier txId1 = nextTransactionId();
205 final TransactionIdentifier txId2 = nextTransactionId();
206 final TransactionIdentifier txId3 = nextTransactionId();
207 final TransactionIdentifier txId4 = nextTransactionId();
208 final TransactionIdentifier txId5 = nextTransactionId();
210 final SortedSet<String> participatingShardNames1 = ImmutableSortedSet.of(shardAId.getShardName(),
211 shardBId.getShardName());
212 final SortedSet<String> participatingShardNames2 = ImmutableSortedSet.of("shardX", shardBId.getShardName());
214 // Ready [tx3, tx4, tx5] on shard A.
216 shardA.tell(newReadyBatchedModifications(txId3, TEST_PATH,
217 ImmutableNodes.containerNode(TEST_QNAME), participatingShardNames1), kit3.getRef());
218 kit3.expectMsgClass(ReadyTransactionReply.class);
220 shardA.tell(newReadyBatchedModifications(txId4, OUTER_LIST_PATH, outerMapNode(),
221 participatingShardNames1), kit4.getRef());
222 kit4.expectMsgClass(ReadyTransactionReply.class);
224 shardA.tell(newReadyBatchedModifications(txId5, outerEntryPath(1),
225 ImmutableNodes.mapEntry(OUTER_LIST_QNAME, ID_QNAME, 1), participatingShardNames1), kit5.getRef());
226 kit5.expectMsgClass(ReadyTransactionReply.class);
228 // Ready [tx1, tx2, tx5, tx4, tx3] on shard B.
230 shardB.tell(newReadyBatchedModifications(txId1, TEST_PATH,
231 ImmutableNodes.containerNode(TEST_QNAME), participatingShardNames2), kit1.getRef());
232 kit1.expectMsgClass(ReadyTransactionReply.class);
234 shardB.tell(newReadyBatchedModifications(txId2, OUTER_LIST_PATH, outerMapNode(),
235 participatingShardNames2), kit2.getRef());
236 kit2.expectMsgClass(ReadyTransactionReply.class);
238 shardB.tell(newReadyBatchedModifications(txId5, innerEntryPath(1, "one"),
239 ImmutableNodes.mapEntry(INNER_LIST_QNAME, NAME_QNAME, "one"), participatingShardNames1), kit5.getRef());
240 kit5.expectMsgClass(ReadyTransactionReply.class);
242 shardB.tell(newReadyBatchedModifications(txId4, innerMapPath(1), innerNode(),
243 participatingShardNames1), kit4.getRef());
244 kit4.expectMsgClass(ReadyTransactionReply.class);
246 shardB.tell(newReadyBatchedModifications(txId3, outerEntryPath(1),
247 ImmutableNodes.mapEntry(OUTER_LIST_QNAME, ID_QNAME, 1), participatingShardNames1), kit3.getRef());
248 kit3.expectMsgClass(ReadyTransactionReply.class);
250 // Send tx3 CanCommit to A - it's at the head of the queue so should proceed.
252 shardA.tell(new CanCommitTransaction(txId3, CURRENT_VERSION).toSerializable(), kit3.getRef());
253 kit3.expectMsgClass(CanCommitTransactionReply.class);
255 // Send tx1 CanCommit to B - it's at the head of the queue so should proceed.
257 shardB.tell(new CanCommitTransaction(txId1, CURRENT_VERSION).toSerializable(), kit1.getRef());
258 kit1.expectMsgClass(CanCommitTransactionReply.class);
260 // Send tx3 CanCommit to B - tx1 is at the head of the queue but the preceding shards in tx3's participating
261 // shard list [A] matches that of tx5 so tx3 should be moved ahead of tx5 in the queue.
263 shardB.tell(new CanCommitTransaction(txId3, CURRENT_VERSION).toSerializable(), kit3.getRef());
264 kit3.expectNoMessage(FiniteDuration.create(100, TimeUnit.MILLISECONDS));
266 // Send tx4 CanCommit to B - tx4's participating shard list [A] matches that of tx3 and tx5 - so tx4 should
267 // be moved ahead of tx5 in the queue but not tx3 since should be in the CAN_COMMIT_PENDING state.
269 shardB.tell(new CanCommitTransaction(txId4, CURRENT_VERSION).toSerializable(), kit4.getRef());
270 kit4.expectNoMessage(FiniteDuration.create(100, TimeUnit.MILLISECONDS));
272 // Send tx5 CanCommit to B - it's position in the queue should remain the same.
274 shardB.tell(new CanCommitTransaction(txId5, CURRENT_VERSION).toSerializable(), kit5.getRef());
275 kit5.expectNoMessage(FiniteDuration.create(100, TimeUnit.MILLISECONDS));
277 // Finish commit of tx1.
279 shardB.tell(new CommitTransaction(txId1, CURRENT_VERSION).toSerializable(), kit1.getRef());
280 kit1.expectMsgClass(CommitTransactionReply.class);
282 // Finish commit of tx2.
284 shardB.tell(new CanCommitTransaction(txId2, CURRENT_VERSION).toSerializable(), kit2.getRef());
285 kit2.expectMsgClass(CanCommitTransactionReply.class);
287 shardB.tell(new CommitTransaction(txId2, CURRENT_VERSION).toSerializable(), kit2.getRef());
288 kit2.expectMsgClass(CommitTransactionReply.class);
290 // Finish commit of tx3.
293 kit3.expectMsgClass(CanCommitTransactionReply.class);
295 shardA.tell(new CommitTransaction(txId3, CURRENT_VERSION).toSerializable(), kit3.getRef());
296 kit3.expectMsgClass(CommitTransactionReply.class);
298 shardB.tell(new CommitTransaction(txId3, CURRENT_VERSION).toSerializable(), kit3.getRef());
299 kit3.expectMsgClass(CommitTransactionReply.class);
301 // Finish commit of tx4.
304 kit4.expectMsgClass(CanCommitTransactionReply.class);
306 shardA.tell(new CanCommitTransaction(txId4, CURRENT_VERSION).toSerializable(), kit4.getRef());
307 kit4.expectMsgClass(CanCommitTransactionReply.class);
308 shardA.tell(new CommitTransaction(txId4, CURRENT_VERSION).toSerializable(), kit4.getRef());
309 kit4.expectMsgClass(CommitTransactionReply.class);
311 shardB.tell(new CommitTransaction(txId4, CURRENT_VERSION).toSerializable(), kit4.getRef());
312 kit4.expectMsgClass(CommitTransactionReply.class);
314 // Finish commit of tx5.
317 kit5.expectMsgClass(CanCommitTransactionReply.class);
319 shardA.tell(new CanCommitTransaction(txId5, CURRENT_VERSION).toSerializable(), kit5.getRef());
320 kit5.expectMsgClass(CanCommitTransactionReply.class);
321 shardA.tell(new CommitTransaction(txId5, CURRENT_VERSION).toSerializable(), kit5.getRef());
322 kit5.expectMsgClass(CommitTransactionReply.class);
324 shardB.tell(new CommitTransaction(txId5, CURRENT_VERSION).toSerializable(), kit5.getRef());
325 kit5.expectMsgClass(CommitTransactionReply.class);
327 verifyOuterListEntry(shardA, 1);
328 verifyInnerListEntry(shardB, 1, "one");
330 LOG.info("{} ending", testName);
334 * Test 2 tx's accessing 2 shards, the second in common.
336 * tx1 -> shard A, shard C
337 * tx2 -> shard B, shard C
339 * The tx's are readied such the pendingTransactions queue are as follows:
341 * Queue for shard A -> tx1
342 * Queue for shard B -> tx2
343 * Queue for shard C -> tx2, tx1
345 * When the tx's re committed verify the ready order is preserved.
348 public void testTwoTransactionsWithOneCommonParticipatingShard1() throws Exception {
349 final String testName = "testTwoTransactionsWithOneCommonParticipatingShard1";
350 LOG.info("{} starting", testName);
352 final TestKit kit1 = new TestKit(getSystem());
353 final TestKit kit2 = new TestKit(getSystem());
355 final ShardIdentifier shardAId = ShardIdentifier.create("shardA", MemberName.forName(testName), "config");
356 final ShardIdentifier shardBId = ShardIdentifier.create("shardB", MemberName.forName(testName), "config");
357 final ShardIdentifier shardCId = ShardIdentifier.create("shardC", MemberName.forName(testName), "config");
359 final TestActorRef<Shard> shardA = actorFactory.createTestActor(
360 newShardBuilder().id(shardAId).props().withDispatcher(Dispatchers.DefaultDispatcherId()));
361 ShardTestKit.waitUntilLeader(shardA);
363 final TestActorRef<Shard> shardB = actorFactory.createTestActor(
364 newShardBuilder().id(shardBId).props().withDispatcher(Dispatchers.DefaultDispatcherId()));
365 ShardTestKit.waitUntilLeader(shardB);
367 final TestActorRef<Shard> shardC = actorFactory.createTestActor(
368 newShardBuilder().id(shardCId).props().withDispatcher(Dispatchers.DefaultDispatcherId()));
369 ShardTestKit.waitUntilLeader(shardC);
371 final TransactionIdentifier txId1 = nextTransactionId();
372 final TransactionIdentifier txId2 = nextTransactionId();
374 SortedSet<String> participatingShardNames1 =
375 ImmutableSortedSet.of(shardAId.getShardName(), shardCId.getShardName());
376 SortedSet<String> participatingShardNames2 =
377 ImmutableSortedSet.of(shardBId.getShardName(), shardCId.getShardName());
379 // Ready [tx1] on shard A.
381 shardA.tell(newReadyBatchedModifications(txId1, TEST_PATH,
382 ImmutableNodes.containerNode(TEST_QNAME), participatingShardNames1), kit1.getRef());
383 kit1.expectMsgClass(ReadyTransactionReply.class);
385 // Ready [tx2] on shard B.
387 shardB.tell(newReadyBatchedModifications(txId2, TEST_PATH,
388 ImmutableNodes.containerNode(TEST_QNAME), participatingShardNames2), kit2.getRef());
389 kit2.expectMsgClass(ReadyTransactionReply.class);
391 // Ready [tx2, tx1] on shard C.
393 shardC.tell(newReadyBatchedModifications(txId2, TEST_PATH,
394 ImmutableNodes.containerNode(TEST_QNAME), participatingShardNames2), kit2.getRef());
395 kit2.expectMsgClass(ReadyTransactionReply.class);
397 shardC.tell(newReadyBatchedModifications(txId1, OUTER_LIST_PATH, outerNode(1),
398 participatingShardNames1), kit1.getRef());
399 kit1.expectMsgClass(ReadyTransactionReply.class);
401 // Send tx1 CanCommit to A - should succeed.
403 shardA.tell(new CanCommitTransaction(txId1, CURRENT_VERSION).toSerializable(), kit1.getRef());
404 kit1.expectMsgClass(CanCommitTransactionReply.class);
406 // Send tx2 CanCommit to B - should succeed.
408 shardB.tell(new CanCommitTransaction(txId2, CURRENT_VERSION).toSerializable(), kit2.getRef());
409 kit2.expectMsgClass(CanCommitTransactionReply.class);
411 // Send tx1 CanCommit to C - tx2 is at the head of the queue but the preceding shards in tx1's participating
412 // shard list [A] do not match that of tx2 [B] so tx1 should not be allowed to proceed.
414 shardC.tell(new CanCommitTransaction(txId1, CURRENT_VERSION).toSerializable(), kit1.getRef());
415 kit1.expectNoMessage(FiniteDuration.create(100, TimeUnit.MILLISECONDS));
417 // Send tx2 CanCommit to C - it's at the head of the queue so should proceed.
419 shardC.tell(new CanCommitTransaction(txId2, CURRENT_VERSION).toSerializable(), kit2.getRef());
420 kit2.expectMsgClass(CanCommitTransactionReply.class);
422 // Finish commit of tx2.
424 shardB.tell(new CommitTransaction(txId2, CURRENT_VERSION).toSerializable(), kit2.getRef());
425 kit2.expectMsgClass(CommitTransactionReply.class);
427 shardC.tell(new CommitTransaction(txId2, CURRENT_VERSION).toSerializable(), kit2.getRef());
428 kit2.expectMsgClass(CommitTransactionReply.class);
430 // Finish commit of tx1.
432 kit1.expectMsgClass(CanCommitTransactionReply.class);
433 shardA.tell(new CommitTransaction(txId1, CURRENT_VERSION).toSerializable(), kit1.getRef());
434 kit1.expectMsgClass(CommitTransactionReply.class);
436 shardC.tell(new CommitTransaction(txId1, CURRENT_VERSION).toSerializable(), kit1.getRef());
437 kit1.expectMsgClass(CommitTransactionReply.class);
439 // Verify data in the data store.
441 verifyOuterListEntry(shardC, 1);
443 LOG.info("{} ending", testName);
447 * Test 2 tx's accessing 2 shards, the first for one and the second for the other in common.
449 * tx1 -> shard A, shard B
450 * tx2 -> shard B, shard C
452 * The tx's are readied such the pendingTransactions queue are as follows:
454 * Queue for shard A -> tx1
455 * Queue for shard B -> tx2, tx1
456 * Queue for shard C -> tx2
458 * When the tx's re committed verify the ready order is preserved.
461 public void testTwoTransactionsWithOneCommonParticipatingShard2() throws Exception {
462 final String testName = "testTwoTransactionsWithOneCommonParticipatingShard2";
463 LOG.info("{} starting", testName);
465 final TestKit kit1 = new TestKit(getSystem());
466 final TestKit kit2 = new TestKit(getSystem());
468 final ShardIdentifier shardAId = ShardIdentifier.create("shardA", MemberName.forName(testName), "config");
469 final ShardIdentifier shardBId = ShardIdentifier.create("shardB", MemberName.forName(testName), "config");
470 final ShardIdentifier shardCId = ShardIdentifier.create("shardC", MemberName.forName(testName), "config");
472 final TestActorRef<Shard> shardA = actorFactory.createTestActor(
473 newShardBuilder().id(shardAId).props().withDispatcher(Dispatchers.DefaultDispatcherId()));
474 ShardTestKit.waitUntilLeader(shardA);
476 final TestActorRef<Shard> shardB = actorFactory.createTestActor(
477 newShardBuilder().id(shardBId).props().withDispatcher(Dispatchers.DefaultDispatcherId()));
478 ShardTestKit.waitUntilLeader(shardB);
480 final TestActorRef<Shard> shardC = actorFactory.createTestActor(
481 newShardBuilder().id(shardCId).props().withDispatcher(Dispatchers.DefaultDispatcherId()));
482 ShardTestKit.waitUntilLeader(shardC);
484 final TransactionIdentifier txId1 = nextTransactionId();
485 final TransactionIdentifier txId2 = nextTransactionId();
487 SortedSet<String> participatingShardNames1 =
488 ImmutableSortedSet.of(shardAId.getShardName(), shardBId.getShardName());
489 SortedSet<String> participatingShardNames2 =
490 ImmutableSortedSet.of(shardBId.getShardName(), shardCId.getShardName());
492 // Ready [tx1] on shard A.
494 shardA.tell(newReadyBatchedModifications(txId1, TEST_PATH,
495 ImmutableNodes.containerNode(TEST_QNAME), participatingShardNames1), kit1.getRef());
496 kit1.expectMsgClass(ReadyTransactionReply.class);
498 // Ready [tx2, tx1] on shard B.
500 shardB.tell(newReadyBatchedModifications(txId2, TEST_PATH,
501 ImmutableNodes.containerNode(TEST_QNAME), participatingShardNames2), kit2.getRef());
502 kit2.expectMsgClass(ReadyTransactionReply.class);
504 shardB.tell(newReadyBatchedModifications(txId1, OUTER_LIST_PATH, outerNode(1),
505 participatingShardNames1), kit1.getRef());
506 kit1.expectMsgClass(ReadyTransactionReply.class);
508 // Ready [tx2] on shard C.
510 shardC.tell(newReadyBatchedModifications(txId2, TEST_PATH,
511 ImmutableNodes.containerNode(TEST_QNAME), participatingShardNames2), kit2.getRef());
512 kit2.expectMsgClass(ReadyTransactionReply.class);
514 // Send tx1 CanCommit to A - should succeed.
516 shardA.tell(new CanCommitTransaction(txId1, CURRENT_VERSION).toSerializable(), kit1.getRef());
517 kit1.expectMsgClass(CanCommitTransactionReply.class);
519 // Send tx1 CanCommit to B - tx2 is at the head of the queue but the preceding shards in tx1's participating
520 // shard list [A] do not match that of tx2 [] so tx1 should not be allowed to proceed.
522 shardB.tell(new CanCommitTransaction(txId1, CURRENT_VERSION).toSerializable(), kit1.getRef());
523 kit1.expectNoMessage(FiniteDuration.create(100, TimeUnit.MILLISECONDS));
525 // Send tx2 CanCommit to B - it's at the head of the queue so should proceed.
527 shardB.tell(new CanCommitTransaction(txId2, CURRENT_VERSION).toSerializable(), kit2.getRef());
528 kit2.expectMsgClass(CanCommitTransactionReply.class);
530 // Finish commit of tx2.
532 shardC.tell(new CanCommitTransaction(txId2, CURRENT_VERSION).toSerializable(), kit2.getRef());
533 kit2.expectMsgClass(CanCommitTransactionReply.class);
535 shardB.tell(new CommitTransaction(txId2, CURRENT_VERSION).toSerializable(), kit2.getRef());
536 kit2.expectMsgClass(CommitTransactionReply.class);
538 shardC.tell(new CommitTransaction(txId2, CURRENT_VERSION).toSerializable(), kit2.getRef());
539 kit2.expectMsgClass(CommitTransactionReply.class);
541 // Finish commit of tx1.
543 kit1.expectMsgClass(CanCommitTransactionReply.class);
544 shardA.tell(new CommitTransaction(txId1, CURRENT_VERSION).toSerializable(), kit1.getRef());
545 kit1.expectMsgClass(CommitTransactionReply.class);
547 shardB.tell(new CommitTransaction(txId1, CURRENT_VERSION).toSerializable(), kit1.getRef());
548 kit1.expectMsgClass(CommitTransactionReply.class);
550 // Verify data in the data store.
552 verifyOuterListEntry(shardB, 1);
554 LOG.info("{} ending", testName);
557 static void verifyInnerListEntry(TestActorRef<Shard> shard, int outerID, String innerID)
559 final YangInstanceIdentifier path = innerEntryPath(outerID, innerID);
560 final NormalizedNode<?, ?> innerListEntry = readStore(shard, path);
561 assertNotNull(path + " not found", innerListEntry);