Merge "Add netconf-ssh as dependency to features-mdsal"
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / ShardTest.java
1 package org.opendaylight.controller.cluster.datastore;
2
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertFalse;
5 import static org.junit.Assert.assertNotNull;
6 import static org.junit.Assert.assertNull;
7 import static org.junit.Assert.assertTrue;
8 import static org.junit.Assert.fail;
9 import static org.mockito.Mockito.doAnswer;
10 import static org.mockito.Mockito.doReturn;
11 import static org.mockito.Mockito.inOrder;
12 import static org.mockito.Mockito.mock;
13 import static org.opendaylight.controller.cluster.datastore.DataStoreVersions.CURRENT_VERSION;
14 import akka.actor.ActorRef;
15 import akka.actor.PoisonPill;
16 import akka.actor.Props;
17 import akka.dispatch.Dispatchers;
18 import akka.dispatch.OnComplete;
19 import akka.japi.Creator;
20 import akka.japi.Procedure;
21 import akka.pattern.Patterns;
22 import akka.persistence.SnapshotSelectionCriteria;
23 import akka.testkit.TestActorRef;
24 import akka.util.Timeout;
25 import com.google.common.base.Function;
26 import com.google.common.base.Optional;
27 import com.google.common.util.concurrent.CheckedFuture;
28 import com.google.common.util.concurrent.Futures;
29 import com.google.common.util.concurrent.ListenableFuture;
30 import com.google.common.util.concurrent.MoreExecutors;
31 import com.google.common.util.concurrent.Uninterruptibles;
32 import java.io.IOException;
33 import java.util.Collections;
34 import java.util.HashSet;
35 import java.util.List;
36 import java.util.Map;
37 import java.util.Set;
38 import java.util.concurrent.CountDownLatch;
39 import java.util.concurrent.ExecutionException;
40 import java.util.concurrent.TimeUnit;
41 import java.util.concurrent.atomic.AtomicInteger;
42 import java.util.concurrent.atomic.AtomicReference;
43 import org.junit.After;
44 import org.junit.Before;
45 import org.junit.Test;
46 import org.mockito.InOrder;
47 import org.mockito.invocation.InvocationOnMock;
48 import org.mockito.stubbing.Answer;
49 import org.opendaylight.controller.cluster.DataPersistenceProvider;
50 import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
51 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
52 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
53 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
54 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
55 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
56 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
57 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
58 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
59 import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
60 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
61 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
62 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
63 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
64 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
65 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
66 import org.opendaylight.controller.cluster.datastore.modification.Modification;
67 import org.opendaylight.controller.cluster.datastore.modification.ModificationPayload;
68 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
69 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
70 import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec;
71 import org.opendaylight.controller.cluster.datastore.utils.InMemoryJournal;
72 import org.opendaylight.controller.cluster.datastore.utils.InMemorySnapshotStore;
73 import org.opendaylight.controller.cluster.datastore.utils.MessageCollectorActor;
74 import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener;
75 import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
76 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
77 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListenerReply;
78 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
79 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
80 import org.opendaylight.controller.cluster.raft.Snapshot;
81 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
82 import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
83 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
84 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
85 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
86 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
87 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
88 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
89 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
90 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload;
91 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload;
92 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
93 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
94 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
95 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
96 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
97 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
98 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
99 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreFactory;
100 import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages;
101 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
102 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
103 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
104 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
105 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
106 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
107 import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
108 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
109 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
110 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
111 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
112 import scala.concurrent.Await;
113 import scala.concurrent.Future;
114 import scala.concurrent.duration.FiniteDuration;
115
116 public class ShardTest extends AbstractActorTest {
117
118     private static final SchemaContext SCHEMA_CONTEXT = TestModel.createTestContext();
119
120     private static final AtomicInteger NEXT_SHARD_NUM = new AtomicInteger();
121
122     private final ShardIdentifier shardID = ShardIdentifier.builder().memberName("member-1")
123             .shardName("inventory").type("config" + NEXT_SHARD_NUM.getAndIncrement()).build();
124
125     private final Builder dataStoreContextBuilder = DatastoreContext.newBuilder().
126             shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).
127             shardHeartbeatIntervalInMillis(100);
128
129     @Before
130     public void setUp() {
131         Builder newBuilder = DatastoreContext.newBuilder();
132         InMemorySnapshotStore.clear();
133         InMemoryJournal.clear();
134     }
135
136     @After
137     public void tearDown() {
138         InMemorySnapshotStore.clear();
139         InMemoryJournal.clear();
140     }
141
142     private DatastoreContext newDatastoreContext() {
143         return dataStoreContextBuilder.build();
144     }
145
146     private Props newShardProps() {
147         return Shard.props(shardID, Collections.<ShardIdentifier,String>emptyMap(),
148                 newDatastoreContext(), SCHEMA_CONTEXT);
149     }
150
151     @Test
152     public void testRegisterChangeListener() throws Exception {
153         new ShardTestKit(getSystem()) {{
154             TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
155                     newShardProps(),  "testRegisterChangeListener");
156
157             waitUntilLeader(shard);
158
159             shard.tell(new UpdateSchemaContext(SchemaContextHelper.full()), ActorRef.noSender());
160
161             MockDataChangeListener listener = new MockDataChangeListener(1);
162             ActorRef dclActor = getSystem().actorOf(DataChangeListener.props(listener),
163                     "testRegisterChangeListener-DataChangeListener");
164
165             shard.tell(new RegisterChangeListener(TestModel.TEST_PATH,
166                     dclActor.path(), AsyncDataBroker.DataChangeScope.BASE), getRef());
167
168             RegisterChangeListenerReply reply = expectMsgClass(duration("3 seconds"),
169                     RegisterChangeListenerReply.class);
170             String replyPath = reply.getListenerRegistrationPath().toString();
171             assertTrue("Incorrect reply path: " + replyPath, replyPath.matches(
172                     "akka:\\/\\/test\\/user\\/testRegisterChangeListener\\/\\$.*"));
173
174             YangInstanceIdentifier path = TestModel.TEST_PATH;
175             writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
176
177             listener.waitForChangeEvents(path);
178
179             dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
180             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
181         }};
182     }
183
184     @SuppressWarnings("serial")
185     @Test
186     public void testChangeListenerNotifiedWhenNotTheLeaderOnRegistration() throws Exception {
187         // This test tests the timing window in which a change listener is registered before the
188         // shard becomes the leader. We verify that the listener is registered and notified of the
189         // existing data when the shard becomes the leader.
190         new ShardTestKit(getSystem()) {{
191             // For this test, we want to send the RegisterChangeListener message after the shard
192             // has recovered from persistence and before it becomes the leader. So we subclass
193             // Shard to override onReceiveCommand and, when the first ElectionTimeout is received,
194             // we know that the shard has been initialized to a follower and has started the
195             // election process. The following 2 CountDownLatches are used to coordinate the
196             // ElectionTimeout with the sending of the RegisterChangeListener message.
197             final CountDownLatch onFirstElectionTimeout = new CountDownLatch(1);
198             final CountDownLatch onChangeListenerRegistered = new CountDownLatch(1);
199             Creator<Shard> creator = new Creator<Shard>() {
200                 boolean firstElectionTimeout = true;
201
202                 @Override
203                 public Shard create() throws Exception {
204                     return new Shard(shardID, Collections.<ShardIdentifier,String>emptyMap(),
205                             newDatastoreContext(), SCHEMA_CONTEXT) {
206                         @Override
207                         public void onReceiveCommand(final Object message) throws Exception {
208                             if(message instanceof ElectionTimeout && firstElectionTimeout) {
209                                 // Got the first ElectionTimeout. We don't forward it to the
210                                 // base Shard yet until we've sent the RegisterChangeListener
211                                 // message. So we signal the onFirstElectionTimeout latch to tell
212                                 // the main thread to send the RegisterChangeListener message and
213                                 // start a thread to wait on the onChangeListenerRegistered latch,
214                                 // which the main thread signals after it has sent the message.
215                                 // After the onChangeListenerRegistered is triggered, we send the
216                                 // original ElectionTimeout message to proceed with the election.
217                                 firstElectionTimeout = false;
218                                 final ActorRef self = getSelf();
219                                 new Thread() {
220                                     @Override
221                                     public void run() {
222                                         Uninterruptibles.awaitUninterruptibly(
223                                                 onChangeListenerRegistered, 5, TimeUnit.SECONDS);
224                                         self.tell(message, self);
225                                     }
226                                 }.start();
227
228                                 onFirstElectionTimeout.countDown();
229                             } else {
230                                 super.onReceiveCommand(message);
231                             }
232                         }
233                     };
234                 }
235             };
236
237             MockDataChangeListener listener = new MockDataChangeListener(1);
238             ActorRef dclActor = getSystem().actorOf(DataChangeListener.props(listener),
239                     "testRegisterChangeListenerWhenNotLeaderInitially-DataChangeListener");
240
241             TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
242                     Props.create(new DelegatingShardCreator(creator)),
243                     "testRegisterChangeListenerWhenNotLeaderInitially");
244
245             // Write initial data into the in-memory store.
246             YangInstanceIdentifier path = TestModel.TEST_PATH;
247             writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
248
249             // Wait until the shard receives the first ElectionTimeout message.
250             assertEquals("Got first ElectionTimeout", true,
251                     onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
252
253             // Now send the RegisterChangeListener and wait for the reply.
254             shard.tell(new RegisterChangeListener(path, dclActor.path(),
255                     AsyncDataBroker.DataChangeScope.SUBTREE), getRef());
256
257             RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
258                     RegisterChangeListenerReply.class);
259             assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
260
261             // Sanity check - verify the shard is not the leader yet.
262             shard.tell(new FindLeader(), getRef());
263             FindLeaderReply findLeadeReply =
264                     expectMsgClass(duration("5 seconds"), FindLeaderReply.class);
265             assertNull("Expected the shard not to be the leader", findLeadeReply.getLeaderActor());
266
267             // Signal the onChangeListenerRegistered latch to tell the thread above to proceed
268             // with the election process.
269             onChangeListenerRegistered.countDown();
270
271             // Wait for the shard to become the leader and notify our listener with the existing
272             // data in the store.
273             listener.waitForChangeEvents(path);
274
275             dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
276             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
277         }};
278     }
279
280     @Test
281     public void testCreateTransaction(){
282         new ShardTestKit(getSystem()) {{
283             ActorRef shard = getSystem().actorOf(newShardProps(), "testCreateTransaction");
284
285             waitUntilLeader(shard);
286
287             shard.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
288
289             shard.tell(new CreateTransaction("txn-1",
290                     TransactionProxy.TransactionType.READ_ONLY.ordinal() ).toSerializable(), getRef());
291
292             CreateTransactionReply reply = expectMsgClass(duration("3 seconds"),
293                     CreateTransactionReply.class);
294
295             String path = reply.getTransactionActorPath().toString();
296             assertTrue("Unexpected transaction path " + path,
297                     path.contains("akka://test/user/testCreateTransaction/shard-txn-1"));
298
299             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
300         }};
301     }
302
303     @Test
304     public void testCreateTransactionOnChain(){
305         new ShardTestKit(getSystem()) {{
306             final ActorRef shard = getSystem().actorOf(newShardProps(), "testCreateTransactionOnChain");
307
308             waitUntilLeader(shard);
309
310             shard.tell(new CreateTransaction("txn-1",
311                     TransactionProxy.TransactionType.READ_ONLY.ordinal() , "foobar").toSerializable(),
312                     getRef());
313
314             CreateTransactionReply reply = expectMsgClass(duration("3 seconds"),
315                     CreateTransactionReply.class);
316
317             String path = reply.getTransactionActorPath().toString();
318             assertTrue("Unexpected transaction path " + path,
319                     path.contains("akka://test/user/testCreateTransactionOnChain/shard-txn-1"));
320
321             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
322         }};
323     }
324
325     @SuppressWarnings("serial")
326     @Test
327     public void testPeerAddressResolved() throws Exception {
328         new ShardTestKit(getSystem()) {{
329             final CountDownLatch recoveryComplete = new CountDownLatch(1);
330             class TestShard extends Shard {
331                 TestShard() {
332                     super(shardID, Collections.<ShardIdentifier, String>singletonMap(shardID, null),
333                             newDatastoreContext(), SCHEMA_CONTEXT);
334                 }
335
336                 Map<String, String> getPeerAddresses() {
337                     return getRaftActorContext().getPeerAddresses();
338                 }
339
340                 @Override
341                 protected void onRecoveryComplete() {
342                     try {
343                         super.onRecoveryComplete();
344                     } finally {
345                         recoveryComplete.countDown();
346                     }
347                 }
348             }
349
350             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
351                     Props.create(new DelegatingShardCreator(new Creator<Shard>() {
352                         @Override
353                         public TestShard create() throws Exception {
354                             return new TestShard();
355                         }
356                     })), "testPeerAddressResolved");
357
358             //waitUntilLeader(shard);
359             assertEquals("Recovery complete", true,
360                     Uninterruptibles.awaitUninterruptibly(recoveryComplete, 5, TimeUnit.SECONDS));
361
362             String address = "akka://foobar";
363             shard.underlyingActor().onReceiveCommand(new PeerAddressResolved(shardID, address));
364
365             assertEquals("getPeerAddresses", address,
366                     ((TestShard)shard.underlyingActor()).getPeerAddresses().get(shardID.toString()));
367
368             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
369         }};
370     }
371
372     @Test
373     public void testApplySnapshot() throws Exception {
374         TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(),
375                 "testApplySnapshot");
376
377         InMemoryDOMDataStore store = new InMemoryDOMDataStore("OPER", MoreExecutors.sameThreadExecutor());
378         store.onGlobalContextUpdated(SCHEMA_CONTEXT);
379
380         writeToStore(store, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
381
382         YangInstanceIdentifier root = YangInstanceIdentifier.builder().build();
383         NormalizedNode<?,?> expected = readStore(store, root);
384
385         ApplySnapshot applySnapshot = new ApplySnapshot(Snapshot.create(
386                 SerializationUtils.serializeNormalizedNode(expected),
387                 Collections.<ReplicatedLogEntry>emptyList(), 1, 2, 3, 4));
388
389         shard.underlyingActor().onReceiveCommand(applySnapshot);
390
391         NormalizedNode<?,?> actual = readStore(shard, root);
392
393         assertEquals("Root node", expected, actual);
394
395         shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
396     }
397
398     @Test
399     public void testApplyHelium2VersionSnapshot() throws Exception {
400         TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(),
401                 "testApplySnapshot");
402
403         NormalizedNodeToNodeCodec codec = new NormalizedNodeToNodeCodec(SCHEMA_CONTEXT);
404
405         InMemoryDOMDataStore store = new InMemoryDOMDataStore("OPER", MoreExecutors.sameThreadExecutor());
406         store.onGlobalContextUpdated(SCHEMA_CONTEXT);
407
408         writeToStore(store, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
409
410         YangInstanceIdentifier root = YangInstanceIdentifier.builder().build();
411         NormalizedNode<?,?> expected = readStore(store, root);
412
413         NormalizedNodeMessages.Container encode = codec.encode(expected);
414
415         ApplySnapshot applySnapshot = new ApplySnapshot(Snapshot.create(
416                 encode.getNormalizedNode().toByteString().toByteArray(),
417                 Collections.<ReplicatedLogEntry>emptyList(), 1, 2, 3, 4));
418
419         shard.underlyingActor().onReceiveCommand(applySnapshot);
420
421         NormalizedNode<?,?> actual = readStore(shard, root);
422
423         assertEquals("Root node", expected, actual);
424
425         shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
426     }
427
428     @Test
429     public void testApplyState() throws Exception {
430
431         TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(), "testApplyState");
432
433         NormalizedNode<?, ?> node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
434
435         ApplyState applyState = new ApplyState(null, "test", new ReplicatedLogImplEntry(1, 2,
436                 newModificationPayload(new WriteModification(TestModel.TEST_PATH, node))));
437
438         shard.underlyingActor().onReceiveCommand(applyState);
439
440         NormalizedNode<?,?> actual = readStore(shard, TestModel.TEST_PATH);
441         assertEquals("Applied state", node, actual);
442
443         shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
444     }
445
446     @Test
447     public void testApplyStateLegacy() throws Exception {
448
449         TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(), "testApplyStateLegacy");
450
451         NormalizedNode<?, ?> node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
452
453         ApplyState applyState = new ApplyState(null, "test", new ReplicatedLogImplEntry(1, 2,
454                 newLegacyByteStringPayload(new WriteModification(TestModel.TEST_PATH, node))));
455
456         shard.underlyingActor().onReceiveCommand(applyState);
457
458         NormalizedNode<?,?> actual = readStore(shard, TestModel.TEST_PATH);
459         assertEquals("Applied state", node, actual);
460
461         shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
462     }
463
464     @Test
465     public void testRecovery() throws Exception {
466
467         // Set up the InMemorySnapshotStore.
468
469         InMemoryDOMDataStore testStore = InMemoryDOMDataStoreFactory.create("Test", null, null);
470         testStore.onGlobalContextUpdated(SCHEMA_CONTEXT);
471
472         writeToStore(testStore, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
473
474         NormalizedNode<?, ?> root = readStore(testStore, YangInstanceIdentifier.builder().build());
475
476         InMemorySnapshotStore.addSnapshot(shardID.toString(), Snapshot.create(
477                 SerializationUtils.serializeNormalizedNode(root),
478                 Collections.<ReplicatedLogEntry>emptyList(), 0, 1, -1, -1));
479
480         // Set up the InMemoryJournal.
481
482         InMemoryJournal.addEntry(shardID.toString(), 0, new ReplicatedLogImplEntry(0, 1, newLegacyPayload(
483                   new WriteModification(TestModel.OUTER_LIST_PATH,
484                           ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build()))));
485
486         int nListEntries = 16;
487         Set<Integer> listEntryKeys = new HashSet<>();
488
489         // Add some ModificationPayload entries
490         for(int i = 1; i <= nListEntries; i++) {
491             listEntryKeys.add(Integer.valueOf(i));
492             YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
493                     .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
494             Modification mod = new MergeModification(path,
495                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i));
496             InMemoryJournal.addEntry(shardID.toString(), i, new ReplicatedLogImplEntry(i, 1,
497                     newModificationPayload(mod)));
498         }
499
500         InMemoryJournal.addEntry(shardID.toString(), nListEntries + 1,
501                 new ApplyJournalEntries(nListEntries));
502
503         testRecovery(listEntryKeys);
504     }
505
506     @Test
507     public void testHelium2VersionRecovery() throws Exception {
508
509         // Set up the InMemorySnapshotStore.
510
511         InMemoryDOMDataStore testStore = InMemoryDOMDataStoreFactory.create("Test", null, null);
512         testStore.onGlobalContextUpdated(SCHEMA_CONTEXT);
513
514         writeToStore(testStore, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
515
516         NormalizedNode<?, ?> root = readStore(testStore, YangInstanceIdentifier.builder().build());
517
518         InMemorySnapshotStore.addSnapshot(shardID.toString(), Snapshot.create(
519                 new NormalizedNodeToNodeCodec(SCHEMA_CONTEXT).encode(root).
520                                 getNormalizedNode().toByteString().toByteArray(),
521                                 Collections.<ReplicatedLogEntry>emptyList(), 0, 1, -1, -1));
522
523         // Set up the InMemoryJournal.
524
525         InMemoryJournal.addEntry(shardID.toString(), 0, new ReplicatedLogImplEntry(0, 1, newLegacyPayload(
526                   new WriteModification(TestModel.OUTER_LIST_PATH,
527                           ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build()))));
528
529         int nListEntries = 16;
530         Set<Integer> listEntryKeys = new HashSet<>();
531         int i = 1;
532
533         // Add some CompositeModificationPayload entries
534         for(; i <= 8; i++) {
535             listEntryKeys.add(Integer.valueOf(i));
536             YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
537                     .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
538             Modification mod = new MergeModification(path,
539                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i));
540             InMemoryJournal.addEntry(shardID.toString(), i, new ReplicatedLogImplEntry(i, 1,
541                     newLegacyPayload(mod)));
542         }
543
544         // Add some CompositeModificationByteStringPayload entries
545         for(; i <= nListEntries; i++) {
546             listEntryKeys.add(Integer.valueOf(i));
547             YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
548                     .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
549             Modification mod = new MergeModification(path,
550                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i));
551             InMemoryJournal.addEntry(shardID.toString(), i, new ReplicatedLogImplEntry(i, 1,
552                     newLegacyByteStringPayload(mod)));
553         }
554
555         InMemoryJournal.addEntry(shardID.toString(), nListEntries + 1, new ApplyLogEntries(nListEntries));
556
557         testRecovery(listEntryKeys);
558     }
559
560     private void testRecovery(Set<Integer> listEntryKeys) throws Exception {
561         // Create the actor and wait for recovery complete.
562
563         int nListEntries = listEntryKeys.size();
564
565         final CountDownLatch recoveryComplete = new CountDownLatch(1);
566
567         @SuppressWarnings("serial")
568         Creator<Shard> creator = new Creator<Shard>() {
569             @Override
570             public Shard create() throws Exception {
571                 return new Shard(shardID, Collections.<ShardIdentifier,String>emptyMap(),
572                         newDatastoreContext(), SCHEMA_CONTEXT) {
573                     @Override
574                     protected void onRecoveryComplete() {
575                         try {
576                             super.onRecoveryComplete();
577                         } finally {
578                             recoveryComplete.countDown();
579                         }
580                     }
581                 };
582             }
583         };
584
585         TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
586                 Props.create(new DelegatingShardCreator(creator)), "testRecovery");
587
588         assertEquals("Recovery complete", true, recoveryComplete.await(5, TimeUnit.SECONDS));
589
590         // Verify data in the data store.
591
592         NormalizedNode<?, ?> outerList = readStore(shard, TestModel.OUTER_LIST_PATH);
593         assertNotNull(TestModel.OUTER_LIST_QNAME.getLocalName() + " not found", outerList);
594         assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " value is not Iterable",
595                 outerList.getValue() instanceof Iterable);
596         for(Object entry: (Iterable<?>) outerList.getValue()) {
597             assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " entry is not MapEntryNode",
598                     entry instanceof MapEntryNode);
599             MapEntryNode mapEntry = (MapEntryNode)entry;
600             Optional<DataContainerChild<? extends PathArgument, ?>> idLeaf =
601                     mapEntry.getChild(new YangInstanceIdentifier.NodeIdentifier(TestModel.ID_QNAME));
602             assertTrue("Missing leaf " + TestModel.ID_QNAME.getLocalName(), idLeaf.isPresent());
603             Object value = idLeaf.get().getValue();
604             assertTrue("Unexpected value for leaf "+ TestModel.ID_QNAME.getLocalName() + ": " + value,
605                     listEntryKeys.remove(value));
606         }
607
608         if(!listEntryKeys.isEmpty()) {
609             fail("Missing " + TestModel.OUTER_LIST_QNAME.getLocalName() + " entries with keys: " +
610                     listEntryKeys);
611         }
612
613         assertEquals("Last log index", nListEntries,
614                 shard.underlyingActor().getShardMBean().getLastLogIndex());
615         assertEquals("Commit index", nListEntries,
616                 shard.underlyingActor().getShardMBean().getCommitIndex());
617         assertEquals("Last applied", nListEntries,
618                 shard.underlyingActor().getShardMBean().getLastApplied());
619
620         shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
621     }
622
623     private CompositeModificationPayload newLegacyPayload(final Modification... mods) {
624         MutableCompositeModification compMod = new MutableCompositeModification();
625         for(Modification mod: mods) {
626             compMod.addModification(mod);
627         }
628
629         return new CompositeModificationPayload(compMod.toSerializable());
630     }
631
632     private CompositeModificationByteStringPayload newLegacyByteStringPayload(final Modification... mods) {
633         MutableCompositeModification compMod = new MutableCompositeModification();
634         for(Modification mod: mods) {
635             compMod.addModification(mod);
636         }
637
638         return new CompositeModificationByteStringPayload(compMod.toSerializable());
639     }
640
641     private ModificationPayload newModificationPayload(final Modification... mods) throws IOException {
642         MutableCompositeModification compMod = new MutableCompositeModification();
643         for(Modification mod: mods) {
644             compMod.addModification(mod);
645         }
646
647         return new ModificationPayload(compMod);
648     }
649
650     private DOMStoreThreePhaseCommitCohort setupMockWriteTransaction(final String cohortName,
651             final InMemoryDOMDataStore dataStore, final YangInstanceIdentifier path, final NormalizedNode<?, ?> data,
652             final MutableCompositeModification modification) {
653         return setupMockWriteTransaction(cohortName, dataStore, path, data, modification, null);
654     }
655
656     private DOMStoreThreePhaseCommitCohort setupMockWriteTransaction(final String cohortName,
657             final InMemoryDOMDataStore dataStore, final YangInstanceIdentifier path, final NormalizedNode<?, ?> data,
658             final MutableCompositeModification modification,
659             final Function<DOMStoreThreePhaseCommitCohort,ListenableFuture<Void>> preCommit) {
660
661         DOMStoreWriteTransaction tx = dataStore.newWriteOnlyTransaction();
662         tx.write(path, data);
663         final DOMStoreThreePhaseCommitCohort realCohort = tx.ready();
664         DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, cohortName);
665
666         doAnswer(new Answer<ListenableFuture<Boolean>>() {
667             @Override
668             public ListenableFuture<Boolean> answer(final InvocationOnMock invocation) {
669                 return realCohort.canCommit();
670             }
671         }).when(cohort).canCommit();
672
673         doAnswer(new Answer<ListenableFuture<Void>>() {
674             @Override
675             public ListenableFuture<Void> answer(final InvocationOnMock invocation) throws Throwable {
676                 if(preCommit != null) {
677                     return preCommit.apply(realCohort);
678                 } else {
679                     return realCohort.preCommit();
680                 }
681             }
682         }).when(cohort).preCommit();
683
684         doAnswer(new Answer<ListenableFuture<Void>>() {
685             @Override
686             public ListenableFuture<Void> answer(final InvocationOnMock invocation) throws Throwable {
687                 return realCohort.commit();
688             }
689         }).when(cohort).commit();
690
691         doAnswer(new Answer<ListenableFuture<Void>>() {
692             @Override
693             public ListenableFuture<Void> answer(final InvocationOnMock invocation) throws Throwable {
694                 return realCohort.abort();
695             }
696         }).when(cohort).abort();
697
698         modification.addModification(new WriteModification(path, data));
699
700         return cohort;
701     }
702
703     @SuppressWarnings({ "unchecked" })
704     @Test
705     public void testConcurrentThreePhaseCommits() throws Throwable {
706         new ShardTestKit(getSystem()) {{
707             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
708                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
709                     "testConcurrentThreePhaseCommits");
710
711             waitUntilLeader(shard);
712
713             // Setup 3 simulated transactions with mock cohorts backed by real cohorts.
714
715             InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
716
717             String transactionID1 = "tx1";
718             MutableCompositeModification modification1 = new MutableCompositeModification();
719             DOMStoreThreePhaseCommitCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
720                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
721
722             String transactionID2 = "tx2";
723             MutableCompositeModification modification2 = new MutableCompositeModification();
724             DOMStoreThreePhaseCommitCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
725                     TestModel.OUTER_LIST_PATH,
726                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
727                     modification2);
728
729             String transactionID3 = "tx3";
730             MutableCompositeModification modification3 = new MutableCompositeModification();
731             DOMStoreThreePhaseCommitCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
732                     YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
733                         .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
734                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1),
735                     modification3);
736
737             long timeoutSec = 5;
738             final FiniteDuration duration = FiniteDuration.create(timeoutSec, TimeUnit.SECONDS);
739             final Timeout timeout = new Timeout(duration);
740
741             // Simulate the ForwardedReadyTransaction message for the first Tx that would be sent
742             // by the ShardTransaction.
743
744             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
745                     cohort1, modification1, true), getRef());
746             ReadyTransactionReply readyReply = ReadyTransactionReply.fromSerializable(
747                     expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS));
748             assertEquals("Cohort path", shard.path().toString(), readyReply.getCohortPath());
749
750             // Send the CanCommitTransaction message for the first Tx.
751
752             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
753             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
754                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
755             assertEquals("Can commit", true, canCommitReply.getCanCommit());
756
757             // Send the ForwardedReadyTransaction for the next 2 Tx's.
758
759             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
760                     cohort2, modification2, true), getRef());
761             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
762
763             shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
764                     cohort3, modification3, true), getRef());
765             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
766
767             // Send the CanCommitTransaction message for the next 2 Tx's. These should get queued and
768             // processed after the first Tx completes.
769
770             Future<Object> canCommitFuture1 = Patterns.ask(shard,
771                     new CanCommitTransaction(transactionID2).toSerializable(), timeout);
772
773             Future<Object> canCommitFuture2 = Patterns.ask(shard,
774                     new CanCommitTransaction(transactionID3).toSerializable(), timeout);
775
776             // Send the CommitTransaction message for the first Tx. After it completes, it should
777             // trigger the 2nd Tx to proceed which should in turn then trigger the 3rd.
778
779             shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
780             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
781
782             // Wait for the next 2 Tx's to complete.
783
784             final AtomicReference<Throwable> caughtEx = new AtomicReference<>();
785             final CountDownLatch commitLatch = new CountDownLatch(2);
786
787             class OnFutureComplete extends OnComplete<Object> {
788                 private final Class<?> expRespType;
789
790                 OnFutureComplete(final Class<?> expRespType) {
791                     this.expRespType = expRespType;
792                 }
793
794                 @Override
795                 public void onComplete(final Throwable error, final Object resp) {
796                     if(error != null) {
797                         caughtEx.set(new AssertionError(getClass().getSimpleName() + " failure", error));
798                     } else {
799                         try {
800                             assertEquals("Commit response type", expRespType, resp.getClass());
801                             onSuccess(resp);
802                         } catch (Exception e) {
803                             caughtEx.set(e);
804                         }
805                     }
806                 }
807
808                 void onSuccess(final Object resp) throws Exception {
809                 }
810             }
811
812             class OnCommitFutureComplete extends OnFutureComplete {
813                 OnCommitFutureComplete() {
814                     super(CommitTransactionReply.SERIALIZABLE_CLASS);
815                 }
816
817                 @Override
818                 public void onComplete(final Throwable error, final Object resp) {
819                     super.onComplete(error, resp);
820                     commitLatch.countDown();
821                 }
822             }
823
824             class OnCanCommitFutureComplete extends OnFutureComplete {
825                 private final String transactionID;
826
827                 OnCanCommitFutureComplete(final String transactionID) {
828                     super(CanCommitTransactionReply.SERIALIZABLE_CLASS);
829                     this.transactionID = transactionID;
830                 }
831
832                 @Override
833                 void onSuccess(final Object resp) throws Exception {
834                     CanCommitTransactionReply canCommitReply =
835                             CanCommitTransactionReply.fromSerializable(resp);
836                     assertEquals("Can commit", true, canCommitReply.getCanCommit());
837
838                     Future<Object> commitFuture = Patterns.ask(shard,
839                             new CommitTransaction(transactionID).toSerializable(), timeout);
840                     commitFuture.onComplete(new OnCommitFutureComplete(), getSystem().dispatcher());
841                 }
842             }
843
844             canCommitFuture1.onComplete(new OnCanCommitFutureComplete(transactionID2),
845                     getSystem().dispatcher());
846
847             canCommitFuture2.onComplete(new OnCanCommitFutureComplete(transactionID3),
848                     getSystem().dispatcher());
849
850             boolean done = commitLatch.await(timeoutSec, TimeUnit.SECONDS);
851
852             if(caughtEx.get() != null) {
853                 throw caughtEx.get();
854             }
855
856             assertEquals("Commits complete", true, done);
857
858             InOrder inOrder = inOrder(cohort1, cohort2, cohort3);
859             inOrder.verify(cohort1).canCommit();
860             inOrder.verify(cohort1).preCommit();
861             inOrder.verify(cohort1).commit();
862             inOrder.verify(cohort2).canCommit();
863             inOrder.verify(cohort2).preCommit();
864             inOrder.verify(cohort2).commit();
865             inOrder.verify(cohort3).canCommit();
866             inOrder.verify(cohort3).preCommit();
867             inOrder.verify(cohort3).commit();
868
869             // Verify data in the data store.
870
871             NormalizedNode<?, ?> outerList = readStore(shard, TestModel.OUTER_LIST_PATH);
872             assertNotNull(TestModel.OUTER_LIST_QNAME.getLocalName() + " not found", outerList);
873             assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " value is not Iterable",
874                     outerList.getValue() instanceof Iterable);
875             Object entry = ((Iterable<Object>)outerList.getValue()).iterator().next();
876             assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " entry is not MapEntryNode",
877                        entry instanceof MapEntryNode);
878             MapEntryNode mapEntry = (MapEntryNode)entry;
879             Optional<DataContainerChild<? extends PathArgument, ?>> idLeaf =
880                     mapEntry.getChild(new YangInstanceIdentifier.NodeIdentifier(TestModel.ID_QNAME));
881             assertTrue("Missing leaf " + TestModel.ID_QNAME.getLocalName(), idLeaf.isPresent());
882             assertEquals(TestModel.ID_QNAME.getLocalName() + " value", 1, idLeaf.get().getValue());
883
884             verifyLastLogIndex(shard, 2);
885
886             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
887         }};
888     }
889
890     private void verifyLastLogIndex(TestActorRef<Shard> shard, long expectedValue) {
891         for(int i = 0; i < 20 * 5; i++) {
892             long lastLogIndex = shard.underlyingActor().getShardMBean().getLastLogIndex();
893             if(lastLogIndex == expectedValue) {
894                 break;
895             }
896             Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
897         }
898
899         assertEquals("Last log index", expectedValue, shard.underlyingActor().getShardMBean().getLastLogIndex());
900     }
901
902     @Test
903     public void testCommitWithPersistenceDisabled() throws Throwable {
904         dataStoreContextBuilder.persistent(false);
905         new ShardTestKit(getSystem()) {{
906             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
907                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
908                     "testCommitPhaseFailure");
909
910             waitUntilLeader(shard);
911
912             InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
913
914             // Setup a simulated transactions with a mock cohort.
915
916             String transactionID = "tx";
917             MutableCompositeModification modification = new MutableCompositeModification();
918             NormalizedNode<?, ?> containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
919             DOMStoreThreePhaseCommitCohort cohort = setupMockWriteTransaction("cohort", dataStore,
920                     TestModel.TEST_PATH, containerNode, modification);
921
922             FiniteDuration duration = duration("5 seconds");
923
924             // Simulate the ForwardedReadyTransaction messages that would be sent
925             // by the ShardTransaction.
926
927             shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
928                     cohort, modification, true), getRef());
929             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
930
931             // Send the CanCommitTransaction message.
932
933             shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
934             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
935                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
936             assertEquals("Can commit", true, canCommitReply.getCanCommit());
937
938             // Send the CanCommitTransaction message.
939
940             shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
941             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
942
943             InOrder inOrder = inOrder(cohort);
944             inOrder.verify(cohort).canCommit();
945             inOrder.verify(cohort).preCommit();
946             inOrder.verify(cohort).commit();
947
948             NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.TEST_PATH);
949             assertEquals(TestModel.TEST_QNAME.getLocalName(), containerNode, actualNode);
950
951             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
952         }};
953     }
954
955     @Test
956     public void testCommitPhaseFailure() throws Throwable {
957         new ShardTestKit(getSystem()) {{
958             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
959                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
960                     "testCommitPhaseFailure");
961
962             waitUntilLeader(shard);
963
964             // Setup 2 simulated transactions with mock cohorts. The first one fails in the
965             // commit phase.
966
967             String transactionID1 = "tx1";
968             MutableCompositeModification modification1 = new MutableCompositeModification();
969             DOMStoreThreePhaseCommitCohort cohort1 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
970             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
971             doReturn(Futures.immediateFuture(null)).when(cohort1).preCommit();
972             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort1).commit();
973
974             String transactionID2 = "tx2";
975             MutableCompositeModification modification2 = new MutableCompositeModification();
976             DOMStoreThreePhaseCommitCohort cohort2 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort2");
977             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
978
979             FiniteDuration duration = duration("5 seconds");
980             final Timeout timeout = new Timeout(duration);
981
982             // Simulate the ForwardedReadyTransaction messages that would be sent
983             // by the ShardTransaction.
984
985             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
986                     cohort1, modification1, true), getRef());
987             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
988
989             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
990                     cohort2, modification2, true), getRef());
991             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
992
993             // Send the CanCommitTransaction message for the first Tx.
994
995             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
996             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
997                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
998             assertEquals("Can commit", true, canCommitReply.getCanCommit());
999
1000             // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
1001             // processed after the first Tx completes.
1002
1003             Future<Object> canCommitFuture = Patterns.ask(shard,
1004                     new CanCommitTransaction(transactionID2).toSerializable(), timeout);
1005
1006             // Send the CommitTransaction message for the first Tx. This should send back an error
1007             // and trigger the 2nd Tx to proceed.
1008
1009             shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
1010             expectMsgClass(duration, akka.actor.Status.Failure.class);
1011
1012             // Wait for the 2nd Tx to complete the canCommit phase.
1013
1014             final CountDownLatch latch = new CountDownLatch(1);
1015             canCommitFuture.onComplete(new OnComplete<Object>() {
1016                 @Override
1017                 public void onComplete(final Throwable t, final Object resp) {
1018                     latch.countDown();
1019                 }
1020             }, getSystem().dispatcher());
1021
1022             assertEquals("2nd CanCommit complete", true, latch.await(5, TimeUnit.SECONDS));
1023
1024             InOrder inOrder = inOrder(cohort1, cohort2);
1025             inOrder.verify(cohort1).canCommit();
1026             inOrder.verify(cohort1).preCommit();
1027             inOrder.verify(cohort1).commit();
1028             inOrder.verify(cohort2).canCommit();
1029
1030             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1031         }};
1032     }
1033
1034     @Test
1035     public void testPreCommitPhaseFailure() throws Throwable {
1036         new ShardTestKit(getSystem()) {{
1037             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1038                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1039                     "testPreCommitPhaseFailure");
1040
1041             waitUntilLeader(shard);
1042
1043             String transactionID = "tx1";
1044             MutableCompositeModification modification = new MutableCompositeModification();
1045             DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
1046             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1047             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).preCommit();
1048
1049             FiniteDuration duration = duration("5 seconds");
1050
1051             // Simulate the ForwardedReadyTransaction messages that would be sent
1052             // by the ShardTransaction.
1053
1054             shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
1055                     cohort, modification, true), getRef());
1056             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1057
1058             // Send the CanCommitTransaction message.
1059
1060             shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
1061             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1062                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1063             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1064
1065             // Send the CommitTransaction message. This should send back an error
1066             // for preCommit failure.
1067
1068             shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
1069             expectMsgClass(duration, akka.actor.Status.Failure.class);
1070
1071             InOrder inOrder = inOrder(cohort);
1072             inOrder.verify(cohort).canCommit();
1073             inOrder.verify(cohort).preCommit();
1074
1075             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1076         }};
1077     }
1078
1079     @Test
1080     public void testCanCommitPhaseFailure() throws Throwable {
1081         new ShardTestKit(getSystem()) {{
1082             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1083                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1084                     "testCanCommitPhaseFailure");
1085
1086             waitUntilLeader(shard);
1087
1088             final FiniteDuration duration = duration("5 seconds");
1089
1090             String transactionID = "tx1";
1091             MutableCompositeModification modification = new MutableCompositeModification();
1092             DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
1093             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).canCommit();
1094
1095             // Simulate the ForwardedReadyTransaction messages that would be sent
1096             // by the ShardTransaction.
1097
1098             shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
1099                     cohort, modification, true), getRef());
1100             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1101
1102             // Send the CanCommitTransaction message.
1103
1104             shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
1105             expectMsgClass(duration, akka.actor.Status.Failure.class);
1106
1107             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1108         }};
1109     }
1110
1111     @Test
1112     public void testAbortBeforeFinishCommit() throws Throwable {
1113         new ShardTestKit(getSystem()) {{
1114             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1115                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1116                     "testAbortBeforeFinishCommit");
1117
1118             waitUntilLeader(shard);
1119
1120             final FiniteDuration duration = duration("5 seconds");
1121             InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
1122
1123             final String transactionID = "tx1";
1124             Function<DOMStoreThreePhaseCommitCohort,ListenableFuture<Void>> preCommit =
1125                           new Function<DOMStoreThreePhaseCommitCohort,ListenableFuture<Void>>() {
1126                 @Override
1127                 public ListenableFuture<Void> apply(final DOMStoreThreePhaseCommitCohort cohort) {
1128                     ListenableFuture<Void> preCommitFuture = cohort.preCommit();
1129
1130                     // Simulate an AbortTransaction message occurring during replication, after
1131                     // persisting and before finishing the commit to the in-memory store.
1132                     // We have no followers so due to optimizations in the RaftActor, it does not
1133                     // attempt replication and thus we can't send an AbortTransaction message b/c
1134                     // it would be processed too late after CommitTransaction completes. So we'll
1135                     // simulate an AbortTransaction message occurring during replication by calling
1136                     // the shard directly.
1137                     //
1138                     shard.underlyingActor().doAbortTransaction(transactionID, null);
1139
1140                     return preCommitFuture;
1141                 }
1142             };
1143
1144             MutableCompositeModification modification = new MutableCompositeModification();
1145             DOMStoreThreePhaseCommitCohort cohort = setupMockWriteTransaction("cohort1", dataStore,
1146                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME),
1147                     modification, preCommit);
1148
1149             shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
1150                     cohort, modification, true), getRef());
1151             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1152
1153             shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
1154             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1155                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1156             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1157
1158             shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
1159             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1160
1161             NormalizedNode<?, ?> node = readStore(shard, TestModel.TEST_PATH);
1162
1163             // Since we're simulating an abort occurring during replication and before finish commit,
1164             // the data should still get written to the in-memory store since we've gotten past
1165             // canCommit and preCommit and persisted the data.
1166             assertNotNull(TestModel.TEST_QNAME.getLocalName() + " not found", node);
1167
1168             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1169         }};
1170     }
1171
1172     @Test
1173     public void testTransactionCommitTimeout() throws Throwable {
1174         dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1);
1175
1176         new ShardTestKit(getSystem()) {{
1177             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1178                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1179                     "testTransactionCommitTimeout");
1180
1181             waitUntilLeader(shard);
1182
1183             final FiniteDuration duration = duration("5 seconds");
1184
1185             InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
1186
1187             writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1188             writeToStore(shard, TestModel.OUTER_LIST_PATH,
1189                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
1190
1191             // Create 1st Tx - will timeout
1192
1193             String transactionID1 = "tx1";
1194             MutableCompositeModification modification1 = new MutableCompositeModification();
1195             DOMStoreThreePhaseCommitCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
1196                     YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
1197                         .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
1198                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1),
1199                     modification1);
1200
1201             // Create 2nd Tx
1202
1203             String transactionID2 = "tx3";
1204             MutableCompositeModification modification2 = new MutableCompositeModification();
1205             YangInstanceIdentifier listNodePath = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
1206                 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build();
1207             DOMStoreThreePhaseCommitCohort cohort2 = setupMockWriteTransaction("cohort3", dataStore,
1208                     listNodePath,
1209                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2),
1210                     modification2);
1211
1212             // Ready the Tx's
1213
1214             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1215                     cohort1, modification1, true), getRef());
1216             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1217
1218             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1219                     cohort2, modification2, true), getRef());
1220             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1221
1222             // canCommit 1st Tx. We don't send the commit so it should timeout.
1223
1224             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1225             expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
1226
1227             // canCommit the 2nd Tx - it should complete after the 1st Tx times out.
1228
1229             shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
1230             expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
1231
1232             // Commit the 2nd Tx.
1233
1234             shard.tell(new CommitTransaction(transactionID2).toSerializable(), getRef());
1235             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1236
1237             NormalizedNode<?, ?> node = readStore(shard, listNodePath);
1238             assertNotNull(listNodePath + " not found", node);
1239
1240             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1241         }};
1242     }
1243
1244     @Test
1245     public void testTransactionCommitQueueCapacityExceeded() throws Throwable {
1246         dataStoreContextBuilder.shardTransactionCommitQueueCapacity(1);
1247
1248         new ShardTestKit(getSystem()) {{
1249             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1250                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1251                     "testTransactionCommitQueueCapacityExceeded");
1252
1253             waitUntilLeader(shard);
1254
1255             final FiniteDuration duration = duration("5 seconds");
1256
1257             InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
1258
1259             String transactionID1 = "tx1";
1260             MutableCompositeModification modification1 = new MutableCompositeModification();
1261             DOMStoreThreePhaseCommitCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
1262                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
1263
1264             String transactionID2 = "tx2";
1265             MutableCompositeModification modification2 = new MutableCompositeModification();
1266             DOMStoreThreePhaseCommitCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
1267                     TestModel.OUTER_LIST_PATH,
1268                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
1269                     modification2);
1270
1271             String transactionID3 = "tx3";
1272             MutableCompositeModification modification3 = new MutableCompositeModification();
1273             DOMStoreThreePhaseCommitCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
1274                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification3);
1275
1276             // Ready the Tx's
1277
1278             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1279                     cohort1, modification1, true), getRef());
1280             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1281
1282             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1283                     cohort2, modification2, true), getRef());
1284             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1285
1286             shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
1287                     cohort3, modification3, true), getRef());
1288             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1289
1290             // canCommit 1st Tx.
1291
1292             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1293             expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
1294
1295             // canCommit the 2nd Tx - it should get queued.
1296
1297             shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
1298
1299             // canCommit the 3rd Tx - should exceed queue capacity and fail.
1300
1301             shard.tell(new CanCommitTransaction(transactionID3).toSerializable(), getRef());
1302             expectMsgClass(duration, akka.actor.Status.Failure.class);
1303
1304             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1305         }};
1306     }
1307
1308     @Test
1309     public void testCanCommitBeforeReadyFailure() throws Throwable {
1310         new ShardTestKit(getSystem()) {{
1311             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1312                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1313                     "testCanCommitBeforeReadyFailure");
1314
1315             shard.tell(new CanCommitTransaction("tx").toSerializable(), getRef());
1316             expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
1317
1318             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1319         }};
1320     }
1321
1322     @Test
1323     public void testAbortTransaction() throws Throwable {
1324         new ShardTestKit(getSystem()) {{
1325             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1326                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1327                     "testAbortTransaction");
1328
1329             waitUntilLeader(shard);
1330
1331             // Setup 2 simulated transactions with mock cohorts. The first one will be aborted.
1332
1333             String transactionID1 = "tx1";
1334             MutableCompositeModification modification1 = new MutableCompositeModification();
1335             DOMStoreThreePhaseCommitCohort cohort1 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
1336             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
1337             doReturn(Futures.immediateFuture(null)).when(cohort1).abort();
1338
1339             String transactionID2 = "tx2";
1340             MutableCompositeModification modification2 = new MutableCompositeModification();
1341             DOMStoreThreePhaseCommitCohort cohort2 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort2");
1342             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
1343
1344             FiniteDuration duration = duration("5 seconds");
1345             final Timeout timeout = new Timeout(duration);
1346
1347             // Simulate the ForwardedReadyTransaction messages that would be sent
1348             // by the ShardTransaction.
1349
1350             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1351                     cohort1, modification1, true), getRef());
1352             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1353
1354             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1355                     cohort2, modification2, true), getRef());
1356             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1357
1358             // Send the CanCommitTransaction message for the first Tx.
1359
1360             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1361             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1362                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1363             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1364
1365             // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
1366             // processed after the first Tx completes.
1367
1368             Future<Object> canCommitFuture = Patterns.ask(shard,
1369                     new CanCommitTransaction(transactionID2).toSerializable(), timeout);
1370
1371             // Send the AbortTransaction message for the first Tx. This should trigger the 2nd
1372             // Tx to proceed.
1373
1374             shard.tell(new AbortTransaction(transactionID1).toSerializable(), getRef());
1375             expectMsgClass(duration, AbortTransactionReply.SERIALIZABLE_CLASS);
1376
1377             // Wait for the 2nd Tx to complete the canCommit phase.
1378
1379             Await.ready(canCommitFuture, duration);
1380
1381             InOrder inOrder = inOrder(cohort1, cohort2);
1382             inOrder.verify(cohort1).canCommit();
1383             inOrder.verify(cohort2).canCommit();
1384
1385             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1386         }};
1387     }
1388
1389     @Test
1390     public void testCreateSnapshot() throws Exception {
1391         testCreateSnapshot(true, "testCreateSnapshot");
1392     }
1393
1394     @Test
1395     public void testCreateSnapshotWithNonPersistentData() throws Exception {
1396         testCreateSnapshot(false, "testCreateSnapshotWithNonPersistentData");
1397     }
1398
1399     @SuppressWarnings("serial")
1400     public void testCreateSnapshot(final boolean persistent, final String shardActorName) throws Exception{
1401
1402         final AtomicReference<Object> savedSnapshot = new AtomicReference<>();
1403         class DelegatingPersistentDataProvider implements DataPersistenceProvider {
1404             DataPersistenceProvider delegate;
1405
1406             DelegatingPersistentDataProvider(DataPersistenceProvider delegate) {
1407                 this.delegate = delegate;
1408             }
1409
1410             @Override
1411             public boolean isRecoveryApplicable() {
1412                 return delegate.isRecoveryApplicable();
1413             }
1414
1415             @Override
1416             public <T> void persist(T o, Procedure<T> procedure) {
1417                 delegate.persist(o, procedure);
1418             }
1419
1420             @Override
1421             public void saveSnapshot(Object o) {
1422                 savedSnapshot.set(o);
1423                 delegate.saveSnapshot(o);
1424             }
1425
1426             @Override
1427             public void deleteSnapshots(SnapshotSelectionCriteria criteria) {
1428                 delegate.deleteSnapshots(criteria);
1429             }
1430
1431             @Override
1432             public void deleteMessages(long sequenceNumber) {
1433                 delegate.deleteMessages(sequenceNumber);
1434             }
1435         }
1436
1437         dataStoreContextBuilder.persistent(persistent);
1438
1439         new ShardTestKit(getSystem()) {{
1440             final AtomicReference<CountDownLatch> latch = new AtomicReference<>(new CountDownLatch(1));
1441             Creator<Shard> creator = new Creator<Shard>() {
1442                 @Override
1443                 public Shard create() throws Exception {
1444                     return new Shard(shardID, Collections.<ShardIdentifier,String>emptyMap(),
1445                             newDatastoreContext(), SCHEMA_CONTEXT) {
1446
1447                         DelegatingPersistentDataProvider delegating;
1448
1449                         @Override
1450                         protected DataPersistenceProvider persistence() {
1451                             if(delegating == null) {
1452                                 delegating = new DelegatingPersistentDataProvider(super.persistence());
1453                             }
1454
1455                             return delegating;
1456                         }
1457
1458                         @Override
1459                         protected void commitSnapshot(final long sequenceNumber) {
1460                             super.commitSnapshot(sequenceNumber);
1461                             latch.get().countDown();
1462                         }
1463                     };
1464                 }
1465             };
1466
1467             TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1468                     Props.create(new DelegatingShardCreator(creator)), shardActorName);
1469
1470             waitUntilLeader(shard);
1471
1472             writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1473
1474             NormalizedNode<?,?> expectedRoot = readStore(shard, YangInstanceIdentifier.builder().build());
1475
1476             CaptureSnapshot capture = new CaptureSnapshot(-1, -1, -1, -1, -1, -1);
1477             shard.tell(capture, getRef());
1478
1479             assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS));
1480
1481             assertTrue("Invalid saved snapshot " + savedSnapshot.get(),
1482                     savedSnapshot.get() instanceof Snapshot);
1483
1484             verifySnapshot((Snapshot)savedSnapshot.get(), expectedRoot);
1485
1486             latch.set(new CountDownLatch(1));
1487             savedSnapshot.set(null);
1488
1489             shard.tell(capture, getRef());
1490
1491             assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS));
1492
1493             assertTrue("Invalid saved snapshot " + savedSnapshot.get(),
1494                     savedSnapshot.get() instanceof Snapshot);
1495
1496             verifySnapshot((Snapshot)savedSnapshot.get(), expectedRoot);
1497
1498             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1499         }
1500
1501         private void verifySnapshot(Snapshot snapshot, NormalizedNode<?,?> expectedRoot) {
1502
1503             NormalizedNode<?, ?> actual = SerializationUtils.deserializeNormalizedNode(snapshot.getState());
1504             assertEquals("Root node", expectedRoot, actual);
1505
1506         }};
1507     }
1508
1509     /**
1510      * This test simply verifies that the applySnapShot logic will work
1511      * @throws ReadFailedException
1512      */
1513     @Test
1514     public void testInMemoryDataStoreRestore() throws ReadFailedException {
1515         InMemoryDOMDataStore store = new InMemoryDOMDataStore("test", MoreExecutors.sameThreadExecutor());
1516
1517         store.onGlobalContextUpdated(SCHEMA_CONTEXT);
1518
1519         DOMStoreWriteTransaction putTransaction = store.newWriteOnlyTransaction();
1520         putTransaction.write(TestModel.TEST_PATH,
1521             ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1522         commitTransaction(putTransaction);
1523
1524
1525         NormalizedNode<?, ?> expected = readStore(store);
1526
1527         DOMStoreWriteTransaction writeTransaction = store.newWriteOnlyTransaction();
1528
1529         writeTransaction.delete(YangInstanceIdentifier.builder().build());
1530         writeTransaction.write(YangInstanceIdentifier.builder().build(), expected);
1531
1532         commitTransaction(writeTransaction);
1533
1534         NormalizedNode<?, ?> actual = readStore(store);
1535
1536         assertEquals(expected, actual);
1537     }
1538
1539     @Test
1540     public void testRecoveryApplicable(){
1541
1542         final DatastoreContext persistentContext = DatastoreContext.newBuilder().
1543                 shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(true).build();
1544
1545         final Props persistentProps = Shard.props(shardID, Collections.<ShardIdentifier, String>emptyMap(),
1546                 persistentContext, SCHEMA_CONTEXT);
1547
1548         final DatastoreContext nonPersistentContext = DatastoreContext.newBuilder().
1549                 shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(false).build();
1550
1551         final Props nonPersistentProps = Shard.props(shardID, Collections.<ShardIdentifier, String>emptyMap(),
1552                 nonPersistentContext, SCHEMA_CONTEXT);
1553
1554         new ShardTestKit(getSystem()) {{
1555             TestActorRef<Shard> shard1 = TestActorRef.create(getSystem(),
1556                     persistentProps, "testPersistence1");
1557
1558             assertTrue("Recovery Applicable", shard1.underlyingActor().getDataPersistenceProvider().isRecoveryApplicable());
1559
1560             shard1.tell(PoisonPill.getInstance(), ActorRef.noSender());
1561
1562             TestActorRef<Shard> shard2 = TestActorRef.create(getSystem(),
1563                     nonPersistentProps, "testPersistence2");
1564
1565             assertFalse("Recovery Not Applicable", shard2.underlyingActor().getDataPersistenceProvider().isRecoveryApplicable());
1566
1567             shard2.tell(PoisonPill.getInstance(), ActorRef.noSender());
1568
1569         }};
1570
1571     }
1572
1573     @Test
1574     public void testOnDatastoreContext() {
1575         new ShardTestKit(getSystem()) {{
1576             dataStoreContextBuilder.persistent(true);
1577
1578             TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(), "testOnDatastoreContext");
1579
1580             assertEquals("isRecoveryApplicable", true,
1581                     shard.underlyingActor().getDataPersistenceProvider().isRecoveryApplicable());
1582
1583             waitUntilLeader(shard);
1584
1585             shard.tell(dataStoreContextBuilder.persistent(false).build(), ActorRef.noSender());
1586
1587             assertEquals("isRecoveryApplicable", false,
1588                     shard.underlyingActor().getDataPersistenceProvider().isRecoveryApplicable());
1589
1590             shard.tell(dataStoreContextBuilder.persistent(true).build(), ActorRef.noSender());
1591
1592             assertEquals("isRecoveryApplicable", true,
1593                     shard.underlyingActor().getDataPersistenceProvider().isRecoveryApplicable());
1594
1595             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1596         }};
1597     }
1598
1599     @Test
1600     public void testRegisterRoleChangeListener() throws Exception {
1601         new ShardTestKit(getSystem()) {
1602             {
1603                 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1604                         newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1605                         "testRegisterRoleChangeListener");
1606
1607                 waitUntilLeader(shard);
1608
1609                 TestActorRef<MessageCollectorActor> listener =
1610                         TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
1611
1612                 shard.tell(new RegisterRoleChangeListener(), listener);
1613
1614                 // TODO: MessageCollectorActor exists as a test util in both the akka-raft and distributed-datastore
1615                 // projects. Need to move it to commons as a regular utility and then we can get rid of this arbitrary
1616                 // sleep.
1617                 Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
1618
1619                 List<Object> allMatching = MessageCollectorActor.getAllMatching(listener, RegisterRoleChangeListenerReply.class);
1620
1621                 assertEquals(1, allMatching.size());
1622             }
1623         };
1624     }
1625
1626     @Test
1627     public void testFollowerInitialSyncStatus() throws Exception {
1628         final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1629                 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1630                 "testFollowerInitialSyncStatus");
1631
1632         shard.underlyingActor().onReceiveCommand(new FollowerInitialSyncUpStatus(false, "member-1-shard-inventory-operational"));
1633
1634         assertEquals(false, shard.underlyingActor().getShardMBean().getFollowerInitialSyncStatus());
1635
1636         shard.underlyingActor().onReceiveCommand(new FollowerInitialSyncUpStatus(true, "member-1-shard-inventory-operational"));
1637
1638         assertEquals(true, shard.underlyingActor().getShardMBean().getFollowerInitialSyncStatus());
1639
1640         shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1641     }
1642
1643
1644     private NormalizedNode<?, ?> readStore(final InMemoryDOMDataStore store) throws ReadFailedException {
1645         DOMStoreReadTransaction transaction = store.newReadOnlyTransaction();
1646         CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read =
1647             transaction.read(YangInstanceIdentifier.builder().build());
1648
1649         Optional<NormalizedNode<?, ?>> optional = read.checkedGet();
1650
1651         NormalizedNode<?, ?> normalizedNode = optional.get();
1652
1653         transaction.close();
1654
1655         return normalizedNode;
1656     }
1657
1658     private void commitTransaction(final DOMStoreWriteTransaction transaction) {
1659         DOMStoreThreePhaseCommitCohort commitCohort = transaction.ready();
1660         ListenableFuture<Void> future =
1661             commitCohort.preCommit();
1662         try {
1663             future.get();
1664             future = commitCohort.commit();
1665             future.get();
1666         } catch (InterruptedException | ExecutionException e) {
1667         }
1668     }
1669
1670     private AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> noOpDataChangeListener() {
1671         return new AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>() {
1672             @Override
1673             public void onDataChanged(
1674                 final AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change) {
1675
1676             }
1677         };
1678     }
1679
1680     static NormalizedNode<?,?> readStore(final TestActorRef<Shard> shard, final YangInstanceIdentifier id)
1681             throws ExecutionException, InterruptedException {
1682         return readStore(shard.underlyingActor().getDataStore(), id);
1683     }
1684
1685     public static NormalizedNode<?,?> readStore(final InMemoryDOMDataStore store, final YangInstanceIdentifier id)
1686             throws ExecutionException, InterruptedException {
1687         DOMStoreReadTransaction transaction = store.newReadOnlyTransaction();
1688
1689         CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> future =
1690             transaction.read(id);
1691
1692         Optional<NormalizedNode<?, ?>> optional = future.get();
1693         NormalizedNode<?, ?> node = optional.isPresent()? optional.get() : null;
1694
1695         transaction.close();
1696
1697         return node;
1698     }
1699
1700     static void writeToStore(final TestActorRef<Shard> shard, final YangInstanceIdentifier id,
1701             final NormalizedNode<?,?> node) throws ExecutionException, InterruptedException {
1702         writeToStore(shard.underlyingActor().getDataStore(), id, node);
1703     }
1704
1705     public static void writeToStore(final InMemoryDOMDataStore store, final YangInstanceIdentifier id,
1706             final NormalizedNode<?,?> node) throws ExecutionException, InterruptedException {
1707         DOMStoreWriteTransaction transaction = store.newWriteOnlyTransaction();
1708
1709         transaction.write(id, node);
1710
1711         DOMStoreThreePhaseCommitCohort commitCohort = transaction.ready();
1712         commitCohort.preCommit().get();
1713         commitCohort.commit().get();
1714     }
1715
1716     @SuppressWarnings("serial")
1717     private static final class DelegatingShardCreator implements Creator<Shard> {
1718         private final Creator<Shard> delegate;
1719
1720         DelegatingShardCreator(final Creator<Shard> delegate) {
1721             this.delegate = delegate;
1722         }
1723
1724         @Override
1725         public Shard create() throws Exception {
1726             return delegate.create();
1727         }
1728     }
1729 }