Merge "Switch opendaylight-karaf-empty to using karaf-parent"
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / ShardTest.java
1 package org.opendaylight.controller.cluster.datastore;
2
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertFalse;
5 import static org.junit.Assert.assertNotNull;
6 import static org.junit.Assert.assertNull;
7 import static org.junit.Assert.assertTrue;
8 import static org.junit.Assert.fail;
9 import static org.mockito.Mockito.doAnswer;
10 import static org.mockito.Mockito.doReturn;
11 import static org.mockito.Mockito.inOrder;
12 import static org.mockito.Mockito.mock;
13 import static org.opendaylight.controller.cluster.datastore.DataStoreVersions.CURRENT_VERSION;
14 import akka.actor.ActorRef;
15 import akka.actor.PoisonPill;
16 import akka.actor.Props;
17 import akka.dispatch.Dispatchers;
18 import akka.dispatch.OnComplete;
19 import akka.japi.Creator;
20 import akka.japi.Procedure;
21 import akka.pattern.Patterns;
22 import akka.persistence.SnapshotSelectionCriteria;
23 import akka.testkit.TestActorRef;
24 import akka.util.Timeout;
25 import com.google.common.base.Function;
26 import com.google.common.base.Optional;
27 import com.google.common.util.concurrent.CheckedFuture;
28 import com.google.common.util.concurrent.Futures;
29 import com.google.common.util.concurrent.ListenableFuture;
30 import com.google.common.util.concurrent.MoreExecutors;
31 import com.google.common.util.concurrent.Uninterruptibles;
32 import java.io.IOException;
33 import java.util.Collections;
34 import java.util.HashSet;
35 import java.util.Map;
36 import java.util.Set;
37 import java.util.concurrent.CountDownLatch;
38 import java.util.concurrent.ExecutionException;
39 import java.util.concurrent.TimeUnit;
40 import java.util.concurrent.atomic.AtomicInteger;
41 import java.util.concurrent.atomic.AtomicReference;
42 import org.junit.After;
43 import org.junit.Before;
44 import org.junit.Test;
45 import org.mockito.InOrder;
46 import org.mockito.invocation.InvocationOnMock;
47 import org.mockito.stubbing.Answer;
48 import org.opendaylight.controller.cluster.DataPersistenceProvider;
49 import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
50 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
51 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
52 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
53 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
54 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
55 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
56 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
57 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
58 import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
59 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
60 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
61 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
62 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
63 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
64 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
65 import org.opendaylight.controller.cluster.datastore.modification.Modification;
66 import org.opendaylight.controller.cluster.datastore.modification.ModificationPayload;
67 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
68 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
69 import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec;
70 import org.opendaylight.controller.cluster.datastore.utils.InMemoryJournal;
71 import org.opendaylight.controller.cluster.datastore.utils.InMemorySnapshotStore;
72 import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener;
73 import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
74 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
75 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
76 import org.opendaylight.controller.cluster.raft.Snapshot;
77 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
78 import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
79 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
80 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
81 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
82 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
83 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
84 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
85 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload;
86 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload;
87 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
88 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
89 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
90 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
91 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
92 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
93 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
94 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreFactory;
95 import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages;
96 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
97 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
98 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
99 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
100 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
101 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
102 import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
103 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
104 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
105 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
106 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
107 import scala.concurrent.Await;
108 import scala.concurrent.Future;
109 import scala.concurrent.duration.FiniteDuration;
110
111 public class ShardTest extends AbstractActorTest {
112
113     private static final SchemaContext SCHEMA_CONTEXT = TestModel.createTestContext();
114
115     private static final AtomicInteger NEXT_SHARD_NUM = new AtomicInteger();
116
117     private final ShardIdentifier shardID = ShardIdentifier.builder().memberName("member-1")
118             .shardName("inventory").type("config" + NEXT_SHARD_NUM.getAndIncrement()).build();
119
120     private final Builder dataStoreContextBuilder = DatastoreContext.newBuilder().
121             shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).
122             shardHeartbeatIntervalInMillis(100);
123
124     @Before
125     public void setUp() {
126         Builder newBuilder = DatastoreContext.newBuilder();
127         InMemorySnapshotStore.clear();
128         InMemoryJournal.clear();
129     }
130
131     @After
132     public void tearDown() {
133         InMemorySnapshotStore.clear();
134         InMemoryJournal.clear();
135     }
136
137     private DatastoreContext newDatastoreContext() {
138         return dataStoreContextBuilder.build();
139     }
140
141     private Props newShardProps() {
142         return Shard.props(shardID, Collections.<ShardIdentifier,String>emptyMap(),
143                 newDatastoreContext(), SCHEMA_CONTEXT);
144     }
145
146     @Test
147     public void testRegisterChangeListener() throws Exception {
148         new ShardTestKit(getSystem()) {{
149             TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
150                     newShardProps(),  "testRegisterChangeListener");
151
152             waitUntilLeader(shard);
153
154             shard.tell(new UpdateSchemaContext(SchemaContextHelper.full()), ActorRef.noSender());
155
156             MockDataChangeListener listener = new MockDataChangeListener(1);
157             ActorRef dclActor = getSystem().actorOf(DataChangeListener.props(listener),
158                     "testRegisterChangeListener-DataChangeListener");
159
160             shard.tell(new RegisterChangeListener(TestModel.TEST_PATH,
161                     dclActor.path(), AsyncDataBroker.DataChangeScope.BASE), getRef());
162
163             RegisterChangeListenerReply reply = expectMsgClass(duration("3 seconds"),
164                     RegisterChangeListenerReply.class);
165             String replyPath = reply.getListenerRegistrationPath().toString();
166             assertTrue("Incorrect reply path: " + replyPath, replyPath.matches(
167                     "akka:\\/\\/test\\/user\\/testRegisterChangeListener\\/\\$.*"));
168
169             YangInstanceIdentifier path = TestModel.TEST_PATH;
170             writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
171
172             listener.waitForChangeEvents(path);
173
174             dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
175             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
176         }};
177     }
178
179     @SuppressWarnings("serial")
180     @Test
181     public void testChangeListenerNotifiedWhenNotTheLeaderOnRegistration() throws Exception {
182         // This test tests the timing window in which a change listener is registered before the
183         // shard becomes the leader. We verify that the listener is registered and notified of the
184         // existing data when the shard becomes the leader.
185         new ShardTestKit(getSystem()) {{
186             // For this test, we want to send the RegisterChangeListener message after the shard
187             // has recovered from persistence and before it becomes the leader. So we subclass
188             // Shard to override onReceiveCommand and, when the first ElectionTimeout is received,
189             // we know that the shard has been initialized to a follower and has started the
190             // election process. The following 2 CountDownLatches are used to coordinate the
191             // ElectionTimeout with the sending of the RegisterChangeListener message.
192             final CountDownLatch onFirstElectionTimeout = new CountDownLatch(1);
193             final CountDownLatch onChangeListenerRegistered = new CountDownLatch(1);
194             Creator<Shard> creator = new Creator<Shard>() {
195                 boolean firstElectionTimeout = true;
196
197                 @Override
198                 public Shard create() throws Exception {
199                     return new Shard(shardID, Collections.<ShardIdentifier,String>emptyMap(),
200                             newDatastoreContext(), SCHEMA_CONTEXT) {
201                         @Override
202                         public void onReceiveCommand(final Object message) throws Exception {
203                             if(message instanceof ElectionTimeout && firstElectionTimeout) {
204                                 // Got the first ElectionTimeout. We don't forward it to the
205                                 // base Shard yet until we've sent the RegisterChangeListener
206                                 // message. So we signal the onFirstElectionTimeout latch to tell
207                                 // the main thread to send the RegisterChangeListener message and
208                                 // start a thread to wait on the onChangeListenerRegistered latch,
209                                 // which the main thread signals after it has sent the message.
210                                 // After the onChangeListenerRegistered is triggered, we send the
211                                 // original ElectionTimeout message to proceed with the election.
212                                 firstElectionTimeout = false;
213                                 final ActorRef self = getSelf();
214                                 new Thread() {
215                                     @Override
216                                     public void run() {
217                                         Uninterruptibles.awaitUninterruptibly(
218                                                 onChangeListenerRegistered, 5, TimeUnit.SECONDS);
219                                         self.tell(message, self);
220                                     }
221                                 }.start();
222
223                                 onFirstElectionTimeout.countDown();
224                             } else {
225                                 super.onReceiveCommand(message);
226                             }
227                         }
228                     };
229                 }
230             };
231
232             MockDataChangeListener listener = new MockDataChangeListener(1);
233             ActorRef dclActor = getSystem().actorOf(DataChangeListener.props(listener),
234                     "testRegisterChangeListenerWhenNotLeaderInitially-DataChangeListener");
235
236             TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
237                     Props.create(new DelegatingShardCreator(creator)),
238                     "testRegisterChangeListenerWhenNotLeaderInitially");
239
240             // Write initial data into the in-memory store.
241             YangInstanceIdentifier path = TestModel.TEST_PATH;
242             writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
243
244             // Wait until the shard receives the first ElectionTimeout message.
245             assertEquals("Got first ElectionTimeout", true,
246                     onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
247
248             // Now send the RegisterChangeListener and wait for the reply.
249             shard.tell(new RegisterChangeListener(path, dclActor.path(),
250                     AsyncDataBroker.DataChangeScope.SUBTREE), getRef());
251
252             RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
253                     RegisterChangeListenerReply.class);
254             assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
255
256             // Sanity check - verify the shard is not the leader yet.
257             shard.tell(new FindLeader(), getRef());
258             FindLeaderReply findLeadeReply =
259                     expectMsgClass(duration("5 seconds"), FindLeaderReply.class);
260             assertNull("Expected the shard not to be the leader", findLeadeReply.getLeaderActor());
261
262             // Signal the onChangeListenerRegistered latch to tell the thread above to proceed
263             // with the election process.
264             onChangeListenerRegistered.countDown();
265
266             // Wait for the shard to become the leader and notify our listener with the existing
267             // data in the store.
268             listener.waitForChangeEvents(path);
269
270             dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
271             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
272         }};
273     }
274
275     @Test
276     public void testCreateTransaction(){
277         new ShardTestKit(getSystem()) {{
278             ActorRef shard = getSystem().actorOf(newShardProps(), "testCreateTransaction");
279
280             waitUntilLeader(shard);
281
282             shard.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
283
284             shard.tell(new CreateTransaction("txn-1",
285                     TransactionProxy.TransactionType.READ_ONLY.ordinal() ).toSerializable(), getRef());
286
287             CreateTransactionReply reply = expectMsgClass(duration("3 seconds"),
288                     CreateTransactionReply.class);
289
290             String path = reply.getTransactionActorPath().toString();
291             assertTrue("Unexpected transaction path " + path,
292                     path.contains("akka://test/user/testCreateTransaction/shard-txn-1"));
293
294             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
295         }};
296     }
297
298     @Test
299     public void testCreateTransactionOnChain(){
300         new ShardTestKit(getSystem()) {{
301             final ActorRef shard = getSystem().actorOf(newShardProps(), "testCreateTransactionOnChain");
302
303             waitUntilLeader(shard);
304
305             shard.tell(new CreateTransaction("txn-1",
306                     TransactionProxy.TransactionType.READ_ONLY.ordinal() , "foobar").toSerializable(),
307                     getRef());
308
309             CreateTransactionReply reply = expectMsgClass(duration("3 seconds"),
310                     CreateTransactionReply.class);
311
312             String path = reply.getTransactionActorPath().toString();
313             assertTrue("Unexpected transaction path " + path,
314                     path.contains("akka://test/user/testCreateTransactionOnChain/shard-txn-1"));
315
316             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
317         }};
318     }
319
320     @SuppressWarnings("serial")
321     @Test
322     public void testPeerAddressResolved() throws Exception {
323         new ShardTestKit(getSystem()) {{
324             final CountDownLatch recoveryComplete = new CountDownLatch(1);
325             class TestShard extends Shard {
326                 TestShard() {
327                     super(shardID, Collections.<ShardIdentifier, String>singletonMap(shardID, null),
328                             newDatastoreContext(), SCHEMA_CONTEXT);
329                 }
330
331                 Map<String, String> getPeerAddresses() {
332                     return getRaftActorContext().getPeerAddresses();
333                 }
334
335                 @Override
336                 protected void onRecoveryComplete() {
337                     try {
338                         super.onRecoveryComplete();
339                     } finally {
340                         recoveryComplete.countDown();
341                     }
342                 }
343             }
344
345             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
346                     Props.create(new DelegatingShardCreator(new Creator<Shard>() {
347                         @Override
348                         public TestShard create() throws Exception {
349                             return new TestShard();
350                         }
351                     })), "testPeerAddressResolved");
352
353             //waitUntilLeader(shard);
354             assertEquals("Recovery complete", true,
355                     Uninterruptibles.awaitUninterruptibly(recoveryComplete, 5, TimeUnit.SECONDS));
356
357             String address = "akka://foobar";
358             shard.underlyingActor().onReceiveCommand(new PeerAddressResolved(shardID, address));
359
360             assertEquals("getPeerAddresses", address,
361                     ((TestShard)shard.underlyingActor()).getPeerAddresses().get(shardID.toString()));
362
363             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
364         }};
365     }
366
367     @Test
368     public void testApplySnapshot() throws Exception {
369         TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(),
370                 "testApplySnapshot");
371
372         InMemoryDOMDataStore store = new InMemoryDOMDataStore("OPER", MoreExecutors.sameThreadExecutor());
373         store.onGlobalContextUpdated(SCHEMA_CONTEXT);
374
375         writeToStore(store, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
376
377         YangInstanceIdentifier root = YangInstanceIdentifier.builder().build();
378         NormalizedNode<?,?> expected = readStore(store, root);
379
380         ApplySnapshot applySnapshot = new ApplySnapshot(Snapshot.create(
381                 SerializationUtils.serializeNormalizedNode(expected),
382                 Collections.<ReplicatedLogEntry>emptyList(), 1, 2, 3, 4));
383
384         shard.underlyingActor().onReceiveCommand(applySnapshot);
385
386         NormalizedNode<?,?> actual = readStore(shard, root);
387
388         assertEquals("Root node", expected, actual);
389
390         shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
391     }
392
393     @Test
394     public void testApplyHelium2VersionSnapshot() throws Exception {
395         TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(),
396                 "testApplySnapshot");
397
398         NormalizedNodeToNodeCodec codec = new NormalizedNodeToNodeCodec(SCHEMA_CONTEXT);
399
400         InMemoryDOMDataStore store = new InMemoryDOMDataStore("OPER", MoreExecutors.sameThreadExecutor());
401         store.onGlobalContextUpdated(SCHEMA_CONTEXT);
402
403         writeToStore(store, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
404
405         YangInstanceIdentifier root = YangInstanceIdentifier.builder().build();
406         NormalizedNode<?,?> expected = readStore(store, root);
407
408         NormalizedNodeMessages.Container encode = codec.encode(expected);
409
410         ApplySnapshot applySnapshot = new ApplySnapshot(Snapshot.create(
411                 encode.getNormalizedNode().toByteString().toByteArray(),
412                 Collections.<ReplicatedLogEntry>emptyList(), 1, 2, 3, 4));
413
414         shard.underlyingActor().onReceiveCommand(applySnapshot);
415
416         NormalizedNode<?,?> actual = readStore(shard, root);
417
418         assertEquals("Root node", expected, actual);
419
420         shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
421     }
422
423     @Test
424     public void testApplyState() throws Exception {
425
426         TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(), "testApplyState");
427
428         NormalizedNode<?, ?> node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
429
430         ApplyState applyState = new ApplyState(null, "test", new ReplicatedLogImplEntry(1, 2,
431                 newModificationPayload(new WriteModification(TestModel.TEST_PATH, node))));
432
433         shard.underlyingActor().onReceiveCommand(applyState);
434
435         NormalizedNode<?,?> actual = readStore(shard, TestModel.TEST_PATH);
436         assertEquals("Applied state", node, actual);
437
438         shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
439     }
440
441     @Test
442     public void testApplyStateLegacy() throws Exception {
443
444         TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(), "testApplyStateLegacy");
445
446         NormalizedNode<?, ?> node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
447
448         ApplyState applyState = new ApplyState(null, "test", new ReplicatedLogImplEntry(1, 2,
449                 newLegacyByteStringPayload(new WriteModification(TestModel.TEST_PATH, node))));
450
451         shard.underlyingActor().onReceiveCommand(applyState);
452
453         NormalizedNode<?,?> actual = readStore(shard, TestModel.TEST_PATH);
454         assertEquals("Applied state", node, actual);
455
456         shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
457     }
458
459     @Test
460     public void testRecovery() throws Exception {
461
462         // Set up the InMemorySnapshotStore.
463
464         InMemoryDOMDataStore testStore = InMemoryDOMDataStoreFactory.create("Test", null, null);
465         testStore.onGlobalContextUpdated(SCHEMA_CONTEXT);
466
467         writeToStore(testStore, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
468
469         NormalizedNode<?, ?> root = readStore(testStore, YangInstanceIdentifier.builder().build());
470
471         InMemorySnapshotStore.addSnapshot(shardID.toString(), Snapshot.create(
472                 SerializationUtils.serializeNormalizedNode(root),
473                 Collections.<ReplicatedLogEntry>emptyList(), 0, 1, -1, -1));
474
475         // Set up the InMemoryJournal.
476
477         InMemoryJournal.addEntry(shardID.toString(), 0, new ReplicatedLogImplEntry(0, 1, newLegacyPayload(
478                   new WriteModification(TestModel.OUTER_LIST_PATH,
479                           ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build()))));
480
481         int nListEntries = 16;
482         Set<Integer> listEntryKeys = new HashSet<>();
483
484         // Add some ModificationPayload entries
485         for(int i = 1; i <= nListEntries; i++) {
486             listEntryKeys.add(Integer.valueOf(i));
487             YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
488                     .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
489             Modification mod = new MergeModification(path,
490                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i));
491             InMemoryJournal.addEntry(shardID.toString(), i, new ReplicatedLogImplEntry(i, 1,
492                     newModificationPayload(mod)));
493         }
494
495         InMemoryJournal.addEntry(shardID.toString(), nListEntries + 1,
496                 new ApplyJournalEntries(nListEntries));
497
498         testRecovery(listEntryKeys);
499     }
500
501     @Test
502     public void testHelium2VersionRecovery() throws Exception {
503
504         // Set up the InMemorySnapshotStore.
505
506         InMemoryDOMDataStore testStore = InMemoryDOMDataStoreFactory.create("Test", null, null);
507         testStore.onGlobalContextUpdated(SCHEMA_CONTEXT);
508
509         writeToStore(testStore, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
510
511         NormalizedNode<?, ?> root = readStore(testStore, YangInstanceIdentifier.builder().build());
512
513         InMemorySnapshotStore.addSnapshot(shardID.toString(), Snapshot.create(
514                 new NormalizedNodeToNodeCodec(SCHEMA_CONTEXT).encode(root).
515                                 getNormalizedNode().toByteString().toByteArray(),
516                                 Collections.<ReplicatedLogEntry>emptyList(), 0, 1, -1, -1));
517
518         // Set up the InMemoryJournal.
519
520         InMemoryJournal.addEntry(shardID.toString(), 0, new ReplicatedLogImplEntry(0, 1, newLegacyPayload(
521                   new WriteModification(TestModel.OUTER_LIST_PATH,
522                           ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build()))));
523
524         int nListEntries = 16;
525         Set<Integer> listEntryKeys = new HashSet<>();
526         int i = 1;
527
528         // Add some CompositeModificationPayload entries
529         for(; i <= 8; i++) {
530             listEntryKeys.add(Integer.valueOf(i));
531             YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
532                     .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
533             Modification mod = new MergeModification(path,
534                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i));
535             InMemoryJournal.addEntry(shardID.toString(), i, new ReplicatedLogImplEntry(i, 1,
536                     newLegacyPayload(mod)));
537         }
538
539         // Add some CompositeModificationByteStringPayload entries
540         for(; i <= nListEntries; i++) {
541             listEntryKeys.add(Integer.valueOf(i));
542             YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
543                     .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
544             Modification mod = new MergeModification(path,
545                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i));
546             InMemoryJournal.addEntry(shardID.toString(), i, new ReplicatedLogImplEntry(i, 1,
547                     newLegacyByteStringPayload(mod)));
548         }
549
550         InMemoryJournal.addEntry(shardID.toString(), nListEntries + 1, new ApplyLogEntries(nListEntries));
551
552         testRecovery(listEntryKeys);
553     }
554
555     private void testRecovery(Set<Integer> listEntryKeys) throws Exception {
556         // Create the actor and wait for recovery complete.
557
558         int nListEntries = listEntryKeys.size();
559
560         final CountDownLatch recoveryComplete = new CountDownLatch(1);
561
562         @SuppressWarnings("serial")
563         Creator<Shard> creator = new Creator<Shard>() {
564             @Override
565             public Shard create() throws Exception {
566                 return new Shard(shardID, Collections.<ShardIdentifier,String>emptyMap(),
567                         newDatastoreContext(), SCHEMA_CONTEXT) {
568                     @Override
569                     protected void onRecoveryComplete() {
570                         try {
571                             super.onRecoveryComplete();
572                         } finally {
573                             recoveryComplete.countDown();
574                         }
575                     }
576                 };
577             }
578         };
579
580         TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
581                 Props.create(new DelegatingShardCreator(creator)), "testRecovery");
582
583         assertEquals("Recovery complete", true, recoveryComplete.await(5, TimeUnit.SECONDS));
584
585         // Verify data in the data store.
586
587         NormalizedNode<?, ?> outerList = readStore(shard, TestModel.OUTER_LIST_PATH);
588         assertNotNull(TestModel.OUTER_LIST_QNAME.getLocalName() + " not found", outerList);
589         assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " value is not Iterable",
590                 outerList.getValue() instanceof Iterable);
591         for(Object entry: (Iterable<?>) outerList.getValue()) {
592             assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " entry is not MapEntryNode",
593                     entry instanceof MapEntryNode);
594             MapEntryNode mapEntry = (MapEntryNode)entry;
595             Optional<DataContainerChild<? extends PathArgument, ?>> idLeaf =
596                     mapEntry.getChild(new YangInstanceIdentifier.NodeIdentifier(TestModel.ID_QNAME));
597             assertTrue("Missing leaf " + TestModel.ID_QNAME.getLocalName(), idLeaf.isPresent());
598             Object value = idLeaf.get().getValue();
599             assertTrue("Unexpected value for leaf "+ TestModel.ID_QNAME.getLocalName() + ": " + value,
600                     listEntryKeys.remove(value));
601         }
602
603         if(!listEntryKeys.isEmpty()) {
604             fail("Missing " + TestModel.OUTER_LIST_QNAME.getLocalName() + " entries with keys: " +
605                     listEntryKeys);
606         }
607
608         assertEquals("Last log index", nListEntries,
609                 shard.underlyingActor().getShardMBean().getLastLogIndex());
610         assertEquals("Commit index", nListEntries,
611                 shard.underlyingActor().getShardMBean().getCommitIndex());
612         assertEquals("Last applied", nListEntries,
613                 shard.underlyingActor().getShardMBean().getLastApplied());
614
615         shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
616     }
617
618     private CompositeModificationPayload newLegacyPayload(final Modification... mods) {
619         MutableCompositeModification compMod = new MutableCompositeModification();
620         for(Modification mod: mods) {
621             compMod.addModification(mod);
622         }
623
624         return new CompositeModificationPayload(compMod.toSerializable());
625     }
626
627     private CompositeModificationByteStringPayload newLegacyByteStringPayload(final Modification... mods) {
628         MutableCompositeModification compMod = new MutableCompositeModification();
629         for(Modification mod: mods) {
630             compMod.addModification(mod);
631         }
632
633         return new CompositeModificationByteStringPayload(compMod.toSerializable());
634     }
635
636     private ModificationPayload newModificationPayload(final Modification... mods) throws IOException {
637         MutableCompositeModification compMod = new MutableCompositeModification();
638         for(Modification mod: mods) {
639             compMod.addModification(mod);
640         }
641
642         return new ModificationPayload(compMod);
643     }
644
645     private DOMStoreThreePhaseCommitCohort setupMockWriteTransaction(final String cohortName,
646             final InMemoryDOMDataStore dataStore, final YangInstanceIdentifier path, final NormalizedNode<?, ?> data,
647             final MutableCompositeModification modification) {
648         return setupMockWriteTransaction(cohortName, dataStore, path, data, modification, null);
649     }
650
651     private DOMStoreThreePhaseCommitCohort setupMockWriteTransaction(final String cohortName,
652             final InMemoryDOMDataStore dataStore, final YangInstanceIdentifier path, final NormalizedNode<?, ?> data,
653             final MutableCompositeModification modification,
654             final Function<DOMStoreThreePhaseCommitCohort,ListenableFuture<Void>> preCommit) {
655
656         DOMStoreWriteTransaction tx = dataStore.newWriteOnlyTransaction();
657         tx.write(path, data);
658         final DOMStoreThreePhaseCommitCohort realCohort = tx.ready();
659         DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, cohortName);
660
661         doAnswer(new Answer<ListenableFuture<Boolean>>() {
662             @Override
663             public ListenableFuture<Boolean> answer(final InvocationOnMock invocation) {
664                 return realCohort.canCommit();
665             }
666         }).when(cohort).canCommit();
667
668         doAnswer(new Answer<ListenableFuture<Void>>() {
669             @Override
670             public ListenableFuture<Void> answer(final InvocationOnMock invocation) throws Throwable {
671                 if(preCommit != null) {
672                     return preCommit.apply(realCohort);
673                 } else {
674                     return realCohort.preCommit();
675                 }
676             }
677         }).when(cohort).preCommit();
678
679         doAnswer(new Answer<ListenableFuture<Void>>() {
680             @Override
681             public ListenableFuture<Void> answer(final InvocationOnMock invocation) throws Throwable {
682                 return realCohort.commit();
683             }
684         }).when(cohort).commit();
685
686         doAnswer(new Answer<ListenableFuture<Void>>() {
687             @Override
688             public ListenableFuture<Void> answer(final InvocationOnMock invocation) throws Throwable {
689                 return realCohort.abort();
690             }
691         }).when(cohort).abort();
692
693         modification.addModification(new WriteModification(path, data));
694
695         return cohort;
696     }
697
698     @SuppressWarnings({ "unchecked" })
699     @Test
700     public void testConcurrentThreePhaseCommits() throws Throwable {
701         new ShardTestKit(getSystem()) {{
702             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
703                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
704                     "testConcurrentThreePhaseCommits");
705
706             waitUntilLeader(shard);
707
708             // Setup 3 simulated transactions with mock cohorts backed by real cohorts.
709
710             InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
711
712             String transactionID1 = "tx1";
713             MutableCompositeModification modification1 = new MutableCompositeModification();
714             DOMStoreThreePhaseCommitCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
715                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
716
717             String transactionID2 = "tx2";
718             MutableCompositeModification modification2 = new MutableCompositeModification();
719             DOMStoreThreePhaseCommitCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
720                     TestModel.OUTER_LIST_PATH,
721                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
722                     modification2);
723
724             String transactionID3 = "tx3";
725             MutableCompositeModification modification3 = new MutableCompositeModification();
726             DOMStoreThreePhaseCommitCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
727                     YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
728                         .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
729                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1),
730                     modification3);
731
732             long timeoutSec = 5;
733             final FiniteDuration duration = FiniteDuration.create(timeoutSec, TimeUnit.SECONDS);
734             final Timeout timeout = new Timeout(duration);
735
736             // Simulate the ForwardedReadyTransaction message for the first Tx that would be sent
737             // by the ShardTransaction.
738
739             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
740                     cohort1, modification1, true), getRef());
741             ReadyTransactionReply readyReply = ReadyTransactionReply.fromSerializable(
742                     expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS));
743             assertEquals("Cohort path", shard.path().toString(), readyReply.getCohortPath());
744
745             // Send the CanCommitTransaction message for the first Tx.
746
747             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
748             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
749                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
750             assertEquals("Can commit", true, canCommitReply.getCanCommit());
751
752             // Send the ForwardedReadyTransaction for the next 2 Tx's.
753
754             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
755                     cohort2, modification2, true), getRef());
756             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
757
758             shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
759                     cohort3, modification3, true), getRef());
760             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
761
762             // Send the CanCommitTransaction message for the next 2 Tx's. These should get queued and
763             // processed after the first Tx completes.
764
765             Future<Object> canCommitFuture1 = Patterns.ask(shard,
766                     new CanCommitTransaction(transactionID2).toSerializable(), timeout);
767
768             Future<Object> canCommitFuture2 = Patterns.ask(shard,
769                     new CanCommitTransaction(transactionID3).toSerializable(), timeout);
770
771             // Send the CommitTransaction message for the first Tx. After it completes, it should
772             // trigger the 2nd Tx to proceed which should in turn then trigger the 3rd.
773
774             shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
775             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
776
777             // Wait for the next 2 Tx's to complete.
778
779             final AtomicReference<Throwable> caughtEx = new AtomicReference<>();
780             final CountDownLatch commitLatch = new CountDownLatch(2);
781
782             class OnFutureComplete extends OnComplete<Object> {
783                 private final Class<?> expRespType;
784
785                 OnFutureComplete(final Class<?> expRespType) {
786                     this.expRespType = expRespType;
787                 }
788
789                 @Override
790                 public void onComplete(final Throwable error, final Object resp) {
791                     if(error != null) {
792                         caughtEx.set(new AssertionError(getClass().getSimpleName() + " failure", error));
793                     } else {
794                         try {
795                             assertEquals("Commit response type", expRespType, resp.getClass());
796                             onSuccess(resp);
797                         } catch (Exception e) {
798                             caughtEx.set(e);
799                         }
800                     }
801                 }
802
803                 void onSuccess(final Object resp) throws Exception {
804                 }
805             }
806
807             class OnCommitFutureComplete extends OnFutureComplete {
808                 OnCommitFutureComplete() {
809                     super(CommitTransactionReply.SERIALIZABLE_CLASS);
810                 }
811
812                 @Override
813                 public void onComplete(final Throwable error, final Object resp) {
814                     super.onComplete(error, resp);
815                     commitLatch.countDown();
816                 }
817             }
818
819             class OnCanCommitFutureComplete extends OnFutureComplete {
820                 private final String transactionID;
821
822                 OnCanCommitFutureComplete(final String transactionID) {
823                     super(CanCommitTransactionReply.SERIALIZABLE_CLASS);
824                     this.transactionID = transactionID;
825                 }
826
827                 @Override
828                 void onSuccess(final Object resp) throws Exception {
829                     CanCommitTransactionReply canCommitReply =
830                             CanCommitTransactionReply.fromSerializable(resp);
831                     assertEquals("Can commit", true, canCommitReply.getCanCommit());
832
833                     Future<Object> commitFuture = Patterns.ask(shard,
834                             new CommitTransaction(transactionID).toSerializable(), timeout);
835                     commitFuture.onComplete(new OnCommitFutureComplete(), getSystem().dispatcher());
836                 }
837             }
838
839             canCommitFuture1.onComplete(new OnCanCommitFutureComplete(transactionID2),
840                     getSystem().dispatcher());
841
842             canCommitFuture2.onComplete(new OnCanCommitFutureComplete(transactionID3),
843                     getSystem().dispatcher());
844
845             boolean done = commitLatch.await(timeoutSec, TimeUnit.SECONDS);
846
847             if(caughtEx.get() != null) {
848                 throw caughtEx.get();
849             }
850
851             assertEquals("Commits complete", true, done);
852
853             InOrder inOrder = inOrder(cohort1, cohort2, cohort3);
854             inOrder.verify(cohort1).canCommit();
855             inOrder.verify(cohort1).preCommit();
856             inOrder.verify(cohort1).commit();
857             inOrder.verify(cohort2).canCommit();
858             inOrder.verify(cohort2).preCommit();
859             inOrder.verify(cohort2).commit();
860             inOrder.verify(cohort3).canCommit();
861             inOrder.verify(cohort3).preCommit();
862             inOrder.verify(cohort3).commit();
863
864             // Verify data in the data store.
865
866             NormalizedNode<?, ?> outerList = readStore(shard, TestModel.OUTER_LIST_PATH);
867             assertNotNull(TestModel.OUTER_LIST_QNAME.getLocalName() + " not found", outerList);
868             assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " value is not Iterable",
869                     outerList.getValue() instanceof Iterable);
870             Object entry = ((Iterable<Object>)outerList.getValue()).iterator().next();
871             assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " entry is not MapEntryNode",
872                        entry instanceof MapEntryNode);
873             MapEntryNode mapEntry = (MapEntryNode)entry;
874             Optional<DataContainerChild<? extends PathArgument, ?>> idLeaf =
875                     mapEntry.getChild(new YangInstanceIdentifier.NodeIdentifier(TestModel.ID_QNAME));
876             assertTrue("Missing leaf " + TestModel.ID_QNAME.getLocalName(), idLeaf.isPresent());
877             assertEquals(TestModel.ID_QNAME.getLocalName() + " value", 1, idLeaf.get().getValue());
878
879             verifyLastLogIndex(shard, 2);
880
881             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
882         }};
883     }
884
885     private void verifyLastLogIndex(TestActorRef<Shard> shard, long expectedValue) {
886         for(int i = 0; i < 20 * 5; i++) {
887             long lastLogIndex = shard.underlyingActor().getShardMBean().getLastLogIndex();
888             if(lastLogIndex == expectedValue) {
889                 break;
890             }
891             Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
892         }
893
894         assertEquals("Last log index", expectedValue, shard.underlyingActor().getShardMBean().getLastLogIndex());
895     }
896
897     @Test
898     public void testCommitWithPersistenceDisabled() throws Throwable {
899         dataStoreContextBuilder.persistent(false);
900         new ShardTestKit(getSystem()) {{
901             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
902                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
903                     "testCommitPhaseFailure");
904
905             waitUntilLeader(shard);
906
907             InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
908
909             // Setup a simulated transactions with a mock cohort.
910
911             String transactionID = "tx";
912             MutableCompositeModification modification = new MutableCompositeModification();
913             NormalizedNode<?, ?> containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
914             DOMStoreThreePhaseCommitCohort cohort = setupMockWriteTransaction("cohort", dataStore,
915                     TestModel.TEST_PATH, containerNode, modification);
916
917             FiniteDuration duration = duration("5 seconds");
918
919             // Simulate the ForwardedReadyTransaction messages that would be sent
920             // by the ShardTransaction.
921
922             shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
923                     cohort, modification, true), getRef());
924             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
925
926             // Send the CanCommitTransaction message.
927
928             shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
929             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
930                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
931             assertEquals("Can commit", true, canCommitReply.getCanCommit());
932
933             // Send the CanCommitTransaction message.
934
935             shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
936             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
937
938             InOrder inOrder = inOrder(cohort);
939             inOrder.verify(cohort).canCommit();
940             inOrder.verify(cohort).preCommit();
941             inOrder.verify(cohort).commit();
942
943             NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.TEST_PATH);
944             assertEquals(TestModel.TEST_QNAME.getLocalName(), containerNode, actualNode);
945
946             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
947         }};
948     }
949
950     @Test
951     public void testCommitPhaseFailure() throws Throwable {
952         new ShardTestKit(getSystem()) {{
953             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
954                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
955                     "testCommitPhaseFailure");
956
957             waitUntilLeader(shard);
958
959             // Setup 2 simulated transactions with mock cohorts. The first one fails in the
960             // commit phase.
961
962             String transactionID1 = "tx1";
963             MutableCompositeModification modification1 = new MutableCompositeModification();
964             DOMStoreThreePhaseCommitCohort cohort1 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
965             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
966             doReturn(Futures.immediateFuture(null)).when(cohort1).preCommit();
967             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort1).commit();
968
969             String transactionID2 = "tx2";
970             MutableCompositeModification modification2 = new MutableCompositeModification();
971             DOMStoreThreePhaseCommitCohort cohort2 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort2");
972             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
973
974             FiniteDuration duration = duration("5 seconds");
975             final Timeout timeout = new Timeout(duration);
976
977             // Simulate the ForwardedReadyTransaction messages that would be sent
978             // by the ShardTransaction.
979
980             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
981                     cohort1, modification1, true), getRef());
982             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
983
984             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
985                     cohort2, modification2, true), getRef());
986             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
987
988             // Send the CanCommitTransaction message for the first Tx.
989
990             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
991             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
992                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
993             assertEquals("Can commit", true, canCommitReply.getCanCommit());
994
995             // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
996             // processed after the first Tx completes.
997
998             Future<Object> canCommitFuture = Patterns.ask(shard,
999                     new CanCommitTransaction(transactionID2).toSerializable(), timeout);
1000
1001             // Send the CommitTransaction message for the first Tx. This should send back an error
1002             // and trigger the 2nd Tx to proceed.
1003
1004             shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
1005             expectMsgClass(duration, akka.actor.Status.Failure.class);
1006
1007             // Wait for the 2nd Tx to complete the canCommit phase.
1008
1009             final CountDownLatch latch = new CountDownLatch(1);
1010             canCommitFuture.onComplete(new OnComplete<Object>() {
1011                 @Override
1012                 public void onComplete(final Throwable t, final Object resp) {
1013                     latch.countDown();
1014                 }
1015             }, getSystem().dispatcher());
1016
1017             assertEquals("2nd CanCommit complete", true, latch.await(5, TimeUnit.SECONDS));
1018
1019             InOrder inOrder = inOrder(cohort1, cohort2);
1020             inOrder.verify(cohort1).canCommit();
1021             inOrder.verify(cohort1).preCommit();
1022             inOrder.verify(cohort1).commit();
1023             inOrder.verify(cohort2).canCommit();
1024
1025             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1026         }};
1027     }
1028
1029     @Test
1030     public void testPreCommitPhaseFailure() throws Throwable {
1031         new ShardTestKit(getSystem()) {{
1032             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1033                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1034                     "testPreCommitPhaseFailure");
1035
1036             waitUntilLeader(shard);
1037
1038             String transactionID = "tx1";
1039             MutableCompositeModification modification = new MutableCompositeModification();
1040             DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
1041             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1042             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).preCommit();
1043
1044             FiniteDuration duration = duration("5 seconds");
1045
1046             // Simulate the ForwardedReadyTransaction messages that would be sent
1047             // by the ShardTransaction.
1048
1049             shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
1050                     cohort, modification, true), getRef());
1051             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1052
1053             // Send the CanCommitTransaction message.
1054
1055             shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
1056             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1057                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1058             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1059
1060             // Send the CommitTransaction message. This should send back an error
1061             // for preCommit failure.
1062
1063             shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
1064             expectMsgClass(duration, akka.actor.Status.Failure.class);
1065
1066             InOrder inOrder = inOrder(cohort);
1067             inOrder.verify(cohort).canCommit();
1068             inOrder.verify(cohort).preCommit();
1069
1070             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1071         }};
1072     }
1073
1074     @Test
1075     public void testCanCommitPhaseFailure() throws Throwable {
1076         new ShardTestKit(getSystem()) {{
1077             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1078                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1079                     "testCanCommitPhaseFailure");
1080
1081             waitUntilLeader(shard);
1082
1083             final FiniteDuration duration = duration("5 seconds");
1084
1085             String transactionID = "tx1";
1086             MutableCompositeModification modification = new MutableCompositeModification();
1087             DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
1088             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).canCommit();
1089
1090             // Simulate the ForwardedReadyTransaction messages that would be sent
1091             // by the ShardTransaction.
1092
1093             shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
1094                     cohort, modification, true), getRef());
1095             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1096
1097             // Send the CanCommitTransaction message.
1098
1099             shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
1100             expectMsgClass(duration, akka.actor.Status.Failure.class);
1101
1102             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1103         }};
1104     }
1105
1106     @Test
1107     public void testAbortBeforeFinishCommit() throws Throwable {
1108         new ShardTestKit(getSystem()) {{
1109             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1110                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1111                     "testAbortBeforeFinishCommit");
1112
1113             waitUntilLeader(shard);
1114
1115             final FiniteDuration duration = duration("5 seconds");
1116             InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
1117
1118             final String transactionID = "tx1";
1119             Function<DOMStoreThreePhaseCommitCohort,ListenableFuture<Void>> preCommit =
1120                           new Function<DOMStoreThreePhaseCommitCohort,ListenableFuture<Void>>() {
1121                 @Override
1122                 public ListenableFuture<Void> apply(final DOMStoreThreePhaseCommitCohort cohort) {
1123                     ListenableFuture<Void> preCommitFuture = cohort.preCommit();
1124
1125                     // Simulate an AbortTransaction message occurring during replication, after
1126                     // persisting and before finishing the commit to the in-memory store.
1127                     // We have no followers so due to optimizations in the RaftActor, it does not
1128                     // attempt replication and thus we can't send an AbortTransaction message b/c
1129                     // it would be processed too late after CommitTransaction completes. So we'll
1130                     // simulate an AbortTransaction message occurring during replication by calling
1131                     // the shard directly.
1132                     //
1133                     shard.underlyingActor().doAbortTransaction(transactionID, null);
1134
1135                     return preCommitFuture;
1136                 }
1137             };
1138
1139             MutableCompositeModification modification = new MutableCompositeModification();
1140             DOMStoreThreePhaseCommitCohort cohort = setupMockWriteTransaction("cohort1", dataStore,
1141                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME),
1142                     modification, preCommit);
1143
1144             shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
1145                     cohort, modification, true), getRef());
1146             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1147
1148             shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
1149             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1150                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1151             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1152
1153             shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
1154             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1155
1156             NormalizedNode<?, ?> node = readStore(shard, TestModel.TEST_PATH);
1157
1158             // Since we're simulating an abort occurring during replication and before finish commit,
1159             // the data should still get written to the in-memory store since we've gotten past
1160             // canCommit and preCommit and persisted the data.
1161             assertNotNull(TestModel.TEST_QNAME.getLocalName() + " not found", node);
1162
1163             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1164         }};
1165     }
1166
1167     @Test
1168     public void testTransactionCommitTimeout() throws Throwable {
1169         dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1);
1170
1171         new ShardTestKit(getSystem()) {{
1172             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1173                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1174                     "testTransactionCommitTimeout");
1175
1176             waitUntilLeader(shard);
1177
1178             final FiniteDuration duration = duration("5 seconds");
1179
1180             InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
1181
1182             writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1183             writeToStore(shard, TestModel.OUTER_LIST_PATH,
1184                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
1185
1186             // Create 1st Tx - will timeout
1187
1188             String transactionID1 = "tx1";
1189             MutableCompositeModification modification1 = new MutableCompositeModification();
1190             DOMStoreThreePhaseCommitCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
1191                     YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
1192                         .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
1193                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1),
1194                     modification1);
1195
1196             // Create 2nd Tx
1197
1198             String transactionID2 = "tx3";
1199             MutableCompositeModification modification2 = new MutableCompositeModification();
1200             YangInstanceIdentifier listNodePath = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
1201                 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build();
1202             DOMStoreThreePhaseCommitCohort cohort2 = setupMockWriteTransaction("cohort3", dataStore,
1203                     listNodePath,
1204                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2),
1205                     modification2);
1206
1207             // Ready the Tx's
1208
1209             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1210                     cohort1, modification1, true), getRef());
1211             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1212
1213             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1214                     cohort2, modification2, true), getRef());
1215             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1216
1217             // canCommit 1st Tx. We don't send the commit so it should timeout.
1218
1219             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1220             expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
1221
1222             // canCommit the 2nd Tx - it should complete after the 1st Tx times out.
1223
1224             shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
1225             expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
1226
1227             // Commit the 2nd Tx.
1228
1229             shard.tell(new CommitTransaction(transactionID2).toSerializable(), getRef());
1230             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1231
1232             NormalizedNode<?, ?> node = readStore(shard, listNodePath);
1233             assertNotNull(listNodePath + " not found", node);
1234
1235             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1236         }};
1237     }
1238
1239     @Test
1240     public void testTransactionCommitQueueCapacityExceeded() throws Throwable {
1241         dataStoreContextBuilder.shardTransactionCommitQueueCapacity(1);
1242
1243         new ShardTestKit(getSystem()) {{
1244             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1245                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1246                     "testTransactionCommitQueueCapacityExceeded");
1247
1248             waitUntilLeader(shard);
1249
1250             final FiniteDuration duration = duration("5 seconds");
1251
1252             InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
1253
1254             String transactionID1 = "tx1";
1255             MutableCompositeModification modification1 = new MutableCompositeModification();
1256             DOMStoreThreePhaseCommitCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
1257                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
1258
1259             String transactionID2 = "tx2";
1260             MutableCompositeModification modification2 = new MutableCompositeModification();
1261             DOMStoreThreePhaseCommitCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
1262                     TestModel.OUTER_LIST_PATH,
1263                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
1264                     modification2);
1265
1266             String transactionID3 = "tx3";
1267             MutableCompositeModification modification3 = new MutableCompositeModification();
1268             DOMStoreThreePhaseCommitCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
1269                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification3);
1270
1271             // Ready the Tx's
1272
1273             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1274                     cohort1, modification1, true), getRef());
1275             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1276
1277             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1278                     cohort2, modification2, true), getRef());
1279             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1280
1281             shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
1282                     cohort3, modification3, true), getRef());
1283             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1284
1285             // canCommit 1st Tx.
1286
1287             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1288             expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
1289
1290             // canCommit the 2nd Tx - it should get queued.
1291
1292             shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
1293
1294             // canCommit the 3rd Tx - should exceed queue capacity and fail.
1295
1296             shard.tell(new CanCommitTransaction(transactionID3).toSerializable(), getRef());
1297             expectMsgClass(duration, akka.actor.Status.Failure.class);
1298
1299             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1300         }};
1301     }
1302
1303     @Test
1304     public void testCanCommitBeforeReadyFailure() throws Throwable {
1305         new ShardTestKit(getSystem()) {{
1306             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1307                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1308                     "testCanCommitBeforeReadyFailure");
1309
1310             shard.tell(new CanCommitTransaction("tx").toSerializable(), getRef());
1311             expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
1312
1313             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1314         }};
1315     }
1316
1317     @Test
1318     public void testAbortTransaction() throws Throwable {
1319         new ShardTestKit(getSystem()) {{
1320             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1321                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1322                     "testAbortTransaction");
1323
1324             waitUntilLeader(shard);
1325
1326             // Setup 2 simulated transactions with mock cohorts. The first one will be aborted.
1327
1328             String transactionID1 = "tx1";
1329             MutableCompositeModification modification1 = new MutableCompositeModification();
1330             DOMStoreThreePhaseCommitCohort cohort1 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
1331             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
1332             doReturn(Futures.immediateFuture(null)).when(cohort1).abort();
1333
1334             String transactionID2 = "tx2";
1335             MutableCompositeModification modification2 = new MutableCompositeModification();
1336             DOMStoreThreePhaseCommitCohort cohort2 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort2");
1337             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
1338
1339             FiniteDuration duration = duration("5 seconds");
1340             final Timeout timeout = new Timeout(duration);
1341
1342             // Simulate the ForwardedReadyTransaction messages that would be sent
1343             // by the ShardTransaction.
1344
1345             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1346                     cohort1, modification1, true), getRef());
1347             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1348
1349             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1350                     cohort2, modification2, true), getRef());
1351             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1352
1353             // Send the CanCommitTransaction message for the first Tx.
1354
1355             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1356             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1357                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1358             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1359
1360             // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
1361             // processed after the first Tx completes.
1362
1363             Future<Object> canCommitFuture = Patterns.ask(shard,
1364                     new CanCommitTransaction(transactionID2).toSerializable(), timeout);
1365
1366             // Send the AbortTransaction message for the first Tx. This should trigger the 2nd
1367             // Tx to proceed.
1368
1369             shard.tell(new AbortTransaction(transactionID1).toSerializable(), getRef());
1370             expectMsgClass(duration, AbortTransactionReply.SERIALIZABLE_CLASS);
1371
1372             // Wait for the 2nd Tx to complete the canCommit phase.
1373
1374             Await.ready(canCommitFuture, duration);
1375
1376             InOrder inOrder = inOrder(cohort1, cohort2);
1377             inOrder.verify(cohort1).canCommit();
1378             inOrder.verify(cohort2).canCommit();
1379
1380             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1381         }};
1382     }
1383
1384     @Test
1385     public void testCreateSnapshot() throws Exception {
1386         testCreateSnapshot(true, "testCreateSnapshot");
1387     }
1388
1389     @Test
1390     public void testCreateSnapshotWithNonPersistentData() throws Exception {
1391         testCreateSnapshot(false, "testCreateSnapshotWithNonPersistentData");
1392     }
1393
1394     @SuppressWarnings("serial")
1395     public void testCreateSnapshot(final boolean persistent, final String shardActorName) throws Exception{
1396
1397         final AtomicReference<Object> savedSnapshot = new AtomicReference<>();
1398         class DelegatingPersistentDataProvider implements DataPersistenceProvider {
1399             DataPersistenceProvider delegate;
1400
1401             DelegatingPersistentDataProvider(DataPersistenceProvider delegate) {
1402                 this.delegate = delegate;
1403             }
1404
1405             @Override
1406             public boolean isRecoveryApplicable() {
1407                 return delegate.isRecoveryApplicable();
1408             }
1409
1410             @Override
1411             public <T> void persist(T o, Procedure<T> procedure) {
1412                 delegate.persist(o, procedure);
1413             }
1414
1415             @Override
1416             public void saveSnapshot(Object o) {
1417                 savedSnapshot.set(o);
1418                 delegate.saveSnapshot(o);
1419             }
1420
1421             @Override
1422             public void deleteSnapshots(SnapshotSelectionCriteria criteria) {
1423                 delegate.deleteSnapshots(criteria);
1424             }
1425
1426             @Override
1427             public void deleteMessages(long sequenceNumber) {
1428                 delegate.deleteMessages(sequenceNumber);
1429             }
1430         }
1431
1432         dataStoreContextBuilder.persistent(persistent);
1433
1434         new ShardTestKit(getSystem()) {{
1435             final AtomicReference<CountDownLatch> latch = new AtomicReference<>(new CountDownLatch(1));
1436             Creator<Shard> creator = new Creator<Shard>() {
1437                 @Override
1438                 public Shard create() throws Exception {
1439                     return new Shard(shardID, Collections.<ShardIdentifier,String>emptyMap(),
1440                             newDatastoreContext(), SCHEMA_CONTEXT) {
1441
1442                         DelegatingPersistentDataProvider delegating;
1443
1444                         @Override
1445                         protected DataPersistenceProvider persistence() {
1446                             if(delegating == null) {
1447                                 delegating = new DelegatingPersistentDataProvider(super.persistence());
1448                             }
1449
1450                             return delegating;
1451                         }
1452
1453                         @Override
1454                         protected void commitSnapshot(final long sequenceNumber) {
1455                             super.commitSnapshot(sequenceNumber);
1456                             latch.get().countDown();
1457                         }
1458                     };
1459                 }
1460             };
1461
1462             TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1463                     Props.create(new DelegatingShardCreator(creator)), shardActorName);
1464
1465             waitUntilLeader(shard);
1466
1467             writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1468
1469             NormalizedNode<?,?> expectedRoot = readStore(shard, YangInstanceIdentifier.builder().build());
1470
1471             CaptureSnapshot capture = new CaptureSnapshot(-1, -1, -1, -1, -1, -1);
1472             shard.tell(capture, getRef());
1473
1474             assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS));
1475
1476             assertTrue("Invalid saved snapshot " + savedSnapshot.get(),
1477                     savedSnapshot.get() instanceof Snapshot);
1478
1479             verifySnapshot((Snapshot)savedSnapshot.get(), expectedRoot);
1480
1481             latch.set(new CountDownLatch(1));
1482             savedSnapshot.set(null);
1483
1484             shard.tell(capture, getRef());
1485
1486             assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS));
1487
1488             assertTrue("Invalid saved snapshot " + savedSnapshot.get(),
1489                     savedSnapshot.get() instanceof Snapshot);
1490
1491             verifySnapshot((Snapshot)savedSnapshot.get(), expectedRoot);
1492
1493             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1494         }
1495
1496         private void verifySnapshot(Snapshot snapshot, NormalizedNode<?,?> expectedRoot) {
1497
1498             NormalizedNode<?, ?> actual = SerializationUtils.deserializeNormalizedNode(snapshot.getState());
1499             assertEquals("Root node", expectedRoot, actual);
1500
1501         }};
1502     }
1503
1504     /**
1505      * This test simply verifies that the applySnapShot logic will work
1506      * @throws ReadFailedException
1507      */
1508     @Test
1509     public void testInMemoryDataStoreRestore() throws ReadFailedException {
1510         InMemoryDOMDataStore store = new InMemoryDOMDataStore("test", MoreExecutors.sameThreadExecutor());
1511
1512         store.onGlobalContextUpdated(SCHEMA_CONTEXT);
1513
1514         DOMStoreWriteTransaction putTransaction = store.newWriteOnlyTransaction();
1515         putTransaction.write(TestModel.TEST_PATH,
1516             ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1517         commitTransaction(putTransaction);
1518
1519
1520         NormalizedNode<?, ?> expected = readStore(store);
1521
1522         DOMStoreWriteTransaction writeTransaction = store.newWriteOnlyTransaction();
1523
1524         writeTransaction.delete(YangInstanceIdentifier.builder().build());
1525         writeTransaction.write(YangInstanceIdentifier.builder().build(), expected);
1526
1527         commitTransaction(writeTransaction);
1528
1529         NormalizedNode<?, ?> actual = readStore(store);
1530
1531         assertEquals(expected, actual);
1532     }
1533
1534     @Test
1535     public void testRecoveryApplicable(){
1536
1537         final DatastoreContext persistentContext = DatastoreContext.newBuilder().
1538                 shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(true).build();
1539
1540         final Props persistentProps = Shard.props(shardID, Collections.<ShardIdentifier, String>emptyMap(),
1541                 persistentContext, SCHEMA_CONTEXT);
1542
1543         final DatastoreContext nonPersistentContext = DatastoreContext.newBuilder().
1544                 shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(false).build();
1545
1546         final Props nonPersistentProps = Shard.props(shardID, Collections.<ShardIdentifier, String>emptyMap(),
1547                 nonPersistentContext, SCHEMA_CONTEXT);
1548
1549         new ShardTestKit(getSystem()) {{
1550             TestActorRef<Shard> shard1 = TestActorRef.create(getSystem(),
1551                     persistentProps, "testPersistence1");
1552
1553             assertTrue("Recovery Applicable", shard1.underlyingActor().getDataPersistenceProvider().isRecoveryApplicable());
1554
1555             shard1.tell(PoisonPill.getInstance(), ActorRef.noSender());
1556
1557             TestActorRef<Shard> shard2 = TestActorRef.create(getSystem(),
1558                     nonPersistentProps, "testPersistence2");
1559
1560             assertFalse("Recovery Not Applicable", shard2.underlyingActor().getDataPersistenceProvider().isRecoveryApplicable());
1561
1562             shard2.tell(PoisonPill.getInstance(), ActorRef.noSender());
1563
1564         }};
1565
1566     }
1567
1568
1569     private NormalizedNode<?, ?> readStore(final InMemoryDOMDataStore store) throws ReadFailedException {
1570         DOMStoreReadTransaction transaction = store.newReadOnlyTransaction();
1571         CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read =
1572             transaction.read(YangInstanceIdentifier.builder().build());
1573
1574         Optional<NormalizedNode<?, ?>> optional = read.checkedGet();
1575
1576         NormalizedNode<?, ?> normalizedNode = optional.get();
1577
1578         transaction.close();
1579
1580         return normalizedNode;
1581     }
1582
1583     private void commitTransaction(final DOMStoreWriteTransaction transaction) {
1584         DOMStoreThreePhaseCommitCohort commitCohort = transaction.ready();
1585         ListenableFuture<Void> future =
1586             commitCohort.preCommit();
1587         try {
1588             future.get();
1589             future = commitCohort.commit();
1590             future.get();
1591         } catch (InterruptedException | ExecutionException e) {
1592         }
1593     }
1594
1595     private AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> noOpDataChangeListener() {
1596         return new AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>() {
1597             @Override
1598             public void onDataChanged(
1599                 final AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change) {
1600
1601             }
1602         };
1603     }
1604
1605     static NormalizedNode<?,?> readStore(final TestActorRef<Shard> shard, final YangInstanceIdentifier id)
1606             throws ExecutionException, InterruptedException {
1607         return readStore(shard.underlyingActor().getDataStore(), id);
1608     }
1609
1610     public static NormalizedNode<?,?> readStore(final InMemoryDOMDataStore store, final YangInstanceIdentifier id)
1611             throws ExecutionException, InterruptedException {
1612         DOMStoreReadTransaction transaction = store.newReadOnlyTransaction();
1613
1614         CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> future =
1615             transaction.read(id);
1616
1617         Optional<NormalizedNode<?, ?>> optional = future.get();
1618         NormalizedNode<?, ?> node = optional.isPresent()? optional.get() : null;
1619
1620         transaction.close();
1621
1622         return node;
1623     }
1624
1625     static void writeToStore(final TestActorRef<Shard> shard, final YangInstanceIdentifier id,
1626             final NormalizedNode<?,?> node) throws ExecutionException, InterruptedException {
1627         writeToStore(shard.underlyingActor().getDataStore(), id, node);
1628     }
1629
1630     public static void writeToStore(final InMemoryDOMDataStore store, final YangInstanceIdentifier id,
1631             final NormalizedNode<?,?> node) throws ExecutionException, InterruptedException {
1632         DOMStoreWriteTransaction transaction = store.newWriteOnlyTransaction();
1633
1634         transaction.write(id, node);
1635
1636         DOMStoreThreePhaseCommitCohort commitCohort = transaction.ready();
1637         commitCohort.preCommit().get();
1638         commitCohort.commit().get();
1639     }
1640
1641     @SuppressWarnings("serial")
1642     private static final class DelegatingShardCreator implements Creator<Shard> {
1643         private final Creator<Shard> delegate;
1644
1645         DelegatingShardCreator(final Creator<Shard> delegate) {
1646             this.delegate = delegate;
1647         }
1648
1649         @Override
1650         public Shard create() throws Exception {
1651             return delegate.create();
1652         }
1653     }
1654 }