Merge "Fix checkstyle warnings"
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / ShardTest.java
1 package org.opendaylight.controller.cluster.datastore;
2
3 import akka.actor.ActorRef;
4 import akka.actor.Props;
5 import akka.dispatch.Dispatchers;
6 import akka.dispatch.OnComplete;
7 import akka.japi.Creator;
8 import akka.pattern.Patterns;
9 import akka.testkit.JavaTestKit;
10 import akka.testkit.TestActorRef;
11 import akka.util.Timeout;
12 import com.google.common.base.Function;
13 import com.google.common.base.Optional;
14 import com.google.common.util.concurrent.CheckedFuture;
15 import com.google.common.util.concurrent.Futures;
16 import com.google.common.util.concurrent.ListenableFuture;
17 import com.google.common.util.concurrent.MoreExecutors;
18 import org.junit.After;
19 import org.junit.Before;
20 import org.junit.Test;
21 import org.mockito.InOrder;
22 import org.mockito.invocation.InvocationOnMock;
23 import org.mockito.stubbing.Answer;
24 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
25 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
26 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
27 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
28 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
29 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
30 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
31 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
32 import org.opendaylight.controller.cluster.datastore.messages.EnableNotification;
33 import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
34 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
35 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
36 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
37 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
38 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
39 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
40 import org.opendaylight.controller.cluster.datastore.modification.Modification;
41 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
42 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
43 import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec;
44 import org.opendaylight.controller.cluster.datastore.utils.InMemoryJournal;
45 import org.opendaylight.controller.cluster.datastore.utils.InMemorySnapshotStore;
46 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
47 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
48 import org.opendaylight.controller.cluster.raft.Snapshot;
49 import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
50 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
51 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
52 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
53 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload;
54 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
55 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
56 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
57 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
58 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
59 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
60 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
61 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
62 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreFactory;
63 import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages;
64 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
65 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
66 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
67 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
68 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
69 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
70 import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
71 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
72 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
73 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
74 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
75 import scala.concurrent.Await;
76 import scala.concurrent.Future;
77 import scala.concurrent.duration.FiniteDuration;
78 import java.io.IOException;
79 import java.util.Collections;
80 import java.util.HashSet;
81 import java.util.Set;
82 import java.util.concurrent.CountDownLatch;
83 import java.util.concurrent.ExecutionException;
84 import java.util.concurrent.TimeUnit;
85 import java.util.concurrent.atomic.AtomicInteger;
86 import java.util.concurrent.atomic.AtomicReference;
87 import static org.junit.Assert.assertEquals;
88 import static org.junit.Assert.assertNotNull;
89 import static org.junit.Assert.assertTrue;
90 import static org.junit.Assert.fail;
91 import static org.mockito.Mockito.mock;
92 import static org.mockito.Mockito.doReturn;
93 import static org.mockito.Mockito.doAnswer;
94 import static org.mockito.Mockito.inOrder;
95
96 public class ShardTest extends AbstractActorTest {
97
98     private static final SchemaContext SCHEMA_CONTEXT = TestModel.createTestContext();
99
100     private static final ShardIdentifier IDENTIFIER = ShardIdentifier.builder().memberName("member-1")
101             .shardName("inventory").type("config").build();
102
103     private static final AtomicInteger NEXT_SHARD_NUM = new AtomicInteger();
104
105     private static String shardName() {
106         return "shard" + NEXT_SHARD_NUM.getAndIncrement();
107     }
108
109     private DatastoreContext dataStoreContext = DatastoreContext.newBuilder().
110             shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).build();
111
112     @Before
113     public void setUp() {
114         System.setProperty("shard.persistent", "false");
115
116         InMemorySnapshotStore.clear();
117         InMemoryJournal.clear();
118     }
119
120     @After
121     public void tearDown() {
122         InMemorySnapshotStore.clear();
123         InMemoryJournal.clear();
124     }
125
126     private Props newShardProps() {
127         return Shard.props(IDENTIFIER, Collections.<ShardIdentifier,String>emptyMap(),
128                 dataStoreContext, SCHEMA_CONTEXT);
129     }
130
131     @Test
132     public void testOnReceiveRegisterListener() throws Exception {
133         new JavaTestKit(getSystem()) {{
134             ActorRef subject = getSystem().actorOf(newShardProps(), "testRegisterChangeListener");
135
136             subject.tell(new UpdateSchemaContext(SchemaContextHelper.full()), getRef());
137
138             subject.tell(new RegisterChangeListener(TestModel.TEST_PATH,
139                     getRef().path(), AsyncDataBroker.DataChangeScope.BASE), getRef());
140
141             EnableNotification enable = expectMsgClass(duration("3 seconds"), EnableNotification.class);
142             assertEquals("isEnabled", false, enable.isEnabled());
143
144             RegisterChangeListenerReply reply = expectMsgClass(duration("3 seconds"),
145                     RegisterChangeListenerReply.class);
146             assertTrue(reply.getListenerRegistrationPath().toString().matches(
147                     "akka:\\/\\/test\\/user\\/testRegisterChangeListener\\/\\$.*"));
148         }};
149     }
150
151     @Test
152     public void testCreateTransaction(){
153         new ShardTestKit(getSystem()) {{
154             ActorRef subject = getSystem().actorOf(newShardProps(), "testCreateTransaction");
155
156             waitUntilLeader(subject);
157
158             subject.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
159
160             subject.tell(new CreateTransaction("txn-1",
161                     TransactionProxy.TransactionType.READ_ONLY.ordinal() ).toSerializable(), getRef());
162
163             CreateTransactionReply reply = expectMsgClass(duration("3 seconds"),
164                     CreateTransactionReply.class);
165
166             String path = reply.getTransactionActorPath().toString();
167             assertTrue("Unexpected transaction path " + path,
168                     path.contains("akka://test/user/testCreateTransaction/shard-txn-1"));
169             expectNoMsg();
170         }};
171     }
172
173     @Test
174     public void testCreateTransactionOnChain(){
175         new ShardTestKit(getSystem()) {{
176             final ActorRef subject = getSystem().actorOf(newShardProps(), "testCreateTransactionOnChain");
177
178             waitUntilLeader(subject);
179
180             subject.tell(new CreateTransaction("txn-1",
181                     TransactionProxy.TransactionType.READ_ONLY.ordinal() , "foobar").toSerializable(),
182                     getRef());
183
184             CreateTransactionReply reply = expectMsgClass(duration("3 seconds"),
185                     CreateTransactionReply.class);
186
187             String path = reply.getTransactionActorPath().toString();
188             assertTrue("Unexpected transaction path " + path,
189                     path.contains("akka://test/user/testCreateTransactionOnChain/shard-txn-1"));
190             expectNoMsg();
191         }};
192     }
193
194     @Test
195     public void testPeerAddressResolved(){
196         new JavaTestKit(getSystem()) {{
197             final ShardIdentifier identifier =
198                 ShardIdentifier.builder().memberName("member-1")
199                     .shardName("inventory").type("config").build();
200
201             Props props = Shard.props(identifier,
202                     Collections.<ShardIdentifier, String>singletonMap(identifier, null),
203                     dataStoreContext, SCHEMA_CONTEXT);
204             final ActorRef subject = getSystem().actorOf(props, "testPeerAddressResolved");
205
206             new Within(duration("3 seconds")) {
207                 @Override
208                 protected void run() {
209
210                     subject.tell(
211                         new PeerAddressResolved(identifier, "akka://foobar"),
212                         getRef());
213
214                     expectNoMsg();
215                 }
216             };
217         }};
218     }
219
220     @Test
221     public void testApplySnapshot() throws ExecutionException, InterruptedException {
222         TestActorRef<Shard> ref = TestActorRef.create(getSystem(), newShardProps());
223
224         NormalizedNodeToNodeCodec codec =
225             new NormalizedNodeToNodeCodec(SCHEMA_CONTEXT);
226
227         writeToStore(ref, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
228
229         YangInstanceIdentifier root = YangInstanceIdentifier.builder().build();
230         NormalizedNode<?,?> expected = readStore(ref, root);
231
232         NormalizedNodeMessages.Container encode = codec.encode(expected);
233
234         ApplySnapshot applySnapshot = new ApplySnapshot(Snapshot.create(
235                 encode.getNormalizedNode().toByteString().toByteArray(),
236                 Collections.<ReplicatedLogEntry>emptyList(), 1, 2, 3, 4));
237
238         ref.underlyingActor().onReceiveCommand(applySnapshot);
239
240         NormalizedNode<?,?> actual = readStore(ref, root);
241
242         assertEquals(expected, actual);
243     }
244
245     @Test
246     public void testApplyState() throws Exception {
247
248         TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps());
249
250         NormalizedNode<?, ?> node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
251
252         MutableCompositeModification compMod = new MutableCompositeModification();
253         compMod.addModification(new WriteModification(TestModel.TEST_PATH, node, SCHEMA_CONTEXT));
254         Payload payload = new CompositeModificationPayload(compMod.toSerializable());
255         ApplyState applyState = new ApplyState(null, "test",
256                 new ReplicatedLogImplEntry(1, 2, payload));
257
258         shard.underlyingActor().onReceiveCommand(applyState);
259
260         NormalizedNode<?,?> actual = readStore(shard, TestModel.TEST_PATH);
261         assertEquals("Applied state", node, actual);
262     }
263
264     @SuppressWarnings("serial")
265     @Test
266     public void testRecovery() throws Exception {
267
268         // Set up the InMemorySnapshotStore.
269
270         InMemoryDOMDataStore testStore = InMemoryDOMDataStoreFactory.create("Test", null, null);
271         testStore.onGlobalContextUpdated(SCHEMA_CONTEXT);
272
273         DOMStoreWriteTransaction writeTx = testStore.newWriteOnlyTransaction();
274         writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
275         DOMStoreThreePhaseCommitCohort commitCohort = writeTx.ready();
276         commitCohort.preCommit().get();
277         commitCohort.commit().get();
278
279         DOMStoreReadTransaction readTx = testStore.newReadOnlyTransaction();
280         NormalizedNode<?, ?> root = readTx.read(YangInstanceIdentifier.builder().build()).get().get();
281
282         InMemorySnapshotStore.addSnapshot(IDENTIFIER.toString(), Snapshot.create(
283                 new NormalizedNodeToNodeCodec(SCHEMA_CONTEXT).encode(
284                         root).
285                                 getNormalizedNode().toByteString().toByteArray(),
286                                 Collections.<ReplicatedLogEntry>emptyList(), 0, 1, -1, -1));
287
288         // Set up the InMemoryJournal.
289
290         InMemoryJournal.addEntry(IDENTIFIER.toString(), 0, new ReplicatedLogImplEntry(0, 1, newPayload(
291                   new WriteModification(TestModel.OUTER_LIST_PATH,
292                           ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
293                           SCHEMA_CONTEXT))));
294
295         int nListEntries = 11;
296         Set<Integer> listEntryKeys = new HashSet<>();
297         for(int i = 1; i <= nListEntries; i++) {
298             listEntryKeys.add(Integer.valueOf(i));
299             YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
300                     .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
301             Modification mod = new MergeModification(path,
302                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i),
303                     SCHEMA_CONTEXT);
304             InMemoryJournal.addEntry(IDENTIFIER.toString(), i, new ReplicatedLogImplEntry(i, 1,
305                     newPayload(mod)));
306         }
307
308         InMemoryJournal.addEntry(IDENTIFIER.toString(), nListEntries + 1,
309                 new ApplyLogEntries(nListEntries));
310
311         // Create the actor and wait for recovery complete.
312
313         final CountDownLatch recoveryComplete = new CountDownLatch(1);
314
315         Creator<Shard> creator = new Creator<Shard>() {
316             @Override
317             public Shard create() throws Exception {
318                 return new Shard(IDENTIFIER, Collections.<ShardIdentifier,String>emptyMap(),
319                         dataStoreContext, SCHEMA_CONTEXT) {
320                     @Override
321                     protected void onRecoveryComplete() {
322                         try {
323                             super.onRecoveryComplete();
324                         } finally {
325                             recoveryComplete.countDown();
326                         }
327                     }
328                 };
329             }
330         };
331
332         TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
333                 Props.create(new DelegatingShardCreator(creator)), "testRecovery");
334
335         assertEquals("Recovery complete", true, recoveryComplete.await(5, TimeUnit.SECONDS));
336
337         // Verify data in the data store.
338
339         NormalizedNode<?, ?> outerList = readStore(shard, TestModel.OUTER_LIST_PATH);
340         assertNotNull(TestModel.OUTER_LIST_QNAME.getLocalName() + " not found", outerList);
341         assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " value is not Iterable",
342                 outerList.getValue() instanceof Iterable);
343         for(Object entry: (Iterable<?>) outerList.getValue()) {
344             assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " entry is not MapEntryNode",
345                     entry instanceof MapEntryNode);
346             MapEntryNode mapEntry = (MapEntryNode)entry;
347             Optional<DataContainerChild<? extends PathArgument, ?>> idLeaf =
348                     mapEntry.getChild(new YangInstanceIdentifier.NodeIdentifier(TestModel.ID_QNAME));
349             assertTrue("Missing leaf " + TestModel.ID_QNAME.getLocalName(), idLeaf.isPresent());
350             Object value = idLeaf.get().getValue();
351             assertTrue("Unexpected value for leaf "+ TestModel.ID_QNAME.getLocalName() + ": " + value,
352                     listEntryKeys.remove(value));
353         }
354
355         if(!listEntryKeys.isEmpty()) {
356             fail("Missing " + TestModel.OUTER_LIST_QNAME.getLocalName() + " entries with keys: " +
357                     listEntryKeys);
358         }
359
360         assertEquals("Last log index", nListEntries,
361                 shard.underlyingActor().getShardMBean().getLastLogIndex());
362         assertEquals("Commit index", nListEntries,
363                 shard.underlyingActor().getShardMBean().getCommitIndex());
364         assertEquals("Last applied", nListEntries,
365                 shard.underlyingActor().getShardMBean().getLastApplied());
366     }
367
368     private CompositeModificationPayload newPayload(Modification... mods) {
369         MutableCompositeModification compMod = new MutableCompositeModification();
370         for(Modification mod: mods) {
371             compMod.addModification(mod);
372         }
373
374         return new CompositeModificationPayload(compMod.toSerializable());
375     }
376
377     private DOMStoreThreePhaseCommitCohort setupMockWriteTransaction(String cohortName,
378             InMemoryDOMDataStore dataStore, YangInstanceIdentifier path, NormalizedNode data,
379             MutableCompositeModification modification) {
380         return setupMockWriteTransaction(cohortName, dataStore, path, data, modification, null);
381     }
382
383     private DOMStoreThreePhaseCommitCohort setupMockWriteTransaction(String cohortName,
384             InMemoryDOMDataStore dataStore, YangInstanceIdentifier path, NormalizedNode data,
385             MutableCompositeModification modification,
386             final Function<DOMStoreThreePhaseCommitCohort,ListenableFuture<Void>> preCommit) {
387
388         DOMStoreWriteTransaction tx = dataStore.newWriteOnlyTransaction();
389         tx.write(path, data);
390         final DOMStoreThreePhaseCommitCohort realCohort = tx.ready();
391         DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, cohortName);
392
393         doAnswer(new Answer<ListenableFuture<Boolean>>() {
394             @Override
395             public ListenableFuture<Boolean> answer(InvocationOnMock invocation) {
396                 return realCohort.canCommit();
397             }
398         }).when(cohort).canCommit();
399
400         doAnswer(new Answer<ListenableFuture<Void>>() {
401             @Override
402             public ListenableFuture<Void> answer(InvocationOnMock invocation) throws Throwable {
403                 if(preCommit != null) {
404                     return preCommit.apply(realCohort);
405                 } else {
406                     return realCohort.preCommit();
407                 }
408             }
409         }).when(cohort).preCommit();
410
411         doAnswer(new Answer<ListenableFuture<Void>>() {
412             @Override
413             public ListenableFuture<Void> answer(InvocationOnMock invocation) throws Throwable {
414                 return realCohort.commit();
415             }
416         }).when(cohort).commit();
417
418         doAnswer(new Answer<ListenableFuture<Void>>() {
419             @Override
420             public ListenableFuture<Void> answer(InvocationOnMock invocation) throws Throwable {
421                 return realCohort.abort();
422             }
423         }).when(cohort).abort();
424
425         modification.addModification(new WriteModification(path, data, SCHEMA_CONTEXT));
426
427         return cohort;
428     }
429
430     @SuppressWarnings({ "unchecked" })
431     @Test
432     public void testConcurrentThreePhaseCommits() throws Throwable {
433         System.setProperty("shard.persistent", "true");
434         new ShardTestKit(getSystem()) {{
435             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
436                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), shardName());
437
438             waitUntilLeader(shard);
439
440             // Setup 3 simulated transactions with mock cohorts backed by real cohorts.
441
442             InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
443
444             String transactionID1 = "tx1";
445             MutableCompositeModification modification1 = new MutableCompositeModification();
446             DOMStoreThreePhaseCommitCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
447                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
448
449             String transactionID2 = "tx2";
450             MutableCompositeModification modification2 = new MutableCompositeModification();
451             DOMStoreThreePhaseCommitCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
452                     TestModel.OUTER_LIST_PATH,
453                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
454                     modification2);
455
456             String transactionID3 = "tx3";
457             MutableCompositeModification modification3 = new MutableCompositeModification();
458             DOMStoreThreePhaseCommitCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
459                     YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
460                         .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
461                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1),
462                     modification3);
463
464             long timeoutSec = 5;
465             final FiniteDuration duration = FiniteDuration.create(timeoutSec, TimeUnit.SECONDS);
466             final Timeout timeout = new Timeout(duration);
467
468             // Simulate the ForwardedReadyTransaction message for the first Tx that would be sent
469             // by the ShardTransaction.
470
471             shard.tell(new ForwardedReadyTransaction(transactionID1, cohort1, modification1, true), getRef());
472             ReadyTransactionReply readyReply = ReadyTransactionReply.fromSerializable(
473                     expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS));
474             assertEquals("Cohort path", shard.path().toString(), readyReply.getCohortPath());
475
476             // Send the CanCommitTransaction message for the first Tx.
477
478             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
479             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
480                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
481             assertEquals("Can commit", true, canCommitReply.getCanCommit());
482
483             // Send the ForwardedReadyTransaction for the next 2 Tx's.
484
485             shard.tell(new ForwardedReadyTransaction(transactionID2, cohort2, modification2, true), getRef());
486             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
487
488             shard.tell(new ForwardedReadyTransaction(transactionID3, cohort3, modification3, true), getRef());
489             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
490
491             // Send the CanCommitTransaction message for the next 2 Tx's. These should get queued and
492             // processed after the first Tx completes.
493
494             Future<Object> canCommitFuture1 = Patterns.ask(shard,
495                     new CanCommitTransaction(transactionID2).toSerializable(), timeout);
496
497             Future<Object> canCommitFuture2 = Patterns.ask(shard,
498                     new CanCommitTransaction(transactionID3).toSerializable(), timeout);
499
500             // Send the CommitTransaction message for the first Tx. After it completes, it should
501             // trigger the 2nd Tx to proceed which should in turn then trigger the 3rd.
502
503             shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
504             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
505
506             // Wait for the next 2 Tx's to complete.
507
508             final AtomicReference<Throwable> caughtEx = new AtomicReference<>();
509             final CountDownLatch commitLatch = new CountDownLatch(2);
510
511             class OnFutureComplete extends OnComplete<Object> {
512                 private final Class<?> expRespType;
513
514                 OnFutureComplete(Class<?> expRespType) {
515                     this.expRespType = expRespType;
516                 }
517
518                 @Override
519                 public void onComplete(Throwable error, Object resp) {
520                     if(error != null) {
521                         System.out.println(new java.util.Date()+": "+getClass().getSimpleName() + " failure: "+error);
522                         caughtEx.set(new AssertionError(getClass().getSimpleName() + " failure", error));
523                     } else {
524                         try {
525                             assertEquals("Commit response type", expRespType, resp.getClass());
526                             onSuccess(resp);
527                         } catch (Exception e) {
528                             caughtEx.set(e);
529                         }
530                     }
531                 }
532
533                 void onSuccess(Object resp) throws Exception {
534                 }
535             }
536
537             class OnCommitFutureComplete extends OnFutureComplete {
538                 OnCommitFutureComplete() {
539                     super(CommitTransactionReply.SERIALIZABLE_CLASS);
540                 }
541
542                 @Override
543                 public void onComplete(Throwable error, Object resp) {
544                     super.onComplete(error, resp);
545                     commitLatch.countDown();
546                 }
547             }
548
549             class OnCanCommitFutureComplete extends OnFutureComplete {
550                 private final String transactionID;
551
552                 OnCanCommitFutureComplete(String transactionID) {
553                     super(CanCommitTransactionReply.SERIALIZABLE_CLASS);
554                     this.transactionID = transactionID;
555                 }
556
557                 @Override
558                 void onSuccess(Object resp) throws Exception {
559                     CanCommitTransactionReply canCommitReply =
560                             CanCommitTransactionReply.fromSerializable(resp);
561                     assertEquals("Can commit", true, canCommitReply.getCanCommit());
562
563                     Future<Object> commitFuture = Patterns.ask(shard,
564                             new CommitTransaction(transactionID).toSerializable(), timeout);
565                     commitFuture.onComplete(new OnCommitFutureComplete(), getSystem().dispatcher());
566                 }
567             }
568
569             canCommitFuture1.onComplete(new OnCanCommitFutureComplete(transactionID2),
570                     getSystem().dispatcher());
571
572             canCommitFuture2.onComplete(new OnCanCommitFutureComplete(transactionID3),
573                     getSystem().dispatcher());
574
575             boolean done = commitLatch.await(timeoutSec, TimeUnit.SECONDS);
576
577             if(caughtEx.get() != null) {
578                 throw caughtEx.get();
579             }
580
581             assertEquals("Commits complete", true, done);
582
583             InOrder inOrder = inOrder(cohort1, cohort2, cohort3);
584             inOrder.verify(cohort1).canCommit();
585             inOrder.verify(cohort1).preCommit();
586             inOrder.verify(cohort1).commit();
587             inOrder.verify(cohort2).canCommit();
588             inOrder.verify(cohort2).preCommit();
589             inOrder.verify(cohort2).commit();
590             inOrder.verify(cohort3).canCommit();
591             inOrder.verify(cohort3).preCommit();
592             inOrder.verify(cohort3).commit();
593
594             // Verify data in the data store.
595
596             NormalizedNode<?, ?> outerList = readStore(shard, TestModel.OUTER_LIST_PATH);
597             assertNotNull(TestModel.OUTER_LIST_QNAME.getLocalName() + " not found", outerList);
598             assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " value is not Iterable",
599                     outerList.getValue() instanceof Iterable);
600             Object entry = ((Iterable<Object>)outerList.getValue()).iterator().next();
601             assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " entry is not MapEntryNode",
602                        entry instanceof MapEntryNode);
603             MapEntryNode mapEntry = (MapEntryNode)entry;
604             Optional<DataContainerChild<? extends PathArgument, ?>> idLeaf =
605                     mapEntry.getChild(new YangInstanceIdentifier.NodeIdentifier(TestModel.ID_QNAME));
606             assertTrue("Missing leaf " + TestModel.ID_QNAME.getLocalName(), idLeaf.isPresent());
607             assertEquals(TestModel.ID_QNAME.getLocalName() + " value", 1, idLeaf.get().getValue());
608
609             assertEquals("Last log index", 2, shard.underlyingActor().getShardMBean().getLastLogIndex());
610         }};
611     }
612
613     @Test
614     public void testCommitPhaseFailure() throws Throwable {
615         new ShardTestKit(getSystem()) {{
616             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
617                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), shardName());
618
619             waitUntilLeader(shard);
620
621             // Setup 2 simulated transactions with mock cohorts. The first one fails in the
622             // commit phase.
623
624             String transactionID1 = "tx1";
625             MutableCompositeModification modification1 = new MutableCompositeModification();
626             DOMStoreThreePhaseCommitCohort cohort1 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
627             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
628             doReturn(Futures.immediateFuture(null)).when(cohort1).preCommit();
629             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort1).commit();
630
631             String transactionID2 = "tx2";
632             MutableCompositeModification modification2 = new MutableCompositeModification();
633             DOMStoreThreePhaseCommitCohort cohort2 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort2");
634             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
635
636             FiniteDuration duration = duration("5 seconds");
637             final Timeout timeout = new Timeout(duration);
638
639             // Simulate the ForwardedReadyTransaction messages that would be sent
640             // by the ShardTransaction.
641
642             shard.tell(new ForwardedReadyTransaction(transactionID1, cohort1, modification1, true), getRef());
643             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
644
645             shard.tell(new ForwardedReadyTransaction(transactionID2, cohort2, modification2, true), getRef());
646             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
647
648             // Send the CanCommitTransaction message for the first Tx.
649
650             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
651             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
652                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
653             assertEquals("Can commit", true, canCommitReply.getCanCommit());
654
655             // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
656             // processed after the first Tx completes.
657
658             Future<Object> canCommitFuture = Patterns.ask(shard,
659                     new CanCommitTransaction(transactionID2).toSerializable(), timeout);
660
661             // Send the CommitTransaction message for the first Tx. This should send back an error
662             // and trigger the 2nd Tx to proceed.
663
664             shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
665             expectMsgClass(duration, akka.actor.Status.Failure.class);
666
667             // Wait for the 2nd Tx to complete the canCommit phase.
668
669             final CountDownLatch latch = new CountDownLatch(1);
670             canCommitFuture.onComplete(new OnComplete<Object>() {
671                 @Override
672                 public void onComplete(Throwable t, Object resp) {
673                     latch.countDown();
674                 }
675             }, getSystem().dispatcher());
676
677             assertEquals("2nd CanCommit complete", true, latch.await(5, TimeUnit.SECONDS));
678
679             InOrder inOrder = inOrder(cohort1, cohort2);
680             inOrder.verify(cohort1).canCommit();
681             inOrder.verify(cohort1).preCommit();
682             inOrder.verify(cohort1).commit();
683             inOrder.verify(cohort2).canCommit();
684         }};
685     }
686
687     @Test
688     public void testPreCommitPhaseFailure() throws Throwable {
689         new ShardTestKit(getSystem()) {{
690             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
691                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), shardName());
692
693             waitUntilLeader(shard);
694
695             String transactionID = "tx1";
696             MutableCompositeModification modification = new MutableCompositeModification();
697             DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
698             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
699             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).preCommit();
700
701             FiniteDuration duration = duration("5 seconds");
702
703             // Simulate the ForwardedReadyTransaction messages that would be sent
704             // by the ShardTransaction.
705
706             shard.tell(new ForwardedReadyTransaction(transactionID, cohort, modification, true), getRef());
707             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
708
709             // Send the CanCommitTransaction message.
710
711             shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
712             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
713                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
714             assertEquals("Can commit", true, canCommitReply.getCanCommit());
715
716             // Send the CommitTransaction message. This should send back an error
717             // for preCommit failure.
718
719             shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
720             expectMsgClass(duration, akka.actor.Status.Failure.class);
721
722             InOrder inOrder = inOrder(cohort);
723             inOrder.verify(cohort).canCommit();
724             inOrder.verify(cohort).preCommit();
725         }};
726     }
727
728     @Test
729     public void testCanCommitPhaseFailure() throws Throwable {
730         new ShardTestKit(getSystem()) {{
731             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
732                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), shardName());
733
734             waitUntilLeader(shard);
735
736             final FiniteDuration duration = duration("5 seconds");
737
738             String transactionID = "tx1";
739             MutableCompositeModification modification = new MutableCompositeModification();
740             DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
741             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).canCommit();
742
743             // Simulate the ForwardedReadyTransaction messages that would be sent
744             // by the ShardTransaction.
745
746             shard.tell(new ForwardedReadyTransaction(transactionID, cohort, modification, true), getRef());
747             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
748
749             // Send the CanCommitTransaction message.
750
751             shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
752             expectMsgClass(duration, akka.actor.Status.Failure.class);
753         }};
754     }
755
756     @Test
757     public void testAbortBeforeFinishCommit() throws Throwable {
758         System.setProperty("shard.persistent", "true");
759         new ShardTestKit(getSystem()) {{
760             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
761                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), shardName());
762
763             waitUntilLeader(shard);
764
765             final FiniteDuration duration = duration("5 seconds");
766             final Timeout timeout = new Timeout(duration);
767
768             InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
769
770             final String transactionID = "tx1";
771             final CountDownLatch abortComplete = new CountDownLatch(1);
772             Function<DOMStoreThreePhaseCommitCohort,ListenableFuture<Void>> preCommit =
773                           new Function<DOMStoreThreePhaseCommitCohort,ListenableFuture<Void>>() {
774                 @Override
775                 public ListenableFuture<Void> apply(final DOMStoreThreePhaseCommitCohort cohort) {
776                     ListenableFuture<Void> preCommitFuture = cohort.preCommit();
777
778                     Future<Object> abortFuture = Patterns.ask(shard,
779                             new AbortTransaction(transactionID).toSerializable(), timeout);
780                     abortFuture.onComplete(new OnComplete<Object>() {
781                         @Override
782                         public void onComplete(Throwable e, Object resp) {
783                             abortComplete.countDown();
784                         }
785                     }, getSystem().dispatcher());
786
787                     return preCommitFuture;
788                 }
789             };
790
791             MutableCompositeModification modification = new MutableCompositeModification();
792             DOMStoreThreePhaseCommitCohort cohort = setupMockWriteTransaction("cohort1", dataStore,
793                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME),
794                     modification, preCommit);
795
796             shard.tell(new ForwardedReadyTransaction(transactionID, cohort, modification, true), getRef());
797             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
798
799             shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
800             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
801                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
802             assertEquals("Can commit", true, canCommitReply.getCanCommit());
803
804             Future<Object> commitFuture = Patterns.ask(shard,
805                     new CommitTransaction(transactionID).toSerializable(), timeout);
806
807             assertEquals("Abort complete", true, abortComplete.await(5, TimeUnit.SECONDS));
808
809             Await.result(commitFuture, duration);
810
811             NormalizedNode<?, ?> node = readStore(shard, TestModel.TEST_PATH);
812             assertNotNull(TestModel.TEST_QNAME.getLocalName() + " not found", node);
813         }};
814     }
815
816     @Test
817     public void testTransactionCommitTimeout() throws Throwable {
818         dataStoreContext = DatastoreContext.newBuilder().shardTransactionCommitTimeoutInSeconds(1).build();
819
820         new ShardTestKit(getSystem()) {{
821             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
822                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), shardName());
823
824             waitUntilLeader(shard);
825
826             final FiniteDuration duration = duration("5 seconds");
827
828             InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
829
830             writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
831             writeToStore(shard, TestModel.OUTER_LIST_PATH,
832                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
833
834             // Create 1st Tx - will timeout
835
836             String transactionID1 = "tx1";
837             MutableCompositeModification modification1 = new MutableCompositeModification();
838             DOMStoreThreePhaseCommitCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
839                     YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
840                         .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
841                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1),
842                     modification1);
843
844             // Create 2nd Tx
845
846             String transactionID2 = "tx3";
847             MutableCompositeModification modification2 = new MutableCompositeModification();
848             YangInstanceIdentifier listNodePath = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
849                 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build();
850             DOMStoreThreePhaseCommitCohort cohort2 = setupMockWriteTransaction("cohort3", dataStore,
851                     listNodePath,
852                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2),
853                     modification2);
854
855             // Ready the Tx's
856
857             shard.tell(new ForwardedReadyTransaction(transactionID1, cohort1, modification1, true), getRef());
858             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
859
860             shard.tell(new ForwardedReadyTransaction(transactionID2, cohort2, modification2, true), getRef());
861             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
862
863             // canCommit 1st Tx. We don't send the commit so it should timeout.
864
865             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
866             expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
867
868             // canCommit the 2nd Tx - it should complete after the 1st Tx times out.
869
870             shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
871             expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
872
873             // Commit the 2nd Tx.
874
875             shard.tell(new CommitTransaction(transactionID2).toSerializable(), getRef());
876             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
877
878             NormalizedNode<?, ?> node = readStore(shard, listNodePath);
879             assertNotNull(listNodePath + " not found", node);
880         }};
881     }
882
883     @Test
884     public void testTransactionCommitQueueCapacityExceeded() throws Throwable {
885         dataStoreContext = DatastoreContext.newBuilder().shardTransactionCommitQueueCapacity(1).build();
886
887         new ShardTestKit(getSystem()) {{
888             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
889                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), shardName());
890
891             waitUntilLeader(shard);
892
893             final FiniteDuration duration = duration("5 seconds");
894
895             InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
896
897             String transactionID1 = "tx1";
898             MutableCompositeModification modification1 = new MutableCompositeModification();
899             DOMStoreThreePhaseCommitCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
900                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
901
902             String transactionID2 = "tx2";
903             MutableCompositeModification modification2 = new MutableCompositeModification();
904             DOMStoreThreePhaseCommitCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
905                     TestModel.OUTER_LIST_PATH,
906                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
907                     modification2);
908
909             String transactionID3 = "tx3";
910             MutableCompositeModification modification3 = new MutableCompositeModification();
911             DOMStoreThreePhaseCommitCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
912                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification3);
913
914             // Ready the Tx's
915
916             shard.tell(new ForwardedReadyTransaction(transactionID1, cohort1, modification1, true), getRef());
917             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
918
919             shard.tell(new ForwardedReadyTransaction(transactionID2, cohort2, modification2, true), getRef());
920             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
921
922             shard.tell(new ForwardedReadyTransaction(transactionID3, cohort3, modification3, true), getRef());
923             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
924
925             // canCommit 1st Tx.
926
927             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
928             expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
929
930             // canCommit the 2nd Tx - it should get queued.
931
932             shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
933
934             // canCommit the 3rd Tx - should exceed queue capacity and fail.
935
936             shard.tell(new CanCommitTransaction(transactionID3).toSerializable(), getRef());
937             expectMsgClass(duration, akka.actor.Status.Failure.class);
938         }};
939     }
940
941     @Test
942     public void testCanCommitBeforeReadyFailure() throws Throwable {
943         new ShardTestKit(getSystem()) {{
944             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
945                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), shardName());
946
947             shard.tell(new CanCommitTransaction("tx").toSerializable(), getRef());
948             expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
949         }};
950     }
951
952     @Test
953     public void testAbortTransaction() throws Throwable {
954         new ShardTestKit(getSystem()) {{
955             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
956                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), shardName());
957
958             waitUntilLeader(shard);
959
960             // Setup 2 simulated transactions with mock cohorts. The first one will be aborted.
961
962             String transactionID1 = "tx1";
963             MutableCompositeModification modification1 = new MutableCompositeModification();
964             DOMStoreThreePhaseCommitCohort cohort1 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
965             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
966             doReturn(Futures.immediateFuture(null)).when(cohort1).abort();
967
968             String transactionID2 = "tx2";
969             MutableCompositeModification modification2 = new MutableCompositeModification();
970             DOMStoreThreePhaseCommitCohort cohort2 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort2");
971             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
972
973             FiniteDuration duration = duration("5 seconds");
974             final Timeout timeout = new Timeout(duration);
975
976             // Simulate the ForwardedReadyTransaction messages that would be sent
977             // by the ShardTransaction.
978
979             shard.tell(new ForwardedReadyTransaction(transactionID1, cohort1, modification1, true), getRef());
980             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
981
982             shard.tell(new ForwardedReadyTransaction(transactionID2, cohort2, modification2, true), getRef());
983             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
984
985             // Send the CanCommitTransaction message for the first Tx.
986
987             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
988             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
989                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
990             assertEquals("Can commit", true, canCommitReply.getCanCommit());
991
992             // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
993             // processed after the first Tx completes.
994
995             Future<Object> canCommitFuture = Patterns.ask(shard,
996                     new CanCommitTransaction(transactionID2).toSerializable(), timeout);
997
998             // Send the AbortTransaction message for the first Tx. This should trigger the 2nd
999             // Tx to proceed.
1000
1001             shard.tell(new AbortTransaction(transactionID1).toSerializable(), getRef());
1002             expectMsgClass(duration, AbortTransactionReply.SERIALIZABLE_CLASS);
1003
1004             // Wait for the 2nd Tx to complete the canCommit phase.
1005
1006             final CountDownLatch latch = new CountDownLatch(1);
1007             canCommitFuture.onComplete(new OnComplete<Object>() {
1008                 @Override
1009                 public void onComplete(Throwable t, Object resp) {
1010                     latch.countDown();
1011                 }
1012             }, getSystem().dispatcher());
1013
1014             assertEquals("2nd CanCommit complete", true, latch.await(5, TimeUnit.SECONDS));
1015
1016             InOrder inOrder = inOrder(cohort1, cohort2);
1017             inOrder.verify(cohort1).canCommit();
1018             inOrder.verify(cohort2).canCommit();
1019         }};
1020     }
1021
1022     @Test
1023     public void testCreateSnapshot() throws IOException, InterruptedException {
1024         new ShardTestKit(getSystem()) {{
1025             final AtomicReference<CountDownLatch> latch = new AtomicReference<>(new CountDownLatch(1));
1026             Creator<Shard> creator = new Creator<Shard>() {
1027                 @Override
1028                 public Shard create() throws Exception {
1029                     return new Shard(IDENTIFIER, Collections.<ShardIdentifier,String>emptyMap(),
1030                             dataStoreContext, SCHEMA_CONTEXT) {
1031                         @Override
1032                         public void saveSnapshot(Object snapshot) {
1033                             super.saveSnapshot(snapshot);
1034                             latch.get().countDown();
1035                         }
1036                     };
1037                 }
1038             };
1039
1040             TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1041                     Props.create(new DelegatingShardCreator(creator)), "testCreateSnapshot");
1042
1043             waitUntilLeader(shard);
1044
1045             shard.tell(new CaptureSnapshot(-1,-1,-1,-1), getRef());
1046
1047             assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS));
1048
1049             latch.set(new CountDownLatch(1));
1050             shard.tell(new CaptureSnapshot(-1,-1,-1,-1), getRef());
1051
1052             assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS));
1053         }};
1054     }
1055
1056     /**
1057      * This test simply verifies that the applySnapShot logic will work
1058      * @throws ReadFailedException
1059      */
1060     @Test
1061     public void testInMemoryDataStoreRestore() throws ReadFailedException {
1062         InMemoryDOMDataStore store = new InMemoryDOMDataStore("test", MoreExecutors.listeningDecorator(
1063             MoreExecutors.sameThreadExecutor()), MoreExecutors.sameThreadExecutor());
1064
1065         store.onGlobalContextUpdated(SCHEMA_CONTEXT);
1066
1067         DOMStoreWriteTransaction putTransaction = store.newWriteOnlyTransaction();
1068         putTransaction.write(TestModel.TEST_PATH,
1069             ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1070         commitTransaction(putTransaction);
1071
1072
1073         NormalizedNode expected = readStore(store);
1074
1075         DOMStoreWriteTransaction writeTransaction = store.newWriteOnlyTransaction();
1076
1077         writeTransaction.delete(YangInstanceIdentifier.builder().build());
1078         writeTransaction.write(YangInstanceIdentifier.builder().build(), expected);
1079
1080         commitTransaction(writeTransaction);
1081
1082         NormalizedNode actual = readStore(store);
1083
1084         assertEquals(expected, actual);
1085
1086     }
1087
1088     private NormalizedNode readStore(InMemoryDOMDataStore store) throws ReadFailedException {
1089         DOMStoreReadTransaction transaction = store.newReadOnlyTransaction();
1090         CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read =
1091             transaction.read(YangInstanceIdentifier.builder().build());
1092
1093         Optional<NormalizedNode<?, ?>> optional = read.checkedGet();
1094
1095         NormalizedNode<?, ?> normalizedNode = optional.get();
1096
1097         transaction.close();
1098
1099         return normalizedNode;
1100     }
1101
1102     private void commitTransaction(DOMStoreWriteTransaction transaction) {
1103         DOMStoreThreePhaseCommitCohort commitCohort = transaction.ready();
1104         ListenableFuture<Void> future =
1105             commitCohort.preCommit();
1106         try {
1107             future.get();
1108             future = commitCohort.commit();
1109             future.get();
1110         } catch (InterruptedException | ExecutionException e) {
1111         }
1112     }
1113
1114     private AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> noOpDataChangeListener() {
1115         return new AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>() {
1116             @Override
1117             public void onDataChanged(
1118                 AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change) {
1119
1120             }
1121         };
1122     }
1123
1124     private NormalizedNode<?,?> readStore(TestActorRef<Shard> shard, YangInstanceIdentifier id)
1125             throws ExecutionException, InterruptedException {
1126         DOMStoreReadTransaction transaction = shard.underlyingActor().getDataStore().newReadOnlyTransaction();
1127
1128         CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> future =
1129             transaction.read(id);
1130
1131         Optional<NormalizedNode<?, ?>> optional = future.get();
1132         NormalizedNode<?, ?> node = optional.isPresent()? optional.get() : null;
1133
1134         transaction.close();
1135
1136         return node;
1137     }
1138
1139     private void writeToStore(TestActorRef<Shard> shard, YangInstanceIdentifier id, NormalizedNode<?,?> node)
1140         throws ExecutionException, InterruptedException {
1141         DOMStoreWriteTransaction transaction = shard.underlyingActor().getDataStore().newWriteOnlyTransaction();
1142
1143         transaction.write(id, node);
1144
1145         DOMStoreThreePhaseCommitCohort commitCohort = transaction.ready();
1146         commitCohort.preCommit().get();
1147         commitCohort.commit().get();
1148     }
1149
1150     private static final class DelegatingShardCreator implements Creator<Shard> {
1151         private final Creator<Shard> delegate;
1152
1153         DelegatingShardCreator(Creator<Shard> delegate) {
1154             this.delegate = delegate;
1155         }
1156
1157         @Override
1158         public Shard create() throws Exception {
1159             return delegate.create();
1160         }
1161     }
1162 }