Fix shard deadlock in 3 nodes
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / ShardCommitCoordinationTest.java
1 /*
2  * Copyright (c) 2018 Inocybe Technologies and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore;
9
10 import static org.junit.Assert.assertNotNull;
11 import static org.opendaylight.controller.cluster.datastore.DataStoreVersions.CURRENT_VERSION;
12 import static org.opendaylight.controller.md.cluster.datastore.model.TestModel.ID_QNAME;
13 import static org.opendaylight.controller.md.cluster.datastore.model.TestModel.INNER_LIST_QNAME;
14 import static org.opendaylight.controller.md.cluster.datastore.model.TestModel.NAME_QNAME;
15 import static org.opendaylight.controller.md.cluster.datastore.model.TestModel.OUTER_LIST_PATH;
16 import static org.opendaylight.controller.md.cluster.datastore.model.TestModel.OUTER_LIST_QNAME;
17 import static org.opendaylight.controller.md.cluster.datastore.model.TestModel.TEST_PATH;
18 import static org.opendaylight.controller.md.cluster.datastore.model.TestModel.TEST_QNAME;
19 import static org.opendaylight.controller.md.cluster.datastore.model.TestModel.innerEntryPath;
20 import static org.opendaylight.controller.md.cluster.datastore.model.TestModel.innerMapPath;
21 import static org.opendaylight.controller.md.cluster.datastore.model.TestModel.innerNode;
22 import static org.opendaylight.controller.md.cluster.datastore.model.TestModel.outerEntryPath;
23 import static org.opendaylight.controller.md.cluster.datastore.model.TestModel.outerMapNode;
24 import static org.opendaylight.controller.md.cluster.datastore.model.TestModel.outerNode;
25
26 import akka.dispatch.Dispatchers;
27 import akka.testkit.TestActorRef;
28 import akka.testkit.javadsl.TestKit;
29 import com.google.common.collect.ImmutableSortedSet;
30 import java.util.SortedSet;
31 import java.util.concurrent.TimeUnit;
32 import org.junit.Test;
33 import org.opendaylight.controller.cluster.access.concepts.MemberName;
34 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
35 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
36 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
37 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
38 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
39 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
40 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
41 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
42 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
43 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
44 import org.slf4j.Logger;
45 import org.slf4j.LoggerFactory;
46 import scala.concurrent.duration.FiniteDuration;
47
48 /**
49  * Unit tests for various 3PC coordination scenarios.
50  *
51  * @author Thomas Pantelis
52  */
53 public class ShardCommitCoordinationTest extends AbstractShardTest {
54     private static final Logger LOG = LoggerFactory.getLogger(ShardCommitCoordinationTest.class);
55
56     /**
57      * Test 2 tx's accessing the same shards.
58      * <pre>
59      *   tx1 -> shard A, shard B
60      *   tx2 -> shard A, shard B
61      * </pre>
62      * The tx's are readied such the pendingTransactions queue are as follows:
63      * <pre>
64      *   Queue for shard A -> tx1, tx2
65      *   Queue for shard B -> tx2, tx1
66      * </pre>
67      * This is a potential deadlock scenario (ABBA) which should be avoided by allowing tx1 to proceed on shard B
68      * even though it isn't at the head of the queues.
69      */
70     @Test
71     public void testTwoTransactionsWithSameTwoParticipatingShards() throws Exception {
72         final String testName = "testTwoTransactionsWithSameTwoParticipatingShards";
73         LOG.info("{} starting", testName);
74
75         final TestKit kit1 = new TestKit(getSystem());
76         final TestKit kit2 = new TestKit(getSystem());
77
78         final ShardIdentifier shardAId = ShardIdentifier.create("shardA", MemberName.forName(testName), "config");
79         final ShardIdentifier shardBId = ShardIdentifier.create("shardB", MemberName.forName(testName), "config");
80
81         final TestActorRef<Shard> shardA = actorFactory.createTestActor(
82                 newShardBuilder().id(shardAId).props().withDispatcher(Dispatchers.DefaultDispatcherId()));
83         ShardTestKit.waitUntilLeader(shardA);
84
85         final TestActorRef<Shard> shardB = actorFactory.createTestActor(
86                 newShardBuilder().id(shardBId).props().withDispatcher(Dispatchers.DefaultDispatcherId()));
87         ShardTestKit.waitUntilLeader(shardB);
88
89         final TransactionIdentifier txId1 = nextTransactionId();
90         final TransactionIdentifier txId2 = nextTransactionId();
91
92         SortedSet<String> participatingShardNames = ImmutableSortedSet.of(shardAId.getShardName(),
93                 shardBId.getShardName());
94
95         // Ready [tx1, tx2] on shard A.
96
97         shardA.tell(newReadyBatchedModifications(txId1, TEST_PATH,
98                 ImmutableNodes.containerNode(TEST_QNAME), participatingShardNames), kit1.getRef());
99         kit1.expectMsgClass(ReadyTransactionReply.class);
100
101         shardA.tell(newReadyBatchedModifications(txId2, OUTER_LIST_PATH, outerNode(1),
102                 participatingShardNames), kit2.getRef());
103         kit2.expectMsgClass(ReadyTransactionReply.class);
104
105         // Ready [tx2, tx1] on shard B.
106
107         shardB.tell(newReadyBatchedModifications(txId2, OUTER_LIST_PATH, outerNode(1),
108                 participatingShardNames), kit2.getRef());
109         kit2.expectMsgClass(ReadyTransactionReply.class);
110
111         shardB.tell(newReadyBatchedModifications(txId1, TEST_PATH,
112                 ImmutableNodes.containerNode(TEST_QNAME), participatingShardNames), kit1.getRef());
113         kit1.expectMsgClass(ReadyTransactionReply.class);
114
115         // Send tx2 CanCommit to A - tx1 is at the head of the queue so tx2 should not proceed as A is the first shard
116         // in the participating shard list.
117
118         shardA.tell(new CanCommitTransaction(txId2, CURRENT_VERSION).toSerializable(), kit2.getRef());
119         kit2.expectNoMessage(FiniteDuration.create(100, TimeUnit.MILLISECONDS));
120
121         // Send tx1 CanCommit to A - it's at the head of the queue so should proceed.
122
123         shardA.tell(new CanCommitTransaction(txId1, CURRENT_VERSION).toSerializable(), kit1.getRef());
124         kit1.expectMsgClass(CanCommitTransactionReply.class);
125
126         // Send tx1 CanCommit to B - tx2 is at the head of the queue but the preceding shards in tx1's participating
127         // shard list [A] matches that of tx2 [A] so tx1 should be de-queued and allowed to proceed.
128
129         shardB.tell(new CanCommitTransaction(txId1, CURRENT_VERSION).toSerializable(), kit1.getRef());
130         kit1.expectMsgClass(CanCommitTransactionReply.class);
131
132         // Send tx2 CanCommit to B - tx1 should now be at the head of he queue.
133
134         shardB.tell(new CanCommitTransaction(txId2, CURRENT_VERSION).toSerializable(), kit2.getRef());
135         kit2.expectNoMessage(FiniteDuration.create(100, TimeUnit.MILLISECONDS));
136
137         // Finish commit of tx1.
138
139         shardA.tell(new CommitTransaction(txId1, CURRENT_VERSION).toSerializable(), kit1.getRef());
140         kit1.expectMsgClass(CommitTransactionReply.class);
141
142         shardB.tell(new CommitTransaction(txId1, CURRENT_VERSION).toSerializable(), kit1.getRef());
143         kit1.expectMsgClass(CommitTransactionReply.class);
144
145         // Finish commit of tx2.
146
147         kit2.expectMsgClass(CanCommitTransactionReply.class);
148         kit2.expectMsgClass(CanCommitTransactionReply.class);
149
150         shardA.tell(new CommitTransaction(txId2, CURRENT_VERSION).toSerializable(), kit2.getRef());
151         kit2.expectMsgClass(CommitTransactionReply.class);
152
153         shardB.tell(new CommitTransaction(txId2, CURRENT_VERSION).toSerializable(), kit2.getRef());
154         kit2.expectMsgClass(CommitTransactionReply.class);
155
156         // Verify data in the data store.
157
158         verifyOuterListEntry(shardA, 1);
159         verifyOuterListEntry(shardB, 1);
160
161         LOG.info("{} ending", testName);
162     }
163
164     /**
165      * Test multiple tx's accessing a mix of same and differing shards.
166      * <pre>
167      *   tx1 -> shard X, shard B
168      *   tx2 -> shard X, shard B
169      *   tx3 -> shard A, shard B
170      *   tx4 -> shard A, shard B
171      *   tx5 -> shard A, shard B
172      * </pre>
173      * The tx's are readied such the pendingTransactions queue are as follows:
174      * <pre>
175      *   Queue for shard A -> tx3, tx4, tx5
176      *   Queue for shard B -> tx1, tx2, tx5, tx4, tx3
177      * </pre>
178      * Note: shard X means any other shard which isn't relevant for the test.
179      * This is a potential deadlock scenario (ABBA) which should be avoided by moving tx3 ahead of tx5 on shard B when
180      * CanCommit is requested.
181      */
182     @Test
183     public void testMultipleTransactionsWithMixedParticipatingShards() throws Exception {
184         final String testName = "testMultipleTransactionsWithMixedParticipatingShards";
185         LOG.info("{} starting", testName);
186
187         final TestKit kit1 = new TestKit(getSystem());
188         final TestKit kit2 = new TestKit(getSystem());
189         final TestKit kit3 = new TestKit(getSystem());
190         final TestKit kit4 = new TestKit(getSystem());
191         final TestKit kit5 = new TestKit(getSystem());
192
193         final ShardIdentifier shardAId = ShardIdentifier.create("shardA", MemberName.forName(testName), "config");
194         final ShardIdentifier shardBId = ShardIdentifier.create("shardB", MemberName.forName(testName), "config");
195
196         final TestActorRef<Shard> shardA = actorFactory.createTestActor(
197                 newShardBuilder().id(shardAId).props().withDispatcher(Dispatchers.DefaultDispatcherId()));
198         ShardTestKit.waitUntilLeader(shardA);
199
200         final TestActorRef<Shard> shardB = actorFactory.createTestActor(
201                 newShardBuilder().id(shardBId).props().withDispatcher(Dispatchers.DefaultDispatcherId()));
202         ShardTestKit.waitUntilLeader(shardB);
203
204         final TransactionIdentifier txId1 = nextTransactionId();
205         final TransactionIdentifier txId2 = nextTransactionId();
206         final TransactionIdentifier txId3 = nextTransactionId();
207         final TransactionIdentifier txId4 = nextTransactionId();
208         final TransactionIdentifier txId5 = nextTransactionId();
209
210         final SortedSet<String> participatingShardNames1 = ImmutableSortedSet.of(shardAId.getShardName(),
211                 shardBId.getShardName());
212         final SortedSet<String> participatingShardNames2 = ImmutableSortedSet.of("shardX", shardBId.getShardName());
213
214         // Ready [tx3, tx4, tx5] on shard A.
215
216         shardA.tell(newReadyBatchedModifications(txId3, TEST_PATH,
217                 ImmutableNodes.containerNode(TEST_QNAME), participatingShardNames1), kit3.getRef());
218         kit3.expectMsgClass(ReadyTransactionReply.class);
219
220         shardA.tell(newReadyBatchedModifications(txId4, OUTER_LIST_PATH, outerMapNode(),
221                 participatingShardNames1), kit4.getRef());
222         kit4.expectMsgClass(ReadyTransactionReply.class);
223
224         shardA.tell(newReadyBatchedModifications(txId5, outerEntryPath(1),
225                 ImmutableNodes.mapEntry(OUTER_LIST_QNAME, ID_QNAME, 1), participatingShardNames1), kit5.getRef());
226         kit5.expectMsgClass(ReadyTransactionReply.class);
227
228         // Ready [tx1, tx2, tx5, tx4, tx3] on shard B.
229
230         shardB.tell(newReadyBatchedModifications(txId1, TEST_PATH,
231                 ImmutableNodes.containerNode(TEST_QNAME), participatingShardNames2), kit1.getRef());
232         kit1.expectMsgClass(ReadyTransactionReply.class);
233
234         shardB.tell(newReadyBatchedModifications(txId2, OUTER_LIST_PATH, outerMapNode(),
235                 participatingShardNames2), kit2.getRef());
236         kit2.expectMsgClass(ReadyTransactionReply.class);
237
238         shardB.tell(newReadyBatchedModifications(txId5, innerEntryPath(1, "one"),
239                 ImmutableNodes.mapEntry(INNER_LIST_QNAME, NAME_QNAME, "one"), participatingShardNames1), kit5.getRef());
240         kit5.expectMsgClass(ReadyTransactionReply.class);
241
242         shardB.tell(newReadyBatchedModifications(txId4, innerMapPath(1), innerNode(),
243                 participatingShardNames1), kit4.getRef());
244         kit4.expectMsgClass(ReadyTransactionReply.class);
245
246         shardB.tell(newReadyBatchedModifications(txId3, outerEntryPath(1),
247                 ImmutableNodes.mapEntry(OUTER_LIST_QNAME, ID_QNAME, 1), participatingShardNames1), kit3.getRef());
248         kit3.expectMsgClass(ReadyTransactionReply.class);
249
250         // Send tx3 CanCommit to A - it's at the head of the queue so should proceed.
251
252         shardA.tell(new CanCommitTransaction(txId3, CURRENT_VERSION).toSerializable(), kit3.getRef());
253         kit3.expectMsgClass(CanCommitTransactionReply.class);
254
255         // Send tx1 CanCommit to B - it's at the head of the queue so should proceed.
256
257         shardB.tell(new CanCommitTransaction(txId1, CURRENT_VERSION).toSerializable(), kit1.getRef());
258         kit1.expectMsgClass(CanCommitTransactionReply.class);
259
260         // Send tx3 CanCommit to B - tx1 is at the head of the queue but the preceding shards in tx3's participating
261         // shard list [A] matches that of tx5 so tx3 should be moved ahead of tx5 in the queue.
262
263         shardB.tell(new CanCommitTransaction(txId3, CURRENT_VERSION).toSerializable(), kit3.getRef());
264         kit3.expectNoMessage(FiniteDuration.create(100, TimeUnit.MILLISECONDS));
265
266         // Send tx4 CanCommit to B - tx4's participating shard list [A] matches that of tx3 and tx5 - so tx4 should
267         // be moved ahead of tx5 in the queue but not tx3 since should be in the CAN_COMMIT_PENDING state.
268
269         shardB.tell(new CanCommitTransaction(txId4, CURRENT_VERSION).toSerializable(), kit4.getRef());
270         kit4.expectNoMessage(FiniteDuration.create(100, TimeUnit.MILLISECONDS));
271
272         // Send tx5 CanCommit to B - it's position in the queue should remain the same.
273
274         shardB.tell(new CanCommitTransaction(txId5, CURRENT_VERSION).toSerializable(), kit5.getRef());
275         kit5.expectNoMessage(FiniteDuration.create(100, TimeUnit.MILLISECONDS));
276
277         // Finish commit of tx1.
278
279         shardB.tell(new CommitTransaction(txId1, CURRENT_VERSION).toSerializable(), kit1.getRef());
280         kit1.expectMsgClass(CommitTransactionReply.class);
281
282         // Finish commit of tx2.
283
284         shardB.tell(new CanCommitTransaction(txId2, CURRENT_VERSION).toSerializable(), kit2.getRef());
285         kit2.expectMsgClass(CanCommitTransactionReply.class);
286
287         shardB.tell(new CommitTransaction(txId2, CURRENT_VERSION).toSerializable(), kit2.getRef());
288         kit2.expectMsgClass(CommitTransactionReply.class);
289
290         // Finish commit of tx3.
291
292         // From shard B
293         kit3.expectMsgClass(CanCommitTransactionReply.class);
294
295         shardA.tell(new CommitTransaction(txId3, CURRENT_VERSION).toSerializable(), kit3.getRef());
296         kit3.expectMsgClass(CommitTransactionReply.class);
297
298         shardB.tell(new CommitTransaction(txId3, CURRENT_VERSION).toSerializable(), kit3.getRef());
299         kit3.expectMsgClass(CommitTransactionReply.class);
300
301         // Finish commit of tx4.
302
303         // From shard B
304         kit4.expectMsgClass(CanCommitTransactionReply.class);
305
306         shardA.tell(new CanCommitTransaction(txId4, CURRENT_VERSION).toSerializable(), kit4.getRef());
307         kit4.expectMsgClass(CanCommitTransactionReply.class);
308         shardA.tell(new CommitTransaction(txId4, CURRENT_VERSION).toSerializable(), kit4.getRef());
309         kit4.expectMsgClass(CommitTransactionReply.class);
310
311         shardB.tell(new CommitTransaction(txId4, CURRENT_VERSION).toSerializable(), kit4.getRef());
312         kit4.expectMsgClass(CommitTransactionReply.class);
313
314         // Finish commit of tx5.
315
316         // From shard B
317         kit5.expectMsgClass(CanCommitTransactionReply.class);
318
319         shardA.tell(new CanCommitTransaction(txId5, CURRENT_VERSION).toSerializable(), kit5.getRef());
320         kit5.expectMsgClass(CanCommitTransactionReply.class);
321         shardA.tell(new CommitTransaction(txId5, CURRENT_VERSION).toSerializable(), kit5.getRef());
322         kit5.expectMsgClass(CommitTransactionReply.class);
323
324         shardB.tell(new CommitTransaction(txId5, CURRENT_VERSION).toSerializable(), kit5.getRef());
325         kit5.expectMsgClass(CommitTransactionReply.class);
326
327         verifyOuterListEntry(shardA, 1);
328         verifyInnerListEntry(shardB, 1, "one");
329
330         LOG.info("{} ending", testName);
331     }
332
333     /**
334      * Test 2 tx's accessing 2 shards, the second in common.
335      * <pre>
336      *   tx1 -> shard A, shard C
337      *   tx2 -> shard B, shard C
338      * </pre>
339      * The tx's are readied such the pendingTransactions queue are as follows:
340      * <pre>
341      *   Queue for shard A -> tx1
342      *   Queue for shard B -> tx2
343      *   Queue for shard C -> tx2, tx1
344      * </pre>
345      * When the tx's re committed verify the ready order is preserved.
346      */
347     @Test
348     public void testTwoTransactionsWithOneCommonParticipatingShard1() throws Exception {
349         final String testName = "testTwoTransactionsWithOneCommonParticipatingShard1";
350         LOG.info("{} starting", testName);
351
352         final TestKit kit1 = new TestKit(getSystem());
353         final TestKit kit2 = new TestKit(getSystem());
354
355         final ShardIdentifier shardAId = ShardIdentifier.create("shardA", MemberName.forName(testName), "config");
356         final ShardIdentifier shardBId = ShardIdentifier.create("shardB", MemberName.forName(testName), "config");
357         final ShardIdentifier shardCId = ShardIdentifier.create("shardC", MemberName.forName(testName), "config");
358
359         final TestActorRef<Shard> shardA = actorFactory.createTestActor(
360                 newShardBuilder().id(shardAId).props().withDispatcher(Dispatchers.DefaultDispatcherId()));
361         ShardTestKit.waitUntilLeader(shardA);
362
363         final TestActorRef<Shard> shardB = actorFactory.createTestActor(
364                 newShardBuilder().id(shardBId).props().withDispatcher(Dispatchers.DefaultDispatcherId()));
365         ShardTestKit.waitUntilLeader(shardB);
366
367         final TestActorRef<Shard> shardC = actorFactory.createTestActor(
368                 newShardBuilder().id(shardCId).props().withDispatcher(Dispatchers.DefaultDispatcherId()));
369         ShardTestKit.waitUntilLeader(shardC);
370
371         final TransactionIdentifier txId1 = nextTransactionId();
372         final TransactionIdentifier txId2 = nextTransactionId();
373
374         SortedSet<String> participatingShardNames1 =
375                 ImmutableSortedSet.of(shardAId.getShardName(), shardCId.getShardName());
376         SortedSet<String> participatingShardNames2 =
377                 ImmutableSortedSet.of(shardBId.getShardName(), shardCId.getShardName());
378
379         // Ready [tx1] on shard A.
380
381         shardA.tell(newReadyBatchedModifications(txId1, TEST_PATH,
382                 ImmutableNodes.containerNode(TEST_QNAME), participatingShardNames1), kit1.getRef());
383         kit1.expectMsgClass(ReadyTransactionReply.class);
384
385         // Ready [tx2] on shard B.
386
387         shardB.tell(newReadyBatchedModifications(txId2, TEST_PATH,
388                 ImmutableNodes.containerNode(TEST_QNAME), participatingShardNames2), kit2.getRef());
389         kit2.expectMsgClass(ReadyTransactionReply.class);
390
391         // Ready [tx2, tx1] on shard C.
392
393         shardC.tell(newReadyBatchedModifications(txId2, TEST_PATH,
394                 ImmutableNodes.containerNode(TEST_QNAME), participatingShardNames2), kit2.getRef());
395         kit2.expectMsgClass(ReadyTransactionReply.class);
396
397         shardC.tell(newReadyBatchedModifications(txId1, OUTER_LIST_PATH, outerNode(1),
398                 participatingShardNames1), kit1.getRef());
399         kit1.expectMsgClass(ReadyTransactionReply.class);
400
401         // Send tx1 CanCommit to A - should succeed.
402
403         shardA.tell(new CanCommitTransaction(txId1, CURRENT_VERSION).toSerializable(), kit1.getRef());
404         kit1.expectMsgClass(CanCommitTransactionReply.class);
405
406         // Send tx2 CanCommit to B - should succeed.
407
408         shardB.tell(new CanCommitTransaction(txId2, CURRENT_VERSION).toSerializable(), kit2.getRef());
409         kit2.expectMsgClass(CanCommitTransactionReply.class);
410
411         // Send tx1 CanCommit to C - tx2 is at the head of the queue but the preceding shards in tx1's participating
412         // shard list [A] do not match that of tx2 [B] so tx1 should not be allowed to proceed.
413
414         shardC.tell(new CanCommitTransaction(txId1, CURRENT_VERSION).toSerializable(), kit1.getRef());
415         kit1.expectNoMessage(FiniteDuration.create(100, TimeUnit.MILLISECONDS));
416
417         // Send tx2 CanCommit to C - it's at the head of the queue so should proceed.
418
419         shardC.tell(new CanCommitTransaction(txId2, CURRENT_VERSION).toSerializable(), kit2.getRef());
420         kit2.expectMsgClass(CanCommitTransactionReply.class);
421
422         // Finish commit of tx2.
423
424         shardB.tell(new CommitTransaction(txId2, CURRENT_VERSION).toSerializable(), kit2.getRef());
425         kit2.expectMsgClass(CommitTransactionReply.class);
426
427         shardC.tell(new CommitTransaction(txId2, CURRENT_VERSION).toSerializable(), kit2.getRef());
428         kit2.expectMsgClass(CommitTransactionReply.class);
429
430         // Finish commit of tx1.
431
432         kit1.expectMsgClass(CanCommitTransactionReply.class);
433         shardA.tell(new CommitTransaction(txId1, CURRENT_VERSION).toSerializable(), kit1.getRef());
434         kit1.expectMsgClass(CommitTransactionReply.class);
435
436         shardC.tell(new CommitTransaction(txId1, CURRENT_VERSION).toSerializable(), kit1.getRef());
437         kit1.expectMsgClass(CommitTransactionReply.class);
438
439         // Verify data in the data store.
440
441         verifyOuterListEntry(shardC, 1);
442
443         LOG.info("{} ending", testName);
444     }
445
446     /**
447      * Test 2 tx's accessing 2 shards, the first for one and the second for the other in common.
448      * <pre>
449      *   tx1 -> shard A, shard B
450      *   tx2 -> shard B, shard C
451      * </pre>
452      * The tx's are readied such the pendingTransactions queue are as follows:
453      * <pre>
454      *   Queue for shard A -> tx1
455      *   Queue for shard B -> tx2, tx1
456      *   Queue for shard C -> tx2
457      * </pre>
458      * When the tx's re committed verify the ready order is preserved.
459      */
460     @Test
461     public void testTwoTransactionsWithOneCommonParticipatingShard2() throws Exception {
462         final String testName = "testTwoTransactionsWithOneCommonParticipatingShard2";
463         LOG.info("{} starting", testName);
464
465         final TestKit kit1 = new TestKit(getSystem());
466         final TestKit kit2 = new TestKit(getSystem());
467
468         final ShardIdentifier shardAId = ShardIdentifier.create("shardA", MemberName.forName(testName), "config");
469         final ShardIdentifier shardBId = ShardIdentifier.create("shardB", MemberName.forName(testName), "config");
470         final ShardIdentifier shardCId = ShardIdentifier.create("shardC", MemberName.forName(testName), "config");
471
472         final TestActorRef<Shard> shardA = actorFactory.createTestActor(
473                 newShardBuilder().id(shardAId).props().withDispatcher(Dispatchers.DefaultDispatcherId()));
474         ShardTestKit.waitUntilLeader(shardA);
475
476         final TestActorRef<Shard> shardB = actorFactory.createTestActor(
477                 newShardBuilder().id(shardBId).props().withDispatcher(Dispatchers.DefaultDispatcherId()));
478         ShardTestKit.waitUntilLeader(shardB);
479
480         final TestActorRef<Shard> shardC = actorFactory.createTestActor(
481                 newShardBuilder().id(shardCId).props().withDispatcher(Dispatchers.DefaultDispatcherId()));
482         ShardTestKit.waitUntilLeader(shardC);
483
484         final TransactionIdentifier txId1 = nextTransactionId();
485         final TransactionIdentifier txId2 = nextTransactionId();
486
487         SortedSet<String> participatingShardNames1 =
488                 ImmutableSortedSet.of(shardAId.getShardName(), shardBId.getShardName());
489         SortedSet<String> participatingShardNames2 =
490                 ImmutableSortedSet.of(shardBId.getShardName(), shardCId.getShardName());
491
492         // Ready [tx1] on shard A.
493
494         shardA.tell(newReadyBatchedModifications(txId1, TEST_PATH,
495                 ImmutableNodes.containerNode(TEST_QNAME), participatingShardNames1), kit1.getRef());
496         kit1.expectMsgClass(ReadyTransactionReply.class);
497
498         // Ready [tx2, tx1] on shard B.
499
500         shardB.tell(newReadyBatchedModifications(txId2, TEST_PATH,
501                 ImmutableNodes.containerNode(TEST_QNAME), participatingShardNames2), kit2.getRef());
502         kit2.expectMsgClass(ReadyTransactionReply.class);
503
504         shardB.tell(newReadyBatchedModifications(txId1, OUTER_LIST_PATH, outerNode(1),
505                 participatingShardNames1), kit1.getRef());
506         kit1.expectMsgClass(ReadyTransactionReply.class);
507
508         // Ready [tx2] on shard C.
509
510         shardC.tell(newReadyBatchedModifications(txId2, TEST_PATH,
511                 ImmutableNodes.containerNode(TEST_QNAME), participatingShardNames2), kit2.getRef());
512         kit2.expectMsgClass(ReadyTransactionReply.class);
513
514         // Send tx1 CanCommit to A - should succeed.
515
516         shardA.tell(new CanCommitTransaction(txId1, CURRENT_VERSION).toSerializable(), kit1.getRef());
517         kit1.expectMsgClass(CanCommitTransactionReply.class);
518
519         // Send tx1 CanCommit to B - tx2 is at the head of the queue but the preceding shards in tx1's participating
520         // shard list [A] do not match that of tx2 [] so tx1 should not be allowed to proceed.
521
522         shardB.tell(new CanCommitTransaction(txId1, CURRENT_VERSION).toSerializable(), kit1.getRef());
523         kit1.expectNoMessage(FiniteDuration.create(100, TimeUnit.MILLISECONDS));
524
525         // Send tx2 CanCommit to B - it's at the head of the queue so should proceed.
526
527         shardB.tell(new CanCommitTransaction(txId2, CURRENT_VERSION).toSerializable(), kit2.getRef());
528         kit2.expectMsgClass(CanCommitTransactionReply.class);
529
530         // Finish commit of tx2.
531
532         shardC.tell(new CanCommitTransaction(txId2, CURRENT_VERSION).toSerializable(), kit2.getRef());
533         kit2.expectMsgClass(CanCommitTransactionReply.class);
534
535         shardB.tell(new CommitTransaction(txId2, CURRENT_VERSION).toSerializable(), kit2.getRef());
536         kit2.expectMsgClass(CommitTransactionReply.class);
537
538         shardC.tell(new CommitTransaction(txId2, CURRENT_VERSION).toSerializable(), kit2.getRef());
539         kit2.expectMsgClass(CommitTransactionReply.class);
540
541         // Finish commit of tx1.
542
543         kit1.expectMsgClass(CanCommitTransactionReply.class);
544         shardA.tell(new CommitTransaction(txId1, CURRENT_VERSION).toSerializable(), kit1.getRef());
545         kit1.expectMsgClass(CommitTransactionReply.class);
546
547         shardB.tell(new CommitTransaction(txId1, CURRENT_VERSION).toSerializable(), kit1.getRef());
548         kit1.expectMsgClass(CommitTransactionReply.class);
549
550         // Verify data in the data store.
551
552         verifyOuterListEntry(shardB, 1);
553
554         LOG.info("{} ending", testName);
555     }
556
557     static void verifyInnerListEntry(TestActorRef<Shard> shard, int outerID, String innerID)
558             throws Exception {
559         final YangInstanceIdentifier path = innerEntryPath(outerID, innerID);
560         final NormalizedNode<?, ?> innerListEntry = readStore(shard, path);
561         assertNotNull(path + " not found", innerListEntry);
562     }
563 }