Merge "BUG 2221 : Add metering to ShardTransaction actor"
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / ShardTest.java
1 package org.opendaylight.controller.cluster.datastore;
2
3 import akka.actor.ActorRef;
4 import akka.actor.PoisonPill;
5 import akka.actor.Props;
6 import akka.dispatch.Dispatchers;
7 import akka.dispatch.OnComplete;
8 import akka.japi.Creator;
9 import akka.pattern.Patterns;
10 import akka.testkit.TestActorRef;
11 import akka.util.Timeout;
12 import com.google.common.base.Function;
13 import com.google.common.base.Optional;
14 import com.google.common.util.concurrent.CheckedFuture;
15 import com.google.common.util.concurrent.Futures;
16 import com.google.common.util.concurrent.ListenableFuture;
17 import com.google.common.util.concurrent.MoreExecutors;
18 import com.google.common.util.concurrent.Uninterruptibles;
19 import org.junit.After;
20 import org.junit.Before;
21 import org.junit.Test;
22 import org.mockito.InOrder;
23 import org.mockito.invocation.InvocationOnMock;
24 import org.mockito.stubbing.Answer;
25 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
26 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
27 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
28 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
29 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
30 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
31 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
32 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
33 import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
34 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
35 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
36 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
37 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
38 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
39 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
40 import org.opendaylight.controller.cluster.datastore.modification.Modification;
41 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
42 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
43 import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec;
44 import org.opendaylight.controller.cluster.datastore.utils.InMemoryJournal;
45 import org.opendaylight.controller.cluster.datastore.utils.InMemorySnapshotStore;
46 import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener;
47 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
48 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
49 import org.opendaylight.controller.cluster.raft.Snapshot;
50 import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
51 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
52 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
53 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
54 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
55 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
56 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
57 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload;
58 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
59 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
60 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
61 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
62 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
63 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
64 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
65 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
66 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreFactory;
67 import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages;
68 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
69 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
70 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
71 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
72 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
73 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
74 import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
75 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
76 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
77 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
78 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
79 import scala.concurrent.Await;
80 import scala.concurrent.Future;
81 import scala.concurrent.duration.FiniteDuration;
82 import java.io.IOException;
83 import java.util.Collections;
84 import java.util.HashSet;
85 import java.util.Map;
86 import java.util.Set;
87 import java.util.concurrent.CountDownLatch;
88 import java.util.concurrent.ExecutionException;
89 import java.util.concurrent.TimeUnit;
90 import java.util.concurrent.atomic.AtomicInteger;
91 import java.util.concurrent.atomic.AtomicReference;
92 import static org.junit.Assert.assertEquals;
93 import static org.junit.Assert.assertNotNull;
94 import static org.junit.Assert.assertNull;
95 import static org.junit.Assert.assertTrue;
96 import static org.junit.Assert.fail;
97 import static org.mockito.Mockito.mock;
98 import static org.mockito.Mockito.doReturn;
99 import static org.mockito.Mockito.doAnswer;
100 import static org.mockito.Mockito.inOrder;
101
102 public class ShardTest extends AbstractActorTest {
103
104     private static final SchemaContext SCHEMA_CONTEXT = TestModel.createTestContext();
105
106     private static final AtomicInteger NEXT_SHARD_NUM = new AtomicInteger();
107
108     private final ShardIdentifier shardID = ShardIdentifier.builder().memberName("member-1")
109             .shardName("inventory").type("config" + NEXT_SHARD_NUM.getAndIncrement()).build();
110
111     private DatastoreContext dataStoreContext = DatastoreContext.newBuilder().
112             shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).
113             shardHeartbeatIntervalInMillis(100).build();
114
115     @Before
116     public void setUp() {
117         System.setProperty("shard.persistent", "false");
118
119         InMemorySnapshotStore.clear();
120         InMemoryJournal.clear();
121     }
122
123     @After
124     public void tearDown() {
125         InMemorySnapshotStore.clear();
126         InMemoryJournal.clear();
127     }
128
129     private Props newShardProps() {
130         return Shard.props(shardID, Collections.<ShardIdentifier,String>emptyMap(),
131                 dataStoreContext, SCHEMA_CONTEXT);
132     }
133
134     @Test
135     public void testRegisterChangeListener() throws Exception {
136         new ShardTestKit(getSystem()) {{
137             TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
138                     newShardProps(),  "testRegisterChangeListener");
139
140             waitUntilLeader(shard);
141
142             shard.tell(new UpdateSchemaContext(SchemaContextHelper.full()), ActorRef.noSender());
143
144             MockDataChangeListener listener = new MockDataChangeListener(1);
145             ActorRef dclActor = getSystem().actorOf(DataChangeListener.props(listener),
146                     "testRegisterChangeListener-DataChangeListener");
147
148             shard.tell(new RegisterChangeListener(TestModel.TEST_PATH,
149                     dclActor.path(), AsyncDataBroker.DataChangeScope.BASE), getRef());
150
151             RegisterChangeListenerReply reply = expectMsgClass(duration("3 seconds"),
152                     RegisterChangeListenerReply.class);
153             String replyPath = reply.getListenerRegistrationPath().toString();
154             assertTrue("Incorrect reply path: " + replyPath, replyPath.matches(
155                     "akka:\\/\\/test\\/user\\/testRegisterChangeListener\\/\\$.*"));
156
157             YangInstanceIdentifier path = TestModel.TEST_PATH;
158             writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
159
160             listener.waitForChangeEvents(path);
161
162             dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
163             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
164         }};
165     }
166
167     @SuppressWarnings("serial")
168     @Test
169     public void testChangeListenerNotifiedWhenNotTheLeaderOnRegistration() throws Exception {
170         // This test tests the timing window in which a change listener is registered before the
171         // shard becomes the leader. We verify that the listener is registered and notified of the
172         // existing data when the shard becomes the leader.
173         new ShardTestKit(getSystem()) {{
174             // For this test, we want to send the RegisterChangeListener message after the shard
175             // has recovered from persistence and before it becomes the leader. So we subclass
176             // Shard to override onReceiveCommand and, when the first ElectionTimeout is received,
177             // we know that the shard has been initialized to a follower and has started the
178             // election process. The following 2 CountDownLatches are used to coordinate the
179             // ElectionTimeout with the sending of the RegisterChangeListener message.
180             final CountDownLatch onFirstElectionTimeout = new CountDownLatch(1);
181             final CountDownLatch onChangeListenerRegistered = new CountDownLatch(1);
182             Creator<Shard> creator = new Creator<Shard>() {
183                 boolean firstElectionTimeout = true;
184
185                 @Override
186                 public Shard create() throws Exception {
187                     return new Shard(shardID, Collections.<ShardIdentifier,String>emptyMap(),
188                             dataStoreContext, SCHEMA_CONTEXT) {
189                         @Override
190                         public void onReceiveCommand(final Object message) {
191                             if(message instanceof ElectionTimeout && firstElectionTimeout) {
192                                 // Got the first ElectionTimeout. We don't forward it to the
193                                 // base Shard yet until we've sent the RegisterChangeListener
194                                 // message. So we signal the onFirstElectionTimeout latch to tell
195                                 // the main thread to send the RegisterChangeListener message and
196                                 // start a thread to wait on the onChangeListenerRegistered latch,
197                                 // which the main thread signals after it has sent the message.
198                                 // After the onChangeListenerRegistered is triggered, we send the
199                                 // original ElectionTimeout message to proceed with the election.
200                                 firstElectionTimeout = false;
201                                 final ActorRef self = getSelf();
202                                 new Thread() {
203                                     @Override
204                                     public void run() {
205                                         Uninterruptibles.awaitUninterruptibly(
206                                                 onChangeListenerRegistered, 5, TimeUnit.SECONDS);
207                                         self.tell(message, self);
208                                     }
209                                 }.start();
210
211                                 onFirstElectionTimeout.countDown();
212                             } else {
213                                 super.onReceiveCommand(message);
214                             }
215                         }
216                     };
217                 }
218             };
219
220             MockDataChangeListener listener = new MockDataChangeListener(1);
221             ActorRef dclActor = getSystem().actorOf(DataChangeListener.props(listener),
222                     "testRegisterChangeListenerWhenNotLeaderInitially-DataChangeListener");
223
224             TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
225                     Props.create(new DelegatingShardCreator(creator)),
226                     "testRegisterChangeListenerWhenNotLeaderInitially");
227
228             // Write initial data into the in-memory store.
229             YangInstanceIdentifier path = TestModel.TEST_PATH;
230             writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
231
232             // Wait until the shard receives the first ElectionTimeout message.
233             assertEquals("Got first ElectionTimeout", true,
234                     onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
235
236             // Now send the RegisterChangeListener and wait for the reply.
237             shard.tell(new RegisterChangeListener(path, dclActor.path(),
238                     AsyncDataBroker.DataChangeScope.SUBTREE), getRef());
239
240             RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
241                     RegisterChangeListenerReply.class);
242             assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
243
244             // Sanity check - verify the shard is not the leader yet.
245             shard.tell(new FindLeader(), getRef());
246             FindLeaderReply findLeadeReply =
247                     expectMsgClass(duration("5 seconds"), FindLeaderReply.class);
248             assertNull("Expected the shard not to be the leader", findLeadeReply.getLeaderActor());
249
250             // Signal the onChangeListenerRegistered latch to tell the thread above to proceed
251             // with the election process.
252             onChangeListenerRegistered.countDown();
253
254             // Wait for the shard to become the leader and notify our listener with the existing
255             // data in the store.
256             listener.waitForChangeEvents(path);
257
258             dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
259             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
260         }};
261     }
262
263     @Test
264     public void testCreateTransaction(){
265         new ShardTestKit(getSystem()) {{
266             ActorRef shard = getSystem().actorOf(newShardProps(), "testCreateTransaction");
267
268             waitUntilLeader(shard);
269
270             shard.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
271
272             shard.tell(new CreateTransaction("txn-1",
273                     TransactionProxy.TransactionType.READ_ONLY.ordinal() ).toSerializable(), getRef());
274
275             CreateTransactionReply reply = expectMsgClass(duration("3 seconds"),
276                     CreateTransactionReply.class);
277
278             String path = reply.getTransactionActorPath().toString();
279             assertTrue("Unexpected transaction path " + path,
280                     path.contains("akka://test/user/testCreateTransaction/shard-txn-1"));
281
282             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
283         }};
284     }
285
286     @Test
287     public void testCreateTransactionOnChain(){
288         new ShardTestKit(getSystem()) {{
289             final ActorRef shard = getSystem().actorOf(newShardProps(), "testCreateTransactionOnChain");
290
291             waitUntilLeader(shard);
292
293             shard.tell(new CreateTransaction("txn-1",
294                     TransactionProxy.TransactionType.READ_ONLY.ordinal() , "foobar").toSerializable(),
295                     getRef());
296
297             CreateTransactionReply reply = expectMsgClass(duration("3 seconds"),
298                     CreateTransactionReply.class);
299
300             String path = reply.getTransactionActorPath().toString();
301             assertTrue("Unexpected transaction path " + path,
302                     path.contains("akka://test/user/testCreateTransactionOnChain/shard-txn-1"));
303
304             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
305         }};
306     }
307
308     @Test
309     public void testPeerAddressResolved(){
310         new ShardTestKit(getSystem()) {{
311             final CountDownLatch recoveryComplete = new CountDownLatch(1);
312             class TestShard extends Shard {
313                 TestShard() {
314                     super(shardID, Collections.<ShardIdentifier, String>singletonMap(shardID, null),
315                             dataStoreContext, SCHEMA_CONTEXT);
316                 }
317
318                 Map<String, String> getPeerAddresses() {
319                     return getRaftActorContext().getPeerAddresses();
320                 }
321
322                 @Override
323                 protected void onRecoveryComplete() {
324                     try {
325                         super.onRecoveryComplete();
326                     } finally {
327                         recoveryComplete.countDown();
328                     }
329                 }
330             }
331
332             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
333                     Props.create(new DelegatingShardCreator(new Creator<Shard>() {
334                         @Override
335                         public TestShard create() throws Exception {
336                             return new TestShard();
337                         }
338                     })), "testPeerAddressResolved");
339
340             //waitUntilLeader(shard);
341             assertEquals("Recovery complete", true,
342                     Uninterruptibles.awaitUninterruptibly(recoveryComplete, 5, TimeUnit.SECONDS));
343
344             String address = "akka://foobar";
345             shard.underlyingActor().onReceiveCommand(new PeerAddressResolved(shardID, address));
346
347             assertEquals("getPeerAddresses", address,
348                     ((TestShard)shard.underlyingActor()).getPeerAddresses().get(shardID.toString()));
349
350             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
351         }};
352     }
353
354     @Test
355     public void testApplySnapshot() throws ExecutionException, InterruptedException {
356         TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(),
357                 "testApplySnapshot");
358
359         NormalizedNodeToNodeCodec codec =
360             new NormalizedNodeToNodeCodec(SCHEMA_CONTEXT);
361
362         writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
363
364         YangInstanceIdentifier root = YangInstanceIdentifier.builder().build();
365         NormalizedNode<?,?> expected = readStore(shard, root);
366
367         NormalizedNodeMessages.Container encode = codec.encode(expected);
368
369         ApplySnapshot applySnapshot = new ApplySnapshot(Snapshot.create(
370                 encode.getNormalizedNode().toByteString().toByteArray(),
371                 Collections.<ReplicatedLogEntry>emptyList(), 1, 2, 3, 4));
372
373         shard.underlyingActor().onReceiveCommand(applySnapshot);
374
375         NormalizedNode<?,?> actual = readStore(shard, root);
376
377         assertEquals(expected, actual);
378
379         shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
380     }
381
382     @Test
383     public void testApplyState() throws Exception {
384
385         TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(), "testApplyState");
386
387         NormalizedNode<?, ?> node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
388
389         MutableCompositeModification compMod = new MutableCompositeModification();
390         compMod.addModification(new WriteModification(TestModel.TEST_PATH, node, SCHEMA_CONTEXT));
391         Payload payload = new CompositeModificationPayload(compMod.toSerializable());
392         ApplyState applyState = new ApplyState(null, "test",
393                 new ReplicatedLogImplEntry(1, 2, payload));
394
395         shard.underlyingActor().onReceiveCommand(applyState);
396
397         NormalizedNode<?,?> actual = readStore(shard, TestModel.TEST_PATH);
398         assertEquals("Applied state", node, actual);
399
400         shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
401     }
402
403     @SuppressWarnings("serial")
404     @Test
405     public void testRecovery() throws Exception {
406
407         // Set up the InMemorySnapshotStore.
408
409         InMemoryDOMDataStore testStore = InMemoryDOMDataStoreFactory.create("Test", null, null);
410         testStore.onGlobalContextUpdated(SCHEMA_CONTEXT);
411
412         DOMStoreWriteTransaction writeTx = testStore.newWriteOnlyTransaction();
413         writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
414         DOMStoreThreePhaseCommitCohort commitCohort = writeTx.ready();
415         commitCohort.preCommit().get();
416         commitCohort.commit().get();
417
418         DOMStoreReadTransaction readTx = testStore.newReadOnlyTransaction();
419         NormalizedNode<?, ?> root = readTx.read(YangInstanceIdentifier.builder().build()).get().get();
420
421         InMemorySnapshotStore.addSnapshot(shardID.toString(), Snapshot.create(
422                 new NormalizedNodeToNodeCodec(SCHEMA_CONTEXT).encode(
423                         root).
424                                 getNormalizedNode().toByteString().toByteArray(),
425                                 Collections.<ReplicatedLogEntry>emptyList(), 0, 1, -1, -1));
426
427         // Set up the InMemoryJournal.
428
429         InMemoryJournal.addEntry(shardID.toString(), 0, new ReplicatedLogImplEntry(0, 1, newPayload(
430                   new WriteModification(TestModel.OUTER_LIST_PATH,
431                           ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
432                           SCHEMA_CONTEXT))));
433
434         int nListEntries = 11;
435         Set<Integer> listEntryKeys = new HashSet<>();
436         for(int i = 1; i <= nListEntries; i++) {
437             listEntryKeys.add(Integer.valueOf(i));
438             YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
439                     .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
440             Modification mod = new MergeModification(path,
441                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i),
442                     SCHEMA_CONTEXT);
443             InMemoryJournal.addEntry(shardID.toString(), i, new ReplicatedLogImplEntry(i, 1,
444                     newPayload(mod)));
445         }
446
447         InMemoryJournal.addEntry(shardID.toString(), nListEntries + 1,
448                 new ApplyLogEntries(nListEntries));
449
450         // Create the actor and wait for recovery complete.
451
452         final CountDownLatch recoveryComplete = new CountDownLatch(1);
453
454         Creator<Shard> creator = new Creator<Shard>() {
455             @Override
456             public Shard create() throws Exception {
457                 return new Shard(shardID, Collections.<ShardIdentifier,String>emptyMap(),
458                         dataStoreContext, SCHEMA_CONTEXT) {
459                     @Override
460                     protected void onRecoveryComplete() {
461                         try {
462                             super.onRecoveryComplete();
463                         } finally {
464                             recoveryComplete.countDown();
465                         }
466                     }
467                 };
468             }
469         };
470
471         TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
472                 Props.create(new DelegatingShardCreator(creator)), "testRecovery");
473
474         assertEquals("Recovery complete", true, recoveryComplete.await(5, TimeUnit.SECONDS));
475
476         // Verify data in the data store.
477
478         NormalizedNode<?, ?> outerList = readStore(shard, TestModel.OUTER_LIST_PATH);
479         assertNotNull(TestModel.OUTER_LIST_QNAME.getLocalName() + " not found", outerList);
480         assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " value is not Iterable",
481                 outerList.getValue() instanceof Iterable);
482         for(Object entry: (Iterable<?>) outerList.getValue()) {
483             assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " entry is not MapEntryNode",
484                     entry instanceof MapEntryNode);
485             MapEntryNode mapEntry = (MapEntryNode)entry;
486             Optional<DataContainerChild<? extends PathArgument, ?>> idLeaf =
487                     mapEntry.getChild(new YangInstanceIdentifier.NodeIdentifier(TestModel.ID_QNAME));
488             assertTrue("Missing leaf " + TestModel.ID_QNAME.getLocalName(), idLeaf.isPresent());
489             Object value = idLeaf.get().getValue();
490             assertTrue("Unexpected value for leaf "+ TestModel.ID_QNAME.getLocalName() + ": " + value,
491                     listEntryKeys.remove(value));
492         }
493
494         if(!listEntryKeys.isEmpty()) {
495             fail("Missing " + TestModel.OUTER_LIST_QNAME.getLocalName() + " entries with keys: " +
496                     listEntryKeys);
497         }
498
499         assertEquals("Last log index", nListEntries,
500                 shard.underlyingActor().getShardMBean().getLastLogIndex());
501         assertEquals("Commit index", nListEntries,
502                 shard.underlyingActor().getShardMBean().getCommitIndex());
503         assertEquals("Last applied", nListEntries,
504                 shard.underlyingActor().getShardMBean().getLastApplied());
505
506         shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
507     }
508
509     private CompositeModificationPayload newPayload(Modification... mods) {
510         MutableCompositeModification compMod = new MutableCompositeModification();
511         for(Modification mod: mods) {
512             compMod.addModification(mod);
513         }
514
515         return new CompositeModificationPayload(compMod.toSerializable());
516     }
517
518     private DOMStoreThreePhaseCommitCohort setupMockWriteTransaction(String cohortName,
519             InMemoryDOMDataStore dataStore, YangInstanceIdentifier path, NormalizedNode data,
520             MutableCompositeModification modification) {
521         return setupMockWriteTransaction(cohortName, dataStore, path, data, modification, null);
522     }
523
524     private DOMStoreThreePhaseCommitCohort setupMockWriteTransaction(String cohortName,
525             InMemoryDOMDataStore dataStore, YangInstanceIdentifier path, NormalizedNode data,
526             MutableCompositeModification modification,
527             final Function<DOMStoreThreePhaseCommitCohort,ListenableFuture<Void>> preCommit) {
528
529         DOMStoreWriteTransaction tx = dataStore.newWriteOnlyTransaction();
530         tx.write(path, data);
531         final DOMStoreThreePhaseCommitCohort realCohort = tx.ready();
532         DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, cohortName);
533
534         doAnswer(new Answer<ListenableFuture<Boolean>>() {
535             @Override
536             public ListenableFuture<Boolean> answer(InvocationOnMock invocation) {
537                 return realCohort.canCommit();
538             }
539         }).when(cohort).canCommit();
540
541         doAnswer(new Answer<ListenableFuture<Void>>() {
542             @Override
543             public ListenableFuture<Void> answer(InvocationOnMock invocation) throws Throwable {
544                 if(preCommit != null) {
545                     return preCommit.apply(realCohort);
546                 } else {
547                     return realCohort.preCommit();
548                 }
549             }
550         }).when(cohort).preCommit();
551
552         doAnswer(new Answer<ListenableFuture<Void>>() {
553             @Override
554             public ListenableFuture<Void> answer(InvocationOnMock invocation) throws Throwable {
555                 return realCohort.commit();
556             }
557         }).when(cohort).commit();
558
559         doAnswer(new Answer<ListenableFuture<Void>>() {
560             @Override
561             public ListenableFuture<Void> answer(InvocationOnMock invocation) throws Throwable {
562                 return realCohort.abort();
563             }
564         }).when(cohort).abort();
565
566         modification.addModification(new WriteModification(path, data, SCHEMA_CONTEXT));
567
568         return cohort;
569     }
570
571     @SuppressWarnings({ "unchecked" })
572     @Test
573     public void testConcurrentThreePhaseCommits() throws Throwable {
574         System.setProperty("shard.persistent", "true");
575         new ShardTestKit(getSystem()) {{
576             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
577                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
578                     "testConcurrentThreePhaseCommits");
579
580             waitUntilLeader(shard);
581
582             // Setup 3 simulated transactions with mock cohorts backed by real cohorts.
583
584             InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
585
586             String transactionID1 = "tx1";
587             MutableCompositeModification modification1 = new MutableCompositeModification();
588             DOMStoreThreePhaseCommitCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
589                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
590
591             String transactionID2 = "tx2";
592             MutableCompositeModification modification2 = new MutableCompositeModification();
593             DOMStoreThreePhaseCommitCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
594                     TestModel.OUTER_LIST_PATH,
595                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
596                     modification2);
597
598             String transactionID3 = "tx3";
599             MutableCompositeModification modification3 = new MutableCompositeModification();
600             DOMStoreThreePhaseCommitCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
601                     YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
602                         .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
603                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1),
604                     modification3);
605
606             long timeoutSec = 5;
607             final FiniteDuration duration = FiniteDuration.create(timeoutSec, TimeUnit.SECONDS);
608             final Timeout timeout = new Timeout(duration);
609
610             // Simulate the ForwardedReadyTransaction message for the first Tx that would be sent
611             // by the ShardTransaction.
612
613             shard.tell(new ForwardedReadyTransaction(transactionID1, cohort1, modification1, true), getRef());
614             ReadyTransactionReply readyReply = ReadyTransactionReply.fromSerializable(
615                     expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS));
616             assertEquals("Cohort path", shard.path().toString(), readyReply.getCohortPath());
617
618             // Send the CanCommitTransaction message for the first Tx.
619
620             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
621             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
622                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
623             assertEquals("Can commit", true, canCommitReply.getCanCommit());
624
625             // Send the ForwardedReadyTransaction for the next 2 Tx's.
626
627             shard.tell(new ForwardedReadyTransaction(transactionID2, cohort2, modification2, true), getRef());
628             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
629
630             shard.tell(new ForwardedReadyTransaction(transactionID3, cohort3, modification3, true), getRef());
631             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
632
633             // Send the CanCommitTransaction message for the next 2 Tx's. These should get queued and
634             // processed after the first Tx completes.
635
636             Future<Object> canCommitFuture1 = Patterns.ask(shard,
637                     new CanCommitTransaction(transactionID2).toSerializable(), timeout);
638
639             Future<Object> canCommitFuture2 = Patterns.ask(shard,
640                     new CanCommitTransaction(transactionID3).toSerializable(), timeout);
641
642             // Send the CommitTransaction message for the first Tx. After it completes, it should
643             // trigger the 2nd Tx to proceed which should in turn then trigger the 3rd.
644
645             shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
646             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
647
648             // Wait for the next 2 Tx's to complete.
649
650             final AtomicReference<Throwable> caughtEx = new AtomicReference<>();
651             final CountDownLatch commitLatch = new CountDownLatch(2);
652
653             class OnFutureComplete extends OnComplete<Object> {
654                 private final Class<?> expRespType;
655
656                 OnFutureComplete(Class<?> expRespType) {
657                     this.expRespType = expRespType;
658                 }
659
660                 @Override
661                 public void onComplete(Throwable error, Object resp) {
662                     if(error != null) {
663                         caughtEx.set(new AssertionError(getClass().getSimpleName() + " failure", error));
664                     } else {
665                         try {
666                             assertEquals("Commit response type", expRespType, resp.getClass());
667                             onSuccess(resp);
668                         } catch (Exception e) {
669                             caughtEx.set(e);
670                         }
671                     }
672                 }
673
674                 void onSuccess(Object resp) throws Exception {
675                 }
676             }
677
678             class OnCommitFutureComplete extends OnFutureComplete {
679                 OnCommitFutureComplete() {
680                     super(CommitTransactionReply.SERIALIZABLE_CLASS);
681                 }
682
683                 @Override
684                 public void onComplete(Throwable error, Object resp) {
685                     super.onComplete(error, resp);
686                     commitLatch.countDown();
687                 }
688             }
689
690             class OnCanCommitFutureComplete extends OnFutureComplete {
691                 private final String transactionID;
692
693                 OnCanCommitFutureComplete(String transactionID) {
694                     super(CanCommitTransactionReply.SERIALIZABLE_CLASS);
695                     this.transactionID = transactionID;
696                 }
697
698                 @Override
699                 void onSuccess(Object resp) throws Exception {
700                     CanCommitTransactionReply canCommitReply =
701                             CanCommitTransactionReply.fromSerializable(resp);
702                     assertEquals("Can commit", true, canCommitReply.getCanCommit());
703
704                     Future<Object> commitFuture = Patterns.ask(shard,
705                             new CommitTransaction(transactionID).toSerializable(), timeout);
706                     commitFuture.onComplete(new OnCommitFutureComplete(), getSystem().dispatcher());
707                 }
708             }
709
710             canCommitFuture1.onComplete(new OnCanCommitFutureComplete(transactionID2),
711                     getSystem().dispatcher());
712
713             canCommitFuture2.onComplete(new OnCanCommitFutureComplete(transactionID3),
714                     getSystem().dispatcher());
715
716             boolean done = commitLatch.await(timeoutSec, TimeUnit.SECONDS);
717
718             if(caughtEx.get() != null) {
719                 throw caughtEx.get();
720             }
721
722             assertEquals("Commits complete", true, done);
723
724             InOrder inOrder = inOrder(cohort1, cohort2, cohort3);
725             inOrder.verify(cohort1).canCommit();
726             inOrder.verify(cohort1).preCommit();
727             inOrder.verify(cohort1).commit();
728             inOrder.verify(cohort2).canCommit();
729             inOrder.verify(cohort2).preCommit();
730             inOrder.verify(cohort2).commit();
731             inOrder.verify(cohort3).canCommit();
732             inOrder.verify(cohort3).preCommit();
733             inOrder.verify(cohort3).commit();
734
735             // Verify data in the data store.
736
737             NormalizedNode<?, ?> outerList = readStore(shard, TestModel.OUTER_LIST_PATH);
738             assertNotNull(TestModel.OUTER_LIST_QNAME.getLocalName() + " not found", outerList);
739             assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " value is not Iterable",
740                     outerList.getValue() instanceof Iterable);
741             Object entry = ((Iterable<Object>)outerList.getValue()).iterator().next();
742             assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " entry is not MapEntryNode",
743                        entry instanceof MapEntryNode);
744             MapEntryNode mapEntry = (MapEntryNode)entry;
745             Optional<DataContainerChild<? extends PathArgument, ?>> idLeaf =
746                     mapEntry.getChild(new YangInstanceIdentifier.NodeIdentifier(TestModel.ID_QNAME));
747             assertTrue("Missing leaf " + TestModel.ID_QNAME.getLocalName(), idLeaf.isPresent());
748             assertEquals(TestModel.ID_QNAME.getLocalName() + " value", 1, idLeaf.get().getValue());
749
750             for(int i = 0; i < 20 * 5; i++) {
751                 long lastLogIndex = shard.underlyingActor().getShardMBean().getLastLogIndex();
752                 if(lastLogIndex == 2) {
753                     break;
754                 }
755                 Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
756             }
757
758             assertEquals("Last log index", 2, shard.underlyingActor().getShardMBean().getLastLogIndex());
759
760             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
761         }};
762     }
763
764     @Test
765     public void testCommitPhaseFailure() throws Throwable {
766         new ShardTestKit(getSystem()) {{
767             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
768                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
769                     "testCommitPhaseFailure");
770
771             waitUntilLeader(shard);
772
773             // Setup 2 simulated transactions with mock cohorts. The first one fails in the
774             // commit phase.
775
776             String transactionID1 = "tx1";
777             MutableCompositeModification modification1 = new MutableCompositeModification();
778             DOMStoreThreePhaseCommitCohort cohort1 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
779             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
780             doReturn(Futures.immediateFuture(null)).when(cohort1).preCommit();
781             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort1).commit();
782
783             String transactionID2 = "tx2";
784             MutableCompositeModification modification2 = new MutableCompositeModification();
785             DOMStoreThreePhaseCommitCohort cohort2 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort2");
786             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
787
788             FiniteDuration duration = duration("5 seconds");
789             final Timeout timeout = new Timeout(duration);
790
791             // Simulate the ForwardedReadyTransaction messages that would be sent
792             // by the ShardTransaction.
793
794             shard.tell(new ForwardedReadyTransaction(transactionID1, cohort1, modification1, true), getRef());
795             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
796
797             shard.tell(new ForwardedReadyTransaction(transactionID2, cohort2, modification2, true), getRef());
798             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
799
800             // Send the CanCommitTransaction message for the first Tx.
801
802             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
803             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
804                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
805             assertEquals("Can commit", true, canCommitReply.getCanCommit());
806
807             // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
808             // processed after the first Tx completes.
809
810             Future<Object> canCommitFuture = Patterns.ask(shard,
811                     new CanCommitTransaction(transactionID2).toSerializable(), timeout);
812
813             // Send the CommitTransaction message for the first Tx. This should send back an error
814             // and trigger the 2nd Tx to proceed.
815
816             shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
817             expectMsgClass(duration, akka.actor.Status.Failure.class);
818
819             // Wait for the 2nd Tx to complete the canCommit phase.
820
821             final CountDownLatch latch = new CountDownLatch(1);
822             canCommitFuture.onComplete(new OnComplete<Object>() {
823                 @Override
824                 public void onComplete(Throwable t, Object resp) {
825                     latch.countDown();
826                 }
827             }, getSystem().dispatcher());
828
829             assertEquals("2nd CanCommit complete", true, latch.await(5, TimeUnit.SECONDS));
830
831             InOrder inOrder = inOrder(cohort1, cohort2);
832             inOrder.verify(cohort1).canCommit();
833             inOrder.verify(cohort1).preCommit();
834             inOrder.verify(cohort1).commit();
835             inOrder.verify(cohort2).canCommit();
836
837             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
838         }};
839     }
840
841     @Test
842     public void testPreCommitPhaseFailure() throws Throwable {
843         new ShardTestKit(getSystem()) {{
844             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
845                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
846                     "testPreCommitPhaseFailure");
847
848             waitUntilLeader(shard);
849
850             String transactionID = "tx1";
851             MutableCompositeModification modification = new MutableCompositeModification();
852             DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
853             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
854             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).preCommit();
855
856             FiniteDuration duration = duration("5 seconds");
857
858             // Simulate the ForwardedReadyTransaction messages that would be sent
859             // by the ShardTransaction.
860
861             shard.tell(new ForwardedReadyTransaction(transactionID, cohort, modification, true), getRef());
862             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
863
864             // Send the CanCommitTransaction message.
865
866             shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
867             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
868                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
869             assertEquals("Can commit", true, canCommitReply.getCanCommit());
870
871             // Send the CommitTransaction message. This should send back an error
872             // for preCommit failure.
873
874             shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
875             expectMsgClass(duration, akka.actor.Status.Failure.class);
876
877             InOrder inOrder = inOrder(cohort);
878             inOrder.verify(cohort).canCommit();
879             inOrder.verify(cohort).preCommit();
880
881             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
882         }};
883     }
884
885     @Test
886     public void testCanCommitPhaseFailure() throws Throwable {
887         new ShardTestKit(getSystem()) {{
888             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
889                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
890                     "testCanCommitPhaseFailure");
891
892             waitUntilLeader(shard);
893
894             final FiniteDuration duration = duration("5 seconds");
895
896             String transactionID = "tx1";
897             MutableCompositeModification modification = new MutableCompositeModification();
898             DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
899             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).canCommit();
900
901             // Simulate the ForwardedReadyTransaction messages that would be sent
902             // by the ShardTransaction.
903
904             shard.tell(new ForwardedReadyTransaction(transactionID, cohort, modification, true), getRef());
905             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
906
907             // Send the CanCommitTransaction message.
908
909             shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
910             expectMsgClass(duration, akka.actor.Status.Failure.class);
911
912             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
913         }};
914     }
915
916     @Test
917     public void testAbortBeforeFinishCommit() throws Throwable {
918         System.setProperty("shard.persistent", "true");
919         new ShardTestKit(getSystem()) {{
920             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
921                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
922                     "testAbortBeforeFinishCommit");
923
924             waitUntilLeader(shard);
925
926             final FiniteDuration duration = duration("5 seconds");
927             final Timeout timeout = new Timeout(duration);
928
929             InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
930
931             final String transactionID = "tx1";
932             final CountDownLatch abortComplete = new CountDownLatch(1);
933             Function<DOMStoreThreePhaseCommitCohort,ListenableFuture<Void>> preCommit =
934                           new Function<DOMStoreThreePhaseCommitCohort,ListenableFuture<Void>>() {
935                 @Override
936                 public ListenableFuture<Void> apply(final DOMStoreThreePhaseCommitCohort cohort) {
937                     ListenableFuture<Void> preCommitFuture = cohort.preCommit();
938
939                     Future<Object> abortFuture = Patterns.ask(shard,
940                             new AbortTransaction(transactionID).toSerializable(), timeout);
941                     abortFuture.onComplete(new OnComplete<Object>() {
942                         @Override
943                         public void onComplete(Throwable e, Object resp) {
944                             abortComplete.countDown();
945                         }
946                     }, getSystem().dispatcher());
947
948                     return preCommitFuture;
949                 }
950             };
951
952             MutableCompositeModification modification = new MutableCompositeModification();
953             DOMStoreThreePhaseCommitCohort cohort = setupMockWriteTransaction("cohort1", dataStore,
954                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME),
955                     modification, preCommit);
956
957             shard.tell(new ForwardedReadyTransaction(transactionID, cohort, modification, true), getRef());
958             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
959
960             shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
961             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
962                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
963             assertEquals("Can commit", true, canCommitReply.getCanCommit());
964
965             Future<Object> commitFuture = Patterns.ask(shard,
966                     new CommitTransaction(transactionID).toSerializable(), timeout);
967
968             assertEquals("Abort complete", true, abortComplete.await(5, TimeUnit.SECONDS));
969
970             Await.result(commitFuture, duration);
971
972             NormalizedNode<?, ?> node = readStore(shard, TestModel.TEST_PATH);
973             assertNotNull(TestModel.TEST_QNAME.getLocalName() + " not found", node);
974
975             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
976         }};
977     }
978
979     @Test
980     public void testTransactionCommitTimeout() throws Throwable {
981         dataStoreContext = DatastoreContext.newBuilder().shardTransactionCommitTimeoutInSeconds(1).build();
982
983         new ShardTestKit(getSystem()) {{
984             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
985                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
986                     "testTransactionCommitTimeout");
987
988             waitUntilLeader(shard);
989
990             final FiniteDuration duration = duration("5 seconds");
991
992             InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
993
994             writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
995             writeToStore(shard, TestModel.OUTER_LIST_PATH,
996                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
997
998             // Create 1st Tx - will timeout
999
1000             String transactionID1 = "tx1";
1001             MutableCompositeModification modification1 = new MutableCompositeModification();
1002             DOMStoreThreePhaseCommitCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
1003                     YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
1004                         .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
1005                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1),
1006                     modification1);
1007
1008             // Create 2nd Tx
1009
1010             String transactionID2 = "tx3";
1011             MutableCompositeModification modification2 = new MutableCompositeModification();
1012             YangInstanceIdentifier listNodePath = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
1013                 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build();
1014             DOMStoreThreePhaseCommitCohort cohort2 = setupMockWriteTransaction("cohort3", dataStore,
1015                     listNodePath,
1016                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2),
1017                     modification2);
1018
1019             // Ready the Tx's
1020
1021             shard.tell(new ForwardedReadyTransaction(transactionID1, cohort1, modification1, true), getRef());
1022             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1023
1024             shard.tell(new ForwardedReadyTransaction(transactionID2, cohort2, modification2, true), getRef());
1025             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1026
1027             // canCommit 1st Tx. We don't send the commit so it should timeout.
1028
1029             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1030             expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
1031
1032             // canCommit the 2nd Tx - it should complete after the 1st Tx times out.
1033
1034             shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
1035             expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
1036
1037             // Commit the 2nd Tx.
1038
1039             shard.tell(new CommitTransaction(transactionID2).toSerializable(), getRef());
1040             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1041
1042             NormalizedNode<?, ?> node = readStore(shard, listNodePath);
1043             assertNotNull(listNodePath + " not found", node);
1044
1045             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1046         }};
1047     }
1048
1049     @Test
1050     public void testTransactionCommitQueueCapacityExceeded() throws Throwable {
1051         dataStoreContext = DatastoreContext.newBuilder().shardTransactionCommitQueueCapacity(1).build();
1052
1053         new ShardTestKit(getSystem()) {{
1054             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1055                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1056                     "testTransactionCommitQueueCapacityExceeded");
1057
1058             waitUntilLeader(shard);
1059
1060             final FiniteDuration duration = duration("5 seconds");
1061
1062             InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
1063
1064             String transactionID1 = "tx1";
1065             MutableCompositeModification modification1 = new MutableCompositeModification();
1066             DOMStoreThreePhaseCommitCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
1067                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
1068
1069             String transactionID2 = "tx2";
1070             MutableCompositeModification modification2 = new MutableCompositeModification();
1071             DOMStoreThreePhaseCommitCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
1072                     TestModel.OUTER_LIST_PATH,
1073                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
1074                     modification2);
1075
1076             String transactionID3 = "tx3";
1077             MutableCompositeModification modification3 = new MutableCompositeModification();
1078             DOMStoreThreePhaseCommitCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
1079                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification3);
1080
1081             // Ready the Tx's
1082
1083             shard.tell(new ForwardedReadyTransaction(transactionID1, cohort1, modification1, true), getRef());
1084             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1085
1086             shard.tell(new ForwardedReadyTransaction(transactionID2, cohort2, modification2, true), getRef());
1087             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1088
1089             shard.tell(new ForwardedReadyTransaction(transactionID3, cohort3, modification3, true), getRef());
1090             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1091
1092             // canCommit 1st Tx.
1093
1094             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1095             expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
1096
1097             // canCommit the 2nd Tx - it should get queued.
1098
1099             shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
1100
1101             // canCommit the 3rd Tx - should exceed queue capacity and fail.
1102
1103             shard.tell(new CanCommitTransaction(transactionID3).toSerializable(), getRef());
1104             expectMsgClass(duration, akka.actor.Status.Failure.class);
1105
1106             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1107         }};
1108     }
1109
1110     @Test
1111     public void testCanCommitBeforeReadyFailure() throws Throwable {
1112         new ShardTestKit(getSystem()) {{
1113             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1114                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1115                     "testCanCommitBeforeReadyFailure");
1116
1117             shard.tell(new CanCommitTransaction("tx").toSerializable(), getRef());
1118             expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
1119
1120             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1121         }};
1122     }
1123
1124     @Test
1125     public void testAbortTransaction() throws Throwable {
1126         new ShardTestKit(getSystem()) {{
1127             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1128                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1129                     "testAbortTransaction");
1130
1131             waitUntilLeader(shard);
1132
1133             // Setup 2 simulated transactions with mock cohorts. The first one will be aborted.
1134
1135             String transactionID1 = "tx1";
1136             MutableCompositeModification modification1 = new MutableCompositeModification();
1137             DOMStoreThreePhaseCommitCohort cohort1 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
1138             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
1139             doReturn(Futures.immediateFuture(null)).when(cohort1).abort();
1140
1141             String transactionID2 = "tx2";
1142             MutableCompositeModification modification2 = new MutableCompositeModification();
1143             DOMStoreThreePhaseCommitCohort cohort2 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort2");
1144             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
1145
1146             FiniteDuration duration = duration("5 seconds");
1147             final Timeout timeout = new Timeout(duration);
1148
1149             // Simulate the ForwardedReadyTransaction messages that would be sent
1150             // by the ShardTransaction.
1151
1152             shard.tell(new ForwardedReadyTransaction(transactionID1, cohort1, modification1, true), getRef());
1153             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1154
1155             shard.tell(new ForwardedReadyTransaction(transactionID2, cohort2, modification2, true), getRef());
1156             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1157
1158             // Send the CanCommitTransaction message for the first Tx.
1159
1160             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1161             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1162                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1163             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1164
1165             // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
1166             // processed after the first Tx completes.
1167
1168             Future<Object> canCommitFuture = Patterns.ask(shard,
1169                     new CanCommitTransaction(transactionID2).toSerializable(), timeout);
1170
1171             // Send the AbortTransaction message for the first Tx. This should trigger the 2nd
1172             // Tx to proceed.
1173
1174             shard.tell(new AbortTransaction(transactionID1).toSerializable(), getRef());
1175             expectMsgClass(duration, AbortTransactionReply.SERIALIZABLE_CLASS);
1176
1177             // Wait for the 2nd Tx to complete the canCommit phase.
1178
1179             final CountDownLatch latch = new CountDownLatch(1);
1180             canCommitFuture.onComplete(new OnComplete<Object>() {
1181                 @Override
1182                 public void onComplete(Throwable t, Object resp) {
1183                     latch.countDown();
1184                 }
1185             }, getSystem().dispatcher());
1186
1187             assertEquals("2nd CanCommit complete", true, latch.await(5, TimeUnit.SECONDS));
1188
1189             InOrder inOrder = inOrder(cohort1, cohort2);
1190             inOrder.verify(cohort1).canCommit();
1191             inOrder.verify(cohort2).canCommit();
1192
1193             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1194         }};
1195     }
1196
1197     @Test
1198     public void testCreateSnapshot() throws IOException, InterruptedException {
1199         new ShardTestKit(getSystem()) {{
1200             final AtomicReference<CountDownLatch> latch = new AtomicReference<>(new CountDownLatch(1));
1201             Creator<Shard> creator = new Creator<Shard>() {
1202                 @Override
1203                 public Shard create() throws Exception {
1204                     return new Shard(shardID, Collections.<ShardIdentifier,String>emptyMap(),
1205                             dataStoreContext, SCHEMA_CONTEXT) {
1206                         @Override
1207                         public void saveSnapshot(Object snapshot) {
1208                             super.saveSnapshot(snapshot);
1209                             latch.get().countDown();
1210                         }
1211                     };
1212                 }
1213             };
1214
1215             TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1216                     Props.create(new DelegatingShardCreator(creator)), "testCreateSnapshot");
1217
1218             waitUntilLeader(shard);
1219
1220             shard.tell(new CaptureSnapshot(-1,-1,-1,-1), getRef());
1221
1222             assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS));
1223
1224             latch.set(new CountDownLatch(1));
1225             shard.tell(new CaptureSnapshot(-1,-1,-1,-1), getRef());
1226
1227             assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS));
1228
1229             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1230         }};
1231     }
1232
1233     /**
1234      * This test simply verifies that the applySnapShot logic will work
1235      * @throws ReadFailedException
1236      */
1237     @Test
1238     public void testInMemoryDataStoreRestore() throws ReadFailedException {
1239         InMemoryDOMDataStore store = new InMemoryDOMDataStore("test", MoreExecutors.listeningDecorator(
1240             MoreExecutors.sameThreadExecutor()), MoreExecutors.sameThreadExecutor());
1241
1242         store.onGlobalContextUpdated(SCHEMA_CONTEXT);
1243
1244         DOMStoreWriteTransaction putTransaction = store.newWriteOnlyTransaction();
1245         putTransaction.write(TestModel.TEST_PATH,
1246             ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1247         commitTransaction(putTransaction);
1248
1249
1250         NormalizedNode expected = readStore(store);
1251
1252         DOMStoreWriteTransaction writeTransaction = store.newWriteOnlyTransaction();
1253
1254         writeTransaction.delete(YangInstanceIdentifier.builder().build());
1255         writeTransaction.write(YangInstanceIdentifier.builder().build(), expected);
1256
1257         commitTransaction(writeTransaction);
1258
1259         NormalizedNode actual = readStore(store);
1260
1261         assertEquals(expected, actual);
1262
1263     }
1264
1265     private NormalizedNode readStore(InMemoryDOMDataStore store) throws ReadFailedException {
1266         DOMStoreReadTransaction transaction = store.newReadOnlyTransaction();
1267         CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read =
1268             transaction.read(YangInstanceIdentifier.builder().build());
1269
1270         Optional<NormalizedNode<?, ?>> optional = read.checkedGet();
1271
1272         NormalizedNode<?, ?> normalizedNode = optional.get();
1273
1274         transaction.close();
1275
1276         return normalizedNode;
1277     }
1278
1279     private void commitTransaction(DOMStoreWriteTransaction transaction) {
1280         DOMStoreThreePhaseCommitCohort commitCohort = transaction.ready();
1281         ListenableFuture<Void> future =
1282             commitCohort.preCommit();
1283         try {
1284             future.get();
1285             future = commitCohort.commit();
1286             future.get();
1287         } catch (InterruptedException | ExecutionException e) {
1288         }
1289     }
1290
1291     private AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> noOpDataChangeListener() {
1292         return new AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>() {
1293             @Override
1294             public void onDataChanged(
1295                 AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change) {
1296
1297             }
1298         };
1299     }
1300
1301     private NormalizedNode<?,?> readStore(TestActorRef<Shard> shard, YangInstanceIdentifier id)
1302             throws ExecutionException, InterruptedException {
1303         DOMStoreReadTransaction transaction = shard.underlyingActor().getDataStore().newReadOnlyTransaction();
1304
1305         CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> future =
1306             transaction.read(id);
1307
1308         Optional<NormalizedNode<?, ?>> optional = future.get();
1309         NormalizedNode<?, ?> node = optional.isPresent()? optional.get() : null;
1310
1311         transaction.close();
1312
1313         return node;
1314     }
1315
1316     private void writeToStore(TestActorRef<Shard> shard, YangInstanceIdentifier id, NormalizedNode<?,?> node)
1317         throws ExecutionException, InterruptedException {
1318         DOMStoreWriteTransaction transaction = shard.underlyingActor().getDataStore().newWriteOnlyTransaction();
1319
1320         transaction.write(id, node);
1321
1322         DOMStoreThreePhaseCommitCohort commitCohort = transaction.ready();
1323         commitCohort.preCommit().get();
1324         commitCohort.commit().get();
1325     }
1326
1327     private static final class DelegatingShardCreator implements Creator<Shard> {
1328         private final Creator<Shard> delegate;
1329
1330         DelegatingShardCreator(Creator<Shard> delegate) {
1331             this.delegate = delegate;
1332         }
1333
1334         @Override
1335         public Shard create() throws Exception {
1336             return delegate.create();
1337         }
1338     }
1339 }