Add UnsignedLongBitmap
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / ShardCommitCoordinationTest.java
1 /*
2  * Copyright (c) 2018 Inocybe Technologies and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore;
9
10 import static org.junit.Assert.assertNotNull;
11 import static org.opendaylight.controller.cluster.datastore.DataStoreVersions.CURRENT_VERSION;
12 import static org.opendaylight.controller.md.cluster.datastore.model.TestModel.ID_QNAME;
13 import static org.opendaylight.controller.md.cluster.datastore.model.TestModel.INNER_LIST_QNAME;
14 import static org.opendaylight.controller.md.cluster.datastore.model.TestModel.NAME_QNAME;
15 import static org.opendaylight.controller.md.cluster.datastore.model.TestModel.OUTER_LIST_PATH;
16 import static org.opendaylight.controller.md.cluster.datastore.model.TestModel.OUTER_LIST_QNAME;
17 import static org.opendaylight.controller.md.cluster.datastore.model.TestModel.TEST_PATH;
18 import static org.opendaylight.controller.md.cluster.datastore.model.TestModel.TEST_QNAME;
19 import static org.opendaylight.controller.md.cluster.datastore.model.TestModel.innerEntryPath;
20 import static org.opendaylight.controller.md.cluster.datastore.model.TestModel.innerMapPath;
21 import static org.opendaylight.controller.md.cluster.datastore.model.TestModel.innerNode;
22 import static org.opendaylight.controller.md.cluster.datastore.model.TestModel.outerEntryPath;
23 import static org.opendaylight.controller.md.cluster.datastore.model.TestModel.outerMapNode;
24 import static org.opendaylight.controller.md.cluster.datastore.model.TestModel.outerNode;
25
26 import akka.dispatch.Dispatchers;
27 import akka.testkit.TestActorRef;
28 import akka.testkit.javadsl.TestKit;
29 import com.google.common.collect.ImmutableSortedSet;
30 import java.time.Duration;
31 import java.util.SortedSet;
32 import org.junit.Test;
33 import org.opendaylight.controller.cluster.access.concepts.MemberName;
34 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
35 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
36 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
37 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
38 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
39 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
40 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
41 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
42 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
43 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
44 import org.slf4j.Logger;
45 import org.slf4j.LoggerFactory;
46
47 /**
48  * Unit tests for various 3PC coordination scenarios.
49  *
50  * @author Thomas Pantelis
51  */
52 public class ShardCommitCoordinationTest extends AbstractShardTest {
53     private static final Logger LOG = LoggerFactory.getLogger(ShardCommitCoordinationTest.class);
54
55     /**
56      * Test 2 tx's accessing the same shards.
57      * <pre>
58      *   tx1 -> shard A, shard B
59      *   tx2 -> shard A, shard B
60      * </pre>
61      * The tx's are readied such the pendingTransactions queue are as follows:
62      * <pre>
63      *   Queue for shard A -> tx1, tx2
64      *   Queue for shard B -> tx2, tx1
65      * </pre>
66      * This is a potential deadlock scenario (ABBA) which should be avoided by allowing tx1 to proceed on shard B
67      * even though it isn't at the head of the queues.
68      */
69     @Test
70     public void testTwoTransactionsWithSameTwoParticipatingShards() {
71         final String testName = "testTwoTransactionsWithSameTwoParticipatingShards";
72         LOG.info("{} starting", testName);
73
74         final TestKit kit1 = new TestKit(getSystem());
75         final TestKit kit2 = new TestKit(getSystem());
76
77         final ShardIdentifier shardAId = ShardIdentifier.create("shardA", MemberName.forName(testName), "config");
78         final ShardIdentifier shardBId = ShardIdentifier.create("shardB", MemberName.forName(testName), "config");
79
80         final TestActorRef<Shard> shardA = actorFactory.createTestActor(
81                 newShardBuilder().id(shardAId).props().withDispatcher(Dispatchers.DefaultDispatcherId()));
82         ShardTestKit.waitUntilLeader(shardA);
83
84         final TestActorRef<Shard> shardB = actorFactory.createTestActor(
85                 newShardBuilder().id(shardBId).props().withDispatcher(Dispatchers.DefaultDispatcherId()));
86         ShardTestKit.waitUntilLeader(shardB);
87
88         final TransactionIdentifier txId1 = nextTransactionId();
89         final TransactionIdentifier txId2 = nextTransactionId();
90
91         SortedSet<String> participatingShardNames = ImmutableSortedSet.of(shardAId.getShardName(),
92                 shardBId.getShardName());
93
94         // Ready [tx1, tx2] on shard A.
95
96         shardA.tell(newReadyBatchedModifications(txId1, TEST_PATH,
97                 ImmutableNodes.containerNode(TEST_QNAME), participatingShardNames), kit1.getRef());
98         kit1.expectMsgClass(ReadyTransactionReply.class);
99
100         shardA.tell(newReadyBatchedModifications(txId2, OUTER_LIST_PATH, outerNode(1),
101                 participatingShardNames), kit2.getRef());
102         kit2.expectMsgClass(ReadyTransactionReply.class);
103
104         // Ready [tx2, tx1] on shard B.
105
106         shardB.tell(newReadyBatchedModifications(txId2, OUTER_LIST_PATH, outerNode(1),
107                 participatingShardNames), kit2.getRef());
108         kit2.expectMsgClass(ReadyTransactionReply.class);
109
110         shardB.tell(newReadyBatchedModifications(txId1, TEST_PATH,
111                 ImmutableNodes.containerNode(TEST_QNAME), participatingShardNames), kit1.getRef());
112         kit1.expectMsgClass(ReadyTransactionReply.class);
113
114         // Send tx2 CanCommit to A - tx1 is at the head of the queue so tx2 should not proceed as A is the first shard
115         // in the participating shard list.
116
117         shardA.tell(new CanCommitTransaction(txId2, CURRENT_VERSION).toSerializable(), kit2.getRef());
118         kit2.expectNoMessage(Duration.ofMillis(100));
119
120         // Send tx1 CanCommit to A - it's at the head of the queue so should proceed.
121
122         shardA.tell(new CanCommitTransaction(txId1, CURRENT_VERSION).toSerializable(), kit1.getRef());
123         kit1.expectMsgClass(CanCommitTransactionReply.class);
124
125         // Send tx1 CanCommit to B - tx2 is at the head of the queue but the preceding shards in tx1's participating
126         // shard list [A] matches that of tx2 [A] so tx1 should be de-queued and allowed to proceed.
127
128         shardB.tell(new CanCommitTransaction(txId1, CURRENT_VERSION).toSerializable(), kit1.getRef());
129         kit1.expectMsgClass(CanCommitTransactionReply.class);
130
131         // Send tx2 CanCommit to B - tx1 should now be at the head of he queue.
132
133         shardB.tell(new CanCommitTransaction(txId2, CURRENT_VERSION).toSerializable(), kit2.getRef());
134         kit2.expectNoMessage(Duration.ofMillis(100));
135
136         // Finish commit of tx1.
137
138         shardA.tell(new CommitTransaction(txId1, CURRENT_VERSION).toSerializable(), kit1.getRef());
139         kit1.expectMsgClass(CommitTransactionReply.class);
140
141         shardB.tell(new CommitTransaction(txId1, CURRENT_VERSION).toSerializable(), kit1.getRef());
142         kit1.expectMsgClass(CommitTransactionReply.class);
143
144         // Finish commit of tx2.
145
146         kit2.expectMsgClass(CanCommitTransactionReply.class);
147         kit2.expectMsgClass(CanCommitTransactionReply.class);
148
149         shardA.tell(new CommitTransaction(txId2, CURRENT_VERSION).toSerializable(), kit2.getRef());
150         kit2.expectMsgClass(CommitTransactionReply.class);
151
152         shardB.tell(new CommitTransaction(txId2, CURRENT_VERSION).toSerializable(), kit2.getRef());
153         kit2.expectMsgClass(CommitTransactionReply.class);
154
155         // Verify data in the data store.
156
157         verifyOuterListEntry(shardA, 1);
158         verifyOuterListEntry(shardB, 1);
159
160         LOG.info("{} ending", testName);
161     }
162
163     /**
164      * Test multiple tx's accessing a mix of same and differing shards.
165      * <pre>
166      *   tx1 -> shard X, shard B
167      *   tx2 -> shard X, shard B
168      *   tx3 -> shard A, shard B
169      *   tx4 -> shard A, shard B
170      *   tx5 -> shard A, shard B
171      * </pre>
172      * The tx's are readied such the pendingTransactions queue are as follows:
173      * <pre>
174      *   Queue for shard A -> tx3, tx4, tx5
175      *   Queue for shard B -> tx1, tx2, tx5, tx4, tx3
176      * </pre>
177      * Note: shard X means any other shard which isn't relevant for the test.
178      * This is a potential deadlock scenario (ABBA) which should be avoided by moving tx3 ahead of tx5 on shard B when
179      * CanCommit is requested.
180      */
181     @Test
182     public void testMultipleTransactionsWithMixedParticipatingShards() {
183         final String testName = "testMultipleTransactionsWithMixedParticipatingShards";
184         LOG.info("{} starting", testName);
185
186         final TestKit kit1 = new TestKit(getSystem());
187         final TestKit kit2 = new TestKit(getSystem());
188         final TestKit kit3 = new TestKit(getSystem());
189         final TestKit kit4 = new TestKit(getSystem());
190         final TestKit kit5 = new TestKit(getSystem());
191
192         final ShardIdentifier shardAId = ShardIdentifier.create("shardA", MemberName.forName(testName), "config");
193         final ShardIdentifier shardBId = ShardIdentifier.create("shardB", MemberName.forName(testName), "config");
194
195         final TestActorRef<Shard> shardA = actorFactory.createTestActor(
196                 newShardBuilder().id(shardAId).props().withDispatcher(Dispatchers.DefaultDispatcherId()));
197         ShardTestKit.waitUntilLeader(shardA);
198
199         final TestActorRef<Shard> shardB = actorFactory.createTestActor(
200                 newShardBuilder().id(shardBId).props().withDispatcher(Dispatchers.DefaultDispatcherId()));
201         ShardTestKit.waitUntilLeader(shardB);
202
203         final TransactionIdentifier txId1 = nextTransactionId();
204         final TransactionIdentifier txId2 = nextTransactionId();
205         final TransactionIdentifier txId3 = nextTransactionId();
206         final TransactionIdentifier txId4 = nextTransactionId();
207         final TransactionIdentifier txId5 = nextTransactionId();
208
209         final SortedSet<String> participatingShardNames1 = ImmutableSortedSet.of(shardAId.getShardName(),
210                 shardBId.getShardName());
211         final SortedSet<String> participatingShardNames2 = ImmutableSortedSet.of("shardX", shardBId.getShardName());
212
213         // Ready [tx3, tx4, tx5] on shard A.
214
215         shardA.tell(newReadyBatchedModifications(txId3, TEST_PATH,
216                 ImmutableNodes.containerNode(TEST_QNAME), participatingShardNames1), kit3.getRef());
217         kit3.expectMsgClass(ReadyTransactionReply.class);
218
219         shardA.tell(newReadyBatchedModifications(txId4, OUTER_LIST_PATH, outerMapNode(),
220                 participatingShardNames1), kit4.getRef());
221         kit4.expectMsgClass(ReadyTransactionReply.class);
222
223         shardA.tell(newReadyBatchedModifications(txId5, outerEntryPath(1),
224                 ImmutableNodes.mapEntry(OUTER_LIST_QNAME, ID_QNAME, 1), participatingShardNames1), kit5.getRef());
225         kit5.expectMsgClass(ReadyTransactionReply.class);
226
227         // Ready [tx1, tx2, tx5, tx4, tx3] on shard B.
228
229         shardB.tell(newReadyBatchedModifications(txId1, TEST_PATH,
230                 ImmutableNodes.containerNode(TEST_QNAME), participatingShardNames2), kit1.getRef());
231         kit1.expectMsgClass(ReadyTransactionReply.class);
232
233         shardB.tell(newReadyBatchedModifications(txId2, OUTER_LIST_PATH, outerMapNode(),
234                 participatingShardNames2), kit2.getRef());
235         kit2.expectMsgClass(ReadyTransactionReply.class);
236
237         shardB.tell(newReadyBatchedModifications(txId5, innerEntryPath(1, "one"),
238                 ImmutableNodes.mapEntry(INNER_LIST_QNAME, NAME_QNAME, "one"), participatingShardNames1), kit5.getRef());
239         kit5.expectMsgClass(ReadyTransactionReply.class);
240
241         shardB.tell(newReadyBatchedModifications(txId4, innerMapPath(1), innerNode(),
242                 participatingShardNames1), kit4.getRef());
243         kit4.expectMsgClass(ReadyTransactionReply.class);
244
245         shardB.tell(newReadyBatchedModifications(txId3, outerEntryPath(1),
246                 ImmutableNodes.mapEntry(OUTER_LIST_QNAME, ID_QNAME, 1), participatingShardNames1), kit3.getRef());
247         kit3.expectMsgClass(ReadyTransactionReply.class);
248
249         // Send tx3 CanCommit to A - it's at the head of the queue so should proceed.
250
251         shardA.tell(new CanCommitTransaction(txId3, CURRENT_VERSION).toSerializable(), kit3.getRef());
252         kit3.expectMsgClass(CanCommitTransactionReply.class);
253
254         // Send tx1 CanCommit to B - it's at the head of the queue so should proceed.
255
256         shardB.tell(new CanCommitTransaction(txId1, CURRENT_VERSION).toSerializable(), kit1.getRef());
257         kit1.expectMsgClass(CanCommitTransactionReply.class);
258
259         // Send tx3 CanCommit to B - tx1 is at the head of the queue but the preceding shards in tx3's participating
260         // shard list [A] matches that of tx5 so tx3 should be moved ahead of tx5 in the queue.
261
262         shardB.tell(new CanCommitTransaction(txId3, CURRENT_VERSION).toSerializable(), kit3.getRef());
263         kit3.expectNoMessage(Duration.ofMillis(100));
264
265         // Send tx4 CanCommit to B - tx4's participating shard list [A] matches that of tx3 and tx5 - so tx4 should
266         // be moved ahead of tx5 in the queue but not tx3 since should be in the CAN_COMMIT_PENDING state.
267
268         shardB.tell(new CanCommitTransaction(txId4, CURRENT_VERSION).toSerializable(), kit4.getRef());
269         kit4.expectNoMessage(Duration.ofMillis(100));
270
271         // Send tx5 CanCommit to B - it's position in the queue should remain the same.
272
273         shardB.tell(new CanCommitTransaction(txId5, CURRENT_VERSION).toSerializable(), kit5.getRef());
274         kit5.expectNoMessage(Duration.ofMillis(100));
275
276         // Finish commit of tx1.
277
278         shardB.tell(new CommitTransaction(txId1, CURRENT_VERSION).toSerializable(), kit1.getRef());
279         kit1.expectMsgClass(CommitTransactionReply.class);
280
281         // Finish commit of tx2.
282
283         shardB.tell(new CanCommitTransaction(txId2, CURRENT_VERSION).toSerializable(), kit2.getRef());
284         kit2.expectMsgClass(CanCommitTransactionReply.class);
285
286         shardB.tell(new CommitTransaction(txId2, CURRENT_VERSION).toSerializable(), kit2.getRef());
287         kit2.expectMsgClass(CommitTransactionReply.class);
288
289         // Finish commit of tx3.
290
291         // From shard B
292         kit3.expectMsgClass(CanCommitTransactionReply.class);
293
294         shardA.tell(new CommitTransaction(txId3, CURRENT_VERSION).toSerializable(), kit3.getRef());
295         kit3.expectMsgClass(CommitTransactionReply.class);
296
297         shardB.tell(new CommitTransaction(txId3, CURRENT_VERSION).toSerializable(), kit3.getRef());
298         kit3.expectMsgClass(CommitTransactionReply.class);
299
300         // Finish commit of tx4.
301
302         // From shard B
303         kit4.expectMsgClass(CanCommitTransactionReply.class);
304
305         shardA.tell(new CanCommitTransaction(txId4, CURRENT_VERSION).toSerializable(), kit4.getRef());
306         kit4.expectMsgClass(CanCommitTransactionReply.class);
307         shardA.tell(new CommitTransaction(txId4, CURRENT_VERSION).toSerializable(), kit4.getRef());
308         kit4.expectMsgClass(CommitTransactionReply.class);
309
310         shardB.tell(new CommitTransaction(txId4, CURRENT_VERSION).toSerializable(), kit4.getRef());
311         kit4.expectMsgClass(CommitTransactionReply.class);
312
313         // Finish commit of tx5.
314
315         // From shard B
316         kit5.expectMsgClass(CanCommitTransactionReply.class);
317
318         shardA.tell(new CanCommitTransaction(txId5, CURRENT_VERSION).toSerializable(), kit5.getRef());
319         kit5.expectMsgClass(CanCommitTransactionReply.class);
320         shardA.tell(new CommitTransaction(txId5, CURRENT_VERSION).toSerializable(), kit5.getRef());
321         kit5.expectMsgClass(CommitTransactionReply.class);
322
323         shardB.tell(new CommitTransaction(txId5, CURRENT_VERSION).toSerializable(), kit5.getRef());
324         kit5.expectMsgClass(CommitTransactionReply.class);
325
326         verifyOuterListEntry(shardA, 1);
327         verifyInnerListEntry(shardB, 1, "one");
328
329         LOG.info("{} ending", testName);
330     }
331
332     /**
333      * Test 2 tx's accessing 2 shards, the second in common.
334      * <pre>
335      *   tx1 -> shard A, shard C
336      *   tx2 -> shard B, shard C
337      * </pre>
338      * The tx's are readied such the pendingTransactions queue are as follows:
339      * <pre>
340      *   Queue for shard A -> tx1
341      *   Queue for shard B -> tx2
342      *   Queue for shard C -> tx2, tx1
343      * </pre>
344      * When the tx's re committed verify the ready order is preserved.
345      */
346     @Test
347     public void testTwoTransactionsWithOneCommonParticipatingShard1() {
348         final String testName = "testTwoTransactionsWithOneCommonParticipatingShard1";
349         LOG.info("{} starting", testName);
350
351         final TestKit kit1 = new TestKit(getSystem());
352         final TestKit kit2 = new TestKit(getSystem());
353
354         final ShardIdentifier shardAId = ShardIdentifier.create("shardA", MemberName.forName(testName), "config");
355         final ShardIdentifier shardBId = ShardIdentifier.create("shardB", MemberName.forName(testName), "config");
356         final ShardIdentifier shardCId = ShardIdentifier.create("shardC", MemberName.forName(testName), "config");
357
358         final TestActorRef<Shard> shardA = actorFactory.createTestActor(
359                 newShardBuilder().id(shardAId).props().withDispatcher(Dispatchers.DefaultDispatcherId()));
360         ShardTestKit.waitUntilLeader(shardA);
361
362         final TestActorRef<Shard> shardB = actorFactory.createTestActor(
363                 newShardBuilder().id(shardBId).props().withDispatcher(Dispatchers.DefaultDispatcherId()));
364         ShardTestKit.waitUntilLeader(shardB);
365
366         final TestActorRef<Shard> shardC = actorFactory.createTestActor(
367                 newShardBuilder().id(shardCId).props().withDispatcher(Dispatchers.DefaultDispatcherId()));
368         ShardTestKit.waitUntilLeader(shardC);
369
370         final TransactionIdentifier txId1 = nextTransactionId();
371         final TransactionIdentifier txId2 = nextTransactionId();
372
373         SortedSet<String> participatingShardNames1 =
374                 ImmutableSortedSet.of(shardAId.getShardName(), shardCId.getShardName());
375         SortedSet<String> participatingShardNames2 =
376                 ImmutableSortedSet.of(shardBId.getShardName(), shardCId.getShardName());
377
378         // Ready [tx1] on shard A.
379
380         shardA.tell(newReadyBatchedModifications(txId1, TEST_PATH,
381                 ImmutableNodes.containerNode(TEST_QNAME), participatingShardNames1), kit1.getRef());
382         kit1.expectMsgClass(ReadyTransactionReply.class);
383
384         // Ready [tx2] on shard B.
385
386         shardB.tell(newReadyBatchedModifications(txId2, TEST_PATH,
387                 ImmutableNodes.containerNode(TEST_QNAME), participatingShardNames2), kit2.getRef());
388         kit2.expectMsgClass(ReadyTransactionReply.class);
389
390         // Ready [tx2, tx1] on shard C.
391
392         shardC.tell(newReadyBatchedModifications(txId2, TEST_PATH,
393                 ImmutableNodes.containerNode(TEST_QNAME), participatingShardNames2), kit2.getRef());
394         kit2.expectMsgClass(ReadyTransactionReply.class);
395
396         shardC.tell(newReadyBatchedModifications(txId1, OUTER_LIST_PATH, outerNode(1),
397                 participatingShardNames1), kit1.getRef());
398         kit1.expectMsgClass(ReadyTransactionReply.class);
399
400         // Send tx1 CanCommit to A - should succeed.
401
402         shardA.tell(new CanCommitTransaction(txId1, CURRENT_VERSION).toSerializable(), kit1.getRef());
403         kit1.expectMsgClass(CanCommitTransactionReply.class);
404
405         // Send tx2 CanCommit to B - should succeed.
406
407         shardB.tell(new CanCommitTransaction(txId2, CURRENT_VERSION).toSerializable(), kit2.getRef());
408         kit2.expectMsgClass(CanCommitTransactionReply.class);
409
410         // Send tx1 CanCommit to C - tx2 is at the head of the queue but the preceding shards in tx1's participating
411         // shard list [A] do not match that of tx2 [B] so tx1 should not be allowed to proceed.
412
413         shardC.tell(new CanCommitTransaction(txId1, CURRENT_VERSION).toSerializable(), kit1.getRef());
414         kit1.expectNoMessage(Duration.ofMillis(100));
415
416         // Send tx2 CanCommit to C - it's at the head of the queue so should proceed.
417
418         shardC.tell(new CanCommitTransaction(txId2, CURRENT_VERSION).toSerializable(), kit2.getRef());
419         kit2.expectMsgClass(CanCommitTransactionReply.class);
420
421         // Finish commit of tx2.
422
423         shardB.tell(new CommitTransaction(txId2, CURRENT_VERSION).toSerializable(), kit2.getRef());
424         kit2.expectMsgClass(CommitTransactionReply.class);
425
426         shardC.tell(new CommitTransaction(txId2, CURRENT_VERSION).toSerializable(), kit2.getRef());
427         kit2.expectMsgClass(CommitTransactionReply.class);
428
429         // Finish commit of tx1.
430
431         kit1.expectMsgClass(CanCommitTransactionReply.class);
432         shardA.tell(new CommitTransaction(txId1, CURRENT_VERSION).toSerializable(), kit1.getRef());
433         kit1.expectMsgClass(CommitTransactionReply.class);
434
435         shardC.tell(new CommitTransaction(txId1, CURRENT_VERSION).toSerializable(), kit1.getRef());
436         kit1.expectMsgClass(CommitTransactionReply.class);
437
438         // Verify data in the data store.
439
440         verifyOuterListEntry(shardC, 1);
441
442         LOG.info("{} ending", testName);
443     }
444
445     /**
446      * Test 2 tx's accessing 2 shards, the first for one and the second for the other in common.
447      * <pre>
448      *   tx1 -> shard A, shard B
449      *   tx2 -> shard B, shard C
450      * </pre>
451      * The tx's are readied such the pendingTransactions queue are as follows:
452      * <pre>
453      *   Queue for shard A -> tx1
454      *   Queue for shard B -> tx2, tx1
455      *   Queue for shard C -> tx2
456      * </pre>
457      * When the tx's re committed verify the ready order is preserved.
458      */
459     @Test
460     public void testTwoTransactionsWithOneCommonParticipatingShard2() {
461         final String testName = "testTwoTransactionsWithOneCommonParticipatingShard2";
462         LOG.info("{} starting", testName);
463
464         final TestKit kit1 = new TestKit(getSystem());
465         final TestKit kit2 = new TestKit(getSystem());
466
467         final ShardIdentifier shardAId = ShardIdentifier.create("shardA", MemberName.forName(testName), "config");
468         final ShardIdentifier shardBId = ShardIdentifier.create("shardB", MemberName.forName(testName), "config");
469         final ShardIdentifier shardCId = ShardIdentifier.create("shardC", MemberName.forName(testName), "config");
470
471         final TestActorRef<Shard> shardA = actorFactory.createTestActor(
472                 newShardBuilder().id(shardAId).props().withDispatcher(Dispatchers.DefaultDispatcherId()));
473         ShardTestKit.waitUntilLeader(shardA);
474
475         final TestActorRef<Shard> shardB = actorFactory.createTestActor(
476                 newShardBuilder().id(shardBId).props().withDispatcher(Dispatchers.DefaultDispatcherId()));
477         ShardTestKit.waitUntilLeader(shardB);
478
479         final TestActorRef<Shard> shardC = actorFactory.createTestActor(
480                 newShardBuilder().id(shardCId).props().withDispatcher(Dispatchers.DefaultDispatcherId()));
481         ShardTestKit.waitUntilLeader(shardC);
482
483         final TransactionIdentifier txId1 = nextTransactionId();
484         final TransactionIdentifier txId2 = nextTransactionId();
485
486         SortedSet<String> participatingShardNames1 =
487                 ImmutableSortedSet.of(shardAId.getShardName(), shardBId.getShardName());
488         SortedSet<String> participatingShardNames2 =
489                 ImmutableSortedSet.of(shardBId.getShardName(), shardCId.getShardName());
490
491         // Ready [tx1] on shard A.
492
493         shardA.tell(newReadyBatchedModifications(txId1, TEST_PATH,
494                 ImmutableNodes.containerNode(TEST_QNAME), participatingShardNames1), kit1.getRef());
495         kit1.expectMsgClass(ReadyTransactionReply.class);
496
497         // Ready [tx2, tx1] on shard B.
498
499         shardB.tell(newReadyBatchedModifications(txId2, TEST_PATH,
500                 ImmutableNodes.containerNode(TEST_QNAME), participatingShardNames2), kit2.getRef());
501         kit2.expectMsgClass(ReadyTransactionReply.class);
502
503         shardB.tell(newReadyBatchedModifications(txId1, OUTER_LIST_PATH, outerNode(1),
504                 participatingShardNames1), kit1.getRef());
505         kit1.expectMsgClass(ReadyTransactionReply.class);
506
507         // Ready [tx2] on shard C.
508
509         shardC.tell(newReadyBatchedModifications(txId2, TEST_PATH,
510                 ImmutableNodes.containerNode(TEST_QNAME), participatingShardNames2), kit2.getRef());
511         kit2.expectMsgClass(ReadyTransactionReply.class);
512
513         // Send tx1 CanCommit to A - should succeed.
514
515         shardA.tell(new CanCommitTransaction(txId1, CURRENT_VERSION).toSerializable(), kit1.getRef());
516         kit1.expectMsgClass(CanCommitTransactionReply.class);
517
518         // Send tx1 CanCommit to B - tx2 is at the head of the queue but the preceding shards in tx1's participating
519         // shard list [A] do not match that of tx2 [] so tx1 should not be allowed to proceed.
520
521         shardB.tell(new CanCommitTransaction(txId1, CURRENT_VERSION).toSerializable(), kit1.getRef());
522         kit1.expectNoMessage(Duration.ofMillis(100));
523
524         // Send tx2 CanCommit to B - it's at the head of the queue so should proceed.
525
526         shardB.tell(new CanCommitTransaction(txId2, CURRENT_VERSION).toSerializable(), kit2.getRef());
527         kit2.expectMsgClass(CanCommitTransactionReply.class);
528
529         // Finish commit of tx2.
530
531         shardC.tell(new CanCommitTransaction(txId2, CURRENT_VERSION).toSerializable(), kit2.getRef());
532         kit2.expectMsgClass(CanCommitTransactionReply.class);
533
534         shardB.tell(new CommitTransaction(txId2, CURRENT_VERSION).toSerializable(), kit2.getRef());
535         kit2.expectMsgClass(CommitTransactionReply.class);
536
537         shardC.tell(new CommitTransaction(txId2, CURRENT_VERSION).toSerializable(), kit2.getRef());
538         kit2.expectMsgClass(CommitTransactionReply.class);
539
540         // Finish commit of tx1.
541
542         kit1.expectMsgClass(CanCommitTransactionReply.class);
543         shardA.tell(new CommitTransaction(txId1, CURRENT_VERSION).toSerializable(), kit1.getRef());
544         kit1.expectMsgClass(CommitTransactionReply.class);
545
546         shardB.tell(new CommitTransaction(txId1, CURRENT_VERSION).toSerializable(), kit1.getRef());
547         kit1.expectMsgClass(CommitTransactionReply.class);
548
549         // Verify data in the data store.
550
551         verifyOuterListEntry(shardB, 1);
552
553         LOG.info("{} ending", testName);
554     }
555
556     static void verifyInnerListEntry(TestActorRef<Shard> shard, int outerID, String innerID) {
557         final YangInstanceIdentifier path = innerEntryPath(outerID, innerID);
558         final NormalizedNode innerListEntry = readStore(shard, path);
559         assertNotNull(path + " not found", innerListEntry);
560     }
561 }