Merge "BUG 2954 : Thread local DocumentBuilder's for XmlUtil"
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / ShardTest.java
1 package org.opendaylight.controller.cluster.datastore;
2
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertFalse;
5 import static org.junit.Assert.assertNotNull;
6 import static org.junit.Assert.assertNull;
7 import static org.junit.Assert.assertSame;
8 import static org.junit.Assert.assertTrue;
9 import static org.mockito.Mockito.doReturn;
10 import static org.mockito.Mockito.inOrder;
11 import static org.mockito.Mockito.mock;
12 import static org.mockito.Mockito.reset;
13 import static org.opendaylight.controller.cluster.datastore.DataStoreVersions.CURRENT_VERSION;
14 import akka.actor.ActorRef;
15 import akka.actor.ActorSelection;
16 import akka.actor.PoisonPill;
17 import akka.actor.Props;
18 import akka.dispatch.Dispatchers;
19 import akka.dispatch.OnComplete;
20 import akka.japi.Creator;
21 import akka.pattern.Patterns;
22 import akka.persistence.SaveSnapshotSuccess;
23 import akka.testkit.TestActorRef;
24 import akka.util.Timeout;
25 import com.google.common.base.Function;
26 import com.google.common.base.Optional;
27 import com.google.common.util.concurrent.Futures;
28 import com.google.common.util.concurrent.ListenableFuture;
29 import com.google.common.util.concurrent.Uninterruptibles;
30 import java.io.IOException;
31 import java.util.Collections;
32 import java.util.HashSet;
33 import java.util.Map;
34 import java.util.Set;
35 import java.util.concurrent.CountDownLatch;
36 import java.util.concurrent.TimeUnit;
37 import java.util.concurrent.atomic.AtomicBoolean;
38 import java.util.concurrent.atomic.AtomicReference;
39 import org.junit.Test;
40 import org.mockito.InOrder;
41 import org.opendaylight.controller.cluster.DataPersistenceProvider;
42 import org.opendaylight.controller.cluster.DelegatingPersistentDataProvider;
43 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
44 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
45 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
46 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
47 import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
48 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
49 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
50 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
51 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
52 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
53 import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
54 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
55 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
56 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
57 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
58 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
59 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
60 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener;
61 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListenerReply;
62 import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
63 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
64 import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
65 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
66 import org.opendaylight.controller.cluster.datastore.modification.Modification;
67 import org.opendaylight.controller.cluster.datastore.modification.ModificationPayload;
68 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
69 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
70 import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener;
71 import org.opendaylight.controller.cluster.datastore.utils.MockDataTreeChangeListener;
72 import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
73 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
74 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListenerReply;
75 import org.opendaylight.controller.cluster.raft.RaftActorContext;
76 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
77 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
78 import org.opendaylight.controller.cluster.raft.Snapshot;
79 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
80 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
81 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
82 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
83 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
84 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
85 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
86 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
87 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
88 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
89 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
90 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
91 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
92 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
93 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
94 import org.opendaylight.controller.protobuff.messages.cohort3pc.ThreePhaseCommitCohortMessages;
95 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
96 import org.opendaylight.yangtools.yang.common.QName;
97 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
98 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
99 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
100 import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
101 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
102 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
103 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
104 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
105 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateNode;
106 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateTip;
107 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidates;
108 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
109 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
110 import org.opendaylight.yangtools.yang.data.api.schema.tree.ModificationType;
111 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
112 import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory;
113 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
114 import scala.concurrent.Await;
115 import scala.concurrent.Future;
116 import scala.concurrent.duration.FiniteDuration;
117
118 public class ShardTest extends AbstractShardTest {
119     private static final QName CARS_QNAME = QName.create("urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:test:cars", "2014-03-13", "cars");
120
121     @Test
122     public void testRegisterChangeListener() throws Exception {
123         new ShardTestKit(getSystem()) {{
124             TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
125                     newShardProps(),  "testRegisterChangeListener");
126
127             waitUntilLeader(shard);
128
129             shard.tell(new UpdateSchemaContext(SchemaContextHelper.full()), ActorRef.noSender());
130
131             MockDataChangeListener listener = new MockDataChangeListener(1);
132             ActorRef dclActor = getSystem().actorOf(DataChangeListener.props(listener),
133                     "testRegisterChangeListener-DataChangeListener");
134
135             shard.tell(new RegisterChangeListener(TestModel.TEST_PATH,
136                     dclActor, AsyncDataBroker.DataChangeScope.BASE), getRef());
137
138             RegisterChangeListenerReply reply = expectMsgClass(duration("3 seconds"),
139                     RegisterChangeListenerReply.class);
140             String replyPath = reply.getListenerRegistrationPath().toString();
141             assertTrue("Incorrect reply path: " + replyPath, replyPath.matches(
142                     "akka:\\/\\/test\\/user\\/testRegisterChangeListener\\/\\$.*"));
143
144             YangInstanceIdentifier path = TestModel.TEST_PATH;
145             writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
146
147             listener.waitForChangeEvents(path);
148
149             dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
150             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
151         }};
152     }
153
154     @SuppressWarnings("serial")
155     @Test
156     public void testChangeListenerNotifiedWhenNotTheLeaderOnRegistration() throws Exception {
157         // This test tests the timing window in which a change listener is registered before the
158         // shard becomes the leader. We verify that the listener is registered and notified of the
159         // existing data when the shard becomes the leader.
160         new ShardTestKit(getSystem()) {{
161             // For this test, we want to send the RegisterChangeListener message after the shard
162             // has recovered from persistence and before it becomes the leader. So we subclass
163             // Shard to override onReceiveCommand and, when the first ElectionTimeout is received,
164             // we know that the shard has been initialized to a follower and has started the
165             // election process. The following 2 CountDownLatches are used to coordinate the
166             // ElectionTimeout with the sending of the RegisterChangeListener message.
167             final CountDownLatch onFirstElectionTimeout = new CountDownLatch(1);
168             final CountDownLatch onChangeListenerRegistered = new CountDownLatch(1);
169             Creator<Shard> creator = new Creator<Shard>() {
170                 boolean firstElectionTimeout = true;
171
172                 @Override
173                 public Shard create() throws Exception {
174                     // Use a non persistent provider because this test actually invokes persist on the journal
175                     // this will cause all other messages to not be queued properly after that.
176                     // The basic issue is that you cannot use TestActorRef with a persistent actor (at least when
177                     // it does do a persist)
178                     return new Shard(shardID, Collections.<String,String>emptyMap(),
179                             dataStoreContextBuilder.persistent(false).build(), SCHEMA_CONTEXT) {
180                         @Override
181                         public void onReceiveCommand(final Object message) throws Exception {
182                             if(message instanceof ElectionTimeout && firstElectionTimeout) {
183                                 // Got the first ElectionTimeout. We don't forward it to the
184                                 // base Shard yet until we've sent the RegisterChangeListener
185                                 // message. So we signal the onFirstElectionTimeout latch to tell
186                                 // the main thread to send the RegisterChangeListener message and
187                                 // start a thread to wait on the onChangeListenerRegistered latch,
188                                 // which the main thread signals after it has sent the message.
189                                 // After the onChangeListenerRegistered is triggered, we send the
190                                 // original ElectionTimeout message to proceed with the election.
191                                 firstElectionTimeout = false;
192                                 final ActorRef self = getSelf();
193                                 new Thread() {
194                                     @Override
195                                     public void run() {
196                                         Uninterruptibles.awaitUninterruptibly(
197                                                 onChangeListenerRegistered, 5, TimeUnit.SECONDS);
198                                         self.tell(message, self);
199                                     }
200                                 }.start();
201
202                                 onFirstElectionTimeout.countDown();
203                             } else {
204                                 super.onReceiveCommand(message);
205                             }
206                         }
207                     };
208                 }
209             };
210
211             MockDataChangeListener listener = new MockDataChangeListener(1);
212             ActorRef dclActor = getSystem().actorOf(DataChangeListener.props(listener),
213                     "testRegisterChangeListenerWhenNotLeaderInitially-DataChangeListener");
214
215             TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
216                     Props.create(new DelegatingShardCreator(creator)),
217                     "testRegisterChangeListenerWhenNotLeaderInitially");
218
219             // Write initial data into the in-memory store.
220             YangInstanceIdentifier path = TestModel.TEST_PATH;
221             writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
222
223             // Wait until the shard receives the first ElectionTimeout message.
224             assertEquals("Got first ElectionTimeout", true,
225                     onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
226
227             // Now send the RegisterChangeListener and wait for the reply.
228             shard.tell(new RegisterChangeListener(path, dclActor,
229                     AsyncDataBroker.DataChangeScope.SUBTREE), getRef());
230
231             RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
232                     RegisterChangeListenerReply.class);
233             assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
234
235             // Sanity check - verify the shard is not the leader yet.
236             shard.tell(new FindLeader(), getRef());
237             FindLeaderReply findLeadeReply =
238                     expectMsgClass(duration("5 seconds"), FindLeaderReply.class);
239             assertNull("Expected the shard not to be the leader", findLeadeReply.getLeaderActor());
240
241             // Signal the onChangeListenerRegistered latch to tell the thread above to proceed
242             // with the election process.
243             onChangeListenerRegistered.countDown();
244
245             // Wait for the shard to become the leader and notify our listener with the existing
246             // data in the store.
247             listener.waitForChangeEvents(path);
248
249             dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
250             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
251         }};
252     }
253
254     @Test
255     public void testRegisterDataTreeChangeListener() throws Exception {
256         new ShardTestKit(getSystem()) {{
257             TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
258                     newShardProps(), "testRegisterDataTreeChangeListener");
259
260             waitUntilLeader(shard);
261
262             shard.tell(new UpdateSchemaContext(SchemaContextHelper.full()), ActorRef.noSender());
263
264             MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
265             ActorRef dclActor = getSystem().actorOf(DataTreeChangeListenerActor.props(listener),
266                     "testRegisterDataTreeChangeListener-DataTreeChangeListener");
267
268             shard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor), getRef());
269
270             RegisterDataTreeChangeListenerReply reply = expectMsgClass(duration("3 seconds"),
271                     RegisterDataTreeChangeListenerReply.class);
272             String replyPath = reply.getListenerRegistrationPath().toString();
273             assertTrue("Incorrect reply path: " + replyPath, replyPath.matches(
274                     "akka:\\/\\/test\\/user\\/testRegisterDataTreeChangeListener\\/\\$.*"));
275
276             YangInstanceIdentifier path = TestModel.TEST_PATH;
277             writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
278
279             listener.waitForChangeEvents();
280
281             dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
282             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
283         }};
284     }
285
286     @SuppressWarnings("serial")
287     @Test
288     public void testDataTreeChangeListenerNotifiedWhenNotTheLeaderOnRegistration() throws Exception {
289         new ShardTestKit(getSystem()) {{
290             final CountDownLatch onFirstElectionTimeout = new CountDownLatch(1);
291             final CountDownLatch onChangeListenerRegistered = new CountDownLatch(1);
292             Creator<Shard> creator = new Creator<Shard>() {
293                 boolean firstElectionTimeout = true;
294
295                 @Override
296                 public Shard create() throws Exception {
297                     return new Shard(shardID, Collections.<String,String>emptyMap(),
298                             dataStoreContextBuilder.persistent(false).build(), SCHEMA_CONTEXT) {
299                         @Override
300                         public void onReceiveCommand(final Object message) throws Exception {
301                             if(message instanceof ElectionTimeout && firstElectionTimeout) {
302                                 firstElectionTimeout = false;
303                                 final ActorRef self = getSelf();
304                                 new Thread() {
305                                     @Override
306                                     public void run() {
307                                         Uninterruptibles.awaitUninterruptibly(
308                                                 onChangeListenerRegistered, 5, TimeUnit.SECONDS);
309                                         self.tell(message, self);
310                                     }
311                                 }.start();
312
313                                 onFirstElectionTimeout.countDown();
314                             } else {
315                                 super.onReceiveCommand(message);
316                             }
317                         }
318                     };
319                 }
320             };
321
322             MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
323             ActorRef dclActor = getSystem().actorOf(DataTreeChangeListenerActor.props(listener),
324                     "testDataTreeChangeListenerNotifiedWhenNotTheLeaderOnRegistration-DataChangeListener");
325
326             TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
327                     Props.create(new DelegatingShardCreator(creator)),
328                     "testDataTreeChangeListenerNotifiedWhenNotTheLeaderOnRegistration");
329
330             YangInstanceIdentifier path = TestModel.TEST_PATH;
331             writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
332
333             assertEquals("Got first ElectionTimeout", true,
334                     onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
335
336             shard.tell(new RegisterDataTreeChangeListener(path, dclActor), getRef());
337             RegisterDataTreeChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
338                     RegisterDataTreeChangeListenerReply.class);
339             assertNotNull("getListenerRegistratioznPath", reply.getListenerRegistrationPath());
340
341             shard.tell(new FindLeader(), getRef());
342             FindLeaderReply findLeadeReply =
343                     expectMsgClass(duration("5 seconds"), FindLeaderReply.class);
344             assertNull("Expected the shard not to be the leader", findLeadeReply.getLeaderActor());
345
346             writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
347
348             onChangeListenerRegistered.countDown();
349
350             // TODO: investigate why we do not receive data chage events
351             listener.waitForChangeEvents();
352
353             dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
354             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
355         }};
356     }
357
358     @Test
359     public void testCreateTransaction(){
360         new ShardTestKit(getSystem()) {{
361             ActorRef shard = getSystem().actorOf(newShardProps(), "testCreateTransaction");
362
363             waitUntilLeader(shard);
364
365             shard.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
366
367             shard.tell(new CreateTransaction("txn-1",
368                     TransactionProxy.TransactionType.READ_ONLY.ordinal() ).toSerializable(), getRef());
369
370             CreateTransactionReply reply = expectMsgClass(duration("3 seconds"),
371                     CreateTransactionReply.class);
372
373             String path = reply.getTransactionActorPath().toString();
374             assertTrue("Unexpected transaction path " + path,
375                     path.contains("akka://test/user/testCreateTransaction/shard-txn-1"));
376
377             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
378         }};
379     }
380
381     @Test
382     public void testCreateTransactionOnChain(){
383         new ShardTestKit(getSystem()) {{
384             final ActorRef shard = getSystem().actorOf(newShardProps(), "testCreateTransactionOnChain");
385
386             waitUntilLeader(shard);
387
388             shard.tell(new CreateTransaction("txn-1",
389                     TransactionProxy.TransactionType.READ_ONLY.ordinal() , "foobar").toSerializable(),
390                     getRef());
391
392             CreateTransactionReply reply = expectMsgClass(duration("3 seconds"),
393                     CreateTransactionReply.class);
394
395             String path = reply.getTransactionActorPath().toString();
396             assertTrue("Unexpected transaction path " + path,
397                     path.contains("akka://test/user/testCreateTransactionOnChain/shard-txn-1"));
398
399             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
400         }};
401     }
402
403     @SuppressWarnings("serial")
404     @Test
405     public void testPeerAddressResolved() throws Exception {
406         new ShardTestKit(getSystem()) {{
407             final CountDownLatch recoveryComplete = new CountDownLatch(1);
408             class TestShard extends Shard {
409                 TestShard() {
410                     super(shardID, Collections.<String, String>singletonMap(shardID.toString(), null),
411                             newDatastoreContext(), SCHEMA_CONTEXT);
412                 }
413
414                 Map<String, String> getPeerAddresses() {
415                     return getRaftActorContext().getPeerAddresses();
416                 }
417
418                 @Override
419                 protected void onRecoveryComplete() {
420                     try {
421                         super.onRecoveryComplete();
422                     } finally {
423                         recoveryComplete.countDown();
424                     }
425                 }
426             }
427
428             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
429                     Props.create(new DelegatingShardCreator(new Creator<Shard>() {
430                         @Override
431                         public TestShard create() throws Exception {
432                             return new TestShard();
433                         }
434                     })), "testPeerAddressResolved");
435
436             //waitUntilLeader(shard);
437             assertEquals("Recovery complete", true,
438                     Uninterruptibles.awaitUninterruptibly(recoveryComplete, 5, TimeUnit.SECONDS));
439
440             String address = "akka://foobar";
441             shard.underlyingActor().onReceiveCommand(new PeerAddressResolved(shardID.toString(), address));
442
443             assertEquals("getPeerAddresses", address,
444                     ((TestShard)shard.underlyingActor()).getPeerAddresses().get(shardID.toString()));
445
446             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
447         }};
448     }
449
450     @Test
451     public void testApplySnapshot() throws Exception {
452         TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(),
453                 "testApplySnapshot");
454
455         DataTree store = InMemoryDataTreeFactory.getInstance().create();
456         store.setSchemaContext(SCHEMA_CONTEXT);
457
458         writeToStore(store, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
459
460         YangInstanceIdentifier root = YangInstanceIdentifier.builder().build();
461         NormalizedNode<?,?> expected = readStore(store, root);
462
463         ApplySnapshot applySnapshot = new ApplySnapshot(Snapshot.create(
464                 SerializationUtils.serializeNormalizedNode(expected),
465                 Collections.<ReplicatedLogEntry>emptyList(), 1, 2, 3, 4));
466
467         shard.underlyingActor().onReceiveCommand(applySnapshot);
468
469         NormalizedNode<?,?> actual = readStore(shard, root);
470
471         assertEquals("Root node", expected, actual);
472
473         shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
474     }
475
476     @Test
477     public void testApplyState() throws Exception {
478
479         TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(), "testApplyState");
480
481         NormalizedNode<?, ?> node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
482
483         ApplyState applyState = new ApplyState(null, "test", new ReplicatedLogImplEntry(1, 2,
484                 newModificationPayload(new WriteModification(TestModel.TEST_PATH, node))));
485
486         shard.underlyingActor().onReceiveCommand(applyState);
487
488         NormalizedNode<?,?> actual = readStore(shard, TestModel.TEST_PATH);
489         assertEquals("Applied state", node, actual);
490
491         shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
492     }
493
494     @Test
495     public void testApplyStateWithCandidatePayload() throws Exception {
496
497         TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(), "testApplyState");
498
499         NormalizedNode<?, ?> node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
500         DataTreeCandidate candidate = DataTreeCandidates.fromNormalizedNode(TestModel.TEST_PATH, node);
501
502         ApplyState applyState = new ApplyState(null, "test", new ReplicatedLogImplEntry(1, 2,
503                 DataTreeCandidatePayload.create(candidate)));
504
505         shard.underlyingActor().onReceiveCommand(applyState);
506
507         NormalizedNode<?,?> actual = readStore(shard, TestModel.TEST_PATH);
508         assertEquals("Applied state", node, actual);
509
510         shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
511     }
512
513     DataTree setupInMemorySnapshotStore() throws DataValidationFailedException {
514         DataTree testStore = InMemoryDataTreeFactory.getInstance().create();
515         testStore.setSchemaContext(SCHEMA_CONTEXT);
516
517         writeToStore(testStore, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
518
519         NormalizedNode<?, ?> root = readStore(testStore, YangInstanceIdentifier.builder().build());
520
521         InMemorySnapshotStore.addSnapshot(shardID.toString(), Snapshot.create(
522                 SerializationUtils.serializeNormalizedNode(root),
523                 Collections.<ReplicatedLogEntry>emptyList(), 0, 1, -1, -1));
524         return testStore;
525     }
526
527     private static DataTreeCandidatePayload payloadForModification(DataTree source, DataTreeModification mod) throws DataValidationFailedException {
528         source.validate(mod);
529         final DataTreeCandidate candidate = source.prepare(mod);
530         source.commit(candidate);
531         return DataTreeCandidatePayload.create(candidate);
532     }
533
534     @Test
535     public void testDataTreeCandidateRecovery() throws Exception {
536         // Set up the InMemorySnapshotStore.
537         final DataTree source = setupInMemorySnapshotStore();
538
539         final DataTreeModification writeMod = source.takeSnapshot().newModification();
540         writeMod.write(TestModel.OUTER_LIST_PATH, ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
541
542         // Set up the InMemoryJournal.
543         InMemoryJournal.addEntry(shardID.toString(), 0, new ReplicatedLogImplEntry(0, 1, payloadForModification(source, writeMod)));
544
545         int nListEntries = 16;
546         Set<Integer> listEntryKeys = new HashSet<>();
547
548         // Add some ModificationPayload entries
549         for (int i = 1; i <= nListEntries; i++) {
550             listEntryKeys.add(Integer.valueOf(i));
551
552             YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
553                     .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
554
555             final DataTreeModification mod = source.takeSnapshot().newModification();
556             mod.merge(path, ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i));
557
558             InMemoryJournal.addEntry(shardID.toString(), i, new ReplicatedLogImplEntry(i, 1,
559                 payloadForModification(source, mod)));
560         }
561
562         InMemoryJournal.addEntry(shardID.toString(), nListEntries + 1,
563                 new ApplyJournalEntries(nListEntries));
564
565         testRecovery(listEntryKeys);
566     }
567
568     @Test
569     public void testModicationRecovery() throws Exception {
570
571         // Set up the InMemorySnapshotStore.
572         setupInMemorySnapshotStore();
573
574         // Set up the InMemoryJournal.
575
576         InMemoryJournal.addEntry(shardID.toString(), 0, new ReplicatedLogImplEntry(0, 1, newModificationPayload(
577                   new WriteModification(TestModel.OUTER_LIST_PATH,
578                           ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build()))));
579
580         int nListEntries = 16;
581         Set<Integer> listEntryKeys = new HashSet<>();
582
583         // Add some ModificationPayload entries
584         for(int i = 1; i <= nListEntries; i++) {
585             listEntryKeys.add(Integer.valueOf(i));
586             YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
587                     .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
588             Modification mod = new MergeModification(path,
589                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i));
590             InMemoryJournal.addEntry(shardID.toString(), i, new ReplicatedLogImplEntry(i, 1,
591                     newModificationPayload(mod)));
592         }
593
594         InMemoryJournal.addEntry(shardID.toString(), nListEntries + 1,
595                 new ApplyJournalEntries(nListEntries));
596
597         testRecovery(listEntryKeys);
598     }
599
600     private static ModificationPayload newModificationPayload(final Modification... mods) throws IOException {
601         MutableCompositeModification compMod = new MutableCompositeModification();
602         for(Modification mod: mods) {
603             compMod.addModification(mod);
604         }
605
606         return new ModificationPayload(compMod);
607     }
608
609     @Test
610     public void testConcurrentThreePhaseCommits() throws Throwable {
611         new ShardTestKit(getSystem()) {{
612             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
613                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
614                     "testConcurrentThreePhaseCommits");
615
616             waitUntilLeader(shard);
617
618          // Setup 3 simulated transactions with mock cohorts backed by real cohorts.
619
620             ShardDataTree dataStore = shard.underlyingActor().getDataStore();
621
622             String transactionID1 = "tx1";
623             MutableCompositeModification modification1 = new MutableCompositeModification();
624             ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
625                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
626
627             String transactionID2 = "tx2";
628             MutableCompositeModification modification2 = new MutableCompositeModification();
629             ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
630                     TestModel.OUTER_LIST_PATH,
631                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
632                     modification2);
633
634             String transactionID3 = "tx3";
635             MutableCompositeModification modification3 = new MutableCompositeModification();
636             ShardDataTreeCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
637                     YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
638                         .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
639                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1),
640                     modification3);
641
642             long timeoutSec = 5;
643             final FiniteDuration duration = FiniteDuration.create(timeoutSec, TimeUnit.SECONDS);
644             final Timeout timeout = new Timeout(duration);
645
646             // Simulate the ForwardedReadyTransaction message for the first Tx that would be sent
647             // by the ShardTransaction.
648
649             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
650                     cohort1, modification1, true, false), getRef());
651             ReadyTransactionReply readyReply = ReadyTransactionReply.fromSerializable(
652                     expectMsgClass(duration, ReadyTransactionReply.class));
653             assertEquals("Cohort path", shard.path().toString(), readyReply.getCohortPath());
654
655             // Send the CanCommitTransaction message for the first Tx.
656
657             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
658             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
659                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
660             assertEquals("Can commit", true, canCommitReply.getCanCommit());
661
662             // Send the ForwardedReadyTransaction for the next 2 Tx's.
663
664             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
665                     cohort2, modification2, true, false), getRef());
666             expectMsgClass(duration, ReadyTransactionReply.class);
667
668             shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
669                     cohort3, modification3, true, false), getRef());
670             expectMsgClass(duration, ReadyTransactionReply.class);
671
672             // Send the CanCommitTransaction message for the next 2 Tx's. These should get queued and
673             // processed after the first Tx completes.
674
675             Future<Object> canCommitFuture1 = Patterns.ask(shard,
676                     new CanCommitTransaction(transactionID2).toSerializable(), timeout);
677
678             Future<Object> canCommitFuture2 = Patterns.ask(shard,
679                     new CanCommitTransaction(transactionID3).toSerializable(), timeout);
680
681             // Send the CommitTransaction message for the first Tx. After it completes, it should
682             // trigger the 2nd Tx to proceed which should in turn then trigger the 3rd.
683
684             shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
685             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
686
687             // Wait for the next 2 Tx's to complete.
688
689             final AtomicReference<Throwable> caughtEx = new AtomicReference<>();
690             final CountDownLatch commitLatch = new CountDownLatch(2);
691
692             class OnFutureComplete extends OnComplete<Object> {
693                 private final Class<?> expRespType;
694
695                 OnFutureComplete(final Class<?> expRespType) {
696                     this.expRespType = expRespType;
697                 }
698
699                 @Override
700                 public void onComplete(final Throwable error, final Object resp) {
701                     if(error != null) {
702                         caughtEx.set(new AssertionError(getClass().getSimpleName() + " failure", error));
703                     } else {
704                         try {
705                             assertEquals("Commit response type", expRespType, resp.getClass());
706                             onSuccess(resp);
707                         } catch (Exception e) {
708                             caughtEx.set(e);
709                         }
710                     }
711                 }
712
713                 void onSuccess(final Object resp) throws Exception {
714                 }
715             }
716
717             class OnCommitFutureComplete extends OnFutureComplete {
718                 OnCommitFutureComplete() {
719                     super(CommitTransactionReply.SERIALIZABLE_CLASS);
720                 }
721
722                 @Override
723                 public void onComplete(final Throwable error, final Object resp) {
724                     super.onComplete(error, resp);
725                     commitLatch.countDown();
726                 }
727             }
728
729             class OnCanCommitFutureComplete extends OnFutureComplete {
730                 private final String transactionID;
731
732                 OnCanCommitFutureComplete(final String transactionID) {
733                     super(CanCommitTransactionReply.SERIALIZABLE_CLASS);
734                     this.transactionID = transactionID;
735                 }
736
737                 @Override
738                 void onSuccess(final Object resp) throws Exception {
739                     CanCommitTransactionReply canCommitReply =
740                             CanCommitTransactionReply.fromSerializable(resp);
741                     assertEquals("Can commit", true, canCommitReply.getCanCommit());
742
743                     Future<Object> commitFuture = Patterns.ask(shard,
744                             new CommitTransaction(transactionID).toSerializable(), timeout);
745                     commitFuture.onComplete(new OnCommitFutureComplete(), getSystem().dispatcher());
746                 }
747             }
748
749             canCommitFuture1.onComplete(new OnCanCommitFutureComplete(transactionID2),
750                     getSystem().dispatcher());
751
752             canCommitFuture2.onComplete(new OnCanCommitFutureComplete(transactionID3),
753                     getSystem().dispatcher());
754
755             boolean done = commitLatch.await(timeoutSec, TimeUnit.SECONDS);
756
757             if(caughtEx.get() != null) {
758                 throw caughtEx.get();
759             }
760
761             assertEquals("Commits complete", true, done);
762
763             InOrder inOrder = inOrder(cohort1, cohort2, cohort3);
764             inOrder.verify(cohort1).canCommit();
765             inOrder.verify(cohort1).preCommit();
766             inOrder.verify(cohort1).commit();
767             inOrder.verify(cohort2).canCommit();
768             inOrder.verify(cohort2).preCommit();
769             inOrder.verify(cohort2).commit();
770             inOrder.verify(cohort3).canCommit();
771             inOrder.verify(cohort3).preCommit();
772             inOrder.verify(cohort3).commit();
773
774             // Verify data in the data store.
775
776             verifyOuterListEntry(shard, 1);
777
778             verifyLastApplied(shard, 2);
779
780             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
781         }};
782     }
783
784     private static BatchedModifications newBatchedModifications(String transactionID, YangInstanceIdentifier path,
785             NormalizedNode<?, ?> data, boolean ready, boolean doCommitOnReady) {
786         return newBatchedModifications(transactionID, null, path, data, ready, doCommitOnReady);
787     }
788
789     private static BatchedModifications newBatchedModifications(String transactionID, String transactionChainID,
790             YangInstanceIdentifier path, NormalizedNode<?, ?> data, boolean ready, boolean doCommitOnReady) {
791         BatchedModifications batched = new BatchedModifications(transactionID, CURRENT_VERSION, transactionChainID);
792         batched.addModification(new WriteModification(path, data));
793         batched.setReady(ready);
794         batched.setDoCommitOnReady(doCommitOnReady);
795         return batched;
796     }
797
798     @Test
799     public void testBatchedModificationsWithNoCommitOnReady() throws Throwable {
800         new ShardTestKit(getSystem()) {{
801             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
802                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
803                     "testBatchedModificationsWithNoCommitOnReady");
804
805             waitUntilLeader(shard);
806
807             final String transactionID = "tx";
808             FiniteDuration duration = duration("5 seconds");
809
810             final AtomicReference<ShardDataTreeCohort> mockCohort = new AtomicReference<>();
811             ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() {
812                 @Override
813                 public ShardDataTreeCohort decorate(String txID, ShardDataTreeCohort actual) {
814                     if(mockCohort.get() == null) {
815                         mockCohort.set(createDelegatingMockCohort("cohort", actual));
816                     }
817
818                     return mockCohort.get();
819                 }
820             };
821
822             shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator);
823
824             // Send a BatchedModifications to start a transaction.
825
826             shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH,
827                     ImmutableNodes.containerNode(TestModel.TEST_QNAME), false, false), getRef());
828             expectMsgClass(duration, BatchedModificationsReply.class);
829
830             // Send a couple more BatchedModifications.
831
832             shard.tell(newBatchedModifications(transactionID, TestModel.OUTER_LIST_PATH,
833                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), false, false), getRef());
834             expectMsgClass(duration, BatchedModificationsReply.class);
835
836             shard.tell(newBatchedModifications(transactionID, YangInstanceIdentifier.builder(
837                     TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
838                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true, false), getRef());
839             expectMsgClass(duration, ReadyTransactionReply.class);
840
841             // Send the CanCommitTransaction message.
842
843             shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
844             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
845                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
846             assertEquals("Can commit", true, canCommitReply.getCanCommit());
847
848             // Send the CanCommitTransaction message.
849
850             shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
851             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
852
853             InOrder inOrder = inOrder(mockCohort.get());
854             inOrder.verify(mockCohort.get()).canCommit();
855             inOrder.verify(mockCohort.get()).preCommit();
856             inOrder.verify(mockCohort.get()).commit();
857
858             // Verify data in the data store.
859
860             verifyOuterListEntry(shard, 1);
861
862             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
863         }};
864     }
865
866     @Test
867     public void testBatchedModificationsWithCommitOnReady() throws Throwable {
868         new ShardTestKit(getSystem()) {{
869             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
870                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
871                     "testBatchedModificationsWithCommitOnReady");
872
873             waitUntilLeader(shard);
874
875             final String transactionID = "tx";
876             FiniteDuration duration = duration("5 seconds");
877
878             final AtomicReference<ShardDataTreeCohort> mockCohort = new AtomicReference<>();
879             ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() {
880                 @Override
881                 public ShardDataTreeCohort decorate(String txID, ShardDataTreeCohort actual) {
882                     if(mockCohort.get() == null) {
883                         mockCohort.set(createDelegatingMockCohort("cohort", actual));
884                     }
885
886                     return mockCohort.get();
887                 }
888             };
889
890             shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator);
891
892             // Send a BatchedModifications to start a transaction.
893
894             shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH,
895                     ImmutableNodes.containerNode(TestModel.TEST_QNAME), false, false), getRef());
896             expectMsgClass(duration, BatchedModificationsReply.class);
897
898             // Send a couple more BatchedModifications.
899
900             shard.tell(newBatchedModifications(transactionID, TestModel.OUTER_LIST_PATH,
901                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), false, false), getRef());
902             expectMsgClass(duration, BatchedModificationsReply.class);
903
904             shard.tell(newBatchedModifications(transactionID, YangInstanceIdentifier.builder(
905                     TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
906                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true, true), getRef());
907
908             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
909
910             InOrder inOrder = inOrder(mockCohort.get());
911             inOrder.verify(mockCohort.get()).canCommit();
912             inOrder.verify(mockCohort.get()).preCommit();
913             inOrder.verify(mockCohort.get()).commit();
914
915             // Verify data in the data store.
916
917             verifyOuterListEntry(shard, 1);
918
919             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
920         }};
921     }
922
923     @SuppressWarnings("unchecked")
924     private static void verifyOuterListEntry(final TestActorRef<Shard> shard, Object expIDValue) throws Exception {
925         NormalizedNode<?, ?> outerList = readStore(shard, TestModel.OUTER_LIST_PATH);
926         assertNotNull(TestModel.OUTER_LIST_QNAME.getLocalName() + " not found", outerList);
927         assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " value is not Iterable",
928                 outerList.getValue() instanceof Iterable);
929         Object entry = ((Iterable<Object>)outerList.getValue()).iterator().next();
930         assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " entry is not MapEntryNode",
931                 entry instanceof MapEntryNode);
932         MapEntryNode mapEntry = (MapEntryNode)entry;
933         Optional<DataContainerChild<? extends PathArgument, ?>> idLeaf =
934                 mapEntry.getChild(new YangInstanceIdentifier.NodeIdentifier(TestModel.ID_QNAME));
935         assertTrue("Missing leaf " + TestModel.ID_QNAME.getLocalName(), idLeaf.isPresent());
936         assertEquals(TestModel.ID_QNAME.getLocalName() + " value", expIDValue, idLeaf.get().getValue());
937     }
938
939     @Test
940     public void testBatchedModificationsOnTransactionChain() throws Throwable {
941         new ShardTestKit(getSystem()) {{
942             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
943                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
944                     "testBatchedModificationsOnTransactionChain");
945
946             waitUntilLeader(shard);
947
948             String transactionChainID = "txChain";
949             String transactionID1 = "tx1";
950             String transactionID2 = "tx2";
951
952             FiniteDuration duration = duration("5 seconds");
953
954             // Send a BatchedModifications to start a chained write transaction and ready it.
955
956             ContainerNode containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
957             YangInstanceIdentifier path = TestModel.TEST_PATH;
958             shard.tell(newBatchedModifications(transactionID1, transactionChainID, path,
959                     containerNode, true, false), getRef());
960             expectMsgClass(duration, ReadyTransactionReply.class);
961
962             // Create a read Tx on the same chain.
963
964             shard.tell(new CreateTransaction(transactionID2, TransactionProxy.TransactionType.READ_ONLY.ordinal() ,
965                     transactionChainID).toSerializable(), getRef());
966
967             CreateTransactionReply createReply = expectMsgClass(duration("3 seconds"), CreateTransactionReply.class);
968
969             getSystem().actorSelection(createReply.getTransactionActorPath()).tell(new ReadData(path), getRef());
970             ReadDataReply readReply = expectMsgClass(duration("3 seconds"), ReadDataReply.class);
971             assertEquals("Read node", containerNode, readReply.getNormalizedNode());
972
973             // Commit the write transaction.
974
975             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
976             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
977                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
978             assertEquals("Can commit", true, canCommitReply.getCanCommit());
979
980             shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
981             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
982
983             // Verify data in the data store.
984
985             NormalizedNode<?, ?> actualNode = readStore(shard, path);
986             assertEquals("Stored node", containerNode, actualNode);
987
988             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
989         }};
990     }
991
992     @Test
993     public void testOnBatchedModificationsWhenNotLeader() {
994         final AtomicBoolean overrideLeaderCalls = new AtomicBoolean();
995         new ShardTestKit(getSystem()) {{
996             Creator<Shard> creator = new Creator<Shard>() {
997                 private static final long serialVersionUID = 1L;
998
999                 @Override
1000                 public Shard create() throws Exception {
1001                     return new Shard(shardID, Collections.<String,String>emptyMap(),
1002                             newDatastoreContext(), SCHEMA_CONTEXT) {
1003                         @Override
1004                         protected boolean isLeader() {
1005                             return overrideLeaderCalls.get() ? false : super.isLeader();
1006                         }
1007
1008                         @Override
1009                         protected ActorSelection getLeader() {
1010                             return overrideLeaderCalls.get() ? getSystem().actorSelection(getRef().path()) :
1011                                 super.getLeader();
1012                         }
1013                     };
1014                 }
1015             };
1016
1017             TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1018                     Props.create(new DelegatingShardCreator(creator)), "testOnBatchedModificationsWhenNotLeader");
1019
1020             waitUntilLeader(shard);
1021
1022             overrideLeaderCalls.set(true);
1023
1024             BatchedModifications batched = new BatchedModifications("tx", DataStoreVersions.CURRENT_VERSION, "");
1025
1026             shard.tell(batched, ActorRef.noSender());
1027
1028             expectMsgEquals(batched);
1029
1030             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1031         }};
1032     }
1033
1034     @Test
1035     public void testForwardedReadyTransactionWithImmediateCommit() throws Exception{
1036         new ShardTestKit(getSystem()) {{
1037             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1038                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1039                     "testForwardedReadyTransactionWithImmediateCommit");
1040
1041             waitUntilLeader(shard);
1042
1043             ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1044
1045             String transactionID = "tx1";
1046             MutableCompositeModification modification = new MutableCompositeModification();
1047             NormalizedNode<?, ?> containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1048             ShardDataTreeCohort cohort = setupMockWriteTransaction("cohort", dataStore,
1049                     TestModel.TEST_PATH, containerNode, modification);
1050
1051             FiniteDuration duration = duration("5 seconds");
1052
1053             // Simulate the ForwardedReadyTransaction messages that would be sent
1054             // by the ShardTransaction.
1055
1056             shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
1057                     cohort, modification, true, true), getRef());
1058
1059             expectMsgClass(duration, ThreePhaseCommitCohortMessages.CommitTransactionReply.class);
1060
1061             InOrder inOrder = inOrder(cohort);
1062             inOrder.verify(cohort).canCommit();
1063             inOrder.verify(cohort).preCommit();
1064             inOrder.verify(cohort).commit();
1065
1066             NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.TEST_PATH);
1067             assertEquals(TestModel.TEST_QNAME.getLocalName(), containerNode, actualNode);
1068
1069             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1070         }};
1071     }
1072
1073     @Test
1074     public void testCommitWithPersistenceDisabled() throws Throwable {
1075         dataStoreContextBuilder.persistent(false);
1076         new ShardTestKit(getSystem()) {{
1077             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1078                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1079                     "testCommitWithPersistenceDisabled");
1080
1081             waitUntilLeader(shard);
1082
1083             ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1084
1085             // Setup a simulated transactions with a mock cohort.
1086
1087             String transactionID = "tx";
1088             MutableCompositeModification modification = new MutableCompositeModification();
1089             NormalizedNode<?, ?> containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1090             ShardDataTreeCohort cohort = setupMockWriteTransaction("cohort", dataStore,
1091                     TestModel.TEST_PATH, containerNode, modification);
1092
1093             FiniteDuration duration = duration("5 seconds");
1094
1095             // Simulate the ForwardedReadyTransaction messages that would be sent
1096             // by the ShardTransaction.
1097
1098             shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
1099                     cohort, modification, true, false), getRef());
1100             expectMsgClass(duration, ReadyTransactionReply.class);
1101
1102             // Send the CanCommitTransaction message.
1103
1104             shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
1105             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1106                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1107             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1108
1109             // Send the CanCommitTransaction message.
1110
1111             shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
1112             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1113
1114             InOrder inOrder = inOrder(cohort);
1115             inOrder.verify(cohort).canCommit();
1116             inOrder.verify(cohort).preCommit();
1117             inOrder.verify(cohort).commit();
1118
1119             NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.TEST_PATH);
1120             assertEquals(TestModel.TEST_QNAME.getLocalName(), containerNode, actualNode);
1121
1122             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1123         }};
1124     }
1125
1126     private static DataTreeCandidateTip mockCandidate(final String name) {
1127         DataTreeCandidateTip mockCandidate = mock(DataTreeCandidateTip.class, name);
1128         DataTreeCandidateNode mockCandidateNode = mock(DataTreeCandidateNode.class, name + "-node");
1129         doReturn(ModificationType.WRITE).when(mockCandidateNode).getModificationType();
1130         doReturn(Optional.of(ImmutableNodes.containerNode(CARS_QNAME))).when(mockCandidateNode).getDataAfter();
1131         doReturn(YangInstanceIdentifier.builder().build()).when(mockCandidate).getRootPath();
1132         doReturn(mockCandidateNode).when(mockCandidate).getRootNode();
1133         return mockCandidate;
1134     }
1135
1136     private static DataTreeCandidateTip mockUnmodifiedCandidate(final String name) {
1137         DataTreeCandidateTip mockCandidate = mock(DataTreeCandidateTip.class, name);
1138         DataTreeCandidateNode mockCandidateNode = mock(DataTreeCandidateNode.class, name + "-node");
1139         doReturn(ModificationType.UNMODIFIED).when(mockCandidateNode).getModificationType();
1140         doReturn(YangInstanceIdentifier.builder().build()).when(mockCandidate).getRootPath();
1141         doReturn(mockCandidateNode).when(mockCandidate).getRootNode();
1142         return mockCandidate;
1143     }
1144
1145     @Test
1146     public void testCommitWhenTransactionHasNoModifications(){
1147         // Note that persistence is enabled which would normally result in the entry getting written to the journal
1148         // but here that need not happen
1149         new ShardTestKit(getSystem()) {
1150             {
1151                 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1152                         newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1153                         "testCommitWhenTransactionHasNoModifications");
1154
1155                 waitUntilLeader(shard);
1156
1157                 String transactionID = "tx1";
1158                 MutableCompositeModification modification = new MutableCompositeModification();
1159                 ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1160                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1161                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).preCommit();
1162                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).commit();
1163                 doReturn(mockUnmodifiedCandidate("cohort1-candidate")).when(cohort).getCandidate();
1164
1165                 FiniteDuration duration = duration("5 seconds");
1166
1167                 // Simulate the ForwardedReadyTransaction messages that would be sent
1168                 // by the ShardTransaction.
1169
1170                 shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
1171                         cohort, modification, true, false), getRef());
1172                 expectMsgClass(duration, ReadyTransactionReply.class);
1173
1174                 // Send the CanCommitTransaction message.
1175
1176                 shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
1177                 CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1178                         expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1179                 assertEquals("Can commit", true, canCommitReply.getCanCommit());
1180
1181                 shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
1182                 expectMsgClass(duration, ThreePhaseCommitCohortMessages.CommitTransactionReply.class);
1183
1184                 InOrder inOrder = inOrder(cohort);
1185                 inOrder.verify(cohort).canCommit();
1186                 inOrder.verify(cohort).preCommit();
1187                 inOrder.verify(cohort).commit();
1188
1189                 // Use MBean for verification
1190                 // Committed transaction count should increase as usual
1191                 assertEquals(1,shard.underlyingActor().getShardMBean().getCommittedTransactionsCount());
1192
1193                 // Commit index should not advance because this does not go into the journal
1194                 assertEquals(-1, shard.underlyingActor().getShardMBean().getCommitIndex());
1195
1196                 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1197
1198             }
1199         };
1200     }
1201
1202     @Test
1203     public void testCommitWhenTransactionHasModifications(){
1204         new ShardTestKit(getSystem()) {
1205             {
1206                 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1207                         newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1208                         "testCommitWhenTransactionHasModifications");
1209
1210                 waitUntilLeader(shard);
1211
1212                 String transactionID = "tx1";
1213                 MutableCompositeModification modification = new MutableCompositeModification();
1214                 modification.addModification(new DeleteModification(YangInstanceIdentifier.builder().build()));
1215                 ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1216                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1217                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).preCommit();
1218                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).commit();
1219                 doReturn(mockCandidate("cohort1-candidate")).when(cohort).getCandidate();
1220
1221                 FiniteDuration duration = duration("5 seconds");
1222
1223                 // Simulate the ForwardedReadyTransaction messages that would be sent
1224                 // by the ShardTransaction.
1225
1226                 shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
1227                         cohort, modification, true, false), getRef());
1228                 expectMsgClass(duration, ReadyTransactionReply.class);
1229
1230                 // Send the CanCommitTransaction message.
1231
1232                 shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
1233                 CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1234                         expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1235                 assertEquals("Can commit", true, canCommitReply.getCanCommit());
1236
1237                 shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
1238                 expectMsgClass(duration, ThreePhaseCommitCohortMessages.CommitTransactionReply.class);
1239
1240                 InOrder inOrder = inOrder(cohort);
1241                 inOrder.verify(cohort).canCommit();
1242                 inOrder.verify(cohort).preCommit();
1243                 inOrder.verify(cohort).commit();
1244
1245                 // Use MBean for verification
1246                 // Committed transaction count should increase as usual
1247                 assertEquals(1, shard.underlyingActor().getShardMBean().getCommittedTransactionsCount());
1248
1249                 // Commit index should advance as we do not have an empty modification
1250                 assertEquals(0, shard.underlyingActor().getShardMBean().getCommitIndex());
1251
1252                 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1253
1254             }
1255         };
1256     }
1257
1258     @Test
1259     public void testCommitPhaseFailure() throws Throwable {
1260         new ShardTestKit(getSystem()) {{
1261             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1262                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1263                     "testCommitPhaseFailure");
1264
1265             waitUntilLeader(shard);
1266
1267             // Setup 2 simulated transactions with mock cohorts. The first one fails in the
1268             // commit phase.
1269
1270             String transactionID1 = "tx1";
1271             MutableCompositeModification modification1 = new MutableCompositeModification();
1272             ShardDataTreeCohort cohort1 = mock(ShardDataTreeCohort.class, "cohort1");
1273             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
1274             doReturn(Futures.immediateFuture(null)).when(cohort1).preCommit();
1275             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort1).commit();
1276             doReturn(mockCandidate("cohort1-candidate")).when(cohort1).getCandidate();
1277
1278             String transactionID2 = "tx2";
1279             MutableCompositeModification modification2 = new MutableCompositeModification();
1280             ShardDataTreeCohort cohort2 = mock(ShardDataTreeCohort.class, "cohort2");
1281             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
1282
1283             FiniteDuration duration = duration("5 seconds");
1284             final Timeout timeout = new Timeout(duration);
1285
1286             // Simulate the ForwardedReadyTransaction messages that would be sent
1287             // by the ShardTransaction.
1288
1289             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1290                     cohort1, modification1, true, false), getRef());
1291             expectMsgClass(duration, ReadyTransactionReply.class);
1292
1293             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1294                     cohort2, modification2, true, false), getRef());
1295             expectMsgClass(duration, ReadyTransactionReply.class);
1296
1297             // Send the CanCommitTransaction message for the first Tx.
1298
1299             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1300             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1301                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1302             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1303
1304             // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
1305             // processed after the first Tx completes.
1306
1307             Future<Object> canCommitFuture = Patterns.ask(shard,
1308                     new CanCommitTransaction(transactionID2).toSerializable(), timeout);
1309
1310             // Send the CommitTransaction message for the first Tx. This should send back an error
1311             // and trigger the 2nd Tx to proceed.
1312
1313             shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
1314             expectMsgClass(duration, akka.actor.Status.Failure.class);
1315
1316             // Wait for the 2nd Tx to complete the canCommit phase.
1317
1318             final CountDownLatch latch = new CountDownLatch(1);
1319             canCommitFuture.onComplete(new OnComplete<Object>() {
1320                 @Override
1321                 public void onComplete(final Throwable t, final Object resp) {
1322                     latch.countDown();
1323                 }
1324             }, getSystem().dispatcher());
1325
1326             assertEquals("2nd CanCommit complete", true, latch.await(5, TimeUnit.SECONDS));
1327
1328             InOrder inOrder = inOrder(cohort1, cohort2);
1329             inOrder.verify(cohort1).canCommit();
1330             inOrder.verify(cohort1).preCommit();
1331             inOrder.verify(cohort1).commit();
1332             inOrder.verify(cohort2).canCommit();
1333
1334             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1335         }};
1336     }
1337
1338     @Test
1339     public void testPreCommitPhaseFailure() throws Throwable {
1340         new ShardTestKit(getSystem()) {{
1341             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1342                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1343                     "testPreCommitPhaseFailure");
1344
1345             waitUntilLeader(shard);
1346
1347             String transactionID1 = "tx1";
1348             MutableCompositeModification modification1 = new MutableCompositeModification();
1349             ShardDataTreeCohort cohort1 = mock(ShardDataTreeCohort.class, "cohort1");
1350             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
1351             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort1).preCommit();
1352
1353             String transactionID2 = "tx2";
1354             MutableCompositeModification modification2 = new MutableCompositeModification();
1355             ShardDataTreeCohort cohort2 = mock(ShardDataTreeCohort.class, "cohort2");
1356             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
1357
1358             FiniteDuration duration = duration("5 seconds");
1359             final Timeout timeout = new Timeout(duration);
1360
1361             // Simulate the ForwardedReadyTransaction messages that would be sent
1362             // by the ShardTransaction.
1363
1364             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1365                     cohort1, modification1, true, false), getRef());
1366             expectMsgClass(duration, ReadyTransactionReply.class);
1367
1368             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1369                     cohort2, modification2, true, false), getRef());
1370             expectMsgClass(duration, ReadyTransactionReply.class);
1371
1372             // Send the CanCommitTransaction message for the first Tx.
1373
1374             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1375             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1376                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1377             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1378
1379             // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
1380             // processed after the first Tx completes.
1381
1382             Future<Object> canCommitFuture = Patterns.ask(shard,
1383                     new CanCommitTransaction(transactionID2).toSerializable(), timeout);
1384
1385             // Send the CommitTransaction message for the first Tx. This should send back an error
1386             // and trigger the 2nd Tx to proceed.
1387
1388             shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
1389             expectMsgClass(duration, akka.actor.Status.Failure.class);
1390
1391             // Wait for the 2nd Tx to complete the canCommit phase.
1392
1393             final CountDownLatch latch = new CountDownLatch(1);
1394             canCommitFuture.onComplete(new OnComplete<Object>() {
1395                 @Override
1396                 public void onComplete(final Throwable t, final Object resp) {
1397                     latch.countDown();
1398                 }
1399             }, getSystem().dispatcher());
1400
1401             assertEquals("2nd CanCommit complete", true, latch.await(5, TimeUnit.SECONDS));
1402
1403             InOrder inOrder = inOrder(cohort1, cohort2);
1404             inOrder.verify(cohort1).canCommit();
1405             inOrder.verify(cohort1).preCommit();
1406             inOrder.verify(cohort2).canCommit();
1407
1408             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1409         }};
1410     }
1411
1412     @Test
1413     public void testCanCommitPhaseFailure() throws Throwable {
1414         new ShardTestKit(getSystem()) {{
1415             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1416                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1417                     "testCanCommitPhaseFailure");
1418
1419             waitUntilLeader(shard);
1420
1421             final FiniteDuration duration = duration("5 seconds");
1422
1423             String transactionID1 = "tx1";
1424             MutableCompositeModification modification = new MutableCompositeModification();
1425             ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1426             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).canCommit();
1427
1428             // Simulate the ForwardedReadyTransaction messages that would be sent
1429             // by the ShardTransaction.
1430
1431             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1432                     cohort, modification, true, false), getRef());
1433             expectMsgClass(duration, ReadyTransactionReply.class);
1434
1435             // Send the CanCommitTransaction message.
1436
1437             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1438             expectMsgClass(duration, akka.actor.Status.Failure.class);
1439
1440             // Send another can commit to ensure the failed one got cleaned up.
1441
1442             reset(cohort);
1443
1444             String transactionID2 = "tx2";
1445             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1446
1447             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1448                     cohort, modification, true, false), getRef());
1449             expectMsgClass(duration, ReadyTransactionReply.class);
1450
1451             shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
1452             CanCommitTransactionReply reply = CanCommitTransactionReply.fromSerializable(
1453                     expectMsgClass(CanCommitTransactionReply.SERIALIZABLE_CLASS));
1454             assertEquals("getCanCommit", true, reply.getCanCommit());
1455
1456             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1457         }};
1458     }
1459
1460     @Test
1461     public void testCanCommitPhaseFalseResponse() throws Throwable {
1462         new ShardTestKit(getSystem()) {{
1463             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1464                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1465                     "testCanCommitPhaseFalseResponse");
1466
1467             waitUntilLeader(shard);
1468
1469             final FiniteDuration duration = duration("5 seconds");
1470
1471             String transactionID1 = "tx1";
1472             MutableCompositeModification modification = new MutableCompositeModification();
1473             ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1474             doReturn(Futures.immediateFuture(Boolean.FALSE)).when(cohort).canCommit();
1475
1476             // Simulate the ForwardedReadyTransaction messages that would be sent
1477             // by the ShardTransaction.
1478
1479             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1480                     cohort, modification, true, false), getRef());
1481             expectMsgClass(duration, ReadyTransactionReply.class);
1482
1483             // Send the CanCommitTransaction message.
1484
1485             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1486             CanCommitTransactionReply reply = CanCommitTransactionReply.fromSerializable(
1487                     expectMsgClass(CanCommitTransactionReply.SERIALIZABLE_CLASS));
1488             assertEquals("getCanCommit", false, reply.getCanCommit());
1489
1490             // Send another can commit to ensure the failed one got cleaned up.
1491
1492             reset(cohort);
1493
1494             String transactionID2 = "tx2";
1495             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1496
1497             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1498                     cohort, modification, true, false), getRef());
1499             expectMsgClass(duration, ReadyTransactionReply.class);
1500
1501             shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
1502             reply = CanCommitTransactionReply.fromSerializable(
1503                     expectMsgClass(CanCommitTransactionReply.SERIALIZABLE_CLASS));
1504             assertEquals("getCanCommit", true, reply.getCanCommit());
1505
1506             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1507         }};
1508     }
1509
1510     @Test
1511     public void testImmediateCommitWithCanCommitPhaseFailure() throws Throwable {
1512         new ShardTestKit(getSystem()) {{
1513             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1514                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1515                     "testImmediateCommitWithCanCommitPhaseFailure");
1516
1517             waitUntilLeader(shard);
1518
1519             final FiniteDuration duration = duration("5 seconds");
1520
1521             String transactionID1 = "tx1";
1522             MutableCompositeModification modification = new MutableCompositeModification();
1523             ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1524             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).canCommit();
1525
1526             // Simulate the ForwardedReadyTransaction messages that would be sent
1527             // by the ShardTransaction.
1528
1529             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1530                     cohort, modification, true, true), getRef());
1531
1532             expectMsgClass(duration, akka.actor.Status.Failure.class);
1533
1534             // Send another can commit to ensure the failed one got cleaned up.
1535
1536             reset(cohort);
1537
1538             String transactionID2 = "tx2";
1539             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1540             doReturn(Futures.immediateFuture(null)).when(cohort).preCommit();
1541             doReturn(Futures.immediateFuture(null)).when(cohort).commit();
1542             DataTreeCandidateTip candidate = mock(DataTreeCandidateTip.class);
1543             DataTreeCandidateNode candidateRoot = mock(DataTreeCandidateNode.class);
1544             doReturn(ModificationType.UNMODIFIED).when(candidateRoot).getModificationType();
1545             doReturn(candidateRoot).when(candidate).getRootNode();
1546             doReturn(candidate).when(cohort).getCandidate();
1547
1548             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1549                     cohort, modification, true, true), getRef());
1550
1551             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1552
1553             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1554         }};
1555     }
1556
1557     @Test
1558     public void testImmediateCommitWithCanCommitPhaseFalseResponse() throws Throwable {
1559         new ShardTestKit(getSystem()) {{
1560             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1561                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1562                     "testImmediateCommitWithCanCommitPhaseFalseResponse");
1563
1564             waitUntilLeader(shard);
1565
1566             final FiniteDuration duration = duration("5 seconds");
1567
1568             String transactionID = "tx1";
1569             MutableCompositeModification modification = new MutableCompositeModification();
1570             ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1571             doReturn(Futures.immediateFuture(Boolean.FALSE)).when(cohort).canCommit();
1572
1573             // Simulate the ForwardedReadyTransaction messages that would be sent
1574             // by the ShardTransaction.
1575
1576             shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
1577                     cohort, modification, true, true), getRef());
1578
1579             expectMsgClass(duration, akka.actor.Status.Failure.class);
1580
1581             // Send another can commit to ensure the failed one got cleaned up.
1582
1583             reset(cohort);
1584
1585             String transactionID2 = "tx2";
1586             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1587             doReturn(Futures.immediateFuture(null)).when(cohort).preCommit();
1588             doReturn(Futures.immediateFuture(null)).when(cohort).commit();
1589             DataTreeCandidateTip candidate = mock(DataTreeCandidateTip.class);
1590             DataTreeCandidateNode candidateRoot = mock(DataTreeCandidateNode.class);
1591             doReturn(ModificationType.UNMODIFIED).when(candidateRoot).getModificationType();
1592             doReturn(candidateRoot).when(candidate).getRootNode();
1593             doReturn(candidate).when(cohort).getCandidate();
1594
1595             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1596                     cohort, modification, true, true), getRef());
1597
1598             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1599
1600             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1601         }};
1602     }
1603
1604     @Test
1605     public void testAbortBeforeFinishCommit() throws Throwable {
1606         new ShardTestKit(getSystem()) {{
1607             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1608                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1609                     "testAbortBeforeFinishCommit");
1610
1611             waitUntilLeader(shard);
1612
1613             final FiniteDuration duration = duration("5 seconds");
1614             ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1615
1616             final String transactionID = "tx1";
1617             Function<ShardDataTreeCohort, ListenableFuture<Void>> preCommit =
1618                           new Function<ShardDataTreeCohort, ListenableFuture<Void>>() {
1619                 @Override
1620                 public ListenableFuture<Void> apply(final ShardDataTreeCohort cohort) {
1621                     ListenableFuture<Void> preCommitFuture = cohort.preCommit();
1622
1623                     // Simulate an AbortTransaction message occurring during replication, after
1624                     // persisting and before finishing the commit to the in-memory store.
1625                     // We have no followers so due to optimizations in the RaftActor, it does not
1626                     // attempt replication and thus we can't send an AbortTransaction message b/c
1627                     // it would be processed too late after CommitTransaction completes. So we'll
1628                     // simulate an AbortTransaction message occurring during replication by calling
1629                     // the shard directly.
1630                     //
1631                     shard.underlyingActor().doAbortTransaction(transactionID, null);
1632
1633                     return preCommitFuture;
1634                 }
1635             };
1636
1637             MutableCompositeModification modification = new MutableCompositeModification();
1638             ShardDataTreeCohort cohort = setupMockWriteTransaction("cohort1", dataStore,
1639                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME),
1640                     modification, preCommit);
1641
1642             shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
1643                     cohort, modification, true, false), getRef());
1644             expectMsgClass(duration, ReadyTransactionReply.class);
1645
1646             shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
1647             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1648                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1649             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1650
1651             shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
1652             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1653
1654             NormalizedNode<?, ?> node = readStore(shard, TestModel.TEST_PATH);
1655
1656             // Since we're simulating an abort occurring during replication and before finish commit,
1657             // the data should still get written to the in-memory store since we've gotten past
1658             // canCommit and preCommit and persisted the data.
1659             assertNotNull(TestModel.TEST_QNAME.getLocalName() + " not found", node);
1660
1661             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1662         }};
1663     }
1664
1665     @Test
1666     public void testTransactionCommitTimeout() throws Throwable {
1667         dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1);
1668
1669         new ShardTestKit(getSystem()) {{
1670             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1671                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1672                     "testTransactionCommitTimeout");
1673
1674             waitUntilLeader(shard);
1675
1676             final FiniteDuration duration = duration("5 seconds");
1677
1678             ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1679
1680             writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1681             writeToStore(shard, TestModel.OUTER_LIST_PATH,
1682                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
1683
1684             // Create 1st Tx - will timeout
1685
1686             String transactionID1 = "tx1";
1687             MutableCompositeModification modification1 = new MutableCompositeModification();
1688             ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
1689                     YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
1690                         .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
1691                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1),
1692                     modification1);
1693
1694             // Create 2nd Tx
1695
1696             String transactionID2 = "tx3";
1697             MutableCompositeModification modification2 = new MutableCompositeModification();
1698             YangInstanceIdentifier listNodePath = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
1699                 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build();
1700             ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort3", dataStore,
1701                     listNodePath,
1702                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2),
1703                     modification2);
1704
1705             // Ready the Tx's
1706
1707             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1708                     cohort1, modification1, true, false), getRef());
1709             expectMsgClass(duration, ReadyTransactionReply.class);
1710
1711             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1712                     cohort2, modification2, true, false), getRef());
1713             expectMsgClass(duration, ReadyTransactionReply.class);
1714
1715             // canCommit 1st Tx. We don't send the commit so it should timeout.
1716
1717             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1718             expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
1719
1720             // canCommit the 2nd Tx - it should complete after the 1st Tx times out.
1721
1722             shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
1723             expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
1724
1725             // Try to commit the 1st Tx - should fail as it's not the current Tx.
1726
1727             shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
1728             expectMsgClass(duration, akka.actor.Status.Failure.class);
1729
1730             // Commit the 2nd Tx.
1731
1732             shard.tell(new CommitTransaction(transactionID2).toSerializable(), getRef());
1733             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1734
1735             NormalizedNode<?, ?> node = readStore(shard, listNodePath);
1736             assertNotNull(listNodePath + " not found", node);
1737
1738             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1739         }};
1740     }
1741
1742     @Test
1743     public void testTransactionCommitQueueCapacityExceeded() throws Throwable {
1744         dataStoreContextBuilder.shardTransactionCommitQueueCapacity(1);
1745
1746         new ShardTestKit(getSystem()) {{
1747             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1748                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1749                     "testTransactionCommitQueueCapacityExceeded");
1750
1751             waitUntilLeader(shard);
1752
1753             final FiniteDuration duration = duration("5 seconds");
1754
1755             ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1756
1757             String transactionID1 = "tx1";
1758             MutableCompositeModification modification1 = new MutableCompositeModification();
1759             ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
1760                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
1761
1762             String transactionID2 = "tx2";
1763             MutableCompositeModification modification2 = new MutableCompositeModification();
1764             ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
1765                     TestModel.OUTER_LIST_PATH,
1766                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
1767                     modification2);
1768
1769             String transactionID3 = "tx3";
1770             MutableCompositeModification modification3 = new MutableCompositeModification();
1771             ShardDataTreeCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
1772                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification3);
1773
1774             // Ready the Tx's
1775
1776             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1777                     cohort1, modification1, true, false), getRef());
1778             expectMsgClass(duration, ReadyTransactionReply.class);
1779
1780             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1781                     cohort2, modification2, true, false), getRef());
1782             expectMsgClass(duration, ReadyTransactionReply.class);
1783
1784             shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
1785                     cohort3, modification3, true, false), getRef());
1786             expectMsgClass(duration, ReadyTransactionReply.class);
1787
1788             // canCommit 1st Tx.
1789
1790             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1791             expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
1792
1793             // canCommit the 2nd Tx - it should get queued.
1794
1795             shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
1796
1797             // canCommit the 3rd Tx - should exceed queue capacity and fail.
1798
1799             shard.tell(new CanCommitTransaction(transactionID3).toSerializable(), getRef());
1800             expectMsgClass(duration, akka.actor.Status.Failure.class);
1801
1802             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1803         }};
1804     }
1805
1806     @Test
1807     public void testCanCommitBeforeReadyFailure() throws Throwable {
1808         new ShardTestKit(getSystem()) {{
1809             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1810                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1811                     "testCanCommitBeforeReadyFailure");
1812
1813             shard.tell(new CanCommitTransaction("tx").toSerializable(), getRef());
1814             expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
1815
1816             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1817         }};
1818     }
1819
1820     @Test
1821     public void testAbortTransaction() throws Throwable {
1822         new ShardTestKit(getSystem()) {{
1823             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1824                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1825                     "testAbortTransaction");
1826
1827             waitUntilLeader(shard);
1828
1829             // Setup 2 simulated transactions with mock cohorts. The first one will be aborted.
1830
1831             String transactionID1 = "tx1";
1832             MutableCompositeModification modification1 = new MutableCompositeModification();
1833             ShardDataTreeCohort cohort1 = mock(ShardDataTreeCohort.class, "cohort1");
1834             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
1835             doReturn(Futures.immediateFuture(null)).when(cohort1).abort();
1836
1837             String transactionID2 = "tx2";
1838             MutableCompositeModification modification2 = new MutableCompositeModification();
1839             ShardDataTreeCohort cohort2 = mock(ShardDataTreeCohort.class, "cohort2");
1840             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
1841
1842             FiniteDuration duration = duration("5 seconds");
1843             final Timeout timeout = new Timeout(duration);
1844
1845             // Simulate the ForwardedReadyTransaction messages that would be sent
1846             // by the ShardTransaction.
1847
1848             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1849                     cohort1, modification1, true, false), getRef());
1850             expectMsgClass(duration, ReadyTransactionReply.class);
1851
1852             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1853                     cohort2, modification2, true, false), getRef());
1854             expectMsgClass(duration, ReadyTransactionReply.class);
1855
1856             // Send the CanCommitTransaction message for the first Tx.
1857
1858             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1859             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1860                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1861             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1862
1863             // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
1864             // processed after the first Tx completes.
1865
1866             Future<Object> canCommitFuture = Patterns.ask(shard,
1867                     new CanCommitTransaction(transactionID2).toSerializable(), timeout);
1868
1869             // Send the AbortTransaction message for the first Tx. This should trigger the 2nd
1870             // Tx to proceed.
1871
1872             shard.tell(new AbortTransaction(transactionID1).toSerializable(), getRef());
1873             expectMsgClass(duration, AbortTransactionReply.SERIALIZABLE_CLASS);
1874
1875             // Wait for the 2nd Tx to complete the canCommit phase.
1876
1877             Await.ready(canCommitFuture, duration);
1878
1879             InOrder inOrder = inOrder(cohort1, cohort2);
1880             inOrder.verify(cohort1).canCommit();
1881             inOrder.verify(cohort2).canCommit();
1882
1883             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1884         }};
1885     }
1886
1887     @Test
1888     public void testCreateSnapshot() throws Exception {
1889         testCreateSnapshot(true, "testCreateSnapshot");
1890     }
1891
1892     @Test
1893     public void testCreateSnapshotWithNonPersistentData() throws Exception {
1894         testCreateSnapshot(false, "testCreateSnapshotWithNonPersistentData");
1895     }
1896
1897     @SuppressWarnings("serial")
1898     public void testCreateSnapshot(final boolean persistent, final String shardActorName) throws Exception{
1899
1900         final AtomicReference<CountDownLatch> latch = new AtomicReference<>(new CountDownLatch(1));
1901
1902         final AtomicReference<Object> savedSnapshot = new AtomicReference<>();
1903         class TestPersistentDataProvider extends DelegatingPersistentDataProvider {
1904             TestPersistentDataProvider(DataPersistenceProvider delegate) {
1905                 super(delegate);
1906             }
1907
1908             @Override
1909             public void saveSnapshot(Object o) {
1910                 savedSnapshot.set(o);
1911                 super.saveSnapshot(o);
1912             }
1913         }
1914
1915         dataStoreContextBuilder.persistent(persistent);
1916
1917         new ShardTestKit(getSystem()) {{
1918             class TestShard extends Shard {
1919
1920                 protected TestShard(ShardIdentifier name, Map<String, String> peerAddresses,
1921                                     DatastoreContext datastoreContext, SchemaContext schemaContext) {
1922                     super(name, peerAddresses, datastoreContext, schemaContext);
1923                     setPersistence(new TestPersistentDataProvider(super.persistence()));
1924                 }
1925
1926                 @Override
1927                 public void handleCommand(Object message) {
1928                     super.handleCommand(message);
1929
1930                     if (message instanceof SaveSnapshotSuccess || message.equals("commit_snapshot")) {
1931                         latch.get().countDown();
1932                     }
1933                 }
1934
1935                 @Override
1936                 public RaftActorContext getRaftActorContext() {
1937                     return super.getRaftActorContext();
1938                 }
1939             }
1940
1941             Creator<Shard> creator = new Creator<Shard>() {
1942                 @Override
1943                 public Shard create() throws Exception {
1944                     return new TestShard(shardID, Collections.<String,String>emptyMap(),
1945                             newDatastoreContext(), SCHEMA_CONTEXT);
1946                 }
1947             };
1948
1949             TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1950                     Props.create(new DelegatingShardCreator(creator)), shardActorName);
1951
1952             waitUntilLeader(shard);
1953
1954             writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1955
1956             NormalizedNode<?,?> expectedRoot = readStore(shard, YangInstanceIdentifier.builder().build());
1957
1958             // Trigger creation of a snapshot by ensuring
1959             RaftActorContext raftActorContext = ((TestShard) shard.underlyingActor()).getRaftActorContext();
1960             raftActorContext.getSnapshotManager().capture(mock(ReplicatedLogEntry.class), -1);
1961
1962             assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS));
1963
1964             assertTrue("Invalid saved snapshot " + savedSnapshot.get(),
1965                     savedSnapshot.get() instanceof Snapshot);
1966
1967             verifySnapshot((Snapshot)savedSnapshot.get(), expectedRoot);
1968
1969             latch.set(new CountDownLatch(1));
1970             savedSnapshot.set(null);
1971
1972             raftActorContext.getSnapshotManager().capture(mock(ReplicatedLogEntry.class), -1);
1973
1974             assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS));
1975
1976             assertTrue("Invalid saved snapshot " + savedSnapshot.get(),
1977                     savedSnapshot.get() instanceof Snapshot);
1978
1979             verifySnapshot((Snapshot)savedSnapshot.get(), expectedRoot);
1980
1981             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1982         }
1983
1984         private void verifySnapshot(Snapshot snapshot, NormalizedNode<?,?> expectedRoot) {
1985
1986             NormalizedNode<?, ?> actual = SerializationUtils.deserializeNormalizedNode(snapshot.getState());
1987             assertEquals("Root node", expectedRoot, actual);
1988
1989         }};
1990     }
1991
1992     /**
1993      * This test simply verifies that the applySnapShot logic will work
1994      * @throws ReadFailedException
1995      * @throws DataValidationFailedException
1996      */
1997     @Test
1998     public void testInMemoryDataTreeRestore() throws ReadFailedException, DataValidationFailedException {
1999         DataTree store = InMemoryDataTreeFactory.getInstance().create();
2000         store.setSchemaContext(SCHEMA_CONTEXT);
2001
2002         DataTreeModification putTransaction = store.takeSnapshot().newModification();
2003         putTransaction.write(TestModel.TEST_PATH,
2004             ImmutableNodes.containerNode(TestModel.TEST_QNAME));
2005         commitTransaction(store, putTransaction);
2006
2007
2008         NormalizedNode<?, ?> expected = readStore(store, YangInstanceIdentifier.builder().build());
2009
2010         DataTreeModification writeTransaction = store.takeSnapshot().newModification();
2011
2012         writeTransaction.delete(YangInstanceIdentifier.builder().build());
2013         writeTransaction.write(YangInstanceIdentifier.builder().build(), expected);
2014
2015         commitTransaction(store, writeTransaction);
2016
2017         NormalizedNode<?, ?> actual = readStore(store, YangInstanceIdentifier.builder().build());
2018
2019         assertEquals(expected, actual);
2020     }
2021
2022     @Test
2023     public void testRecoveryApplicable(){
2024
2025         final DatastoreContext persistentContext = DatastoreContext.newBuilder().
2026                 shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(true).build();
2027
2028         final Props persistentProps = Shard.props(shardID, Collections.<String, String>emptyMap(),
2029                 persistentContext, SCHEMA_CONTEXT);
2030
2031         final DatastoreContext nonPersistentContext = DatastoreContext.newBuilder().
2032                 shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(false).build();
2033
2034         final Props nonPersistentProps = Shard.props(shardID, Collections.<String, String>emptyMap(),
2035                 nonPersistentContext, SCHEMA_CONTEXT);
2036
2037         new ShardTestKit(getSystem()) {{
2038             TestActorRef<Shard> shard1 = TestActorRef.create(getSystem(),
2039                     persistentProps, "testPersistence1");
2040
2041             assertTrue("Recovery Applicable", shard1.underlyingActor().persistence().isRecoveryApplicable());
2042
2043             shard1.tell(PoisonPill.getInstance(), ActorRef.noSender());
2044
2045             TestActorRef<Shard> shard2 = TestActorRef.create(getSystem(),
2046                     nonPersistentProps, "testPersistence2");
2047
2048             assertFalse("Recovery Not Applicable", shard2.underlyingActor().persistence().isRecoveryApplicable());
2049
2050             shard2.tell(PoisonPill.getInstance(), ActorRef.noSender());
2051
2052         }};
2053
2054     }
2055
2056     @Test
2057     public void testOnDatastoreContext() {
2058         new ShardTestKit(getSystem()) {{
2059             dataStoreContextBuilder.persistent(true);
2060
2061             TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(), "testOnDatastoreContext");
2062
2063             assertEquals("isRecoveryApplicable", true,
2064                     shard.underlyingActor().persistence().isRecoveryApplicable());
2065
2066             waitUntilLeader(shard);
2067
2068             shard.tell(dataStoreContextBuilder.persistent(false).build(), ActorRef.noSender());
2069
2070             assertEquals("isRecoveryApplicable", false,
2071                     shard.underlyingActor().persistence().isRecoveryApplicable());
2072
2073             shard.tell(dataStoreContextBuilder.persistent(true).build(), ActorRef.noSender());
2074
2075             assertEquals("isRecoveryApplicable", true,
2076                     shard.underlyingActor().persistence().isRecoveryApplicable());
2077
2078             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2079         }};
2080     }
2081
2082     @Test
2083     public void testRegisterRoleChangeListener() throws Exception {
2084         new ShardTestKit(getSystem()) {
2085             {
2086                 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
2087                         newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
2088                         "testRegisterRoleChangeListener");
2089
2090                 waitUntilLeader(shard);
2091
2092                 TestActorRef<MessageCollectorActor> listener =
2093                         TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
2094
2095                 shard.tell(new RegisterRoleChangeListener(), listener);
2096
2097                 MessageCollectorActor.expectFirstMatching(listener, RegisterRoleChangeListenerReply.class);
2098
2099                 ShardLeaderStateChanged leaderStateChanged = MessageCollectorActor.expectFirstMatching(listener,
2100                         ShardLeaderStateChanged.class);
2101                 assertEquals("getLocalShardDataTree present", true,
2102                         leaderStateChanged.getLocalShardDataTree().isPresent());
2103                 assertSame("getLocalShardDataTree", shard.underlyingActor().getDataStore().getDataTree(),
2104                         leaderStateChanged.getLocalShardDataTree().get());
2105
2106                 MessageCollectorActor.clearMessages(listener);
2107
2108                 // Force a leader change
2109
2110                 shard.tell(new RequestVote(10000, "member2", 50, 50), getRef());
2111
2112                 leaderStateChanged = MessageCollectorActor.expectFirstMatching(listener,
2113                         ShardLeaderStateChanged.class);
2114                 assertEquals("getLocalShardDataTree present", false,
2115                         leaderStateChanged.getLocalShardDataTree().isPresent());
2116
2117                 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2118             }
2119         };
2120     }
2121
2122     @Test
2123     public void testFollowerInitialSyncStatus() throws Exception {
2124         final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
2125                 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
2126                 "testFollowerInitialSyncStatus");
2127
2128         shard.underlyingActor().onReceiveCommand(new FollowerInitialSyncUpStatus(false, "member-1-shard-inventory-operational"));
2129
2130         assertEquals(false, shard.underlyingActor().getShardMBean().getFollowerInitialSyncStatus());
2131
2132         shard.underlyingActor().onReceiveCommand(new FollowerInitialSyncUpStatus(true, "member-1-shard-inventory-operational"));
2133
2134         assertEquals(true, shard.underlyingActor().getShardMBean().getFollowerInitialSyncStatus());
2135
2136         shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2137     }
2138
2139     private static void commitTransaction(DataTree store, final DataTreeModification modification) throws DataValidationFailedException {
2140         modification.ready();
2141         store.validate(modification);
2142         store.commit(store.prepare(modification));
2143     }
2144 }