Merge "Initial message bus implementation"
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / ShardTest.java
1 package org.opendaylight.controller.cluster.datastore;
2
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertFalse;
5 import static org.junit.Assert.assertNotNull;
6 import static org.junit.Assert.assertNull;
7 import static org.junit.Assert.assertTrue;
8 import static org.junit.Assert.fail;
9 import static org.mockito.Mockito.doAnswer;
10 import static org.mockito.Mockito.doReturn;
11 import static org.mockito.Mockito.inOrder;
12 import static org.mockito.Mockito.mock;
13 import static org.opendaylight.controller.cluster.datastore.DataStoreVersions.CURRENT_VERSION;
14 import akka.actor.ActorRef;
15 import akka.actor.PoisonPill;
16 import akka.actor.Props;
17 import akka.dispatch.Dispatchers;
18 import akka.dispatch.OnComplete;
19 import akka.japi.Creator;
20 import akka.japi.Procedure;
21 import akka.pattern.Patterns;
22 import akka.persistence.SnapshotSelectionCriteria;
23 import akka.testkit.TestActorRef;
24 import akka.util.Timeout;
25 import com.google.common.base.Function;
26 import com.google.common.base.Optional;
27 import com.google.common.util.concurrent.CheckedFuture;
28 import com.google.common.util.concurrent.Futures;
29 import com.google.common.util.concurrent.ListenableFuture;
30 import com.google.common.util.concurrent.MoreExecutors;
31 import com.google.common.util.concurrent.Uninterruptibles;
32 import java.io.IOException;
33 import java.util.Collections;
34 import java.util.HashSet;
35 import java.util.Map;
36 import java.util.Set;
37 import java.util.concurrent.CountDownLatch;
38 import java.util.concurrent.ExecutionException;
39 import java.util.concurrent.TimeUnit;
40 import java.util.concurrent.atomic.AtomicInteger;
41 import java.util.concurrent.atomic.AtomicReference;
42 import org.junit.After;
43 import org.junit.Before;
44 import org.junit.Test;
45 import org.mockito.InOrder;
46 import org.mockito.invocation.InvocationOnMock;
47 import org.mockito.stubbing.Answer;
48 import org.opendaylight.controller.cluster.DataPersistenceProvider;
49 import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
50 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
51 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
52 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
53 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
54 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
55 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
56 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
57 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
58 import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
59 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
60 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
61 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
62 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
63 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
64 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
65 import org.opendaylight.controller.cluster.datastore.modification.Modification;
66 import org.opendaylight.controller.cluster.datastore.modification.ModificationPayload;
67 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
68 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
69 import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec;
70 import org.opendaylight.controller.cluster.datastore.utils.InMemoryJournal;
71 import org.opendaylight.controller.cluster.datastore.utils.InMemorySnapshotStore;
72 import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener;
73 import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
74 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
75 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
76 import org.opendaylight.controller.cluster.raft.Snapshot;
77 import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
78 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
79 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
80 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
81 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
82 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
83 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
84 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload;
85 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload;
86 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
87 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
88 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
89 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
90 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
91 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
92 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
93 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreFactory;
94 import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages;
95 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
96 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
97 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
98 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
99 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
100 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
101 import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
102 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
103 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
104 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
105 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
106 import scala.concurrent.Await;
107 import scala.concurrent.Future;
108 import scala.concurrent.duration.FiniteDuration;
109
110 public class ShardTest extends AbstractActorTest {
111
112     private static final SchemaContext SCHEMA_CONTEXT = TestModel.createTestContext();
113
114     private static final AtomicInteger NEXT_SHARD_NUM = new AtomicInteger();
115
116     private final ShardIdentifier shardID = ShardIdentifier.builder().memberName("member-1")
117             .shardName("inventory").type("config" + NEXT_SHARD_NUM.getAndIncrement()).build();
118
119     private final Builder dataStoreContextBuilder = DatastoreContext.newBuilder().
120             shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).
121             shardHeartbeatIntervalInMillis(100);
122
123     @Before
124     public void setUp() {
125         Builder newBuilder = DatastoreContext.newBuilder();
126         InMemorySnapshotStore.clear();
127         InMemoryJournal.clear();
128     }
129
130     @After
131     public void tearDown() {
132         InMemorySnapshotStore.clear();
133         InMemoryJournal.clear();
134     }
135
136     private DatastoreContext newDatastoreContext() {
137         return dataStoreContextBuilder.build();
138     }
139
140     private Props newShardProps() {
141         return Shard.props(shardID, Collections.<ShardIdentifier,String>emptyMap(),
142                 newDatastoreContext(), SCHEMA_CONTEXT);
143     }
144
145     @Test
146     public void testRegisterChangeListener() throws Exception {
147         new ShardTestKit(getSystem()) {{
148             TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
149                     newShardProps(),  "testRegisterChangeListener");
150
151             waitUntilLeader(shard);
152
153             shard.tell(new UpdateSchemaContext(SchemaContextHelper.full()), ActorRef.noSender());
154
155             MockDataChangeListener listener = new MockDataChangeListener(1);
156             ActorRef dclActor = getSystem().actorOf(DataChangeListener.props(listener),
157                     "testRegisterChangeListener-DataChangeListener");
158
159             shard.tell(new RegisterChangeListener(TestModel.TEST_PATH,
160                     dclActor.path(), AsyncDataBroker.DataChangeScope.BASE), getRef());
161
162             RegisterChangeListenerReply reply = expectMsgClass(duration("3 seconds"),
163                     RegisterChangeListenerReply.class);
164             String replyPath = reply.getListenerRegistrationPath().toString();
165             assertTrue("Incorrect reply path: " + replyPath, replyPath.matches(
166                     "akka:\\/\\/test\\/user\\/testRegisterChangeListener\\/\\$.*"));
167
168             YangInstanceIdentifier path = TestModel.TEST_PATH;
169             writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
170
171             listener.waitForChangeEvents(path);
172
173             dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
174             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
175         }};
176     }
177
178     @SuppressWarnings("serial")
179     @Test
180     public void testChangeListenerNotifiedWhenNotTheLeaderOnRegistration() throws Exception {
181         // This test tests the timing window in which a change listener is registered before the
182         // shard becomes the leader. We verify that the listener is registered and notified of the
183         // existing data when the shard becomes the leader.
184         new ShardTestKit(getSystem()) {{
185             // For this test, we want to send the RegisterChangeListener message after the shard
186             // has recovered from persistence and before it becomes the leader. So we subclass
187             // Shard to override onReceiveCommand and, when the first ElectionTimeout is received,
188             // we know that the shard has been initialized to a follower and has started the
189             // election process. The following 2 CountDownLatches are used to coordinate the
190             // ElectionTimeout with the sending of the RegisterChangeListener message.
191             final CountDownLatch onFirstElectionTimeout = new CountDownLatch(1);
192             final CountDownLatch onChangeListenerRegistered = new CountDownLatch(1);
193             Creator<Shard> creator = new Creator<Shard>() {
194                 boolean firstElectionTimeout = true;
195
196                 @Override
197                 public Shard create() throws Exception {
198                     return new Shard(shardID, Collections.<ShardIdentifier,String>emptyMap(),
199                             newDatastoreContext(), SCHEMA_CONTEXT) {
200                         @Override
201                         public void onReceiveCommand(final Object message) throws Exception {
202                             if(message instanceof ElectionTimeout && firstElectionTimeout) {
203                                 // Got the first ElectionTimeout. We don't forward it to the
204                                 // base Shard yet until we've sent the RegisterChangeListener
205                                 // message. So we signal the onFirstElectionTimeout latch to tell
206                                 // the main thread to send the RegisterChangeListener message and
207                                 // start a thread to wait on the onChangeListenerRegistered latch,
208                                 // which the main thread signals after it has sent the message.
209                                 // After the onChangeListenerRegistered is triggered, we send the
210                                 // original ElectionTimeout message to proceed with the election.
211                                 firstElectionTimeout = false;
212                                 final ActorRef self = getSelf();
213                                 new Thread() {
214                                     @Override
215                                     public void run() {
216                                         Uninterruptibles.awaitUninterruptibly(
217                                                 onChangeListenerRegistered, 5, TimeUnit.SECONDS);
218                                         self.tell(message, self);
219                                     }
220                                 }.start();
221
222                                 onFirstElectionTimeout.countDown();
223                             } else {
224                                 super.onReceiveCommand(message);
225                             }
226                         }
227                     };
228                 }
229             };
230
231             MockDataChangeListener listener = new MockDataChangeListener(1);
232             ActorRef dclActor = getSystem().actorOf(DataChangeListener.props(listener),
233                     "testRegisterChangeListenerWhenNotLeaderInitially-DataChangeListener");
234
235             TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
236                     Props.create(new DelegatingShardCreator(creator)),
237                     "testRegisterChangeListenerWhenNotLeaderInitially");
238
239             // Write initial data into the in-memory store.
240             YangInstanceIdentifier path = TestModel.TEST_PATH;
241             writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
242
243             // Wait until the shard receives the first ElectionTimeout message.
244             assertEquals("Got first ElectionTimeout", true,
245                     onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
246
247             // Now send the RegisterChangeListener and wait for the reply.
248             shard.tell(new RegisterChangeListener(path, dclActor.path(),
249                     AsyncDataBroker.DataChangeScope.SUBTREE), getRef());
250
251             RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
252                     RegisterChangeListenerReply.class);
253             assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
254
255             // Sanity check - verify the shard is not the leader yet.
256             shard.tell(new FindLeader(), getRef());
257             FindLeaderReply findLeadeReply =
258                     expectMsgClass(duration("5 seconds"), FindLeaderReply.class);
259             assertNull("Expected the shard not to be the leader", findLeadeReply.getLeaderActor());
260
261             // Signal the onChangeListenerRegistered latch to tell the thread above to proceed
262             // with the election process.
263             onChangeListenerRegistered.countDown();
264
265             // Wait for the shard to become the leader and notify our listener with the existing
266             // data in the store.
267             listener.waitForChangeEvents(path);
268
269             dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
270             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
271         }};
272     }
273
274     @Test
275     public void testCreateTransaction(){
276         new ShardTestKit(getSystem()) {{
277             ActorRef shard = getSystem().actorOf(newShardProps(), "testCreateTransaction");
278
279             waitUntilLeader(shard);
280
281             shard.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
282
283             shard.tell(new CreateTransaction("txn-1",
284                     TransactionProxy.TransactionType.READ_ONLY.ordinal() ).toSerializable(), getRef());
285
286             CreateTransactionReply reply = expectMsgClass(duration("3 seconds"),
287                     CreateTransactionReply.class);
288
289             String path = reply.getTransactionActorPath().toString();
290             assertTrue("Unexpected transaction path " + path,
291                     path.contains("akka://test/user/testCreateTransaction/shard-txn-1"));
292
293             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
294         }};
295     }
296
297     @Test
298     public void testCreateTransactionOnChain(){
299         new ShardTestKit(getSystem()) {{
300             final ActorRef shard = getSystem().actorOf(newShardProps(), "testCreateTransactionOnChain");
301
302             waitUntilLeader(shard);
303
304             shard.tell(new CreateTransaction("txn-1",
305                     TransactionProxy.TransactionType.READ_ONLY.ordinal() , "foobar").toSerializable(),
306                     getRef());
307
308             CreateTransactionReply reply = expectMsgClass(duration("3 seconds"),
309                     CreateTransactionReply.class);
310
311             String path = reply.getTransactionActorPath().toString();
312             assertTrue("Unexpected transaction path " + path,
313                     path.contains("akka://test/user/testCreateTransactionOnChain/shard-txn-1"));
314
315             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
316         }};
317     }
318
319     @SuppressWarnings("serial")
320     @Test
321     public void testPeerAddressResolved() throws Exception {
322         new ShardTestKit(getSystem()) {{
323             final CountDownLatch recoveryComplete = new CountDownLatch(1);
324             class TestShard extends Shard {
325                 TestShard() {
326                     super(shardID, Collections.<ShardIdentifier, String>singletonMap(shardID, null),
327                             newDatastoreContext(), SCHEMA_CONTEXT);
328                 }
329
330                 Map<String, String> getPeerAddresses() {
331                     return getRaftActorContext().getPeerAddresses();
332                 }
333
334                 @Override
335                 protected void onRecoveryComplete() {
336                     try {
337                         super.onRecoveryComplete();
338                     } finally {
339                         recoveryComplete.countDown();
340                     }
341                 }
342             }
343
344             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
345                     Props.create(new DelegatingShardCreator(new Creator<Shard>() {
346                         @Override
347                         public TestShard create() throws Exception {
348                             return new TestShard();
349                         }
350                     })), "testPeerAddressResolved");
351
352             //waitUntilLeader(shard);
353             assertEquals("Recovery complete", true,
354                     Uninterruptibles.awaitUninterruptibly(recoveryComplete, 5, TimeUnit.SECONDS));
355
356             String address = "akka://foobar";
357             shard.underlyingActor().onReceiveCommand(new PeerAddressResolved(shardID, address));
358
359             assertEquals("getPeerAddresses", address,
360                     ((TestShard)shard.underlyingActor()).getPeerAddresses().get(shardID.toString()));
361
362             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
363         }};
364     }
365
366     @Test
367     public void testApplySnapshot() throws Exception {
368         TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(),
369                 "testApplySnapshot");
370
371         InMemoryDOMDataStore store = new InMemoryDOMDataStore("OPER", MoreExecutors.sameThreadExecutor());
372         store.onGlobalContextUpdated(SCHEMA_CONTEXT);
373
374         writeToStore(store, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
375
376         YangInstanceIdentifier root = YangInstanceIdentifier.builder().build();
377         NormalizedNode<?,?> expected = readStore(store, root);
378
379         ApplySnapshot applySnapshot = new ApplySnapshot(Snapshot.create(
380                 SerializationUtils.serializeNormalizedNode(expected),
381                 Collections.<ReplicatedLogEntry>emptyList(), 1, 2, 3, 4));
382
383         shard.underlyingActor().onReceiveCommand(applySnapshot);
384
385         NormalizedNode<?,?> actual = readStore(shard, root);
386
387         assertEquals("Root node", expected, actual);
388
389         shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
390     }
391
392     @Test
393     public void testApplyHelium2VersionSnapshot() throws Exception {
394         TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(),
395                 "testApplySnapshot");
396
397         NormalizedNodeToNodeCodec codec = new NormalizedNodeToNodeCodec(SCHEMA_CONTEXT);
398
399         InMemoryDOMDataStore store = new InMemoryDOMDataStore("OPER", MoreExecutors.sameThreadExecutor());
400         store.onGlobalContextUpdated(SCHEMA_CONTEXT);
401
402         writeToStore(store, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
403
404         YangInstanceIdentifier root = YangInstanceIdentifier.builder().build();
405         NormalizedNode<?,?> expected = readStore(store, root);
406
407         NormalizedNodeMessages.Container encode = codec.encode(expected);
408
409         ApplySnapshot applySnapshot = new ApplySnapshot(Snapshot.create(
410                 encode.getNormalizedNode().toByteString().toByteArray(),
411                 Collections.<ReplicatedLogEntry>emptyList(), 1, 2, 3, 4));
412
413         shard.underlyingActor().onReceiveCommand(applySnapshot);
414
415         NormalizedNode<?,?> actual = readStore(shard, root);
416
417         assertEquals("Root node", expected, actual);
418
419         shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
420     }
421
422     @Test
423     public void testApplyState() throws Exception {
424
425         TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(), "testApplyState");
426
427         NormalizedNode<?, ?> node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
428
429         ApplyState applyState = new ApplyState(null, "test", new ReplicatedLogImplEntry(1, 2,
430                 newModificationPayload(new WriteModification(TestModel.TEST_PATH, node))));
431
432         shard.underlyingActor().onReceiveCommand(applyState);
433
434         NormalizedNode<?,?> actual = readStore(shard, TestModel.TEST_PATH);
435         assertEquals("Applied state", node, actual);
436
437         shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
438     }
439
440     @Test
441     public void testApplyStateLegacy() throws Exception {
442
443         TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(), "testApplyStateLegacy");
444
445         NormalizedNode<?, ?> node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
446
447         ApplyState applyState = new ApplyState(null, "test", new ReplicatedLogImplEntry(1, 2,
448                 newLegacyByteStringPayload(new WriteModification(TestModel.TEST_PATH, node))));
449
450         shard.underlyingActor().onReceiveCommand(applyState);
451
452         NormalizedNode<?,?> actual = readStore(shard, TestModel.TEST_PATH);
453         assertEquals("Applied state", node, actual);
454
455         shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
456     }
457
458     @Test
459     public void testRecovery() throws Exception {
460
461         // Set up the InMemorySnapshotStore.
462
463         InMemoryDOMDataStore testStore = InMemoryDOMDataStoreFactory.create("Test", null, null);
464         testStore.onGlobalContextUpdated(SCHEMA_CONTEXT);
465
466         writeToStore(testStore, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
467
468         NormalizedNode<?, ?> root = readStore(testStore, YangInstanceIdentifier.builder().build());
469
470         InMemorySnapshotStore.addSnapshot(shardID.toString(), Snapshot.create(
471                 SerializationUtils.serializeNormalizedNode(root),
472                 Collections.<ReplicatedLogEntry>emptyList(), 0, 1, -1, -1));
473
474         // Set up the InMemoryJournal.
475
476         InMemoryJournal.addEntry(shardID.toString(), 0, new ReplicatedLogImplEntry(0, 1, newLegacyPayload(
477                   new WriteModification(TestModel.OUTER_LIST_PATH,
478                           ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build()))));
479
480         int nListEntries = 16;
481         Set<Integer> listEntryKeys = new HashSet<>();
482
483         // Add some ModificationPayload entries
484         for(int i = 1; i <= nListEntries; i++) {
485             listEntryKeys.add(Integer.valueOf(i));
486             YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
487                     .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
488             Modification mod = new MergeModification(path,
489                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i));
490             InMemoryJournal.addEntry(shardID.toString(), i, new ReplicatedLogImplEntry(i, 1,
491                     newModificationPayload(mod)));
492         }
493
494         InMemoryJournal.addEntry(shardID.toString(), nListEntries + 1,
495                 new ApplyLogEntries(nListEntries));
496
497         testRecovery(listEntryKeys);
498     }
499
500     @Test
501     public void testHelium2VersionRecovery() throws Exception {
502
503         // Set up the InMemorySnapshotStore.
504
505         InMemoryDOMDataStore testStore = InMemoryDOMDataStoreFactory.create("Test", null, null);
506         testStore.onGlobalContextUpdated(SCHEMA_CONTEXT);
507
508         writeToStore(testStore, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
509
510         NormalizedNode<?, ?> root = readStore(testStore, YangInstanceIdentifier.builder().build());
511
512         InMemorySnapshotStore.addSnapshot(shardID.toString(), Snapshot.create(
513                 new NormalizedNodeToNodeCodec(SCHEMA_CONTEXT).encode(root).
514                                 getNormalizedNode().toByteString().toByteArray(),
515                                 Collections.<ReplicatedLogEntry>emptyList(), 0, 1, -1, -1));
516
517         // Set up the InMemoryJournal.
518
519         InMemoryJournal.addEntry(shardID.toString(), 0, new ReplicatedLogImplEntry(0, 1, newLegacyPayload(
520                   new WriteModification(TestModel.OUTER_LIST_PATH,
521                           ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build()))));
522
523         int nListEntries = 16;
524         Set<Integer> listEntryKeys = new HashSet<>();
525         int i = 1;
526
527         // Add some CompositeModificationPayload entries
528         for(; i <= 8; i++) {
529             listEntryKeys.add(Integer.valueOf(i));
530             YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
531                     .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
532             Modification mod = new MergeModification(path,
533                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i));
534             InMemoryJournal.addEntry(shardID.toString(), i, new ReplicatedLogImplEntry(i, 1,
535                     newLegacyPayload(mod)));
536         }
537
538         // Add some CompositeModificationByteStringPayload entries
539         for(; i <= nListEntries; i++) {
540             listEntryKeys.add(Integer.valueOf(i));
541             YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
542                     .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
543             Modification mod = new MergeModification(path,
544                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i));
545             InMemoryJournal.addEntry(shardID.toString(), i, new ReplicatedLogImplEntry(i, 1,
546                     newLegacyByteStringPayload(mod)));
547         }
548
549         InMemoryJournal.addEntry(shardID.toString(), nListEntries + 1, new ApplyLogEntries(nListEntries));
550
551         testRecovery(listEntryKeys);
552     }
553
554     private void testRecovery(Set<Integer> listEntryKeys) throws Exception {
555         // Create the actor and wait for recovery complete.
556
557         int nListEntries = listEntryKeys.size();
558
559         final CountDownLatch recoveryComplete = new CountDownLatch(1);
560
561         @SuppressWarnings("serial")
562         Creator<Shard> creator = new Creator<Shard>() {
563             @Override
564             public Shard create() throws Exception {
565                 return new Shard(shardID, Collections.<ShardIdentifier,String>emptyMap(),
566                         newDatastoreContext(), SCHEMA_CONTEXT) {
567                     @Override
568                     protected void onRecoveryComplete() {
569                         try {
570                             super.onRecoveryComplete();
571                         } finally {
572                             recoveryComplete.countDown();
573                         }
574                     }
575                 };
576             }
577         };
578
579         TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
580                 Props.create(new DelegatingShardCreator(creator)), "testRecovery");
581
582         assertEquals("Recovery complete", true, recoveryComplete.await(5, TimeUnit.SECONDS));
583
584         // Verify data in the data store.
585
586         NormalizedNode<?, ?> outerList = readStore(shard, TestModel.OUTER_LIST_PATH);
587         assertNotNull(TestModel.OUTER_LIST_QNAME.getLocalName() + " not found", outerList);
588         assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " value is not Iterable",
589                 outerList.getValue() instanceof Iterable);
590         for(Object entry: (Iterable<?>) outerList.getValue()) {
591             assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " entry is not MapEntryNode",
592                     entry instanceof MapEntryNode);
593             MapEntryNode mapEntry = (MapEntryNode)entry;
594             Optional<DataContainerChild<? extends PathArgument, ?>> idLeaf =
595                     mapEntry.getChild(new YangInstanceIdentifier.NodeIdentifier(TestModel.ID_QNAME));
596             assertTrue("Missing leaf " + TestModel.ID_QNAME.getLocalName(), idLeaf.isPresent());
597             Object value = idLeaf.get().getValue();
598             assertTrue("Unexpected value for leaf "+ TestModel.ID_QNAME.getLocalName() + ": " + value,
599                     listEntryKeys.remove(value));
600         }
601
602         if(!listEntryKeys.isEmpty()) {
603             fail("Missing " + TestModel.OUTER_LIST_QNAME.getLocalName() + " entries with keys: " +
604                     listEntryKeys);
605         }
606
607         assertEquals("Last log index", nListEntries,
608                 shard.underlyingActor().getShardMBean().getLastLogIndex());
609         assertEquals("Commit index", nListEntries,
610                 shard.underlyingActor().getShardMBean().getCommitIndex());
611         assertEquals("Last applied", nListEntries,
612                 shard.underlyingActor().getShardMBean().getLastApplied());
613
614         shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
615     }
616
617     private CompositeModificationPayload newLegacyPayload(final Modification... mods) {
618         MutableCompositeModification compMod = new MutableCompositeModification();
619         for(Modification mod: mods) {
620             compMod.addModification(mod);
621         }
622
623         return new CompositeModificationPayload(compMod.toSerializable());
624     }
625
626     private CompositeModificationByteStringPayload newLegacyByteStringPayload(final Modification... mods) {
627         MutableCompositeModification compMod = new MutableCompositeModification();
628         for(Modification mod: mods) {
629             compMod.addModification(mod);
630         }
631
632         return new CompositeModificationByteStringPayload(compMod.toSerializable());
633     }
634
635     private ModificationPayload newModificationPayload(final Modification... mods) throws IOException {
636         MutableCompositeModification compMod = new MutableCompositeModification();
637         for(Modification mod: mods) {
638             compMod.addModification(mod);
639         }
640
641         return new ModificationPayload(compMod);
642     }
643
644     private DOMStoreThreePhaseCommitCohort setupMockWriteTransaction(final String cohortName,
645             final InMemoryDOMDataStore dataStore, final YangInstanceIdentifier path, final NormalizedNode<?, ?> data,
646             final MutableCompositeModification modification) {
647         return setupMockWriteTransaction(cohortName, dataStore, path, data, modification, null);
648     }
649
650     private DOMStoreThreePhaseCommitCohort setupMockWriteTransaction(final String cohortName,
651             final InMemoryDOMDataStore dataStore, final YangInstanceIdentifier path, final NormalizedNode<?, ?> data,
652             final MutableCompositeModification modification,
653             final Function<DOMStoreThreePhaseCommitCohort,ListenableFuture<Void>> preCommit) {
654
655         DOMStoreWriteTransaction tx = dataStore.newWriteOnlyTransaction();
656         tx.write(path, data);
657         final DOMStoreThreePhaseCommitCohort realCohort = tx.ready();
658         DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, cohortName);
659
660         doAnswer(new Answer<ListenableFuture<Boolean>>() {
661             @Override
662             public ListenableFuture<Boolean> answer(final InvocationOnMock invocation) {
663                 return realCohort.canCommit();
664             }
665         }).when(cohort).canCommit();
666
667         doAnswer(new Answer<ListenableFuture<Void>>() {
668             @Override
669             public ListenableFuture<Void> answer(final InvocationOnMock invocation) throws Throwable {
670                 if(preCommit != null) {
671                     return preCommit.apply(realCohort);
672                 } else {
673                     return realCohort.preCommit();
674                 }
675             }
676         }).when(cohort).preCommit();
677
678         doAnswer(new Answer<ListenableFuture<Void>>() {
679             @Override
680             public ListenableFuture<Void> answer(final InvocationOnMock invocation) throws Throwable {
681                 return realCohort.commit();
682             }
683         }).when(cohort).commit();
684
685         doAnswer(new Answer<ListenableFuture<Void>>() {
686             @Override
687             public ListenableFuture<Void> answer(final InvocationOnMock invocation) throws Throwable {
688                 return realCohort.abort();
689             }
690         }).when(cohort).abort();
691
692         modification.addModification(new WriteModification(path, data));
693
694         return cohort;
695     }
696
697     @SuppressWarnings({ "unchecked" })
698     @Test
699     public void testConcurrentThreePhaseCommits() throws Throwable {
700         new ShardTestKit(getSystem()) {{
701             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
702                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
703                     "testConcurrentThreePhaseCommits");
704
705             waitUntilLeader(shard);
706
707             // Setup 3 simulated transactions with mock cohorts backed by real cohorts.
708
709             InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
710
711             String transactionID1 = "tx1";
712             MutableCompositeModification modification1 = new MutableCompositeModification();
713             DOMStoreThreePhaseCommitCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
714                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
715
716             String transactionID2 = "tx2";
717             MutableCompositeModification modification2 = new MutableCompositeModification();
718             DOMStoreThreePhaseCommitCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
719                     TestModel.OUTER_LIST_PATH,
720                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
721                     modification2);
722
723             String transactionID3 = "tx3";
724             MutableCompositeModification modification3 = new MutableCompositeModification();
725             DOMStoreThreePhaseCommitCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
726                     YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
727                         .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
728                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1),
729                     modification3);
730
731             long timeoutSec = 5;
732             final FiniteDuration duration = FiniteDuration.create(timeoutSec, TimeUnit.SECONDS);
733             final Timeout timeout = new Timeout(duration);
734
735             // Simulate the ForwardedReadyTransaction message for the first Tx that would be sent
736             // by the ShardTransaction.
737
738             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
739                     cohort1, modification1, true), getRef());
740             ReadyTransactionReply readyReply = ReadyTransactionReply.fromSerializable(
741                     expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS));
742             assertEquals("Cohort path", shard.path().toString(), readyReply.getCohortPath());
743
744             // Send the CanCommitTransaction message for the first Tx.
745
746             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
747             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
748                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
749             assertEquals("Can commit", true, canCommitReply.getCanCommit());
750
751             // Send the ForwardedReadyTransaction for the next 2 Tx's.
752
753             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
754                     cohort2, modification2, true), getRef());
755             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
756
757             shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
758                     cohort3, modification3, true), getRef());
759             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
760
761             // Send the CanCommitTransaction message for the next 2 Tx's. These should get queued and
762             // processed after the first Tx completes.
763
764             Future<Object> canCommitFuture1 = Patterns.ask(shard,
765                     new CanCommitTransaction(transactionID2).toSerializable(), timeout);
766
767             Future<Object> canCommitFuture2 = Patterns.ask(shard,
768                     new CanCommitTransaction(transactionID3).toSerializable(), timeout);
769
770             // Send the CommitTransaction message for the first Tx. After it completes, it should
771             // trigger the 2nd Tx to proceed which should in turn then trigger the 3rd.
772
773             shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
774             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
775
776             // Wait for the next 2 Tx's to complete.
777
778             final AtomicReference<Throwable> caughtEx = new AtomicReference<>();
779             final CountDownLatch commitLatch = new CountDownLatch(2);
780
781             class OnFutureComplete extends OnComplete<Object> {
782                 private final Class<?> expRespType;
783
784                 OnFutureComplete(final Class<?> expRespType) {
785                     this.expRespType = expRespType;
786                 }
787
788                 @Override
789                 public void onComplete(final Throwable error, final Object resp) {
790                     if(error != null) {
791                         caughtEx.set(new AssertionError(getClass().getSimpleName() + " failure", error));
792                     } else {
793                         try {
794                             assertEquals("Commit response type", expRespType, resp.getClass());
795                             onSuccess(resp);
796                         } catch (Exception e) {
797                             caughtEx.set(e);
798                         }
799                     }
800                 }
801
802                 void onSuccess(final Object resp) throws Exception {
803                 }
804             }
805
806             class OnCommitFutureComplete extends OnFutureComplete {
807                 OnCommitFutureComplete() {
808                     super(CommitTransactionReply.SERIALIZABLE_CLASS);
809                 }
810
811                 @Override
812                 public void onComplete(final Throwable error, final Object resp) {
813                     super.onComplete(error, resp);
814                     commitLatch.countDown();
815                 }
816             }
817
818             class OnCanCommitFutureComplete extends OnFutureComplete {
819                 private final String transactionID;
820
821                 OnCanCommitFutureComplete(final String transactionID) {
822                     super(CanCommitTransactionReply.SERIALIZABLE_CLASS);
823                     this.transactionID = transactionID;
824                 }
825
826                 @Override
827                 void onSuccess(final Object resp) throws Exception {
828                     CanCommitTransactionReply canCommitReply =
829                             CanCommitTransactionReply.fromSerializable(resp);
830                     assertEquals("Can commit", true, canCommitReply.getCanCommit());
831
832                     Future<Object> commitFuture = Patterns.ask(shard,
833                             new CommitTransaction(transactionID).toSerializable(), timeout);
834                     commitFuture.onComplete(new OnCommitFutureComplete(), getSystem().dispatcher());
835                 }
836             }
837
838             canCommitFuture1.onComplete(new OnCanCommitFutureComplete(transactionID2),
839                     getSystem().dispatcher());
840
841             canCommitFuture2.onComplete(new OnCanCommitFutureComplete(transactionID3),
842                     getSystem().dispatcher());
843
844             boolean done = commitLatch.await(timeoutSec, TimeUnit.SECONDS);
845
846             if(caughtEx.get() != null) {
847                 throw caughtEx.get();
848             }
849
850             assertEquals("Commits complete", true, done);
851
852             InOrder inOrder = inOrder(cohort1, cohort2, cohort3);
853             inOrder.verify(cohort1).canCommit();
854             inOrder.verify(cohort1).preCommit();
855             inOrder.verify(cohort1).commit();
856             inOrder.verify(cohort2).canCommit();
857             inOrder.verify(cohort2).preCommit();
858             inOrder.verify(cohort2).commit();
859             inOrder.verify(cohort3).canCommit();
860             inOrder.verify(cohort3).preCommit();
861             inOrder.verify(cohort3).commit();
862
863             // Verify data in the data store.
864
865             NormalizedNode<?, ?> outerList = readStore(shard, TestModel.OUTER_LIST_PATH);
866             assertNotNull(TestModel.OUTER_LIST_QNAME.getLocalName() + " not found", outerList);
867             assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " value is not Iterable",
868                     outerList.getValue() instanceof Iterable);
869             Object entry = ((Iterable<Object>)outerList.getValue()).iterator().next();
870             assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " entry is not MapEntryNode",
871                        entry instanceof MapEntryNode);
872             MapEntryNode mapEntry = (MapEntryNode)entry;
873             Optional<DataContainerChild<? extends PathArgument, ?>> idLeaf =
874                     mapEntry.getChild(new YangInstanceIdentifier.NodeIdentifier(TestModel.ID_QNAME));
875             assertTrue("Missing leaf " + TestModel.ID_QNAME.getLocalName(), idLeaf.isPresent());
876             assertEquals(TestModel.ID_QNAME.getLocalName() + " value", 1, idLeaf.get().getValue());
877
878             verifyLastLogIndex(shard, 2);
879
880             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
881         }};
882     }
883
884     private void verifyLastLogIndex(TestActorRef<Shard> shard, long expectedValue) {
885         for(int i = 0; i < 20 * 5; i++) {
886             long lastLogIndex = shard.underlyingActor().getShardMBean().getLastLogIndex();
887             if(lastLogIndex == expectedValue) {
888                 break;
889             }
890             Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
891         }
892
893         assertEquals("Last log index", expectedValue, shard.underlyingActor().getShardMBean().getLastLogIndex());
894     }
895
896     @Test
897     public void testCommitWithPersistenceDisabled() throws Throwable {
898         dataStoreContextBuilder.persistent(false);
899         new ShardTestKit(getSystem()) {{
900             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
901                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
902                     "testCommitPhaseFailure");
903
904             waitUntilLeader(shard);
905
906             InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
907
908             // Setup a simulated transactions with a mock cohort.
909
910             String transactionID = "tx";
911             MutableCompositeModification modification = new MutableCompositeModification();
912             NormalizedNode<?, ?> containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
913             DOMStoreThreePhaseCommitCohort cohort = setupMockWriteTransaction("cohort", dataStore,
914                     TestModel.TEST_PATH, containerNode, modification);
915
916             FiniteDuration duration = duration("5 seconds");
917
918             // Simulate the ForwardedReadyTransaction messages that would be sent
919             // by the ShardTransaction.
920
921             shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
922                     cohort, modification, true), getRef());
923             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
924
925             // Send the CanCommitTransaction message.
926
927             shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
928             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
929                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
930             assertEquals("Can commit", true, canCommitReply.getCanCommit());
931
932             // Send the CanCommitTransaction message.
933
934             shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
935             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
936
937             InOrder inOrder = inOrder(cohort);
938             inOrder.verify(cohort).canCommit();
939             inOrder.verify(cohort).preCommit();
940             inOrder.verify(cohort).commit();
941
942             NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.TEST_PATH);
943             assertEquals(TestModel.TEST_QNAME.getLocalName(), containerNode, actualNode);
944
945             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
946         }};
947     }
948
949     @Test
950     public void testCommitPhaseFailure() throws Throwable {
951         new ShardTestKit(getSystem()) {{
952             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
953                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
954                     "testCommitPhaseFailure");
955
956             waitUntilLeader(shard);
957
958             // Setup 2 simulated transactions with mock cohorts. The first one fails in the
959             // commit phase.
960
961             String transactionID1 = "tx1";
962             MutableCompositeModification modification1 = new MutableCompositeModification();
963             DOMStoreThreePhaseCommitCohort cohort1 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
964             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
965             doReturn(Futures.immediateFuture(null)).when(cohort1).preCommit();
966             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort1).commit();
967
968             String transactionID2 = "tx2";
969             MutableCompositeModification modification2 = new MutableCompositeModification();
970             DOMStoreThreePhaseCommitCohort cohort2 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort2");
971             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
972
973             FiniteDuration duration = duration("5 seconds");
974             final Timeout timeout = new Timeout(duration);
975
976             // Simulate the ForwardedReadyTransaction messages that would be sent
977             // by the ShardTransaction.
978
979             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
980                     cohort1, modification1, true), getRef());
981             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
982
983             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
984                     cohort2, modification2, true), getRef());
985             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
986
987             // Send the CanCommitTransaction message for the first Tx.
988
989             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
990             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
991                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
992             assertEquals("Can commit", true, canCommitReply.getCanCommit());
993
994             // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
995             // processed after the first Tx completes.
996
997             Future<Object> canCommitFuture = Patterns.ask(shard,
998                     new CanCommitTransaction(transactionID2).toSerializable(), timeout);
999
1000             // Send the CommitTransaction message for the first Tx. This should send back an error
1001             // and trigger the 2nd Tx to proceed.
1002
1003             shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
1004             expectMsgClass(duration, akka.actor.Status.Failure.class);
1005
1006             // Wait for the 2nd Tx to complete the canCommit phase.
1007
1008             final CountDownLatch latch = new CountDownLatch(1);
1009             canCommitFuture.onComplete(new OnComplete<Object>() {
1010                 @Override
1011                 public void onComplete(final Throwable t, final Object resp) {
1012                     latch.countDown();
1013                 }
1014             }, getSystem().dispatcher());
1015
1016             assertEquals("2nd CanCommit complete", true, latch.await(5, TimeUnit.SECONDS));
1017
1018             InOrder inOrder = inOrder(cohort1, cohort2);
1019             inOrder.verify(cohort1).canCommit();
1020             inOrder.verify(cohort1).preCommit();
1021             inOrder.verify(cohort1).commit();
1022             inOrder.verify(cohort2).canCommit();
1023
1024             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1025         }};
1026     }
1027
1028     @Test
1029     public void testPreCommitPhaseFailure() throws Throwable {
1030         new ShardTestKit(getSystem()) {{
1031             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1032                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1033                     "testPreCommitPhaseFailure");
1034
1035             waitUntilLeader(shard);
1036
1037             String transactionID = "tx1";
1038             MutableCompositeModification modification = new MutableCompositeModification();
1039             DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
1040             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1041             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).preCommit();
1042
1043             FiniteDuration duration = duration("5 seconds");
1044
1045             // Simulate the ForwardedReadyTransaction messages that would be sent
1046             // by the ShardTransaction.
1047
1048             shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
1049                     cohort, modification, true), getRef());
1050             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1051
1052             // Send the CanCommitTransaction message.
1053
1054             shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
1055             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1056                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1057             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1058
1059             // Send the CommitTransaction message. This should send back an error
1060             // for preCommit failure.
1061
1062             shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
1063             expectMsgClass(duration, akka.actor.Status.Failure.class);
1064
1065             InOrder inOrder = inOrder(cohort);
1066             inOrder.verify(cohort).canCommit();
1067             inOrder.verify(cohort).preCommit();
1068
1069             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1070         }};
1071     }
1072
1073     @Test
1074     public void testCanCommitPhaseFailure() throws Throwable {
1075         new ShardTestKit(getSystem()) {{
1076             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1077                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1078                     "testCanCommitPhaseFailure");
1079
1080             waitUntilLeader(shard);
1081
1082             final FiniteDuration duration = duration("5 seconds");
1083
1084             String transactionID = "tx1";
1085             MutableCompositeModification modification = new MutableCompositeModification();
1086             DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
1087             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).canCommit();
1088
1089             // Simulate the ForwardedReadyTransaction messages that would be sent
1090             // by the ShardTransaction.
1091
1092             shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
1093                     cohort, modification, true), getRef());
1094             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1095
1096             // Send the CanCommitTransaction message.
1097
1098             shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
1099             expectMsgClass(duration, akka.actor.Status.Failure.class);
1100
1101             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1102         }};
1103     }
1104
1105     @Test
1106     public void testAbortBeforeFinishCommit() throws Throwable {
1107         new ShardTestKit(getSystem()) {{
1108             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1109                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1110                     "testAbortBeforeFinishCommit");
1111
1112             waitUntilLeader(shard);
1113
1114             final FiniteDuration duration = duration("5 seconds");
1115             InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
1116
1117             final String transactionID = "tx1";
1118             Function<DOMStoreThreePhaseCommitCohort,ListenableFuture<Void>> preCommit =
1119                           new Function<DOMStoreThreePhaseCommitCohort,ListenableFuture<Void>>() {
1120                 @Override
1121                 public ListenableFuture<Void> apply(final DOMStoreThreePhaseCommitCohort cohort) {
1122                     ListenableFuture<Void> preCommitFuture = cohort.preCommit();
1123
1124                     // Simulate an AbortTransaction message occurring during replication, after
1125                     // persisting and before finishing the commit to the in-memory store.
1126                     // We have no followers so due to optimizations in the RaftActor, it does not
1127                     // attempt replication and thus we can't send an AbortTransaction message b/c
1128                     // it would be processed too late after CommitTransaction completes. So we'll
1129                     // simulate an AbortTransaction message occurring during replication by calling
1130                     // the shard directly.
1131                     //
1132                     shard.underlyingActor().doAbortTransaction(transactionID, null);
1133
1134                     return preCommitFuture;
1135                 }
1136             };
1137
1138             MutableCompositeModification modification = new MutableCompositeModification();
1139             DOMStoreThreePhaseCommitCohort cohort = setupMockWriteTransaction("cohort1", dataStore,
1140                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME),
1141                     modification, preCommit);
1142
1143             shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
1144                     cohort, modification, true), getRef());
1145             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1146
1147             shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
1148             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1149                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1150             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1151
1152             shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
1153             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1154
1155             NormalizedNode<?, ?> node = readStore(shard, TestModel.TEST_PATH);
1156
1157             // Since we're simulating an abort occurring during replication and before finish commit,
1158             // the data should still get written to the in-memory store since we've gotten past
1159             // canCommit and preCommit and persisted the data.
1160             assertNotNull(TestModel.TEST_QNAME.getLocalName() + " not found", node);
1161
1162             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1163         }};
1164     }
1165
1166     @Test
1167     public void testTransactionCommitTimeout() throws Throwable {
1168         dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1);
1169
1170         new ShardTestKit(getSystem()) {{
1171             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1172                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1173                     "testTransactionCommitTimeout");
1174
1175             waitUntilLeader(shard);
1176
1177             final FiniteDuration duration = duration("5 seconds");
1178
1179             InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
1180
1181             writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1182             writeToStore(shard, TestModel.OUTER_LIST_PATH,
1183                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
1184
1185             // Create 1st Tx - will timeout
1186
1187             String transactionID1 = "tx1";
1188             MutableCompositeModification modification1 = new MutableCompositeModification();
1189             DOMStoreThreePhaseCommitCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
1190                     YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
1191                         .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
1192                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1),
1193                     modification1);
1194
1195             // Create 2nd Tx
1196
1197             String transactionID2 = "tx3";
1198             MutableCompositeModification modification2 = new MutableCompositeModification();
1199             YangInstanceIdentifier listNodePath = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
1200                 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build();
1201             DOMStoreThreePhaseCommitCohort cohort2 = setupMockWriteTransaction("cohort3", dataStore,
1202                     listNodePath,
1203                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2),
1204                     modification2);
1205
1206             // Ready the Tx's
1207
1208             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1209                     cohort1, modification1, true), getRef());
1210             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1211
1212             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1213                     cohort2, modification2, true), getRef());
1214             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1215
1216             // canCommit 1st Tx. We don't send the commit so it should timeout.
1217
1218             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1219             expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
1220
1221             // canCommit the 2nd Tx - it should complete after the 1st Tx times out.
1222
1223             shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
1224             expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
1225
1226             // Commit the 2nd Tx.
1227
1228             shard.tell(new CommitTransaction(transactionID2).toSerializable(), getRef());
1229             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1230
1231             NormalizedNode<?, ?> node = readStore(shard, listNodePath);
1232             assertNotNull(listNodePath + " not found", node);
1233
1234             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1235         }};
1236     }
1237
1238     @Test
1239     public void testTransactionCommitQueueCapacityExceeded() throws Throwable {
1240         dataStoreContextBuilder.shardTransactionCommitQueueCapacity(1);
1241
1242         new ShardTestKit(getSystem()) {{
1243             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1244                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1245                     "testTransactionCommitQueueCapacityExceeded");
1246
1247             waitUntilLeader(shard);
1248
1249             final FiniteDuration duration = duration("5 seconds");
1250
1251             InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
1252
1253             String transactionID1 = "tx1";
1254             MutableCompositeModification modification1 = new MutableCompositeModification();
1255             DOMStoreThreePhaseCommitCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
1256                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
1257
1258             String transactionID2 = "tx2";
1259             MutableCompositeModification modification2 = new MutableCompositeModification();
1260             DOMStoreThreePhaseCommitCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
1261                     TestModel.OUTER_LIST_PATH,
1262                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
1263                     modification2);
1264
1265             String transactionID3 = "tx3";
1266             MutableCompositeModification modification3 = new MutableCompositeModification();
1267             DOMStoreThreePhaseCommitCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
1268                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification3);
1269
1270             // Ready the Tx's
1271
1272             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1273                     cohort1, modification1, true), getRef());
1274             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1275
1276             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1277                     cohort2, modification2, true), getRef());
1278             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1279
1280             shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
1281                     cohort3, modification3, true), getRef());
1282             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1283
1284             // canCommit 1st Tx.
1285
1286             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1287             expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
1288
1289             // canCommit the 2nd Tx - it should get queued.
1290
1291             shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
1292
1293             // canCommit the 3rd Tx - should exceed queue capacity and fail.
1294
1295             shard.tell(new CanCommitTransaction(transactionID3).toSerializable(), getRef());
1296             expectMsgClass(duration, akka.actor.Status.Failure.class);
1297
1298             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1299         }};
1300     }
1301
1302     @Test
1303     public void testCanCommitBeforeReadyFailure() throws Throwable {
1304         new ShardTestKit(getSystem()) {{
1305             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1306                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1307                     "testCanCommitBeforeReadyFailure");
1308
1309             shard.tell(new CanCommitTransaction("tx").toSerializable(), getRef());
1310             expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
1311
1312             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1313         }};
1314     }
1315
1316     @Test
1317     public void testAbortTransaction() throws Throwable {
1318         new ShardTestKit(getSystem()) {{
1319             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1320                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1321                     "testAbortTransaction");
1322
1323             waitUntilLeader(shard);
1324
1325             // Setup 2 simulated transactions with mock cohorts. The first one will be aborted.
1326
1327             String transactionID1 = "tx1";
1328             MutableCompositeModification modification1 = new MutableCompositeModification();
1329             DOMStoreThreePhaseCommitCohort cohort1 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
1330             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
1331             doReturn(Futures.immediateFuture(null)).when(cohort1).abort();
1332
1333             String transactionID2 = "tx2";
1334             MutableCompositeModification modification2 = new MutableCompositeModification();
1335             DOMStoreThreePhaseCommitCohort cohort2 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort2");
1336             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
1337
1338             FiniteDuration duration = duration("5 seconds");
1339             final Timeout timeout = new Timeout(duration);
1340
1341             // Simulate the ForwardedReadyTransaction messages that would be sent
1342             // by the ShardTransaction.
1343
1344             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1345                     cohort1, modification1, true), getRef());
1346             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1347
1348             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1349                     cohort2, modification2, true), getRef());
1350             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1351
1352             // Send the CanCommitTransaction message for the first Tx.
1353
1354             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1355             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1356                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1357             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1358
1359             // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
1360             // processed after the first Tx completes.
1361
1362             Future<Object> canCommitFuture = Patterns.ask(shard,
1363                     new CanCommitTransaction(transactionID2).toSerializable(), timeout);
1364
1365             // Send the AbortTransaction message for the first Tx. This should trigger the 2nd
1366             // Tx to proceed.
1367
1368             shard.tell(new AbortTransaction(transactionID1).toSerializable(), getRef());
1369             expectMsgClass(duration, AbortTransactionReply.SERIALIZABLE_CLASS);
1370
1371             // Wait for the 2nd Tx to complete the canCommit phase.
1372
1373             Await.ready(canCommitFuture, duration);
1374
1375             InOrder inOrder = inOrder(cohort1, cohort2);
1376             inOrder.verify(cohort1).canCommit();
1377             inOrder.verify(cohort2).canCommit();
1378
1379             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1380         }};
1381     }
1382
1383     @Test
1384     public void testCreateSnapshot() throws Exception {
1385         testCreateSnapshot(true, "testCreateSnapshot");
1386     }
1387
1388     @Test
1389     public void testCreateSnapshotWithNonPersistentData() throws Exception {
1390         testCreateSnapshot(false, "testCreateSnapshotWithNonPersistentData");
1391     }
1392
1393     @SuppressWarnings("serial")
1394     public void testCreateSnapshot(final boolean persistent, final String shardActorName) throws Exception{
1395
1396         final AtomicReference<Object> savedSnapshot = new AtomicReference<>();
1397         class DelegatingPersistentDataProvider implements DataPersistenceProvider {
1398             DataPersistenceProvider delegate;
1399
1400             DelegatingPersistentDataProvider(DataPersistenceProvider delegate) {
1401                 this.delegate = delegate;
1402             }
1403
1404             @Override
1405             public boolean isRecoveryApplicable() {
1406                 return delegate.isRecoveryApplicable();
1407             }
1408
1409             @Override
1410             public <T> void persist(T o, Procedure<T> procedure) {
1411                 delegate.persist(o, procedure);
1412             }
1413
1414             @Override
1415             public void saveSnapshot(Object o) {
1416                 savedSnapshot.set(o);
1417                 delegate.saveSnapshot(o);
1418             }
1419
1420             @Override
1421             public void deleteSnapshots(SnapshotSelectionCriteria criteria) {
1422                 delegate.deleteSnapshots(criteria);
1423             }
1424
1425             @Override
1426             public void deleteMessages(long sequenceNumber) {
1427                 delegate.deleteMessages(sequenceNumber);
1428             }
1429         }
1430
1431         dataStoreContextBuilder.persistent(persistent);
1432
1433         new ShardTestKit(getSystem()) {{
1434             final AtomicReference<CountDownLatch> latch = new AtomicReference<>(new CountDownLatch(1));
1435             Creator<Shard> creator = new Creator<Shard>() {
1436                 @Override
1437                 public Shard create() throws Exception {
1438                     return new Shard(shardID, Collections.<ShardIdentifier,String>emptyMap(),
1439                             newDatastoreContext(), SCHEMA_CONTEXT) {
1440
1441                         DelegatingPersistentDataProvider delegating;
1442
1443                         @Override
1444                         protected DataPersistenceProvider persistence() {
1445                             if(delegating == null) {
1446                                 delegating = new DelegatingPersistentDataProvider(super.persistence());
1447                             }
1448
1449                             return delegating;
1450                         }
1451
1452                         @Override
1453                         protected void commitSnapshot(final long sequenceNumber) {
1454                             super.commitSnapshot(sequenceNumber);
1455                             latch.get().countDown();
1456                         }
1457                     };
1458                 }
1459             };
1460
1461             TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1462                     Props.create(new DelegatingShardCreator(creator)), shardActorName);
1463
1464             waitUntilLeader(shard);
1465
1466             writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1467
1468             NormalizedNode<?,?> expectedRoot = readStore(shard, YangInstanceIdentifier.builder().build());
1469
1470             CaptureSnapshot capture = new CaptureSnapshot(-1, -1, -1, -1, -1, -1);
1471             shard.tell(capture, getRef());
1472
1473             assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS));
1474
1475             assertTrue("Invalid saved snapshot " + savedSnapshot.get(),
1476                     savedSnapshot.get() instanceof Snapshot);
1477
1478             verifySnapshot((Snapshot)savedSnapshot.get(), expectedRoot);
1479
1480             latch.set(new CountDownLatch(1));
1481             savedSnapshot.set(null);
1482
1483             shard.tell(capture, getRef());
1484
1485             assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS));
1486
1487             assertTrue("Invalid saved snapshot " + savedSnapshot.get(),
1488                     savedSnapshot.get() instanceof Snapshot);
1489
1490             verifySnapshot((Snapshot)savedSnapshot.get(), expectedRoot);
1491
1492             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1493         }
1494
1495         private void verifySnapshot(Snapshot snapshot, NormalizedNode<?,?> expectedRoot) {
1496
1497             NormalizedNode<?, ?> actual = SerializationUtils.deserializeNormalizedNode(snapshot.getState());
1498             assertEquals("Root node", expectedRoot, actual);
1499
1500         }};
1501     }
1502
1503     /**
1504      * This test simply verifies that the applySnapShot logic will work
1505      * @throws ReadFailedException
1506      */
1507     @Test
1508     public void testInMemoryDataStoreRestore() throws ReadFailedException {
1509         InMemoryDOMDataStore store = new InMemoryDOMDataStore("test", MoreExecutors.sameThreadExecutor());
1510
1511         store.onGlobalContextUpdated(SCHEMA_CONTEXT);
1512
1513         DOMStoreWriteTransaction putTransaction = store.newWriteOnlyTransaction();
1514         putTransaction.write(TestModel.TEST_PATH,
1515             ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1516         commitTransaction(putTransaction);
1517
1518
1519         NormalizedNode<?, ?> expected = readStore(store);
1520
1521         DOMStoreWriteTransaction writeTransaction = store.newWriteOnlyTransaction();
1522
1523         writeTransaction.delete(YangInstanceIdentifier.builder().build());
1524         writeTransaction.write(YangInstanceIdentifier.builder().build(), expected);
1525
1526         commitTransaction(writeTransaction);
1527
1528         NormalizedNode<?, ?> actual = readStore(store);
1529
1530         assertEquals(expected, actual);
1531     }
1532
1533     @Test
1534     public void testRecoveryApplicable(){
1535
1536         final DatastoreContext persistentContext = DatastoreContext.newBuilder().
1537                 shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(true).build();
1538
1539         final Props persistentProps = Shard.props(shardID, Collections.<ShardIdentifier, String>emptyMap(),
1540                 persistentContext, SCHEMA_CONTEXT);
1541
1542         final DatastoreContext nonPersistentContext = DatastoreContext.newBuilder().
1543                 shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(false).build();
1544
1545         final Props nonPersistentProps = Shard.props(shardID, Collections.<ShardIdentifier, String>emptyMap(),
1546                 nonPersistentContext, SCHEMA_CONTEXT);
1547
1548         new ShardTestKit(getSystem()) {{
1549             TestActorRef<Shard> shard1 = TestActorRef.create(getSystem(),
1550                     persistentProps, "testPersistence1");
1551
1552             assertTrue("Recovery Applicable", shard1.underlyingActor().getDataPersistenceProvider().isRecoveryApplicable());
1553
1554             shard1.tell(PoisonPill.getInstance(), ActorRef.noSender());
1555
1556             TestActorRef<Shard> shard2 = TestActorRef.create(getSystem(),
1557                     nonPersistentProps, "testPersistence2");
1558
1559             assertFalse("Recovery Not Applicable", shard2.underlyingActor().getDataPersistenceProvider().isRecoveryApplicable());
1560
1561             shard2.tell(PoisonPill.getInstance(), ActorRef.noSender());
1562
1563         }};
1564
1565     }
1566
1567
1568     private NormalizedNode<?, ?> readStore(final InMemoryDOMDataStore store) throws ReadFailedException {
1569         DOMStoreReadTransaction transaction = store.newReadOnlyTransaction();
1570         CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read =
1571             transaction.read(YangInstanceIdentifier.builder().build());
1572
1573         Optional<NormalizedNode<?, ?>> optional = read.checkedGet();
1574
1575         NormalizedNode<?, ?> normalizedNode = optional.get();
1576
1577         transaction.close();
1578
1579         return normalizedNode;
1580     }
1581
1582     private void commitTransaction(final DOMStoreWriteTransaction transaction) {
1583         DOMStoreThreePhaseCommitCohort commitCohort = transaction.ready();
1584         ListenableFuture<Void> future =
1585             commitCohort.preCommit();
1586         try {
1587             future.get();
1588             future = commitCohort.commit();
1589             future.get();
1590         } catch (InterruptedException | ExecutionException e) {
1591         }
1592     }
1593
1594     private AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> noOpDataChangeListener() {
1595         return new AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>() {
1596             @Override
1597             public void onDataChanged(
1598                 final AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change) {
1599
1600             }
1601         };
1602     }
1603
1604     static NormalizedNode<?,?> readStore(final TestActorRef<Shard> shard, final YangInstanceIdentifier id)
1605             throws ExecutionException, InterruptedException {
1606         return readStore(shard.underlyingActor().getDataStore(), id);
1607     }
1608
1609     public static NormalizedNode<?,?> readStore(final InMemoryDOMDataStore store, final YangInstanceIdentifier id)
1610             throws ExecutionException, InterruptedException {
1611         DOMStoreReadTransaction transaction = store.newReadOnlyTransaction();
1612
1613         CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> future =
1614             transaction.read(id);
1615
1616         Optional<NormalizedNode<?, ?>> optional = future.get();
1617         NormalizedNode<?, ?> node = optional.isPresent()? optional.get() : null;
1618
1619         transaction.close();
1620
1621         return node;
1622     }
1623
1624     static void writeToStore(final TestActorRef<Shard> shard, final YangInstanceIdentifier id,
1625             final NormalizedNode<?,?> node) throws ExecutionException, InterruptedException {
1626         writeToStore(shard.underlyingActor().getDataStore(), id, node);
1627     }
1628
1629     public static void writeToStore(final InMemoryDOMDataStore store, final YangInstanceIdentifier id,
1630             final NormalizedNode<?,?> node) throws ExecutionException, InterruptedException {
1631         DOMStoreWriteTransaction transaction = store.newWriteOnlyTransaction();
1632
1633         transaction.write(id, node);
1634
1635         DOMStoreThreePhaseCommitCohort commitCohort = transaction.ready();
1636         commitCohort.preCommit().get();
1637         commitCohort.commit().get();
1638     }
1639
1640     @SuppressWarnings("serial")
1641     private static final class DelegatingShardCreator implements Creator<Shard> {
1642         private final Creator<Shard> delegate;
1643
1644         DelegatingShardCreator(final Creator<Shard> delegate) {
1645             this.delegate = delegate;
1646         }
1647
1648         @Override
1649         public Shard create() throws Exception {
1650             return delegate.create();
1651         }
1652     }
1653 }