Merge "Fixed build-failure because of undeclared transtive dependency."
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / ShardTest.java
1 package org.opendaylight.controller.cluster.datastore;
2
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertFalse;
5 import static org.junit.Assert.assertNotNull;
6 import static org.junit.Assert.assertNull;
7 import static org.junit.Assert.assertTrue;
8 import static org.junit.Assert.fail;
9 import static org.mockito.Mockito.doAnswer;
10 import static org.mockito.Mockito.doReturn;
11 import static org.mockito.Mockito.inOrder;
12 import static org.mockito.Mockito.mock;
13 import static org.opendaylight.controller.cluster.datastore.DataStoreVersions.CURRENT_VERSION;
14 import akka.actor.ActorRef;
15 import akka.actor.PoisonPill;
16 import akka.actor.Props;
17 import akka.dispatch.Dispatchers;
18 import akka.dispatch.OnComplete;
19 import akka.japi.Creator;
20 import akka.japi.Procedure;
21 import akka.pattern.Patterns;
22 import akka.persistence.SnapshotSelectionCriteria;
23 import akka.testkit.TestActorRef;
24 import akka.util.Timeout;
25 import com.google.common.base.Function;
26 import com.google.common.base.Optional;
27 import com.google.common.util.concurrent.CheckedFuture;
28 import com.google.common.util.concurrent.Futures;
29 import com.google.common.util.concurrent.ListenableFuture;
30 import com.google.common.util.concurrent.MoreExecutors;
31 import com.google.common.util.concurrent.Uninterruptibles;
32 import java.io.IOException;
33 import java.util.Collections;
34 import java.util.HashSet;
35 import java.util.List;
36 import java.util.Map;
37 import java.util.Set;
38 import java.util.concurrent.CountDownLatch;
39 import java.util.concurrent.ExecutionException;
40 import java.util.concurrent.TimeUnit;
41 import java.util.concurrent.atomic.AtomicInteger;
42 import java.util.concurrent.atomic.AtomicReference;
43 import org.junit.After;
44 import org.junit.Before;
45 import org.junit.Test;
46 import org.mockito.InOrder;
47 import org.mockito.invocation.InvocationOnMock;
48 import org.mockito.stubbing.Answer;
49 import org.opendaylight.controller.cluster.DataPersistenceProvider;
50 import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
51 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
52 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
53 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
54 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
55 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
56 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
57 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
58 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
59 import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
60 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
61 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
62 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
63 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
64 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
65 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
66 import org.opendaylight.controller.cluster.datastore.modification.Modification;
67 import org.opendaylight.controller.cluster.datastore.modification.ModificationPayload;
68 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
69 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
70 import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec;
71 import org.opendaylight.controller.cluster.datastore.utils.InMemoryJournal;
72 import org.opendaylight.controller.cluster.datastore.utils.InMemorySnapshotStore;
73 import org.opendaylight.controller.cluster.datastore.utils.MessageCollectorActor;
74 import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener;
75 import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
76 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
77 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListenerReply;
78 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
79 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
80 import org.opendaylight.controller.cluster.raft.Snapshot;
81 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
82 import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
83 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
84 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
85 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
86 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
87 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
88 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
89 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload;
90 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload;
91 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
92 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
93 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
94 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
95 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
96 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
97 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
98 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreFactory;
99 import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages;
100 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
101 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
102 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
103 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
104 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
105 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
106 import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
107 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
108 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
109 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
110 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
111 import scala.concurrent.Await;
112 import scala.concurrent.Future;
113 import scala.concurrent.duration.FiniteDuration;
114
115 public class ShardTest extends AbstractActorTest {
116
117     private static final SchemaContext SCHEMA_CONTEXT = TestModel.createTestContext();
118
119     private static final AtomicInteger NEXT_SHARD_NUM = new AtomicInteger();
120
121     private final ShardIdentifier shardID = ShardIdentifier.builder().memberName("member-1")
122             .shardName("inventory").type("config" + NEXT_SHARD_NUM.getAndIncrement()).build();
123
124     private final Builder dataStoreContextBuilder = DatastoreContext.newBuilder().
125             shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).
126             shardHeartbeatIntervalInMillis(100);
127
128     @Before
129     public void setUp() {
130         Builder newBuilder = DatastoreContext.newBuilder();
131         InMemorySnapshotStore.clear();
132         InMemoryJournal.clear();
133     }
134
135     @After
136     public void tearDown() {
137         InMemorySnapshotStore.clear();
138         InMemoryJournal.clear();
139     }
140
141     private DatastoreContext newDatastoreContext() {
142         return dataStoreContextBuilder.build();
143     }
144
145     private Props newShardProps() {
146         return Shard.props(shardID, Collections.<ShardIdentifier,String>emptyMap(),
147                 newDatastoreContext(), SCHEMA_CONTEXT);
148     }
149
150     @Test
151     public void testRegisterChangeListener() throws Exception {
152         new ShardTestKit(getSystem()) {{
153             TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
154                     newShardProps(),  "testRegisterChangeListener");
155
156             waitUntilLeader(shard);
157
158             shard.tell(new UpdateSchemaContext(SchemaContextHelper.full()), ActorRef.noSender());
159
160             MockDataChangeListener listener = new MockDataChangeListener(1);
161             ActorRef dclActor = getSystem().actorOf(DataChangeListener.props(listener),
162                     "testRegisterChangeListener-DataChangeListener");
163
164             shard.tell(new RegisterChangeListener(TestModel.TEST_PATH,
165                     dclActor.path(), AsyncDataBroker.DataChangeScope.BASE), getRef());
166
167             RegisterChangeListenerReply reply = expectMsgClass(duration("3 seconds"),
168                     RegisterChangeListenerReply.class);
169             String replyPath = reply.getListenerRegistrationPath().toString();
170             assertTrue("Incorrect reply path: " + replyPath, replyPath.matches(
171                     "akka:\\/\\/test\\/user\\/testRegisterChangeListener\\/\\$.*"));
172
173             YangInstanceIdentifier path = TestModel.TEST_PATH;
174             writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
175
176             listener.waitForChangeEvents(path);
177
178             dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
179             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
180         }};
181     }
182
183     @SuppressWarnings("serial")
184     @Test
185     public void testChangeListenerNotifiedWhenNotTheLeaderOnRegistration() throws Exception {
186         // This test tests the timing window in which a change listener is registered before the
187         // shard becomes the leader. We verify that the listener is registered and notified of the
188         // existing data when the shard becomes the leader.
189         new ShardTestKit(getSystem()) {{
190             // For this test, we want to send the RegisterChangeListener message after the shard
191             // has recovered from persistence and before it becomes the leader. So we subclass
192             // Shard to override onReceiveCommand and, when the first ElectionTimeout is received,
193             // we know that the shard has been initialized to a follower and has started the
194             // election process. The following 2 CountDownLatches are used to coordinate the
195             // ElectionTimeout with the sending of the RegisterChangeListener message.
196             final CountDownLatch onFirstElectionTimeout = new CountDownLatch(1);
197             final CountDownLatch onChangeListenerRegistered = new CountDownLatch(1);
198             Creator<Shard> creator = new Creator<Shard>() {
199                 boolean firstElectionTimeout = true;
200
201                 @Override
202                 public Shard create() throws Exception {
203                     return new Shard(shardID, Collections.<ShardIdentifier,String>emptyMap(),
204                             newDatastoreContext(), SCHEMA_CONTEXT) {
205                         @Override
206                         public void onReceiveCommand(final Object message) throws Exception {
207                             if(message instanceof ElectionTimeout && firstElectionTimeout) {
208                                 // Got the first ElectionTimeout. We don't forward it to the
209                                 // base Shard yet until we've sent the RegisterChangeListener
210                                 // message. So we signal the onFirstElectionTimeout latch to tell
211                                 // the main thread to send the RegisterChangeListener message and
212                                 // start a thread to wait on the onChangeListenerRegistered latch,
213                                 // which the main thread signals after it has sent the message.
214                                 // After the onChangeListenerRegistered is triggered, we send the
215                                 // original ElectionTimeout message to proceed with the election.
216                                 firstElectionTimeout = false;
217                                 final ActorRef self = getSelf();
218                                 new Thread() {
219                                     @Override
220                                     public void run() {
221                                         Uninterruptibles.awaitUninterruptibly(
222                                                 onChangeListenerRegistered, 5, TimeUnit.SECONDS);
223                                         self.tell(message, self);
224                                     }
225                                 }.start();
226
227                                 onFirstElectionTimeout.countDown();
228                             } else {
229                                 super.onReceiveCommand(message);
230                             }
231                         }
232                     };
233                 }
234             };
235
236             MockDataChangeListener listener = new MockDataChangeListener(1);
237             ActorRef dclActor = getSystem().actorOf(DataChangeListener.props(listener),
238                     "testRegisterChangeListenerWhenNotLeaderInitially-DataChangeListener");
239
240             TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
241                     Props.create(new DelegatingShardCreator(creator)),
242                     "testRegisterChangeListenerWhenNotLeaderInitially");
243
244             // Write initial data into the in-memory store.
245             YangInstanceIdentifier path = TestModel.TEST_PATH;
246             writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
247
248             // Wait until the shard receives the first ElectionTimeout message.
249             assertEquals("Got first ElectionTimeout", true,
250                     onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
251
252             // Now send the RegisterChangeListener and wait for the reply.
253             shard.tell(new RegisterChangeListener(path, dclActor.path(),
254                     AsyncDataBroker.DataChangeScope.SUBTREE), getRef());
255
256             RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
257                     RegisterChangeListenerReply.class);
258             assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
259
260             // Sanity check - verify the shard is not the leader yet.
261             shard.tell(new FindLeader(), getRef());
262             FindLeaderReply findLeadeReply =
263                     expectMsgClass(duration("5 seconds"), FindLeaderReply.class);
264             assertNull("Expected the shard not to be the leader", findLeadeReply.getLeaderActor());
265
266             // Signal the onChangeListenerRegistered latch to tell the thread above to proceed
267             // with the election process.
268             onChangeListenerRegistered.countDown();
269
270             // Wait for the shard to become the leader and notify our listener with the existing
271             // data in the store.
272             listener.waitForChangeEvents(path);
273
274             dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
275             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
276         }};
277     }
278
279     @Test
280     public void testCreateTransaction(){
281         new ShardTestKit(getSystem()) {{
282             ActorRef shard = getSystem().actorOf(newShardProps(), "testCreateTransaction");
283
284             waitUntilLeader(shard);
285
286             shard.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
287
288             shard.tell(new CreateTransaction("txn-1",
289                     TransactionProxy.TransactionType.READ_ONLY.ordinal() ).toSerializable(), getRef());
290
291             CreateTransactionReply reply = expectMsgClass(duration("3 seconds"),
292                     CreateTransactionReply.class);
293
294             String path = reply.getTransactionActorPath().toString();
295             assertTrue("Unexpected transaction path " + path,
296                     path.contains("akka://test/user/testCreateTransaction/shard-txn-1"));
297
298             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
299         }};
300     }
301
302     @Test
303     public void testCreateTransactionOnChain(){
304         new ShardTestKit(getSystem()) {{
305             final ActorRef shard = getSystem().actorOf(newShardProps(), "testCreateTransactionOnChain");
306
307             waitUntilLeader(shard);
308
309             shard.tell(new CreateTransaction("txn-1",
310                     TransactionProxy.TransactionType.READ_ONLY.ordinal() , "foobar").toSerializable(),
311                     getRef());
312
313             CreateTransactionReply reply = expectMsgClass(duration("3 seconds"),
314                     CreateTransactionReply.class);
315
316             String path = reply.getTransactionActorPath().toString();
317             assertTrue("Unexpected transaction path " + path,
318                     path.contains("akka://test/user/testCreateTransactionOnChain/shard-txn-1"));
319
320             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
321         }};
322     }
323
324     @SuppressWarnings("serial")
325     @Test
326     public void testPeerAddressResolved() throws Exception {
327         new ShardTestKit(getSystem()) {{
328             final CountDownLatch recoveryComplete = new CountDownLatch(1);
329             class TestShard extends Shard {
330                 TestShard() {
331                     super(shardID, Collections.<ShardIdentifier, String>singletonMap(shardID, null),
332                             newDatastoreContext(), SCHEMA_CONTEXT);
333                 }
334
335                 Map<String, String> getPeerAddresses() {
336                     return getRaftActorContext().getPeerAddresses();
337                 }
338
339                 @Override
340                 protected void onRecoveryComplete() {
341                     try {
342                         super.onRecoveryComplete();
343                     } finally {
344                         recoveryComplete.countDown();
345                     }
346                 }
347             }
348
349             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
350                     Props.create(new DelegatingShardCreator(new Creator<Shard>() {
351                         @Override
352                         public TestShard create() throws Exception {
353                             return new TestShard();
354                         }
355                     })), "testPeerAddressResolved");
356
357             //waitUntilLeader(shard);
358             assertEquals("Recovery complete", true,
359                     Uninterruptibles.awaitUninterruptibly(recoveryComplete, 5, TimeUnit.SECONDS));
360
361             String address = "akka://foobar";
362             shard.underlyingActor().onReceiveCommand(new PeerAddressResolved(shardID, address));
363
364             assertEquals("getPeerAddresses", address,
365                     ((TestShard)shard.underlyingActor()).getPeerAddresses().get(shardID.toString()));
366
367             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
368         }};
369     }
370
371     @Test
372     public void testApplySnapshot() throws Exception {
373         TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(),
374                 "testApplySnapshot");
375
376         InMemoryDOMDataStore store = new InMemoryDOMDataStore("OPER", MoreExecutors.sameThreadExecutor());
377         store.onGlobalContextUpdated(SCHEMA_CONTEXT);
378
379         writeToStore(store, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
380
381         YangInstanceIdentifier root = YangInstanceIdentifier.builder().build();
382         NormalizedNode<?,?> expected = readStore(store, root);
383
384         ApplySnapshot applySnapshot = new ApplySnapshot(Snapshot.create(
385                 SerializationUtils.serializeNormalizedNode(expected),
386                 Collections.<ReplicatedLogEntry>emptyList(), 1, 2, 3, 4));
387
388         shard.underlyingActor().onReceiveCommand(applySnapshot);
389
390         NormalizedNode<?,?> actual = readStore(shard, root);
391
392         assertEquals("Root node", expected, actual);
393
394         shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
395     }
396
397     @Test
398     public void testApplyHelium2VersionSnapshot() throws Exception {
399         TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(),
400                 "testApplySnapshot");
401
402         NormalizedNodeToNodeCodec codec = new NormalizedNodeToNodeCodec(SCHEMA_CONTEXT);
403
404         InMemoryDOMDataStore store = new InMemoryDOMDataStore("OPER", MoreExecutors.sameThreadExecutor());
405         store.onGlobalContextUpdated(SCHEMA_CONTEXT);
406
407         writeToStore(store, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
408
409         YangInstanceIdentifier root = YangInstanceIdentifier.builder().build();
410         NormalizedNode<?,?> expected = readStore(store, root);
411
412         NormalizedNodeMessages.Container encode = codec.encode(expected);
413
414         ApplySnapshot applySnapshot = new ApplySnapshot(Snapshot.create(
415                 encode.getNormalizedNode().toByteString().toByteArray(),
416                 Collections.<ReplicatedLogEntry>emptyList(), 1, 2, 3, 4));
417
418         shard.underlyingActor().onReceiveCommand(applySnapshot);
419
420         NormalizedNode<?,?> actual = readStore(shard, root);
421
422         assertEquals("Root node", expected, actual);
423
424         shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
425     }
426
427     @Test
428     public void testApplyState() throws Exception {
429
430         TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(), "testApplyState");
431
432         NormalizedNode<?, ?> node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
433
434         ApplyState applyState = new ApplyState(null, "test", new ReplicatedLogImplEntry(1, 2,
435                 newModificationPayload(new WriteModification(TestModel.TEST_PATH, node))));
436
437         shard.underlyingActor().onReceiveCommand(applyState);
438
439         NormalizedNode<?,?> actual = readStore(shard, TestModel.TEST_PATH);
440         assertEquals("Applied state", node, actual);
441
442         shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
443     }
444
445     @Test
446     public void testApplyStateLegacy() throws Exception {
447
448         TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(), "testApplyStateLegacy");
449
450         NormalizedNode<?, ?> node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
451
452         ApplyState applyState = new ApplyState(null, "test", new ReplicatedLogImplEntry(1, 2,
453                 newLegacyByteStringPayload(new WriteModification(TestModel.TEST_PATH, node))));
454
455         shard.underlyingActor().onReceiveCommand(applyState);
456
457         NormalizedNode<?,?> actual = readStore(shard, TestModel.TEST_PATH);
458         assertEquals("Applied state", node, actual);
459
460         shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
461     }
462
463     @Test
464     public void testRecovery() throws Exception {
465
466         // Set up the InMemorySnapshotStore.
467
468         InMemoryDOMDataStore testStore = InMemoryDOMDataStoreFactory.create("Test", null, null);
469         testStore.onGlobalContextUpdated(SCHEMA_CONTEXT);
470
471         writeToStore(testStore, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
472
473         NormalizedNode<?, ?> root = readStore(testStore, YangInstanceIdentifier.builder().build());
474
475         InMemorySnapshotStore.addSnapshot(shardID.toString(), Snapshot.create(
476                 SerializationUtils.serializeNormalizedNode(root),
477                 Collections.<ReplicatedLogEntry>emptyList(), 0, 1, -1, -1));
478
479         // Set up the InMemoryJournal.
480
481         InMemoryJournal.addEntry(shardID.toString(), 0, new ReplicatedLogImplEntry(0, 1, newLegacyPayload(
482                   new WriteModification(TestModel.OUTER_LIST_PATH,
483                           ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build()))));
484
485         int nListEntries = 16;
486         Set<Integer> listEntryKeys = new HashSet<>();
487
488         // Add some ModificationPayload entries
489         for(int i = 1; i <= nListEntries; i++) {
490             listEntryKeys.add(Integer.valueOf(i));
491             YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
492                     .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
493             Modification mod = new MergeModification(path,
494                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i));
495             InMemoryJournal.addEntry(shardID.toString(), i, new ReplicatedLogImplEntry(i, 1,
496                     newModificationPayload(mod)));
497         }
498
499         InMemoryJournal.addEntry(shardID.toString(), nListEntries + 1,
500                 new ApplyJournalEntries(nListEntries));
501
502         testRecovery(listEntryKeys);
503     }
504
505     @Test
506     public void testHelium2VersionRecovery() throws Exception {
507
508         // Set up the InMemorySnapshotStore.
509
510         InMemoryDOMDataStore testStore = InMemoryDOMDataStoreFactory.create("Test", null, null);
511         testStore.onGlobalContextUpdated(SCHEMA_CONTEXT);
512
513         writeToStore(testStore, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
514
515         NormalizedNode<?, ?> root = readStore(testStore, YangInstanceIdentifier.builder().build());
516
517         InMemorySnapshotStore.addSnapshot(shardID.toString(), Snapshot.create(
518                 new NormalizedNodeToNodeCodec(SCHEMA_CONTEXT).encode(root).
519                                 getNormalizedNode().toByteString().toByteArray(),
520                                 Collections.<ReplicatedLogEntry>emptyList(), 0, 1, -1, -1));
521
522         // Set up the InMemoryJournal.
523
524         InMemoryJournal.addEntry(shardID.toString(), 0, new ReplicatedLogImplEntry(0, 1, newLegacyPayload(
525                   new WriteModification(TestModel.OUTER_LIST_PATH,
526                           ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build()))));
527
528         int nListEntries = 16;
529         Set<Integer> listEntryKeys = new HashSet<>();
530         int i = 1;
531
532         // Add some CompositeModificationPayload entries
533         for(; i <= 8; i++) {
534             listEntryKeys.add(Integer.valueOf(i));
535             YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
536                     .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
537             Modification mod = new MergeModification(path,
538                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i));
539             InMemoryJournal.addEntry(shardID.toString(), i, new ReplicatedLogImplEntry(i, 1,
540                     newLegacyPayload(mod)));
541         }
542
543         // Add some CompositeModificationByteStringPayload entries
544         for(; i <= nListEntries; i++) {
545             listEntryKeys.add(Integer.valueOf(i));
546             YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
547                     .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
548             Modification mod = new MergeModification(path,
549                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i));
550             InMemoryJournal.addEntry(shardID.toString(), i, new ReplicatedLogImplEntry(i, 1,
551                     newLegacyByteStringPayload(mod)));
552         }
553
554         InMemoryJournal.addEntry(shardID.toString(), nListEntries + 1, new ApplyLogEntries(nListEntries));
555
556         testRecovery(listEntryKeys);
557     }
558
559     private void testRecovery(Set<Integer> listEntryKeys) throws Exception {
560         // Create the actor and wait for recovery complete.
561
562         int nListEntries = listEntryKeys.size();
563
564         final CountDownLatch recoveryComplete = new CountDownLatch(1);
565
566         @SuppressWarnings("serial")
567         Creator<Shard> creator = new Creator<Shard>() {
568             @Override
569             public Shard create() throws Exception {
570                 return new Shard(shardID, Collections.<ShardIdentifier,String>emptyMap(),
571                         newDatastoreContext(), SCHEMA_CONTEXT) {
572                     @Override
573                     protected void onRecoveryComplete() {
574                         try {
575                             super.onRecoveryComplete();
576                         } finally {
577                             recoveryComplete.countDown();
578                         }
579                     }
580                 };
581             }
582         };
583
584         TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
585                 Props.create(new DelegatingShardCreator(creator)), "testRecovery");
586
587         assertEquals("Recovery complete", true, recoveryComplete.await(5, TimeUnit.SECONDS));
588
589         // Verify data in the data store.
590
591         NormalizedNode<?, ?> outerList = readStore(shard, TestModel.OUTER_LIST_PATH);
592         assertNotNull(TestModel.OUTER_LIST_QNAME.getLocalName() + " not found", outerList);
593         assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " value is not Iterable",
594                 outerList.getValue() instanceof Iterable);
595         for(Object entry: (Iterable<?>) outerList.getValue()) {
596             assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " entry is not MapEntryNode",
597                     entry instanceof MapEntryNode);
598             MapEntryNode mapEntry = (MapEntryNode)entry;
599             Optional<DataContainerChild<? extends PathArgument, ?>> idLeaf =
600                     mapEntry.getChild(new YangInstanceIdentifier.NodeIdentifier(TestModel.ID_QNAME));
601             assertTrue("Missing leaf " + TestModel.ID_QNAME.getLocalName(), idLeaf.isPresent());
602             Object value = idLeaf.get().getValue();
603             assertTrue("Unexpected value for leaf "+ TestModel.ID_QNAME.getLocalName() + ": " + value,
604                     listEntryKeys.remove(value));
605         }
606
607         if(!listEntryKeys.isEmpty()) {
608             fail("Missing " + TestModel.OUTER_LIST_QNAME.getLocalName() + " entries with keys: " +
609                     listEntryKeys);
610         }
611
612         assertEquals("Last log index", nListEntries,
613                 shard.underlyingActor().getShardMBean().getLastLogIndex());
614         assertEquals("Commit index", nListEntries,
615                 shard.underlyingActor().getShardMBean().getCommitIndex());
616         assertEquals("Last applied", nListEntries,
617                 shard.underlyingActor().getShardMBean().getLastApplied());
618
619         shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
620     }
621
622     private CompositeModificationPayload newLegacyPayload(final Modification... mods) {
623         MutableCompositeModification compMod = new MutableCompositeModification();
624         for(Modification mod: mods) {
625             compMod.addModification(mod);
626         }
627
628         return new CompositeModificationPayload(compMod.toSerializable());
629     }
630
631     private CompositeModificationByteStringPayload newLegacyByteStringPayload(final Modification... mods) {
632         MutableCompositeModification compMod = new MutableCompositeModification();
633         for(Modification mod: mods) {
634             compMod.addModification(mod);
635         }
636
637         return new CompositeModificationByteStringPayload(compMod.toSerializable());
638     }
639
640     private ModificationPayload newModificationPayload(final Modification... mods) throws IOException {
641         MutableCompositeModification compMod = new MutableCompositeModification();
642         for(Modification mod: mods) {
643             compMod.addModification(mod);
644         }
645
646         return new ModificationPayload(compMod);
647     }
648
649     private DOMStoreThreePhaseCommitCohort setupMockWriteTransaction(final String cohortName,
650             final InMemoryDOMDataStore dataStore, final YangInstanceIdentifier path, final NormalizedNode<?, ?> data,
651             final MutableCompositeModification modification) {
652         return setupMockWriteTransaction(cohortName, dataStore, path, data, modification, null);
653     }
654
655     private DOMStoreThreePhaseCommitCohort setupMockWriteTransaction(final String cohortName,
656             final InMemoryDOMDataStore dataStore, final YangInstanceIdentifier path, final NormalizedNode<?, ?> data,
657             final MutableCompositeModification modification,
658             final Function<DOMStoreThreePhaseCommitCohort,ListenableFuture<Void>> preCommit) {
659
660         DOMStoreWriteTransaction tx = dataStore.newWriteOnlyTransaction();
661         tx.write(path, data);
662         final DOMStoreThreePhaseCommitCohort realCohort = tx.ready();
663         DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, cohortName);
664
665         doAnswer(new Answer<ListenableFuture<Boolean>>() {
666             @Override
667             public ListenableFuture<Boolean> answer(final InvocationOnMock invocation) {
668                 return realCohort.canCommit();
669             }
670         }).when(cohort).canCommit();
671
672         doAnswer(new Answer<ListenableFuture<Void>>() {
673             @Override
674             public ListenableFuture<Void> answer(final InvocationOnMock invocation) throws Throwable {
675                 if(preCommit != null) {
676                     return preCommit.apply(realCohort);
677                 } else {
678                     return realCohort.preCommit();
679                 }
680             }
681         }).when(cohort).preCommit();
682
683         doAnswer(new Answer<ListenableFuture<Void>>() {
684             @Override
685             public ListenableFuture<Void> answer(final InvocationOnMock invocation) throws Throwable {
686                 return realCohort.commit();
687             }
688         }).when(cohort).commit();
689
690         doAnswer(new Answer<ListenableFuture<Void>>() {
691             @Override
692             public ListenableFuture<Void> answer(final InvocationOnMock invocation) throws Throwable {
693                 return realCohort.abort();
694             }
695         }).when(cohort).abort();
696
697         modification.addModification(new WriteModification(path, data));
698
699         return cohort;
700     }
701
702     @SuppressWarnings({ "unchecked" })
703     @Test
704     public void testConcurrentThreePhaseCommits() throws Throwable {
705         new ShardTestKit(getSystem()) {{
706             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
707                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
708                     "testConcurrentThreePhaseCommits");
709
710             waitUntilLeader(shard);
711
712             // Setup 3 simulated transactions with mock cohorts backed by real cohorts.
713
714             InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
715
716             String transactionID1 = "tx1";
717             MutableCompositeModification modification1 = new MutableCompositeModification();
718             DOMStoreThreePhaseCommitCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
719                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
720
721             String transactionID2 = "tx2";
722             MutableCompositeModification modification2 = new MutableCompositeModification();
723             DOMStoreThreePhaseCommitCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
724                     TestModel.OUTER_LIST_PATH,
725                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
726                     modification2);
727
728             String transactionID3 = "tx3";
729             MutableCompositeModification modification3 = new MutableCompositeModification();
730             DOMStoreThreePhaseCommitCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
731                     YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
732                         .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
733                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1),
734                     modification3);
735
736             long timeoutSec = 5;
737             final FiniteDuration duration = FiniteDuration.create(timeoutSec, TimeUnit.SECONDS);
738             final Timeout timeout = new Timeout(duration);
739
740             // Simulate the ForwardedReadyTransaction message for the first Tx that would be sent
741             // by the ShardTransaction.
742
743             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
744                     cohort1, modification1, true), getRef());
745             ReadyTransactionReply readyReply = ReadyTransactionReply.fromSerializable(
746                     expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS));
747             assertEquals("Cohort path", shard.path().toString(), readyReply.getCohortPath());
748
749             // Send the CanCommitTransaction message for the first Tx.
750
751             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
752             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
753                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
754             assertEquals("Can commit", true, canCommitReply.getCanCommit());
755
756             // Send the ForwardedReadyTransaction for the next 2 Tx's.
757
758             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
759                     cohort2, modification2, true), getRef());
760             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
761
762             shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
763                     cohort3, modification3, true), getRef());
764             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
765
766             // Send the CanCommitTransaction message for the next 2 Tx's. These should get queued and
767             // processed after the first Tx completes.
768
769             Future<Object> canCommitFuture1 = Patterns.ask(shard,
770                     new CanCommitTransaction(transactionID2).toSerializable(), timeout);
771
772             Future<Object> canCommitFuture2 = Patterns.ask(shard,
773                     new CanCommitTransaction(transactionID3).toSerializable(), timeout);
774
775             // Send the CommitTransaction message for the first Tx. After it completes, it should
776             // trigger the 2nd Tx to proceed which should in turn then trigger the 3rd.
777
778             shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
779             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
780
781             // Wait for the next 2 Tx's to complete.
782
783             final AtomicReference<Throwable> caughtEx = new AtomicReference<>();
784             final CountDownLatch commitLatch = new CountDownLatch(2);
785
786             class OnFutureComplete extends OnComplete<Object> {
787                 private final Class<?> expRespType;
788
789                 OnFutureComplete(final Class<?> expRespType) {
790                     this.expRespType = expRespType;
791                 }
792
793                 @Override
794                 public void onComplete(final Throwable error, final Object resp) {
795                     if(error != null) {
796                         caughtEx.set(new AssertionError(getClass().getSimpleName() + " failure", error));
797                     } else {
798                         try {
799                             assertEquals("Commit response type", expRespType, resp.getClass());
800                             onSuccess(resp);
801                         } catch (Exception e) {
802                             caughtEx.set(e);
803                         }
804                     }
805                 }
806
807                 void onSuccess(final Object resp) throws Exception {
808                 }
809             }
810
811             class OnCommitFutureComplete extends OnFutureComplete {
812                 OnCommitFutureComplete() {
813                     super(CommitTransactionReply.SERIALIZABLE_CLASS);
814                 }
815
816                 @Override
817                 public void onComplete(final Throwable error, final Object resp) {
818                     super.onComplete(error, resp);
819                     commitLatch.countDown();
820                 }
821             }
822
823             class OnCanCommitFutureComplete extends OnFutureComplete {
824                 private final String transactionID;
825
826                 OnCanCommitFutureComplete(final String transactionID) {
827                     super(CanCommitTransactionReply.SERIALIZABLE_CLASS);
828                     this.transactionID = transactionID;
829                 }
830
831                 @Override
832                 void onSuccess(final Object resp) throws Exception {
833                     CanCommitTransactionReply canCommitReply =
834                             CanCommitTransactionReply.fromSerializable(resp);
835                     assertEquals("Can commit", true, canCommitReply.getCanCommit());
836
837                     Future<Object> commitFuture = Patterns.ask(shard,
838                             new CommitTransaction(transactionID).toSerializable(), timeout);
839                     commitFuture.onComplete(new OnCommitFutureComplete(), getSystem().dispatcher());
840                 }
841             }
842
843             canCommitFuture1.onComplete(new OnCanCommitFutureComplete(transactionID2),
844                     getSystem().dispatcher());
845
846             canCommitFuture2.onComplete(new OnCanCommitFutureComplete(transactionID3),
847                     getSystem().dispatcher());
848
849             boolean done = commitLatch.await(timeoutSec, TimeUnit.SECONDS);
850
851             if(caughtEx.get() != null) {
852                 throw caughtEx.get();
853             }
854
855             assertEquals("Commits complete", true, done);
856
857             InOrder inOrder = inOrder(cohort1, cohort2, cohort3);
858             inOrder.verify(cohort1).canCommit();
859             inOrder.verify(cohort1).preCommit();
860             inOrder.verify(cohort1).commit();
861             inOrder.verify(cohort2).canCommit();
862             inOrder.verify(cohort2).preCommit();
863             inOrder.verify(cohort2).commit();
864             inOrder.verify(cohort3).canCommit();
865             inOrder.verify(cohort3).preCommit();
866             inOrder.verify(cohort3).commit();
867
868             // Verify data in the data store.
869
870             NormalizedNode<?, ?> outerList = readStore(shard, TestModel.OUTER_LIST_PATH);
871             assertNotNull(TestModel.OUTER_LIST_QNAME.getLocalName() + " not found", outerList);
872             assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " value is not Iterable",
873                     outerList.getValue() instanceof Iterable);
874             Object entry = ((Iterable<Object>)outerList.getValue()).iterator().next();
875             assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " entry is not MapEntryNode",
876                        entry instanceof MapEntryNode);
877             MapEntryNode mapEntry = (MapEntryNode)entry;
878             Optional<DataContainerChild<? extends PathArgument, ?>> idLeaf =
879                     mapEntry.getChild(new YangInstanceIdentifier.NodeIdentifier(TestModel.ID_QNAME));
880             assertTrue("Missing leaf " + TestModel.ID_QNAME.getLocalName(), idLeaf.isPresent());
881             assertEquals(TestModel.ID_QNAME.getLocalName() + " value", 1, idLeaf.get().getValue());
882
883             verifyLastLogIndex(shard, 2);
884
885             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
886         }};
887     }
888
889     private void verifyLastLogIndex(TestActorRef<Shard> shard, long expectedValue) {
890         for(int i = 0; i < 20 * 5; i++) {
891             long lastLogIndex = shard.underlyingActor().getShardMBean().getLastLogIndex();
892             if(lastLogIndex == expectedValue) {
893                 break;
894             }
895             Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
896         }
897
898         assertEquals("Last log index", expectedValue, shard.underlyingActor().getShardMBean().getLastLogIndex());
899     }
900
901     @Test
902     public void testCommitWithPersistenceDisabled() throws Throwable {
903         dataStoreContextBuilder.persistent(false);
904         new ShardTestKit(getSystem()) {{
905             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
906                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
907                     "testCommitPhaseFailure");
908
909             waitUntilLeader(shard);
910
911             InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
912
913             // Setup a simulated transactions with a mock cohort.
914
915             String transactionID = "tx";
916             MutableCompositeModification modification = new MutableCompositeModification();
917             NormalizedNode<?, ?> containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
918             DOMStoreThreePhaseCommitCohort cohort = setupMockWriteTransaction("cohort", dataStore,
919                     TestModel.TEST_PATH, containerNode, modification);
920
921             FiniteDuration duration = duration("5 seconds");
922
923             // Simulate the ForwardedReadyTransaction messages that would be sent
924             // by the ShardTransaction.
925
926             shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
927                     cohort, modification, true), getRef());
928             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
929
930             // Send the CanCommitTransaction message.
931
932             shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
933             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
934                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
935             assertEquals("Can commit", true, canCommitReply.getCanCommit());
936
937             // Send the CanCommitTransaction message.
938
939             shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
940             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
941
942             InOrder inOrder = inOrder(cohort);
943             inOrder.verify(cohort).canCommit();
944             inOrder.verify(cohort).preCommit();
945             inOrder.verify(cohort).commit();
946
947             NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.TEST_PATH);
948             assertEquals(TestModel.TEST_QNAME.getLocalName(), containerNode, actualNode);
949
950             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
951         }};
952     }
953
954     @Test
955     public void testCommitPhaseFailure() throws Throwable {
956         new ShardTestKit(getSystem()) {{
957             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
958                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
959                     "testCommitPhaseFailure");
960
961             waitUntilLeader(shard);
962
963             // Setup 2 simulated transactions with mock cohorts. The first one fails in the
964             // commit phase.
965
966             String transactionID1 = "tx1";
967             MutableCompositeModification modification1 = new MutableCompositeModification();
968             DOMStoreThreePhaseCommitCohort cohort1 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
969             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
970             doReturn(Futures.immediateFuture(null)).when(cohort1).preCommit();
971             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort1).commit();
972
973             String transactionID2 = "tx2";
974             MutableCompositeModification modification2 = new MutableCompositeModification();
975             DOMStoreThreePhaseCommitCohort cohort2 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort2");
976             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
977
978             FiniteDuration duration = duration("5 seconds");
979             final Timeout timeout = new Timeout(duration);
980
981             // Simulate the ForwardedReadyTransaction messages that would be sent
982             // by the ShardTransaction.
983
984             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
985                     cohort1, modification1, true), getRef());
986             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
987
988             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
989                     cohort2, modification2, true), getRef());
990             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
991
992             // Send the CanCommitTransaction message for the first Tx.
993
994             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
995             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
996                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
997             assertEquals("Can commit", true, canCommitReply.getCanCommit());
998
999             // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
1000             // processed after the first Tx completes.
1001
1002             Future<Object> canCommitFuture = Patterns.ask(shard,
1003                     new CanCommitTransaction(transactionID2).toSerializable(), timeout);
1004
1005             // Send the CommitTransaction message for the first Tx. This should send back an error
1006             // and trigger the 2nd Tx to proceed.
1007
1008             shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
1009             expectMsgClass(duration, akka.actor.Status.Failure.class);
1010
1011             // Wait for the 2nd Tx to complete the canCommit phase.
1012
1013             final CountDownLatch latch = new CountDownLatch(1);
1014             canCommitFuture.onComplete(new OnComplete<Object>() {
1015                 @Override
1016                 public void onComplete(final Throwable t, final Object resp) {
1017                     latch.countDown();
1018                 }
1019             }, getSystem().dispatcher());
1020
1021             assertEquals("2nd CanCommit complete", true, latch.await(5, TimeUnit.SECONDS));
1022
1023             InOrder inOrder = inOrder(cohort1, cohort2);
1024             inOrder.verify(cohort1).canCommit();
1025             inOrder.verify(cohort1).preCommit();
1026             inOrder.verify(cohort1).commit();
1027             inOrder.verify(cohort2).canCommit();
1028
1029             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1030         }};
1031     }
1032
1033     @Test
1034     public void testPreCommitPhaseFailure() throws Throwable {
1035         new ShardTestKit(getSystem()) {{
1036             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1037                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1038                     "testPreCommitPhaseFailure");
1039
1040             waitUntilLeader(shard);
1041
1042             String transactionID = "tx1";
1043             MutableCompositeModification modification = new MutableCompositeModification();
1044             DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
1045             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1046             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).preCommit();
1047
1048             FiniteDuration duration = duration("5 seconds");
1049
1050             // Simulate the ForwardedReadyTransaction messages that would be sent
1051             // by the ShardTransaction.
1052
1053             shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
1054                     cohort, modification, true), getRef());
1055             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1056
1057             // Send the CanCommitTransaction message.
1058
1059             shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
1060             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1061                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1062             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1063
1064             // Send the CommitTransaction message. This should send back an error
1065             // for preCommit failure.
1066
1067             shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
1068             expectMsgClass(duration, akka.actor.Status.Failure.class);
1069
1070             InOrder inOrder = inOrder(cohort);
1071             inOrder.verify(cohort).canCommit();
1072             inOrder.verify(cohort).preCommit();
1073
1074             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1075         }};
1076     }
1077
1078     @Test
1079     public void testCanCommitPhaseFailure() throws Throwable {
1080         new ShardTestKit(getSystem()) {{
1081             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1082                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1083                     "testCanCommitPhaseFailure");
1084
1085             waitUntilLeader(shard);
1086
1087             final FiniteDuration duration = duration("5 seconds");
1088
1089             String transactionID = "tx1";
1090             MutableCompositeModification modification = new MutableCompositeModification();
1091             DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
1092             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).canCommit();
1093
1094             // Simulate the ForwardedReadyTransaction messages that would be sent
1095             // by the ShardTransaction.
1096
1097             shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
1098                     cohort, modification, true), getRef());
1099             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1100
1101             // Send the CanCommitTransaction message.
1102
1103             shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
1104             expectMsgClass(duration, akka.actor.Status.Failure.class);
1105
1106             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1107         }};
1108     }
1109
1110     @Test
1111     public void testAbortBeforeFinishCommit() throws Throwable {
1112         new ShardTestKit(getSystem()) {{
1113             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1114                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1115                     "testAbortBeforeFinishCommit");
1116
1117             waitUntilLeader(shard);
1118
1119             final FiniteDuration duration = duration("5 seconds");
1120             InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
1121
1122             final String transactionID = "tx1";
1123             Function<DOMStoreThreePhaseCommitCohort,ListenableFuture<Void>> preCommit =
1124                           new Function<DOMStoreThreePhaseCommitCohort,ListenableFuture<Void>>() {
1125                 @Override
1126                 public ListenableFuture<Void> apply(final DOMStoreThreePhaseCommitCohort cohort) {
1127                     ListenableFuture<Void> preCommitFuture = cohort.preCommit();
1128
1129                     // Simulate an AbortTransaction message occurring during replication, after
1130                     // persisting and before finishing the commit to the in-memory store.
1131                     // We have no followers so due to optimizations in the RaftActor, it does not
1132                     // attempt replication and thus we can't send an AbortTransaction message b/c
1133                     // it would be processed too late after CommitTransaction completes. So we'll
1134                     // simulate an AbortTransaction message occurring during replication by calling
1135                     // the shard directly.
1136                     //
1137                     shard.underlyingActor().doAbortTransaction(transactionID, null);
1138
1139                     return preCommitFuture;
1140                 }
1141             };
1142
1143             MutableCompositeModification modification = new MutableCompositeModification();
1144             DOMStoreThreePhaseCommitCohort cohort = setupMockWriteTransaction("cohort1", dataStore,
1145                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME),
1146                     modification, preCommit);
1147
1148             shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
1149                     cohort, modification, true), getRef());
1150             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1151
1152             shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
1153             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1154                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1155             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1156
1157             shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
1158             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1159
1160             NormalizedNode<?, ?> node = readStore(shard, TestModel.TEST_PATH);
1161
1162             // Since we're simulating an abort occurring during replication and before finish commit,
1163             // the data should still get written to the in-memory store since we've gotten past
1164             // canCommit and preCommit and persisted the data.
1165             assertNotNull(TestModel.TEST_QNAME.getLocalName() + " not found", node);
1166
1167             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1168         }};
1169     }
1170
1171     @Test
1172     public void testTransactionCommitTimeout() throws Throwable {
1173         dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1);
1174
1175         new ShardTestKit(getSystem()) {{
1176             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1177                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1178                     "testTransactionCommitTimeout");
1179
1180             waitUntilLeader(shard);
1181
1182             final FiniteDuration duration = duration("5 seconds");
1183
1184             InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
1185
1186             writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1187             writeToStore(shard, TestModel.OUTER_LIST_PATH,
1188                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
1189
1190             // Create 1st Tx - will timeout
1191
1192             String transactionID1 = "tx1";
1193             MutableCompositeModification modification1 = new MutableCompositeModification();
1194             DOMStoreThreePhaseCommitCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
1195                     YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
1196                         .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
1197                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1),
1198                     modification1);
1199
1200             // Create 2nd Tx
1201
1202             String transactionID2 = "tx3";
1203             MutableCompositeModification modification2 = new MutableCompositeModification();
1204             YangInstanceIdentifier listNodePath = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
1205                 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build();
1206             DOMStoreThreePhaseCommitCohort cohort2 = setupMockWriteTransaction("cohort3", dataStore,
1207                     listNodePath,
1208                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2),
1209                     modification2);
1210
1211             // Ready the Tx's
1212
1213             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1214                     cohort1, modification1, true), getRef());
1215             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1216
1217             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1218                     cohort2, modification2, true), getRef());
1219             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1220
1221             // canCommit 1st Tx. We don't send the commit so it should timeout.
1222
1223             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1224             expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
1225
1226             // canCommit the 2nd Tx - it should complete after the 1st Tx times out.
1227
1228             shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
1229             expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
1230
1231             // Commit the 2nd Tx.
1232
1233             shard.tell(new CommitTransaction(transactionID2).toSerializable(), getRef());
1234             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1235
1236             NormalizedNode<?, ?> node = readStore(shard, listNodePath);
1237             assertNotNull(listNodePath + " not found", node);
1238
1239             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1240         }};
1241     }
1242
1243     @Test
1244     public void testTransactionCommitQueueCapacityExceeded() throws Throwable {
1245         dataStoreContextBuilder.shardTransactionCommitQueueCapacity(1);
1246
1247         new ShardTestKit(getSystem()) {{
1248             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1249                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1250                     "testTransactionCommitQueueCapacityExceeded");
1251
1252             waitUntilLeader(shard);
1253
1254             final FiniteDuration duration = duration("5 seconds");
1255
1256             InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
1257
1258             String transactionID1 = "tx1";
1259             MutableCompositeModification modification1 = new MutableCompositeModification();
1260             DOMStoreThreePhaseCommitCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
1261                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
1262
1263             String transactionID2 = "tx2";
1264             MutableCompositeModification modification2 = new MutableCompositeModification();
1265             DOMStoreThreePhaseCommitCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
1266                     TestModel.OUTER_LIST_PATH,
1267                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
1268                     modification2);
1269
1270             String transactionID3 = "tx3";
1271             MutableCompositeModification modification3 = new MutableCompositeModification();
1272             DOMStoreThreePhaseCommitCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
1273                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification3);
1274
1275             // Ready the Tx's
1276
1277             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1278                     cohort1, modification1, true), getRef());
1279             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1280
1281             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1282                     cohort2, modification2, true), getRef());
1283             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1284
1285             shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
1286                     cohort3, modification3, true), getRef());
1287             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1288
1289             // canCommit 1st Tx.
1290
1291             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1292             expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
1293
1294             // canCommit the 2nd Tx - it should get queued.
1295
1296             shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
1297
1298             // canCommit the 3rd Tx - should exceed queue capacity and fail.
1299
1300             shard.tell(new CanCommitTransaction(transactionID3).toSerializable(), getRef());
1301             expectMsgClass(duration, akka.actor.Status.Failure.class);
1302
1303             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1304         }};
1305     }
1306
1307     @Test
1308     public void testCanCommitBeforeReadyFailure() throws Throwable {
1309         new ShardTestKit(getSystem()) {{
1310             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1311                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1312                     "testCanCommitBeforeReadyFailure");
1313
1314             shard.tell(new CanCommitTransaction("tx").toSerializable(), getRef());
1315             expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
1316
1317             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1318         }};
1319     }
1320
1321     @Test
1322     public void testAbortTransaction() throws Throwable {
1323         new ShardTestKit(getSystem()) {{
1324             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1325                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1326                     "testAbortTransaction");
1327
1328             waitUntilLeader(shard);
1329
1330             // Setup 2 simulated transactions with mock cohorts. The first one will be aborted.
1331
1332             String transactionID1 = "tx1";
1333             MutableCompositeModification modification1 = new MutableCompositeModification();
1334             DOMStoreThreePhaseCommitCohort cohort1 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
1335             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
1336             doReturn(Futures.immediateFuture(null)).when(cohort1).abort();
1337
1338             String transactionID2 = "tx2";
1339             MutableCompositeModification modification2 = new MutableCompositeModification();
1340             DOMStoreThreePhaseCommitCohort cohort2 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort2");
1341             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
1342
1343             FiniteDuration duration = duration("5 seconds");
1344             final Timeout timeout = new Timeout(duration);
1345
1346             // Simulate the ForwardedReadyTransaction messages that would be sent
1347             // by the ShardTransaction.
1348
1349             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1350                     cohort1, modification1, true), getRef());
1351             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1352
1353             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1354                     cohort2, modification2, true), getRef());
1355             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1356
1357             // Send the CanCommitTransaction message for the first Tx.
1358
1359             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1360             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1361                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1362             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1363
1364             // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
1365             // processed after the first Tx completes.
1366
1367             Future<Object> canCommitFuture = Patterns.ask(shard,
1368                     new CanCommitTransaction(transactionID2).toSerializable(), timeout);
1369
1370             // Send the AbortTransaction message for the first Tx. This should trigger the 2nd
1371             // Tx to proceed.
1372
1373             shard.tell(new AbortTransaction(transactionID1).toSerializable(), getRef());
1374             expectMsgClass(duration, AbortTransactionReply.SERIALIZABLE_CLASS);
1375
1376             // Wait for the 2nd Tx to complete the canCommit phase.
1377
1378             Await.ready(canCommitFuture, duration);
1379
1380             InOrder inOrder = inOrder(cohort1, cohort2);
1381             inOrder.verify(cohort1).canCommit();
1382             inOrder.verify(cohort2).canCommit();
1383
1384             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1385         }};
1386     }
1387
1388     @Test
1389     public void testCreateSnapshot() throws Exception {
1390         testCreateSnapshot(true, "testCreateSnapshot");
1391     }
1392
1393     @Test
1394     public void testCreateSnapshotWithNonPersistentData() throws Exception {
1395         testCreateSnapshot(false, "testCreateSnapshotWithNonPersistentData");
1396     }
1397
1398     @SuppressWarnings("serial")
1399     public void testCreateSnapshot(final boolean persistent, final String shardActorName) throws Exception{
1400
1401         final AtomicReference<Object> savedSnapshot = new AtomicReference<>();
1402         class DelegatingPersistentDataProvider implements DataPersistenceProvider {
1403             DataPersistenceProvider delegate;
1404
1405             DelegatingPersistentDataProvider(DataPersistenceProvider delegate) {
1406                 this.delegate = delegate;
1407             }
1408
1409             @Override
1410             public boolean isRecoveryApplicable() {
1411                 return delegate.isRecoveryApplicable();
1412             }
1413
1414             @Override
1415             public <T> void persist(T o, Procedure<T> procedure) {
1416                 delegate.persist(o, procedure);
1417             }
1418
1419             @Override
1420             public void saveSnapshot(Object o) {
1421                 savedSnapshot.set(o);
1422                 delegate.saveSnapshot(o);
1423             }
1424
1425             @Override
1426             public void deleteSnapshots(SnapshotSelectionCriteria criteria) {
1427                 delegate.deleteSnapshots(criteria);
1428             }
1429
1430             @Override
1431             public void deleteMessages(long sequenceNumber) {
1432                 delegate.deleteMessages(sequenceNumber);
1433             }
1434         }
1435
1436         dataStoreContextBuilder.persistent(persistent);
1437
1438         new ShardTestKit(getSystem()) {{
1439             final AtomicReference<CountDownLatch> latch = new AtomicReference<>(new CountDownLatch(1));
1440             Creator<Shard> creator = new Creator<Shard>() {
1441                 @Override
1442                 public Shard create() throws Exception {
1443                     return new Shard(shardID, Collections.<ShardIdentifier,String>emptyMap(),
1444                             newDatastoreContext(), SCHEMA_CONTEXT) {
1445
1446                         DelegatingPersistentDataProvider delegating;
1447
1448                         @Override
1449                         protected DataPersistenceProvider persistence() {
1450                             if(delegating == null) {
1451                                 delegating = new DelegatingPersistentDataProvider(super.persistence());
1452                             }
1453
1454                             return delegating;
1455                         }
1456
1457                         @Override
1458                         protected void commitSnapshot(final long sequenceNumber) {
1459                             super.commitSnapshot(sequenceNumber);
1460                             latch.get().countDown();
1461                         }
1462                     };
1463                 }
1464             };
1465
1466             TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1467                     Props.create(new DelegatingShardCreator(creator)), shardActorName);
1468
1469             waitUntilLeader(shard);
1470
1471             writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1472
1473             NormalizedNode<?,?> expectedRoot = readStore(shard, YangInstanceIdentifier.builder().build());
1474
1475             CaptureSnapshot capture = new CaptureSnapshot(-1, -1, -1, -1, -1, -1);
1476             shard.tell(capture, getRef());
1477
1478             assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS));
1479
1480             assertTrue("Invalid saved snapshot " + savedSnapshot.get(),
1481                     savedSnapshot.get() instanceof Snapshot);
1482
1483             verifySnapshot((Snapshot)savedSnapshot.get(), expectedRoot);
1484
1485             latch.set(new CountDownLatch(1));
1486             savedSnapshot.set(null);
1487
1488             shard.tell(capture, getRef());
1489
1490             assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS));
1491
1492             assertTrue("Invalid saved snapshot " + savedSnapshot.get(),
1493                     savedSnapshot.get() instanceof Snapshot);
1494
1495             verifySnapshot((Snapshot)savedSnapshot.get(), expectedRoot);
1496
1497             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1498         }
1499
1500         private void verifySnapshot(Snapshot snapshot, NormalizedNode<?,?> expectedRoot) {
1501
1502             NormalizedNode<?, ?> actual = SerializationUtils.deserializeNormalizedNode(snapshot.getState());
1503             assertEquals("Root node", expectedRoot, actual);
1504
1505         }};
1506     }
1507
1508     /**
1509      * This test simply verifies that the applySnapShot logic will work
1510      * @throws ReadFailedException
1511      */
1512     @Test
1513     public void testInMemoryDataStoreRestore() throws ReadFailedException {
1514         InMemoryDOMDataStore store = new InMemoryDOMDataStore("test", MoreExecutors.sameThreadExecutor());
1515
1516         store.onGlobalContextUpdated(SCHEMA_CONTEXT);
1517
1518         DOMStoreWriteTransaction putTransaction = store.newWriteOnlyTransaction();
1519         putTransaction.write(TestModel.TEST_PATH,
1520             ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1521         commitTransaction(putTransaction);
1522
1523
1524         NormalizedNode<?, ?> expected = readStore(store);
1525
1526         DOMStoreWriteTransaction writeTransaction = store.newWriteOnlyTransaction();
1527
1528         writeTransaction.delete(YangInstanceIdentifier.builder().build());
1529         writeTransaction.write(YangInstanceIdentifier.builder().build(), expected);
1530
1531         commitTransaction(writeTransaction);
1532
1533         NormalizedNode<?, ?> actual = readStore(store);
1534
1535         assertEquals(expected, actual);
1536     }
1537
1538     @Test
1539     public void testRecoveryApplicable(){
1540
1541         final DatastoreContext persistentContext = DatastoreContext.newBuilder().
1542                 shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(true).build();
1543
1544         final Props persistentProps = Shard.props(shardID, Collections.<ShardIdentifier, String>emptyMap(),
1545                 persistentContext, SCHEMA_CONTEXT);
1546
1547         final DatastoreContext nonPersistentContext = DatastoreContext.newBuilder().
1548                 shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(false).build();
1549
1550         final Props nonPersistentProps = Shard.props(shardID, Collections.<ShardIdentifier, String>emptyMap(),
1551                 nonPersistentContext, SCHEMA_CONTEXT);
1552
1553         new ShardTestKit(getSystem()) {{
1554             TestActorRef<Shard> shard1 = TestActorRef.create(getSystem(),
1555                     persistentProps, "testPersistence1");
1556
1557             assertTrue("Recovery Applicable", shard1.underlyingActor().getDataPersistenceProvider().isRecoveryApplicable());
1558
1559             shard1.tell(PoisonPill.getInstance(), ActorRef.noSender());
1560
1561             TestActorRef<Shard> shard2 = TestActorRef.create(getSystem(),
1562                     nonPersistentProps, "testPersistence2");
1563
1564             assertFalse("Recovery Not Applicable", shard2.underlyingActor().getDataPersistenceProvider().isRecoveryApplicable());
1565
1566             shard2.tell(PoisonPill.getInstance(), ActorRef.noSender());
1567
1568         }};
1569
1570     }
1571
1572     @Test
1573     public void testOnDatastoreContext() {
1574         new ShardTestKit(getSystem()) {{
1575             dataStoreContextBuilder.persistent(true);
1576
1577             TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(), "testOnDatastoreContext");
1578
1579             assertEquals("isRecoveryApplicable", true,
1580                     shard.underlyingActor().getDataPersistenceProvider().isRecoveryApplicable());
1581
1582             waitUntilLeader(shard);
1583
1584             shard.tell(dataStoreContextBuilder.persistent(false).build(), ActorRef.noSender());
1585
1586             assertEquals("isRecoveryApplicable", false,
1587                     shard.underlyingActor().getDataPersistenceProvider().isRecoveryApplicable());
1588
1589             shard.tell(dataStoreContextBuilder.persistent(true).build(), ActorRef.noSender());
1590
1591             assertEquals("isRecoveryApplicable", true,
1592                     shard.underlyingActor().getDataPersistenceProvider().isRecoveryApplicable());
1593
1594             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1595         }};
1596     }
1597
1598     @Test
1599     public void testRegisterRoleChangeListener() throws Exception {
1600         new ShardTestKit(getSystem()) {
1601             {
1602                 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1603                         newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1604                         "testRegisterRoleChangeListener");
1605
1606                 waitUntilLeader(shard);
1607
1608                 TestActorRef<MessageCollectorActor> listener =
1609                         TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
1610
1611                 shard.tell(new RegisterRoleChangeListener(), listener);
1612
1613                 // TODO: MessageCollectorActor exists as a test util in both the akka-raft and distributed-datastore
1614                 // projects. Need to move it to commons as a regular utility and then we can get rid of this arbitrary
1615                 // sleep.
1616                 Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
1617
1618                 List<Object> allMatching = MessageCollectorActor.getAllMatching(listener, RegisterRoleChangeListenerReply.class);
1619
1620                 assertEquals(1, allMatching.size());
1621             }};
1622     }
1623
1624
1625     private NormalizedNode<?, ?> readStore(final InMemoryDOMDataStore store) throws ReadFailedException {
1626         DOMStoreReadTransaction transaction = store.newReadOnlyTransaction();
1627         CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read =
1628             transaction.read(YangInstanceIdentifier.builder().build());
1629
1630         Optional<NormalizedNode<?, ?>> optional = read.checkedGet();
1631
1632         NormalizedNode<?, ?> normalizedNode = optional.get();
1633
1634         transaction.close();
1635
1636         return normalizedNode;
1637     }
1638
1639     private void commitTransaction(final DOMStoreWriteTransaction transaction) {
1640         DOMStoreThreePhaseCommitCohort commitCohort = transaction.ready();
1641         ListenableFuture<Void> future =
1642             commitCohort.preCommit();
1643         try {
1644             future.get();
1645             future = commitCohort.commit();
1646             future.get();
1647         } catch (InterruptedException | ExecutionException e) {
1648         }
1649     }
1650
1651     private AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> noOpDataChangeListener() {
1652         return new AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>() {
1653             @Override
1654             public void onDataChanged(
1655                 final AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change) {
1656
1657             }
1658         };
1659     }
1660
1661     static NormalizedNode<?,?> readStore(final TestActorRef<Shard> shard, final YangInstanceIdentifier id)
1662             throws ExecutionException, InterruptedException {
1663         return readStore(shard.underlyingActor().getDataStore(), id);
1664     }
1665
1666     public static NormalizedNode<?,?> readStore(final InMemoryDOMDataStore store, final YangInstanceIdentifier id)
1667             throws ExecutionException, InterruptedException {
1668         DOMStoreReadTransaction transaction = store.newReadOnlyTransaction();
1669
1670         CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> future =
1671             transaction.read(id);
1672
1673         Optional<NormalizedNode<?, ?>> optional = future.get();
1674         NormalizedNode<?, ?> node = optional.isPresent()? optional.get() : null;
1675
1676         transaction.close();
1677
1678         return node;
1679     }
1680
1681     static void writeToStore(final TestActorRef<Shard> shard, final YangInstanceIdentifier id,
1682             final NormalizedNode<?,?> node) throws ExecutionException, InterruptedException {
1683         writeToStore(shard.underlyingActor().getDataStore(), id, node);
1684     }
1685
1686     public static void writeToStore(final InMemoryDOMDataStore store, final YangInstanceIdentifier id,
1687             final NormalizedNode<?,?> node) throws ExecutionException, InterruptedException {
1688         DOMStoreWriteTransaction transaction = store.newWriteOnlyTransaction();
1689
1690         transaction.write(id, node);
1691
1692         DOMStoreThreePhaseCommitCohort commitCohort = transaction.ready();
1693         commitCohort.preCommit().get();
1694         commitCohort.commit().get();
1695     }
1696
1697     @SuppressWarnings("serial")
1698     private static final class DelegatingShardCreator implements Creator<Shard> {
1699         private final Creator<Shard> delegate;
1700
1701         DelegatingShardCreator(final Creator<Shard> delegate) {
1702             this.delegate = delegate;
1703         }
1704
1705         @Override
1706         public Shard create() throws Exception {
1707             return delegate.create();
1708         }
1709     }
1710 }