Convert all tests that use ForwardReadyTransaction to use BatchedModifications
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / ShardTest.java
1 /*
2  * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertFalse;
13 import static org.junit.Assert.assertNotNull;
14 import static org.junit.Assert.assertNull;
15 import static org.junit.Assert.assertSame;
16 import static org.junit.Assert.assertTrue;
17 import static org.mockito.Mockito.doReturn;
18 import static org.mockito.Mockito.inOrder;
19 import static org.mockito.Mockito.mock;
20 import static org.mockito.Mockito.reset;
21 import static org.mockito.Mockito.verify;
22 import static org.opendaylight.controller.cluster.datastore.DataStoreVersions.CURRENT_VERSION;
23 import akka.actor.ActorRef;
24 import akka.actor.ActorSelection;
25 import akka.actor.PoisonPill;
26 import akka.actor.Props;
27 import akka.actor.Status.Failure;
28 import akka.dispatch.Dispatchers;
29 import akka.dispatch.OnComplete;
30 import akka.japi.Creator;
31 import akka.pattern.Patterns;
32 import akka.persistence.SaveSnapshotSuccess;
33 import akka.testkit.TestActorRef;
34 import akka.util.Timeout;
35 import com.google.common.base.Function;
36 import com.google.common.base.Optional;
37 import com.google.common.util.concurrent.Futures;
38 import com.google.common.util.concurrent.ListenableFuture;
39 import com.google.common.util.concurrent.Uninterruptibles;
40 import java.io.IOException;
41 import java.util.Collections;
42 import java.util.HashSet;
43 import java.util.Set;
44 import java.util.concurrent.CountDownLatch;
45 import java.util.concurrent.TimeUnit;
46 import java.util.concurrent.atomic.AtomicBoolean;
47 import java.util.concurrent.atomic.AtomicReference;
48 import org.junit.Test;
49 import org.mockito.InOrder;
50 import org.opendaylight.controller.cluster.DataPersistenceProvider;
51 import org.opendaylight.controller.cluster.DelegatingPersistentDataProvider;
52 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
53 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
54 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
55 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
56 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
57 import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
58 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
59 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
60 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
61 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
62 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
63 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
64 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
65 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
66 import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction;
67 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
68 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
69 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
70 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener;
71 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListenerReply;
72 import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
73 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
74 import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
75 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
76 import org.opendaylight.controller.cluster.datastore.modification.Modification;
77 import org.opendaylight.controller.cluster.datastore.modification.ModificationPayload;
78 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
79 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
80 import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener;
81 import org.opendaylight.controller.cluster.datastore.utils.MockDataTreeChangeListener;
82 import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
83 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
84 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListenerReply;
85 import org.opendaylight.controller.cluster.raft.RaftActorContext;
86 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
87 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
88 import org.opendaylight.controller.cluster.raft.Snapshot;
89 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
90 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
91 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
92 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
93 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
94 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
95 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
96 import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
97 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
98 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
99 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
100 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
101 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
102 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
103 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
104 import org.opendaylight.controller.protobuff.messages.cohort3pc.ThreePhaseCommitCohortMessages;
105 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
106 import org.opendaylight.yangtools.yang.common.QName;
107 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
108 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
109 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
110 import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
111 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
112 import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
113 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
114 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
115 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
116 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateNode;
117 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateTip;
118 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidates;
119 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
120 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
121 import org.opendaylight.yangtools.yang.data.api.schema.tree.ModificationType;
122 import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType;
123 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
124 import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
125 import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory;
126 import scala.concurrent.Await;
127 import scala.concurrent.Future;
128 import scala.concurrent.duration.FiniteDuration;
129
130 public class ShardTest extends AbstractShardTest {
131     private static final QName CARS_QNAME = QName.create("urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:test:cars", "2014-03-13", "cars");
132
133     private static final String DUMMY_DATA = "Dummy data as snapshot sequence number is set to 0 in InMemorySnapshotStore and journal recovery seq number will start from 1";
134
135     @Test
136     public void testRegisterChangeListener() throws Exception {
137         new ShardTestKit(getSystem()) {{
138             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
139                     newShardProps(),  "testRegisterChangeListener");
140
141             waitUntilLeader(shard);
142
143             shard.tell(new UpdateSchemaContext(SchemaContextHelper.full()), ActorRef.noSender());
144
145             final MockDataChangeListener listener = new MockDataChangeListener(1);
146             final ActorRef dclActor = getSystem().actorOf(DataChangeListener.props(listener),
147                     "testRegisterChangeListener-DataChangeListener");
148
149             shard.tell(new RegisterChangeListener(TestModel.TEST_PATH,
150                     dclActor, AsyncDataBroker.DataChangeScope.BASE, true), getRef());
151
152             final RegisterChangeListenerReply reply = expectMsgClass(duration("3 seconds"),
153                     RegisterChangeListenerReply.class);
154             final String replyPath = reply.getListenerRegistrationPath().toString();
155             assertTrue("Incorrect reply path: " + replyPath, replyPath.matches(
156                     "akka:\\/\\/test\\/user\\/testRegisterChangeListener\\/\\$.*"));
157
158             final YangInstanceIdentifier path = TestModel.TEST_PATH;
159             writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
160
161             listener.waitForChangeEvents(path);
162
163             dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
164             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
165         }};
166     }
167
168     @SuppressWarnings("serial")
169     @Test
170     public void testChangeListenerNotifiedWhenNotTheLeaderOnRegistration() throws Exception {
171         // This test tests the timing window in which a change listener is registered before the
172         // shard becomes the leader. We verify that the listener is registered and notified of the
173         // existing data when the shard becomes the leader.
174         new ShardTestKit(getSystem()) {{
175             // For this test, we want to send the RegisterChangeListener message after the shard
176             // has recovered from persistence and before it becomes the leader. So we subclass
177             // Shard to override onReceiveCommand and, when the first ElectionTimeout is received,
178             // we know that the shard has been initialized to a follower and has started the
179             // election process. The following 2 CountDownLatches are used to coordinate the
180             // ElectionTimeout with the sending of the RegisterChangeListener message.
181             final CountDownLatch onFirstElectionTimeout = new CountDownLatch(1);
182             final CountDownLatch onChangeListenerRegistered = new CountDownLatch(1);
183             final Creator<Shard> creator = new Creator<Shard>() {
184                 boolean firstElectionTimeout = true;
185
186                 @Override
187                 public Shard create() throws Exception {
188                     // Use a non persistent provider because this test actually invokes persist on the journal
189                     // this will cause all other messages to not be queued properly after that.
190                     // The basic issue is that you cannot use TestActorRef with a persistent actor (at least when
191                     // it does do a persist)
192                     return new Shard(newShardBuilder()) {
193                         @Override
194                         public void onReceiveCommand(final Object message) throws Exception {
195                             if(message instanceof ElectionTimeout && firstElectionTimeout) {
196                                 // Got the first ElectionTimeout. We don't forward it to the
197                                 // base Shard yet until we've sent the RegisterChangeListener
198                                 // message. So we signal the onFirstElectionTimeout latch to tell
199                                 // the main thread to send the RegisterChangeListener message and
200                                 // start a thread to wait on the onChangeListenerRegistered latch,
201                                 // which the main thread signals after it has sent the message.
202                                 // After the onChangeListenerRegistered is triggered, we send the
203                                 // original ElectionTimeout message to proceed with the election.
204                                 firstElectionTimeout = false;
205                                 final ActorRef self = getSelf();
206                                 new Thread() {
207                                     @Override
208                                     public void run() {
209                                         Uninterruptibles.awaitUninterruptibly(
210                                                 onChangeListenerRegistered, 5, TimeUnit.SECONDS);
211                                         self.tell(message, self);
212                                     }
213                                 }.start();
214
215                                 onFirstElectionTimeout.countDown();
216                             } else {
217                                 super.onReceiveCommand(message);
218                             }
219                         }
220                     };
221                 }
222             };
223
224             final MockDataChangeListener listener = new MockDataChangeListener(1);
225             final ActorRef dclActor = getSystem().actorOf(DataChangeListener.props(listener),
226                     "testRegisterChangeListenerWhenNotLeaderInitially-DataChangeListener");
227
228             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
229                     Props.create(new DelegatingShardCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()),
230                     "testRegisterChangeListenerWhenNotLeaderInitially");
231
232             // Write initial data into the in-memory store.
233             final YangInstanceIdentifier path = TestModel.TEST_PATH;
234             writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
235
236             // Wait until the shard receives the first ElectionTimeout message.
237             assertEquals("Got first ElectionTimeout", true,
238                     onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
239
240             // Now send the RegisterChangeListener and wait for the reply.
241             shard.tell(new RegisterChangeListener(path, dclActor,
242                     AsyncDataBroker.DataChangeScope.SUBTREE, false), getRef());
243
244             final RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
245                     RegisterChangeListenerReply.class);
246             assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
247
248             // Sanity check - verify the shard is not the leader yet.
249             shard.tell(new FindLeader(), getRef());
250             final FindLeaderReply findLeadeReply =
251                     expectMsgClass(duration("5 seconds"), FindLeaderReply.class);
252             assertNull("Expected the shard not to be the leader", findLeadeReply.getLeaderActor());
253
254             // Signal the onChangeListenerRegistered latch to tell the thread above to proceed
255             // with the election process.
256             onChangeListenerRegistered.countDown();
257
258             // Wait for the shard to become the leader and notify our listener with the existing
259             // data in the store.
260             listener.waitForChangeEvents(path);
261
262             dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
263             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
264         }};
265     }
266
267     @Test
268     public void testRegisterDataTreeChangeListener() throws Exception {
269         new ShardTestKit(getSystem()) {{
270             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
271                     newShardProps(), "testRegisterDataTreeChangeListener");
272
273             waitUntilLeader(shard);
274
275             shard.tell(new UpdateSchemaContext(SchemaContextHelper.full()), ActorRef.noSender());
276
277             final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
278             final ActorRef dclActor = getSystem().actorOf(DataTreeChangeListenerActor.props(listener),
279                     "testRegisterDataTreeChangeListener-DataTreeChangeListener");
280
281             shard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor, false), getRef());
282
283             final RegisterDataTreeChangeListenerReply reply = expectMsgClass(duration("3 seconds"),
284                     RegisterDataTreeChangeListenerReply.class);
285             final String replyPath = reply.getListenerRegistrationPath().toString();
286             assertTrue("Incorrect reply path: " + replyPath, replyPath.matches(
287                     "akka:\\/\\/test\\/user\\/testRegisterDataTreeChangeListener\\/\\$.*"));
288
289             final YangInstanceIdentifier path = TestModel.TEST_PATH;
290             writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
291
292             listener.waitForChangeEvents();
293
294             dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
295             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
296         }};
297     }
298
299     @SuppressWarnings("serial")
300     @Test
301     public void testDataTreeChangeListenerNotifiedWhenNotTheLeaderOnRegistration() throws Exception {
302         new ShardTestKit(getSystem()) {{
303             final CountDownLatch onFirstElectionTimeout = new CountDownLatch(1);
304             final CountDownLatch onChangeListenerRegistered = new CountDownLatch(1);
305             final Creator<Shard> creator = new Creator<Shard>() {
306                 boolean firstElectionTimeout = true;
307
308                 @Override
309                 public Shard create() throws Exception {
310                     return new Shard(Shard.builder().id(shardID).datastoreContext(
311                             dataStoreContextBuilder.persistent(false).build()).schemaContext(SCHEMA_CONTEXT)) {
312                         @Override
313                         public void onReceiveCommand(final Object message) throws Exception {
314                             if(message instanceof ElectionTimeout && firstElectionTimeout) {
315                                 firstElectionTimeout = false;
316                                 final ActorRef self = getSelf();
317                                 new Thread() {
318                                     @Override
319                                     public void run() {
320                                         Uninterruptibles.awaitUninterruptibly(
321                                                 onChangeListenerRegistered, 5, TimeUnit.SECONDS);
322                                         self.tell(message, self);
323                                     }
324                                 }.start();
325
326                                 onFirstElectionTimeout.countDown();
327                             } else {
328                                 super.onReceiveCommand(message);
329                             }
330                         }
331                     };
332                 }
333             };
334
335             final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
336             final ActorRef dclActor = getSystem().actorOf(DataTreeChangeListenerActor.props(listener),
337                     "testDataTreeChangeListenerNotifiedWhenNotTheLeaderOnRegistration-DataChangeListener");
338
339             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
340                     Props.create(new DelegatingShardCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()),
341                     "testDataTreeChangeListenerNotifiedWhenNotTheLeaderOnRegistration");
342
343             final YangInstanceIdentifier path = TestModel.TEST_PATH;
344             writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
345
346             assertEquals("Got first ElectionTimeout", true,
347                 onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
348
349             shard.tell(new RegisterDataTreeChangeListener(path, dclActor, false), getRef());
350             final RegisterDataTreeChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
351                 RegisterDataTreeChangeListenerReply.class);
352             assertNotNull("getListenerRegistratioznPath", reply.getListenerRegistrationPath());
353
354             shard.tell(new FindLeader(), getRef());
355             final FindLeaderReply findLeadeReply =
356                     expectMsgClass(duration("5 seconds"), FindLeaderReply.class);
357             assertNull("Expected the shard not to be the leader", findLeadeReply.getLeaderActor());
358
359             writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
360
361             onChangeListenerRegistered.countDown();
362
363             // TODO: investigate why we do not receive data chage events
364             listener.waitForChangeEvents();
365
366             dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
367             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
368         }};
369     }
370
371     @Test
372     public void testCreateTransaction(){
373         new ShardTestKit(getSystem()) {{
374             final ActorRef shard = getSystem().actorOf(newShardProps(), "testCreateTransaction");
375
376             waitUntilLeader(shard);
377
378             shard.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
379
380             shard.tell(new CreateTransaction("txn-1",
381                     TransactionType.READ_ONLY.ordinal() ).toSerializable(), getRef());
382
383             final CreateTransactionReply reply = expectMsgClass(duration("3 seconds"),
384                     CreateTransactionReply.class);
385
386             final String path = reply.getTransactionActorPath().toString();
387             assertTrue("Unexpected transaction path " + path,
388                     path.contains("akka://test/user/testCreateTransaction/shard-txn-1"));
389
390             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
391         }};
392     }
393
394     @Test
395     public void testCreateTransactionOnChain(){
396         new ShardTestKit(getSystem()) {{
397             final ActorRef shard = getSystem().actorOf(newShardProps(), "testCreateTransactionOnChain");
398
399             waitUntilLeader(shard);
400
401             shard.tell(new CreateTransaction("txn-1",
402                     TransactionType.READ_ONLY.ordinal() , "foobar").toSerializable(),
403                     getRef());
404
405             final CreateTransactionReply reply = expectMsgClass(duration("3 seconds"),
406                     CreateTransactionReply.class);
407
408             final String path = reply.getTransactionActorPath().toString();
409             assertTrue("Unexpected transaction path " + path,
410                     path.contains("akka://test/user/testCreateTransactionOnChain/shard-txn-1"));
411
412             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
413         }};
414     }
415
416     @SuppressWarnings("serial")
417     @Test
418     public void testPeerAddressResolved() throws Exception {
419         new ShardTestKit(getSystem()) {{
420             final CountDownLatch recoveryComplete = new CountDownLatch(1);
421             class TestShard extends Shard {
422                 TestShard() {
423                     super(Shard.builder().id(shardID).datastoreContext(newDatastoreContext()).
424                             peerAddresses(Collections.<String, String>singletonMap(shardID.toString(), null)).
425                             schemaContext(SCHEMA_CONTEXT));
426                 }
427
428                 String getPeerAddress(String id) {
429                     return getRaftActorContext().getPeerAddress(id);
430                 }
431
432                 @Override
433                 protected void onRecoveryComplete() {
434                     try {
435                         super.onRecoveryComplete();
436                     } finally {
437                         recoveryComplete.countDown();
438                     }
439                 }
440             }
441
442             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
443                     Props.create(new DelegatingShardCreator(new Creator<Shard>() {
444                         @Override
445                         public TestShard create() throws Exception {
446                             return new TestShard();
447                         }
448                     })), "testPeerAddressResolved");
449
450             assertEquals("Recovery complete", true,
451                 Uninterruptibles.awaitUninterruptibly(recoveryComplete, 5, TimeUnit.SECONDS));
452
453             final String address = "akka://foobar";
454             shard.underlyingActor().onReceiveCommand(new PeerAddressResolved(shardID.toString(), address));
455
456             assertEquals("getPeerAddress", address,
457                 ((TestShard) shard.underlyingActor()).getPeerAddress(shardID.toString()));
458
459             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
460         }};
461     }
462
463     @Test
464     public void testApplySnapshot() throws Exception {
465
466         ShardTestKit testkit = new ShardTestKit(getSystem());
467
468         final TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(),
469                 "testApplySnapshot");
470
471         testkit.waitUntilLeader(shard);
472
473         final DataTree store = InMemoryDataTreeFactory.getInstance().create(TreeType.OPERATIONAL);
474         store.setSchemaContext(SCHEMA_CONTEXT);
475
476         final ContainerNode container = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
477                 new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
478                     withChild(ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).addChild(
479                         ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1)).build()).build();
480
481         writeToStore(store, TestModel.TEST_PATH, container);
482
483         final YangInstanceIdentifier root = YangInstanceIdentifier.builder().build();
484         final NormalizedNode<?,?> expected = readStore(store, root);
485
486         final Snapshot snapshot = Snapshot.create(SerializationUtils.serializeNormalizedNode(expected),
487                 Collections.<ReplicatedLogEntry>emptyList(), 1, 2, 3, 4);
488
489         shard.underlyingActor().getRaftActorSnapshotCohort().applySnapshot(snapshot.getState());
490
491         final NormalizedNode<?,?> actual = readStore(shard, root);
492
493         assertEquals("Root node", expected, actual);
494
495         shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
496     }
497
498     @Test
499     public void testApplyState() throws Exception {
500
501         ShardTestKit testkit = new ShardTestKit(getSystem());
502
503         final TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(), "testApplyState");
504
505         testkit.waitUntilLeader(shard);
506
507         final NormalizedNode<?, ?> node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
508
509         final ApplyState applyState = new ApplyState(null, "test", new ReplicatedLogImplEntry(1, 2,
510                 newModificationPayload(new WriteModification(TestModel.TEST_PATH, node))));
511
512         shard.underlyingActor().onReceiveCommand(applyState);
513
514         final NormalizedNode<?,?> actual = readStore(shard, TestModel.TEST_PATH);
515         assertEquals("Applied state", node, actual);
516
517         shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
518     }
519
520     @Test
521     public void testApplyStateWithCandidatePayload() throws Exception {
522
523         ShardTestKit testkit = new ShardTestKit(getSystem());
524
525         final TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(), "testApplyState");
526
527         testkit.waitUntilLeader(shard);
528
529         final NormalizedNode<?, ?> node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
530         final DataTreeCandidate candidate = DataTreeCandidates.fromNormalizedNode(TestModel.TEST_PATH, node);
531
532         final ApplyState applyState = new ApplyState(null, "test", new ReplicatedLogImplEntry(1, 2,
533                 DataTreeCandidatePayload.create(candidate)));
534
535         shard.underlyingActor().onReceiveCommand(applyState);
536
537         final NormalizedNode<?,?> actual = readStore(shard, TestModel.TEST_PATH);
538         assertEquals("Applied state", node, actual);
539
540         shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
541     }
542
543     DataTree setupInMemorySnapshotStore() throws DataValidationFailedException {
544         final DataTree testStore = InMemoryDataTreeFactory.getInstance().create(TreeType.OPERATIONAL);
545         testStore.setSchemaContext(SCHEMA_CONTEXT);
546
547         writeToStore(testStore, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
548
549         final NormalizedNode<?, ?> root = readStore(testStore, YangInstanceIdentifier.builder().build());
550
551         InMemorySnapshotStore.addSnapshot(shardID.toString(), Snapshot.create(
552                 SerializationUtils.serializeNormalizedNode(root),
553                 Collections.<ReplicatedLogEntry>emptyList(), 0, 1, -1, -1));
554         return testStore;
555     }
556
557     private static DataTreeCandidatePayload payloadForModification(final DataTree source, final DataTreeModification mod) throws DataValidationFailedException {
558         source.validate(mod);
559         final DataTreeCandidate candidate = source.prepare(mod);
560         source.commit(candidate);
561         return DataTreeCandidatePayload.create(candidate);
562     }
563
564     @Test
565     public void testDataTreeCandidateRecovery() throws Exception {
566         // Set up the InMemorySnapshotStore.
567         final DataTree source = setupInMemorySnapshotStore();
568
569         final DataTreeModification writeMod = source.takeSnapshot().newModification();
570         writeMod.write(TestModel.OUTER_LIST_PATH, ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
571         writeMod.ready();
572         InMemoryJournal.addEntry(shardID.toString(), 0, DUMMY_DATA);
573
574         // Set up the InMemoryJournal.
575         InMemoryJournal.addEntry(shardID.toString(), 1, new ReplicatedLogImplEntry(0, 1, payloadForModification(source, writeMod)));
576
577         final int nListEntries = 16;
578         final Set<Integer> listEntryKeys = new HashSet<>();
579
580         // Add some ModificationPayload entries
581         for (int i = 1; i <= nListEntries; i++) {
582             listEntryKeys.add(Integer.valueOf(i));
583
584             final YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
585                     .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
586
587             final DataTreeModification mod = source.takeSnapshot().newModification();
588             mod.merge(path, ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i));
589             mod.ready();
590             InMemoryJournal.addEntry(shardID.toString(), i+1, new ReplicatedLogImplEntry(i, 1,
591                 payloadForModification(source, mod)));
592         }
593
594         InMemoryJournal.addEntry(shardID.toString(), nListEntries + 2,
595             new ApplyJournalEntries(nListEntries));
596
597         testRecovery(listEntryKeys);
598     }
599
600     @Test
601     public void testModicationRecovery() throws Exception {
602
603         // Set up the InMemorySnapshotStore.
604         setupInMemorySnapshotStore();
605
606         // Set up the InMemoryJournal.
607
608         InMemoryJournal.addEntry(shardID.toString(), 0, DUMMY_DATA);
609
610         InMemoryJournal.addEntry(shardID.toString(), 1, new ReplicatedLogImplEntry(0, 1, newModificationPayload(
611             new WriteModification(TestModel.OUTER_LIST_PATH,
612                 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build()))));
613
614         final int nListEntries = 16;
615         final Set<Integer> listEntryKeys = new HashSet<>();
616
617         // Add some ModificationPayload entries
618         for(int i = 1; i <= nListEntries; i++) {
619             listEntryKeys.add(Integer.valueOf(i));
620             final YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
621                     .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
622             final Modification mod = new MergeModification(path,
623                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i));
624             InMemoryJournal.addEntry(shardID.toString(), i + 1, new ReplicatedLogImplEntry(i, 1,
625                     newModificationPayload(mod)));
626         }
627
628         InMemoryJournal.addEntry(shardID.toString(), nListEntries + 2,
629                 new ApplyJournalEntries(nListEntries));
630
631         testRecovery(listEntryKeys);
632     }
633
634     private static ModificationPayload newModificationPayload(final Modification... mods) throws IOException {
635         final MutableCompositeModification compMod = new MutableCompositeModification();
636         for(final Modification mod: mods) {
637             compMod.addModification(mod);
638         }
639
640         return new ModificationPayload(compMod);
641     }
642
643     @Test
644     public void testConcurrentThreePhaseCommits() throws Throwable {
645         new ShardTestKit(getSystem()) {{
646             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
647                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
648                     "testConcurrentThreePhaseCommits");
649
650             waitUntilLeader(shard);
651
652          // Setup 3 simulated transactions with mock cohorts backed by real cohorts.
653
654             final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
655
656             final String transactionID1 = "tx1";
657             final MutableCompositeModification modification1 = new MutableCompositeModification();
658             final ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
659                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
660
661             final String transactionID2 = "tx2";
662             final MutableCompositeModification modification2 = new MutableCompositeModification();
663             final ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
664                     TestModel.OUTER_LIST_PATH,
665                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
666                     modification2);
667
668             final String transactionID3 = "tx3";
669             final MutableCompositeModification modification3 = new MutableCompositeModification();
670             final ShardDataTreeCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
671                     YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
672                         .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
673                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1),
674                     modification3);
675
676             final long timeoutSec = 5;
677             final FiniteDuration duration = FiniteDuration.create(timeoutSec, TimeUnit.SECONDS);
678             final Timeout timeout = new Timeout(duration);
679
680             shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort1, transactionID1, modification1), getRef());
681             final ReadyTransactionReply readyReply = ReadyTransactionReply.fromSerializable(
682                     expectMsgClass(duration, ReadyTransactionReply.class));
683             assertEquals("Cohort path", shard.path().toString(), readyReply.getCohortPath());
684
685             // Send the CanCommitTransaction message for the first Tx.
686
687             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
688             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
689                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
690             assertEquals("Can commit", true, canCommitReply.getCanCommit());
691
692             shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort2, transactionID2, modification2), getRef());
693             expectMsgClass(duration, ReadyTransactionReply.class);
694
695             shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort3, transactionID3, modification3), getRef());
696             expectMsgClass(duration, ReadyTransactionReply.class);
697
698             // Send the CanCommitTransaction message for the next 2 Tx's. These should get queued and
699             // processed after the first Tx completes.
700
701             final Future<Object> canCommitFuture1 = Patterns.ask(shard,
702                     new CanCommitTransaction(transactionID2).toSerializable(), timeout);
703
704             final Future<Object> canCommitFuture2 = Patterns.ask(shard,
705                     new CanCommitTransaction(transactionID3).toSerializable(), timeout);
706
707             // Send the CommitTransaction message for the first Tx. After it completes, it should
708             // trigger the 2nd Tx to proceed which should in turn then trigger the 3rd.
709
710             shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
711             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
712
713             // Wait for the next 2 Tx's to complete.
714
715             final AtomicReference<Throwable> caughtEx = new AtomicReference<>();
716             final CountDownLatch commitLatch = new CountDownLatch(2);
717
718             class OnFutureComplete extends OnComplete<Object> {
719                 private final Class<?> expRespType;
720
721                 OnFutureComplete(final Class<?> expRespType) {
722                     this.expRespType = expRespType;
723                 }
724
725                 @Override
726                 public void onComplete(final Throwable error, final Object resp) {
727                     if(error != null) {
728                         caughtEx.set(new AssertionError(getClass().getSimpleName() + " failure", error));
729                     } else {
730                         try {
731                             assertEquals("Commit response type", expRespType, resp.getClass());
732                             onSuccess(resp);
733                         } catch (final Exception e) {
734                             caughtEx.set(e);
735                         }
736                     }
737                 }
738
739                 void onSuccess(final Object resp) throws Exception {
740                 }
741             }
742
743             class OnCommitFutureComplete extends OnFutureComplete {
744                 OnCommitFutureComplete() {
745                     super(CommitTransactionReply.SERIALIZABLE_CLASS);
746                 }
747
748                 @Override
749                 public void onComplete(final Throwable error, final Object resp) {
750                     super.onComplete(error, resp);
751                     commitLatch.countDown();
752                 }
753             }
754
755             class OnCanCommitFutureComplete extends OnFutureComplete {
756                 private final String transactionID;
757
758                 OnCanCommitFutureComplete(final String transactionID) {
759                     super(CanCommitTransactionReply.SERIALIZABLE_CLASS);
760                     this.transactionID = transactionID;
761                 }
762
763                 @Override
764                 void onSuccess(final Object resp) throws Exception {
765                     final CanCommitTransactionReply canCommitReply =
766                             CanCommitTransactionReply.fromSerializable(resp);
767                     assertEquals("Can commit", true, canCommitReply.getCanCommit());
768
769                     final Future<Object> commitFuture = Patterns.ask(shard,
770                             new CommitTransaction(transactionID).toSerializable(), timeout);
771                     commitFuture.onComplete(new OnCommitFutureComplete(), getSystem().dispatcher());
772                 }
773             }
774
775             canCommitFuture1.onComplete(new OnCanCommitFutureComplete(transactionID2),
776                     getSystem().dispatcher());
777
778             canCommitFuture2.onComplete(new OnCanCommitFutureComplete(transactionID3),
779                     getSystem().dispatcher());
780
781             final boolean done = commitLatch.await(timeoutSec, TimeUnit.SECONDS);
782
783             if(caughtEx.get() != null) {
784                 throw caughtEx.get();
785             }
786
787             assertEquals("Commits complete", true, done);
788
789             final InOrder inOrder = inOrder(cohort1, cohort2, cohort3);
790             inOrder.verify(cohort1).canCommit();
791             inOrder.verify(cohort1).preCommit();
792             inOrder.verify(cohort1).commit();
793             inOrder.verify(cohort2).canCommit();
794             inOrder.verify(cohort2).preCommit();
795             inOrder.verify(cohort2).commit();
796             inOrder.verify(cohort3).canCommit();
797             inOrder.verify(cohort3).preCommit();
798             inOrder.verify(cohort3).commit();
799
800             // Verify data in the data store.
801
802             verifyOuterListEntry(shard, 1);
803
804             verifyLastApplied(shard, 2);
805
806             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
807         }};
808     }
809
810     private static BatchedModifications newBatchedModifications(final String transactionID, final YangInstanceIdentifier path,
811             final NormalizedNode<?, ?> data, final boolean ready, final boolean doCommitOnReady, final int messagesSent) {
812         return newBatchedModifications(transactionID, null, path, data, ready, doCommitOnReady, messagesSent);
813     }
814
815     private static BatchedModifications newBatchedModifications(final String transactionID, final String transactionChainID,
816             final YangInstanceIdentifier path, final NormalizedNode<?, ?> data, final boolean ready, final boolean doCommitOnReady,
817             final int messagesSent) {
818         final BatchedModifications batched = new BatchedModifications(transactionID, CURRENT_VERSION, transactionChainID);
819         batched.addModification(new WriteModification(path, data));
820         batched.setReady(ready);
821         batched.setDoCommitOnReady(doCommitOnReady);
822         batched.setTotalMessagesSent(messagesSent);
823         return batched;
824     }
825
826     @Test
827     public void testBatchedModificationsWithNoCommitOnReady() throws Throwable {
828         new ShardTestKit(getSystem()) {{
829             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
830                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
831                     "testBatchedModificationsWithNoCommitOnReady");
832
833             waitUntilLeader(shard);
834
835             final String transactionID = "tx";
836             final FiniteDuration duration = duration("5 seconds");
837
838             final AtomicReference<ShardDataTreeCohort> mockCohort = new AtomicReference<>();
839             final ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() {
840                 @Override
841                 public ShardDataTreeCohort decorate(final String txID, final ShardDataTreeCohort actual) {
842                     if(mockCohort.get() == null) {
843                         mockCohort.set(createDelegatingMockCohort("cohort", actual));
844                     }
845
846                     return mockCohort.get();
847                 }
848             };
849
850             shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator);
851
852             // Send a BatchedModifications to start a transaction.
853
854             shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH,
855                     ImmutableNodes.containerNode(TestModel.TEST_QNAME), false, false, 1), getRef());
856             expectMsgClass(duration, BatchedModificationsReply.class);
857
858             // Send a couple more BatchedModifications.
859
860             shard.tell(newBatchedModifications(transactionID, TestModel.OUTER_LIST_PATH,
861                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), false, false, 2), getRef());
862             expectMsgClass(duration, BatchedModificationsReply.class);
863
864             shard.tell(newBatchedModifications(transactionID, YangInstanceIdentifier.builder(
865                     TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
866                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true, false, 3), getRef());
867             expectMsgClass(duration, ReadyTransactionReply.class);
868
869             // Send the CanCommitTransaction message.
870
871             shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
872             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
873                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
874             assertEquals("Can commit", true, canCommitReply.getCanCommit());
875
876             // Send the CanCommitTransaction message.
877
878             shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
879             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
880
881             final InOrder inOrder = inOrder(mockCohort.get());
882             inOrder.verify(mockCohort.get()).canCommit();
883             inOrder.verify(mockCohort.get()).preCommit();
884             inOrder.verify(mockCohort.get()).commit();
885
886             // Verify data in the data store.
887
888             verifyOuterListEntry(shard, 1);
889
890             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
891         }};
892     }
893
894     @Test
895     public void testBatchedModificationsWithCommitOnReady() throws Throwable {
896         new ShardTestKit(getSystem()) {{
897             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
898                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
899                     "testBatchedModificationsWithCommitOnReady");
900
901             waitUntilLeader(shard);
902
903             final String transactionID = "tx";
904             final FiniteDuration duration = duration("5 seconds");
905
906             final AtomicReference<ShardDataTreeCohort> mockCohort = new AtomicReference<>();
907             final ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() {
908                 @Override
909                 public ShardDataTreeCohort decorate(final String txID, final ShardDataTreeCohort actual) {
910                     if(mockCohort.get() == null) {
911                         mockCohort.set(createDelegatingMockCohort("cohort", actual));
912                     }
913
914                     return mockCohort.get();
915                 }
916             };
917
918             shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator);
919
920             // Send a BatchedModifications to start a transaction.
921
922             shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH,
923                     ImmutableNodes.containerNode(TestModel.TEST_QNAME), false, false, 1), getRef());
924             expectMsgClass(duration, BatchedModificationsReply.class);
925
926             // Send a couple more BatchedModifications.
927
928             shard.tell(newBatchedModifications(transactionID, TestModel.OUTER_LIST_PATH,
929                 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), false, false, 2), getRef());
930             expectMsgClass(duration, BatchedModificationsReply.class);
931
932             shard.tell(newBatchedModifications(transactionID, YangInstanceIdentifier.builder(
933                     TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
934                 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true, true, 3), getRef());
935
936             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
937
938             final InOrder inOrder = inOrder(mockCohort.get());
939             inOrder.verify(mockCohort.get()).canCommit();
940             inOrder.verify(mockCohort.get()).preCommit();
941             inOrder.verify(mockCohort.get()).commit();
942
943             // Verify data in the data store.
944
945             verifyOuterListEntry(shard, 1);
946
947             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
948         }};
949     }
950
951     @Test(expected=IllegalStateException.class)
952     public void testBatchedModificationsReadyWithIncorrectTotalMessageCount() throws Throwable {
953         new ShardTestKit(getSystem()) {{
954             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
955                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
956                     "testBatchedModificationsReadyWithIncorrectTotalMessageCount");
957
958             waitUntilLeader(shard);
959
960             final String transactionID = "tx1";
961             final BatchedModifications batched = new BatchedModifications(transactionID, DataStoreVersions.CURRENT_VERSION, null);
962             batched.setReady(true);
963             batched.setTotalMessagesSent(2);
964
965             shard.tell(batched, getRef());
966
967             final Failure failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
968
969             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
970
971             if(failure != null) {
972                 throw failure.cause();
973             }
974         }};
975     }
976
977     @Test
978     public void testBatchedModificationsWithOperationFailure() throws Throwable {
979         new ShardTestKit(getSystem()) {{
980             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
981                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
982                     "testBatchedModificationsWithOperationFailure");
983
984             waitUntilLeader(shard);
985
986             // Test merge with invalid data. An exception should occur when the merge is applied. Note that
987             // write will not validate the children for performance reasons.
988
989             String transactionID = "tx1";
990
991             ContainerNode invalidData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
992                     new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
993                         withChild(ImmutableNodes.leafNode(TestModel.JUNK_QNAME, "junk")).build();
994
995             BatchedModifications batched = new BatchedModifications(transactionID, CURRENT_VERSION, null);
996             batched.addModification(new MergeModification(TestModel.TEST_PATH, invalidData));
997             shard.tell(batched, getRef());
998             Failure failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
999
1000             Throwable cause = failure.cause();
1001
1002             batched = new BatchedModifications(transactionID, DataStoreVersions.CURRENT_VERSION, null);
1003             batched.setReady(true);
1004             batched.setTotalMessagesSent(2);
1005
1006             shard.tell(batched, getRef());
1007
1008             failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
1009             assertEquals("Failure cause", cause, failure.cause());
1010
1011             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1012         }};
1013     }
1014
1015     @SuppressWarnings("unchecked")
1016     private static void verifyOuterListEntry(final TestActorRef<Shard> shard, final Object expIDValue) throws Exception {
1017         final NormalizedNode<?, ?> outerList = readStore(shard, TestModel.OUTER_LIST_PATH);
1018         assertNotNull(TestModel.OUTER_LIST_QNAME.getLocalName() + " not found", outerList);
1019         assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " value is not Iterable",
1020                 outerList.getValue() instanceof Iterable);
1021         final Object entry = ((Iterable<Object>)outerList.getValue()).iterator().next();
1022         assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " entry is not MapEntryNode",
1023                 entry instanceof MapEntryNode);
1024         final MapEntryNode mapEntry = (MapEntryNode)entry;
1025         final Optional<DataContainerChild<? extends PathArgument, ?>> idLeaf =
1026                 mapEntry.getChild(new YangInstanceIdentifier.NodeIdentifier(TestModel.ID_QNAME));
1027         assertTrue("Missing leaf " + TestModel.ID_QNAME.getLocalName(), idLeaf.isPresent());
1028         assertEquals(TestModel.ID_QNAME.getLocalName() + " value", expIDValue, idLeaf.get().getValue());
1029     }
1030
1031     @Test
1032     public void testBatchedModificationsOnTransactionChain() throws Throwable {
1033         new ShardTestKit(getSystem()) {{
1034             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1035                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1036                     "testBatchedModificationsOnTransactionChain");
1037
1038             waitUntilLeader(shard);
1039
1040             final String transactionChainID = "txChain";
1041             final String transactionID1 = "tx1";
1042             final String transactionID2 = "tx2";
1043
1044             final FiniteDuration duration = duration("5 seconds");
1045
1046             // Send a BatchedModifications to start a chained write transaction and ready it.
1047
1048             final ContainerNode containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1049             final YangInstanceIdentifier path = TestModel.TEST_PATH;
1050             shard.tell(newBatchedModifications(transactionID1, transactionChainID, path,
1051                     containerNode, true, false, 1), getRef());
1052             expectMsgClass(duration, ReadyTransactionReply.class);
1053
1054             // Create a read Tx on the same chain.
1055
1056             shard.tell(new CreateTransaction(transactionID2, TransactionType.READ_ONLY.ordinal() ,
1057                     transactionChainID).toSerializable(), getRef());
1058
1059             final CreateTransactionReply createReply = expectMsgClass(duration("3 seconds"), CreateTransactionReply.class);
1060
1061             getSystem().actorSelection(createReply.getTransactionActorPath()).tell(new ReadData(path), getRef());
1062             final ReadDataReply readReply = expectMsgClass(duration("3 seconds"), ReadDataReply.class);
1063             assertEquals("Read node", containerNode, readReply.getNormalizedNode());
1064
1065             // Commit the write transaction.
1066
1067             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1068             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1069                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1070             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1071
1072             shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
1073             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1074
1075             // Verify data in the data store.
1076
1077             final NormalizedNode<?, ?> actualNode = readStore(shard, path);
1078             assertEquals("Stored node", containerNode, actualNode);
1079
1080             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1081         }};
1082     }
1083
1084     @Test
1085     public void testOnBatchedModificationsWhenNotLeader() {
1086         final AtomicBoolean overrideLeaderCalls = new AtomicBoolean();
1087         new ShardTestKit(getSystem()) {{
1088             final Creator<Shard> creator = new Creator<Shard>() {
1089                 private static final long serialVersionUID = 1L;
1090
1091                 @Override
1092                 public Shard create() throws Exception {
1093                     return new Shard(newShardBuilder()) {
1094                         @Override
1095                         protected boolean isLeader() {
1096                             return overrideLeaderCalls.get() ? false : super.isLeader();
1097                         }
1098
1099                         @Override
1100                         protected ActorSelection getLeader() {
1101                             return overrideLeaderCalls.get() ? getSystem().actorSelection(getRef().path()) :
1102                                 super.getLeader();
1103                         }
1104                     };
1105                 }
1106             };
1107
1108             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1109                     Props.create(new DelegatingShardCreator(creator)), "testOnBatchedModificationsWhenNotLeader");
1110
1111             waitUntilLeader(shard);
1112
1113             overrideLeaderCalls.set(true);
1114
1115             final BatchedModifications batched = new BatchedModifications("tx", DataStoreVersions.CURRENT_VERSION, "");
1116
1117             shard.tell(batched, ActorRef.noSender());
1118
1119             expectMsgEquals(batched);
1120
1121             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1122         }};
1123     }
1124
1125     @Test
1126     public void testReadyWithImmediateCommit() throws Exception{
1127         testReadyWithImmediateCommit(true);
1128         testReadyWithImmediateCommit(false);
1129     }
1130
1131     public void testReadyWithImmediateCommit(final boolean readWrite) throws Exception{
1132         new ShardTestKit(getSystem()) {{
1133             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1134                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1135                     "testReadyWithImmediateCommit-" + readWrite);
1136
1137             waitUntilLeader(shard);
1138
1139             final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1140
1141             final String transactionID = "tx1";
1142             final MutableCompositeModification modification = new MutableCompositeModification();
1143             final NormalizedNode<?, ?> containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1144             final ShardDataTreeCohort cohort = setupMockWriteTransaction("cohort", dataStore,
1145                     TestModel.TEST_PATH, containerNode, modification);
1146
1147             final FiniteDuration duration = duration("5 seconds");
1148
1149             shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID, modification, true), getRef());
1150
1151             expectMsgClass(duration, ThreePhaseCommitCohortMessages.CommitTransactionReply.class);
1152
1153             final InOrder inOrder = inOrder(cohort);
1154             inOrder.verify(cohort).canCommit();
1155             inOrder.verify(cohort).preCommit();
1156             inOrder.verify(cohort).commit();
1157
1158             final NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.TEST_PATH);
1159             assertEquals(TestModel.TEST_QNAME.getLocalName(), containerNode, actualNode);
1160
1161             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1162         }};
1163     }
1164
1165     @Test
1166     public void testReadyLocalTransactionWithImmediateCommit() throws Exception{
1167         new ShardTestKit(getSystem()) {{
1168             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1169                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1170                     "testReadyLocalTransactionWithImmediateCommit");
1171
1172             waitUntilLeader(shard);
1173
1174             final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1175
1176             final DataTreeModification modification = dataStore.newModification();
1177
1178             final ContainerNode writeData = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1179             new WriteModification(TestModel.TEST_PATH, writeData).apply(modification);
1180             final MapNode mergeData = ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build();
1181             new MergeModification(TestModel.OUTER_LIST_PATH, mergeData).apply(modification);
1182
1183             final String txId = "tx1";
1184             modification.ready();
1185             final ReadyLocalTransaction readyMessage = new ReadyLocalTransaction(txId, modification, true);
1186
1187             shard.tell(readyMessage, getRef());
1188
1189             expectMsgClass(CommitTransactionReply.SERIALIZABLE_CLASS);
1190
1191             final NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.OUTER_LIST_PATH);
1192             assertEquals(TestModel.OUTER_LIST_QNAME.getLocalName(), mergeData, actualNode);
1193
1194             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1195         }};
1196     }
1197
1198     @Test
1199     public void testReadyLocalTransactionWithThreePhaseCommit() throws Exception{
1200         new ShardTestKit(getSystem()) {{
1201             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1202                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1203                     "testReadyLocalTransactionWithThreePhaseCommit");
1204
1205             waitUntilLeader(shard);
1206
1207             final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1208
1209             final DataTreeModification modification = dataStore.newModification();
1210
1211             final ContainerNode writeData = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1212             new WriteModification(TestModel.TEST_PATH, writeData).apply(modification);
1213             final MapNode mergeData = ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build();
1214             new MergeModification(TestModel.OUTER_LIST_PATH, mergeData).apply(modification);
1215
1216             final String txId = "tx1";
1217                 modification.ready();
1218             final ReadyLocalTransaction readyMessage = new ReadyLocalTransaction(txId, modification, false);
1219
1220             shard.tell(readyMessage, getRef());
1221
1222             expectMsgClass(ReadyTransactionReply.class);
1223
1224             // Send the CanCommitTransaction message.
1225
1226             shard.tell(new CanCommitTransaction(txId).toSerializable(), getRef());
1227             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1228                     expectMsgClass(CanCommitTransactionReply.SERIALIZABLE_CLASS));
1229             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1230
1231             // Send the CanCommitTransaction message.
1232
1233             shard.tell(new CommitTransaction(txId).toSerializable(), getRef());
1234             expectMsgClass(CommitTransactionReply.SERIALIZABLE_CLASS);
1235
1236             final NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.OUTER_LIST_PATH);
1237             assertEquals(TestModel.OUTER_LIST_QNAME.getLocalName(), mergeData, actualNode);
1238
1239             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1240         }};
1241     }
1242
1243     @Test
1244     public void testCommitWithPersistenceDisabled() throws Throwable {
1245         testCommitWithPersistenceDisabled(true);
1246         testCommitWithPersistenceDisabled(false);
1247     }
1248
1249     public void testCommitWithPersistenceDisabled(final boolean readWrite) throws Throwable {
1250         dataStoreContextBuilder.persistent(false);
1251         new ShardTestKit(getSystem()) {{
1252             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1253                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1254                     "testCommitWithPersistenceDisabled-" + readWrite);
1255
1256             waitUntilLeader(shard);
1257
1258             final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1259
1260             // Setup a simulated transactions with a mock cohort.
1261
1262             final String transactionID = "tx";
1263             final MutableCompositeModification modification = new MutableCompositeModification();
1264             final NormalizedNode<?, ?> containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1265             final ShardDataTreeCohort cohort = setupMockWriteTransaction("cohort", dataStore,
1266                 TestModel.TEST_PATH, containerNode, modification);
1267
1268             final FiniteDuration duration = duration("5 seconds");
1269
1270             shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID, modification), getRef());
1271             expectMsgClass(duration, ReadyTransactionReply.class);
1272
1273             // Send the CanCommitTransaction message.
1274
1275             shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
1276             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1277                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1278             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1279
1280             // Send the CanCommitTransaction message.
1281
1282             shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
1283             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1284
1285             final InOrder inOrder = inOrder(cohort);
1286             inOrder.verify(cohort).canCommit();
1287             inOrder.verify(cohort).preCommit();
1288             inOrder.verify(cohort).commit();
1289
1290             final NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.TEST_PATH);
1291             assertEquals(TestModel.TEST_QNAME.getLocalName(), containerNode, actualNode);
1292
1293             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1294         }};
1295     }
1296
1297     private static DataTreeCandidateTip mockCandidate(final String name) {
1298         final DataTreeCandidateTip mockCandidate = mock(DataTreeCandidateTip.class, name);
1299         final DataTreeCandidateNode mockCandidateNode = mock(DataTreeCandidateNode.class, name + "-node");
1300         doReturn(ModificationType.WRITE).when(mockCandidateNode).getModificationType();
1301         doReturn(Optional.of(ImmutableNodes.containerNode(CARS_QNAME))).when(mockCandidateNode).getDataAfter();
1302         doReturn(YangInstanceIdentifier.builder().build()).when(mockCandidate).getRootPath();
1303         doReturn(mockCandidateNode).when(mockCandidate).getRootNode();
1304         return mockCandidate;
1305     }
1306
1307     private static DataTreeCandidateTip mockUnmodifiedCandidate(final String name) {
1308         final DataTreeCandidateTip mockCandidate = mock(DataTreeCandidateTip.class, name);
1309         final DataTreeCandidateNode mockCandidateNode = mock(DataTreeCandidateNode.class, name + "-node");
1310         doReturn(ModificationType.UNMODIFIED).when(mockCandidateNode).getModificationType();
1311         doReturn(YangInstanceIdentifier.builder().build()).when(mockCandidate).getRootPath();
1312         doReturn(mockCandidateNode).when(mockCandidate).getRootNode();
1313         return mockCandidate;
1314     }
1315
1316     @Test
1317     public void testCommitWhenTransactionHasNoModifications() {
1318         testCommitWhenTransactionHasNoModifications(true);
1319         testCommitWhenTransactionHasNoModifications(false);
1320     }
1321
1322     public void testCommitWhenTransactionHasNoModifications(final boolean readWrite){
1323         // Note that persistence is enabled which would normally result in the entry getting written to the journal
1324         // but here that need not happen
1325         new ShardTestKit(getSystem()) {
1326             {
1327                 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1328                         newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1329                         "testCommitWhenTransactionHasNoModifications-" + readWrite);
1330
1331                 waitUntilLeader(shard);
1332
1333                 final String transactionID = "tx1";
1334                 final MutableCompositeModification modification = new MutableCompositeModification();
1335                 final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1336                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1337                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).preCommit();
1338                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).commit();
1339                 doReturn(mockUnmodifiedCandidate("cohort1-candidate")).when(cohort).getCandidate();
1340
1341                 final FiniteDuration duration = duration("5 seconds");
1342
1343                 shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID, modification), getRef());
1344                 expectMsgClass(duration, ReadyTransactionReply.class);
1345
1346                 // Send the CanCommitTransaction message.
1347
1348                 shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
1349                 final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1350                         expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1351                 assertEquals("Can commit", true, canCommitReply.getCanCommit());
1352
1353                 shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
1354                 expectMsgClass(duration, ThreePhaseCommitCohortMessages.CommitTransactionReply.class);
1355
1356                 final InOrder inOrder = inOrder(cohort);
1357                 inOrder.verify(cohort).canCommit();
1358                 inOrder.verify(cohort).preCommit();
1359                 inOrder.verify(cohort).commit();
1360
1361                 shard.tell(Shard.GET_SHARD_MBEAN_MESSAGE, getRef());
1362                 final ShardStats shardStats = expectMsgClass(duration, ShardStats.class);
1363
1364                 // Use MBean for verification
1365                 // Committed transaction count should increase as usual
1366                 assertEquals(1,shardStats.getCommittedTransactionsCount());
1367
1368                 // Commit index should not advance because this does not go into the journal
1369                 assertEquals(-1, shardStats.getCommitIndex());
1370
1371                 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1372
1373             }
1374         };
1375     }
1376
1377     @Test
1378     public void testCommitWhenTransactionHasModifications() {
1379         testCommitWhenTransactionHasModifications(true);
1380         testCommitWhenTransactionHasModifications(false);
1381     }
1382
1383     public void testCommitWhenTransactionHasModifications(final boolean readWrite){
1384         new ShardTestKit(getSystem()) {
1385             {
1386                 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1387                         newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1388                         "testCommitWhenTransactionHasModifications-" + readWrite);
1389
1390                 waitUntilLeader(shard);
1391
1392                 final String transactionID = "tx1";
1393                 final MutableCompositeModification modification = new MutableCompositeModification();
1394                 modification.addModification(new DeleteModification(YangInstanceIdentifier.builder().build()));
1395                 final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1396                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1397                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).preCommit();
1398                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).commit();
1399                 doReturn(mockCandidate("cohort1-candidate")).when(cohort).getCandidate();
1400
1401                 final FiniteDuration duration = duration("5 seconds");
1402
1403                 shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID, modification), getRef());
1404                 expectMsgClass(duration, ReadyTransactionReply.class);
1405
1406                 // Send the CanCommitTransaction message.
1407
1408                 shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
1409                 final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1410                         expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1411                 assertEquals("Can commit", true, canCommitReply.getCanCommit());
1412
1413                 shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
1414                 expectMsgClass(duration, ThreePhaseCommitCohortMessages.CommitTransactionReply.class);
1415
1416                 final InOrder inOrder = inOrder(cohort);
1417                 inOrder.verify(cohort).canCommit();
1418                 inOrder.verify(cohort).preCommit();
1419                 inOrder.verify(cohort).commit();
1420
1421                 shard.tell(Shard.GET_SHARD_MBEAN_MESSAGE, getRef());
1422                 final ShardStats shardStats = expectMsgClass(duration, ShardStats.class);
1423
1424                 // Use MBean for verification
1425                 // Committed transaction count should increase as usual
1426                 assertEquals(1, shardStats.getCommittedTransactionsCount());
1427
1428                 // Commit index should advance as we do not have an empty modification
1429                 assertEquals(0, shardStats.getCommitIndex());
1430
1431                 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1432
1433             }
1434         };
1435     }
1436
1437     @Test
1438     public void testCommitPhaseFailure() throws Throwable {
1439         testCommitPhaseFailure(true);
1440         testCommitPhaseFailure(false);
1441     }
1442
1443     public void testCommitPhaseFailure(final boolean readWrite) throws Throwable {
1444         new ShardTestKit(getSystem()) {{
1445             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1446                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1447                     "testCommitPhaseFailure-" + readWrite);
1448
1449             waitUntilLeader(shard);
1450
1451             // Setup 2 simulated transactions with mock cohorts. The first one fails in the
1452             // commit phase.
1453
1454             final String transactionID1 = "tx1";
1455             final MutableCompositeModification modification1 = new MutableCompositeModification();
1456             final ShardDataTreeCohort cohort1 = mock(ShardDataTreeCohort.class, "cohort1");
1457             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
1458             doReturn(Futures.immediateFuture(null)).when(cohort1).preCommit();
1459             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort1).commit();
1460             doReturn(mockCandidate("cohort1-candidate")).when(cohort1).getCandidate();
1461
1462             final String transactionID2 = "tx2";
1463             final MutableCompositeModification modification2 = new MutableCompositeModification();
1464             final ShardDataTreeCohort cohort2 = mock(ShardDataTreeCohort.class, "cohort2");
1465             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
1466
1467             final FiniteDuration duration = duration("5 seconds");
1468             final Timeout timeout = new Timeout(duration);
1469
1470             shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort1, transactionID1, modification1), getRef());
1471             expectMsgClass(duration, ReadyTransactionReply.class);
1472
1473             shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort2, transactionID2, modification2), getRef());
1474             expectMsgClass(duration, ReadyTransactionReply.class);
1475
1476             // Send the CanCommitTransaction message for the first Tx.
1477
1478             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1479             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1480                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1481             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1482
1483             // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
1484             // processed after the first Tx completes.
1485
1486             final Future<Object> canCommitFuture = Patterns.ask(shard,
1487                     new CanCommitTransaction(transactionID2).toSerializable(), timeout);
1488
1489             // Send the CommitTransaction message for the first Tx. This should send back an error
1490             // and trigger the 2nd Tx to proceed.
1491
1492             shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
1493             expectMsgClass(duration, akka.actor.Status.Failure.class);
1494
1495             // Wait for the 2nd Tx to complete the canCommit phase.
1496
1497             final CountDownLatch latch = new CountDownLatch(1);
1498             canCommitFuture.onComplete(new OnComplete<Object>() {
1499                 @Override
1500                 public void onComplete(final Throwable t, final Object resp) {
1501                     latch.countDown();
1502                 }
1503             }, getSystem().dispatcher());
1504
1505             assertEquals("2nd CanCommit complete", true, latch.await(5, TimeUnit.SECONDS));
1506
1507             final InOrder inOrder = inOrder(cohort1, cohort2);
1508             inOrder.verify(cohort1).canCommit();
1509             inOrder.verify(cohort1).preCommit();
1510             inOrder.verify(cohort1).commit();
1511             inOrder.verify(cohort2).canCommit();
1512
1513             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1514         }};
1515     }
1516
1517     @Test
1518     public void testPreCommitPhaseFailure() throws Throwable {
1519         testPreCommitPhaseFailure(true);
1520         testPreCommitPhaseFailure(false);
1521     }
1522
1523     public void testPreCommitPhaseFailure(final boolean readWrite) throws Throwable {
1524         new ShardTestKit(getSystem()) {{
1525             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1526                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1527                     "testPreCommitPhaseFailure-" + readWrite);
1528
1529             waitUntilLeader(shard);
1530
1531             final String transactionID1 = "tx1";
1532             final MutableCompositeModification modification1 = new MutableCompositeModification();
1533             final ShardDataTreeCohort cohort1 = mock(ShardDataTreeCohort.class, "cohort1");
1534             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
1535             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort1).preCommit();
1536
1537             final String transactionID2 = "tx2";
1538             final MutableCompositeModification modification2 = new MutableCompositeModification();
1539             final ShardDataTreeCohort cohort2 = mock(ShardDataTreeCohort.class, "cohort2");
1540             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
1541
1542             final FiniteDuration duration = duration("5 seconds");
1543             final Timeout timeout = new Timeout(duration);
1544
1545             shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort1, transactionID1, modification1), getRef());
1546             expectMsgClass(duration, ReadyTransactionReply.class);
1547
1548             shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort2, transactionID2, modification2), getRef());
1549             expectMsgClass(duration, ReadyTransactionReply.class);
1550
1551             // Send the CanCommitTransaction message for the first Tx.
1552
1553             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1554             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1555                 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1556             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1557
1558             // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
1559             // processed after the first Tx completes.
1560
1561             final Future<Object> canCommitFuture = Patterns.ask(shard,
1562                     new CanCommitTransaction(transactionID2).toSerializable(), timeout);
1563
1564             // Send the CommitTransaction message for the first Tx. This should send back an error
1565             // and trigger the 2nd Tx to proceed.
1566
1567             shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
1568             expectMsgClass(duration, akka.actor.Status.Failure.class);
1569
1570             // Wait for the 2nd Tx to complete the canCommit phase.
1571
1572             final CountDownLatch latch = new CountDownLatch(1);
1573             canCommitFuture.onComplete(new OnComplete<Object>() {
1574                 @Override
1575                 public void onComplete(final Throwable t, final Object resp) {
1576                     latch.countDown();
1577                 }
1578             }, getSystem().dispatcher());
1579
1580             assertEquals("2nd CanCommit complete", true, latch.await(5, TimeUnit.SECONDS));
1581
1582             final InOrder inOrder = inOrder(cohort1, cohort2);
1583             inOrder.verify(cohort1).canCommit();
1584             inOrder.verify(cohort1).preCommit();
1585             inOrder.verify(cohort2).canCommit();
1586
1587             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1588         }};
1589     }
1590
1591     @Test
1592     public void testCanCommitPhaseFailure() throws Throwable {
1593         testCanCommitPhaseFailure(true);
1594         testCanCommitPhaseFailure(false);
1595     }
1596
1597     public void testCanCommitPhaseFailure(final boolean readWrite) throws Throwable {
1598         new ShardTestKit(getSystem()) {{
1599             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1600                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1601                     "testCanCommitPhaseFailure-" + readWrite);
1602
1603             waitUntilLeader(shard);
1604
1605             final FiniteDuration duration = duration("5 seconds");
1606
1607             final String transactionID1 = "tx1";
1608             final MutableCompositeModification modification = new MutableCompositeModification();
1609             final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1610             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).canCommit();
1611
1612             shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID1, modification), getRef());
1613             expectMsgClass(duration, ReadyTransactionReply.class);
1614
1615             // Send the CanCommitTransaction message.
1616
1617             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1618             expectMsgClass(duration, akka.actor.Status.Failure.class);
1619
1620             // Send another can commit to ensure the failed one got cleaned up.
1621
1622             reset(cohort);
1623
1624             final String transactionID2 = "tx2";
1625             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1626
1627             shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID2, modification), getRef());
1628             expectMsgClass(duration, ReadyTransactionReply.class);
1629
1630             shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
1631             final CanCommitTransactionReply reply = CanCommitTransactionReply.fromSerializable(
1632                     expectMsgClass(CanCommitTransactionReply.SERIALIZABLE_CLASS));
1633             assertEquals("getCanCommit", true, reply.getCanCommit());
1634
1635             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1636         }};
1637     }
1638
1639     @Test
1640     public void testCanCommitPhaseFalseResponse() throws Throwable {
1641         testCanCommitPhaseFalseResponse(true);
1642         testCanCommitPhaseFalseResponse(false);
1643     }
1644
1645     public void testCanCommitPhaseFalseResponse(final boolean readWrite) throws Throwable {
1646         new ShardTestKit(getSystem()) {{
1647             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1648                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1649                     "testCanCommitPhaseFalseResponse-" + readWrite);
1650
1651             waitUntilLeader(shard);
1652
1653             final FiniteDuration duration = duration("5 seconds");
1654
1655             final String transactionID1 = "tx1";
1656             final MutableCompositeModification modification = new MutableCompositeModification();
1657             final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1658             doReturn(Futures.immediateFuture(Boolean.FALSE)).when(cohort).canCommit();
1659
1660             shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID1, modification), getRef());
1661             expectMsgClass(duration, ReadyTransactionReply.class);
1662
1663             // Send the CanCommitTransaction message.
1664
1665             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1666             CanCommitTransactionReply reply = CanCommitTransactionReply.fromSerializable(
1667                     expectMsgClass(CanCommitTransactionReply.SERIALIZABLE_CLASS));
1668             assertEquals("getCanCommit", false, reply.getCanCommit());
1669
1670             // Send another can commit to ensure the failed one got cleaned up.
1671
1672             reset(cohort);
1673
1674             final String transactionID2 = "tx2";
1675             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1676
1677             shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID2, modification), getRef());
1678             expectMsgClass(duration, ReadyTransactionReply.class);
1679
1680             shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
1681             reply = CanCommitTransactionReply.fromSerializable(
1682                     expectMsgClass(CanCommitTransactionReply.SERIALIZABLE_CLASS));
1683             assertEquals("getCanCommit", true, reply.getCanCommit());
1684
1685             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1686         }};
1687     }
1688
1689     @Test
1690     public void testImmediateCommitWithCanCommitPhaseFailure() throws Throwable {
1691         testImmediateCommitWithCanCommitPhaseFailure(true);
1692         testImmediateCommitWithCanCommitPhaseFailure(false);
1693     }
1694
1695     public void testImmediateCommitWithCanCommitPhaseFailure(final boolean readWrite) throws Throwable {
1696         new ShardTestKit(getSystem()) {{
1697             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1698                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1699                     "testImmediateCommitWithCanCommitPhaseFailure-" + readWrite);
1700
1701             waitUntilLeader(shard);
1702
1703             final FiniteDuration duration = duration("5 seconds");
1704
1705             final String transactionID1 = "tx1";
1706             final MutableCompositeModification modification = new MutableCompositeModification();
1707             final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1708             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).canCommit();
1709
1710             shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID1, modification, true), getRef());
1711
1712             expectMsgClass(duration, akka.actor.Status.Failure.class);
1713
1714             // Send another can commit to ensure the failed one got cleaned up.
1715
1716             reset(cohort);
1717
1718             final String transactionID2 = "tx2";
1719             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1720             doReturn(Futures.immediateFuture(null)).when(cohort).preCommit();
1721             doReturn(Futures.immediateFuture(null)).when(cohort).commit();
1722             final DataTreeCandidateTip candidate = mock(DataTreeCandidateTip.class);
1723             final DataTreeCandidateNode candidateRoot = mock(DataTreeCandidateNode.class);
1724             doReturn(ModificationType.UNMODIFIED).when(candidateRoot).getModificationType();
1725             doReturn(candidateRoot).when(candidate).getRootNode();
1726             doReturn(candidate).when(cohort).getCandidate();
1727
1728             shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID2, modification, true), getRef());
1729
1730             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1731
1732             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1733         }};
1734     }
1735
1736     @Test
1737     public void testImmediateCommitWithCanCommitPhaseFalseResponse() throws Throwable {
1738         testImmediateCommitWithCanCommitPhaseFalseResponse(true);
1739         testImmediateCommitWithCanCommitPhaseFalseResponse(false);
1740     }
1741
1742     public void testImmediateCommitWithCanCommitPhaseFalseResponse(final boolean readWrite) throws Throwable {
1743         new ShardTestKit(getSystem()) {{
1744             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1745                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1746                     "testImmediateCommitWithCanCommitPhaseFalseResponse-" + readWrite);
1747
1748             waitUntilLeader(shard);
1749
1750             final FiniteDuration duration = duration("5 seconds");
1751
1752             final String transactionID = "tx1";
1753             final MutableCompositeModification modification = new MutableCompositeModification();
1754             final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1755             doReturn(Futures.immediateFuture(Boolean.FALSE)).when(cohort).canCommit();
1756
1757             shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID, modification, true), getRef());
1758
1759             expectMsgClass(duration, akka.actor.Status.Failure.class);
1760
1761             // Send another can commit to ensure the failed one got cleaned up.
1762
1763             reset(cohort);
1764
1765             final String transactionID2 = "tx2";
1766             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1767             doReturn(Futures.immediateFuture(null)).when(cohort).preCommit();
1768             doReturn(Futures.immediateFuture(null)).when(cohort).commit();
1769             final DataTreeCandidateTip candidate = mock(DataTreeCandidateTip.class);
1770             final DataTreeCandidateNode candidateRoot = mock(DataTreeCandidateNode.class);
1771             doReturn(ModificationType.UNMODIFIED).when(candidateRoot).getModificationType();
1772             doReturn(candidateRoot).when(candidate).getRootNode();
1773             doReturn(candidate).when(cohort).getCandidate();
1774
1775             shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID2, modification, true), getRef());
1776
1777             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1778
1779             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1780         }};
1781     }
1782
1783     @Test
1784     public void testAbortBeforeFinishCommit() throws Throwable {
1785         testAbortBeforeFinishCommit(true);
1786         testAbortBeforeFinishCommit(false);
1787     }
1788
1789     public void testAbortBeforeFinishCommit(final boolean readWrite) throws Throwable {
1790         new ShardTestKit(getSystem()) {{
1791             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1792                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1793                     "testAbortBeforeFinishCommit-" + readWrite);
1794
1795             waitUntilLeader(shard);
1796
1797             final FiniteDuration duration = duration("5 seconds");
1798             final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1799
1800             final String transactionID = "tx1";
1801             final Function<ShardDataTreeCohort, ListenableFuture<Void>> preCommit =
1802                           new Function<ShardDataTreeCohort, ListenableFuture<Void>>() {
1803                 @Override
1804                 public ListenableFuture<Void> apply(final ShardDataTreeCohort cohort) {
1805                     final ListenableFuture<Void> preCommitFuture = cohort.preCommit();
1806
1807                     // Simulate an AbortTransaction message occurring during replication, after
1808                     // persisting and before finishing the commit to the in-memory store.
1809                     // We have no followers so due to optimizations in the RaftActor, it does not
1810                     // attempt replication and thus we can't send an AbortTransaction message b/c
1811                     // it would be processed too late after CommitTransaction completes. So we'll
1812                     // simulate an AbortTransaction message occurring during replication by calling
1813                     // the shard directly.
1814                     //
1815                     shard.underlyingActor().doAbortTransaction(transactionID, null);
1816
1817                     return preCommitFuture;
1818                 }
1819             };
1820
1821             final MutableCompositeModification modification = new MutableCompositeModification();
1822             final ShardDataTreeCohort cohort = setupMockWriteTransaction("cohort1", dataStore,
1823                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME),
1824                     modification, preCommit);
1825
1826             shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID, modification), getRef());
1827             expectMsgClass(duration, ReadyTransactionReply.class);
1828
1829             shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
1830             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1831                 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1832             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1833
1834             shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
1835             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1836
1837             final NormalizedNode<?, ?> node = readStore(shard, TestModel.TEST_PATH);
1838
1839             // Since we're simulating an abort occurring during replication and before finish commit,
1840             // the data should still get written to the in-memory store since we've gotten past
1841             // canCommit and preCommit and persisted the data.
1842             assertNotNull(TestModel.TEST_QNAME.getLocalName() + " not found", node);
1843
1844             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1845         }};
1846     }
1847
1848     @Test
1849     public void testTransactionCommitTimeout() throws Throwable {
1850         testTransactionCommitTimeout(true);
1851         testTransactionCommitTimeout(false);
1852     }
1853
1854     public void testTransactionCommitTimeout(final boolean readWrite) throws Throwable {
1855         dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1);
1856
1857         new ShardTestKit(getSystem()) {{
1858             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1859                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1860                     "testTransactionCommitTimeout-" + readWrite);
1861
1862             waitUntilLeader(shard);
1863
1864             final FiniteDuration duration = duration("5 seconds");
1865
1866             final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1867
1868             writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1869             writeToStore(shard, TestModel.OUTER_LIST_PATH,
1870                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
1871
1872             // Create 1st Tx - will timeout
1873
1874             final String transactionID1 = "tx1";
1875             final MutableCompositeModification modification1 = new MutableCompositeModification();
1876             final ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
1877                     YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
1878                         .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
1879                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1),
1880                     modification1);
1881
1882             // Create 2nd Tx
1883
1884             final String transactionID2 = "tx3";
1885             final MutableCompositeModification modification2 = new MutableCompositeModification();
1886             final YangInstanceIdentifier listNodePath = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
1887                 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build();
1888             final ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort3", dataStore,
1889                     listNodePath,
1890                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2),
1891                     modification2);
1892
1893             // Ready the Tx's
1894
1895             shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort1, transactionID1, modification1), getRef());
1896             expectMsgClass(duration, ReadyTransactionReply.class);
1897
1898             shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort2, transactionID2, modification2), getRef());
1899             expectMsgClass(duration, ReadyTransactionReply.class);
1900
1901             // canCommit 1st Tx. We don't send the commit so it should timeout.
1902
1903             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1904             expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
1905
1906             // canCommit the 2nd Tx - it should complete after the 1st Tx times out.
1907
1908             shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
1909             expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
1910
1911             // Try to commit the 1st Tx - should fail as it's not the current Tx.
1912
1913             shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
1914             expectMsgClass(duration, akka.actor.Status.Failure.class);
1915
1916             // Commit the 2nd Tx.
1917
1918             shard.tell(new CommitTransaction(transactionID2).toSerializable(), getRef());
1919             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1920
1921             final NormalizedNode<?, ?> node = readStore(shard, listNodePath);
1922             assertNotNull(listNodePath + " not found", node);
1923
1924             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1925         }};
1926     }
1927
1928     @Test
1929     public void testTransactionCommitQueueCapacityExceeded() throws Throwable {
1930         dataStoreContextBuilder.shardTransactionCommitQueueCapacity(2);
1931
1932         new ShardTestKit(getSystem()) {{
1933             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1934                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1935                     "testTransactionCommitQueueCapacityExceeded");
1936
1937             waitUntilLeader(shard);
1938
1939             final FiniteDuration duration = duration("5 seconds");
1940
1941             final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1942
1943             final String transactionID1 = "tx1";
1944             final MutableCompositeModification modification1 = new MutableCompositeModification();
1945             final ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
1946                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
1947
1948             final String transactionID2 = "tx2";
1949             final MutableCompositeModification modification2 = new MutableCompositeModification();
1950             final ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
1951                     TestModel.OUTER_LIST_PATH,
1952                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
1953                     modification2);
1954
1955             final String transactionID3 = "tx3";
1956             final MutableCompositeModification modification3 = new MutableCompositeModification();
1957             final ShardDataTreeCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
1958                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification3);
1959
1960             // Ready the Tx's
1961
1962             shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort1, transactionID1, modification1), getRef());
1963             expectMsgClass(duration, ReadyTransactionReply.class);
1964
1965             shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort2, transactionID2, modification2), getRef());
1966             expectMsgClass(duration, ReadyTransactionReply.class);
1967
1968             // The 3rd Tx should exceed queue capacity and fail.
1969
1970             shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort3, transactionID3, modification3), getRef());
1971             expectMsgClass(duration, akka.actor.Status.Failure.class);
1972
1973             // canCommit 1st Tx.
1974
1975             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1976             expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
1977
1978             // canCommit the 2nd Tx - it should get queued.
1979
1980             shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
1981
1982             // canCommit the 3rd Tx - should exceed queue capacity and fail.
1983
1984             shard.tell(new CanCommitTransaction(transactionID3).toSerializable(), getRef());
1985             expectMsgClass(duration, akka.actor.Status.Failure.class);
1986
1987             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1988         }};
1989     }
1990
1991     @Test
1992     public void testTransactionCommitWithPriorExpiredCohortEntries() throws Throwable {
1993         dataStoreContextBuilder.shardCommitQueueExpiryTimeoutInMillis(1300).shardTransactionCommitTimeoutInSeconds(1);
1994
1995         new ShardTestKit(getSystem()) {{
1996             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1997                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1998                     "testTransactionCommitWithPriorExpiredCohortEntries");
1999
2000             waitUntilLeader(shard);
2001
2002             final FiniteDuration duration = duration("5 seconds");
2003
2004             final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
2005
2006             final String transactionID1 = "tx1";
2007             final MutableCompositeModification modification1 = new MutableCompositeModification();
2008             final ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
2009                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
2010
2011             shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort1, transactionID1, modification1), getRef());
2012             expectMsgClass(duration, ReadyTransactionReply.class);
2013
2014             final String transactionID2 = "tx2";
2015             final MutableCompositeModification modification2 = new MutableCompositeModification();
2016             final ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
2017                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification2);
2018
2019             shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort2, transactionID2, modification2), getRef());
2020             expectMsgClass(duration, ReadyTransactionReply.class);
2021
2022             final String transactionID3 = "tx3";
2023             final MutableCompositeModification modification3 = new MutableCompositeModification();
2024             final ShardDataTreeCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
2025                     TestModel.TEST2_PATH, ImmutableNodes.containerNode(TestModel.TEST2_QNAME), modification3);
2026
2027             shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort3, transactionID3, modification3), getRef());
2028             expectMsgClass(duration, ReadyTransactionReply.class);
2029
2030             // All Tx's are readied. We'll send canCommit for the last one but not the others. The others
2031             // should expire from the queue and the last one should be processed.
2032
2033             shard.tell(new CanCommitTransaction(transactionID3).toSerializable(), getRef());
2034             expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
2035
2036             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2037         }};
2038     }
2039
2040     @Test
2041     public void testTransactionCommitWithSubsequentExpiredCohortEntry() throws Throwable {
2042         dataStoreContextBuilder.shardCommitQueueExpiryTimeoutInMillis(1300).shardTransactionCommitTimeoutInSeconds(1);
2043
2044         new ShardTestKit(getSystem()) {{
2045             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
2046                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
2047                     "testTransactionCommitWithSubsequentExpiredCohortEntry");
2048
2049             waitUntilLeader(shard);
2050
2051             final FiniteDuration duration = duration("5 seconds");
2052
2053             final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
2054
2055             final String transactionID1 = "tx1";
2056             final MutableCompositeModification modification1 = new MutableCompositeModification();
2057             final ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
2058                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
2059
2060             shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort1, transactionID1, modification1), getRef());
2061             expectMsgClass(duration, ReadyTransactionReply.class);
2062
2063             // CanCommit the first one so it's the current in-progress CohortEntry.
2064
2065             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
2066             expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
2067
2068             // Ready the second Tx.
2069
2070             final String transactionID2 = "tx2";
2071             final MutableCompositeModification modification2 = new MutableCompositeModification();
2072             final ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
2073                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification2);
2074
2075             shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort2, transactionID2, modification2), getRef());
2076             expectMsgClass(duration, ReadyTransactionReply.class);
2077
2078             // Ready the third Tx.
2079
2080             final String transactionID3 = "tx3";
2081             final DataTreeModification modification3 = dataStore.newModification();
2082             new WriteModification(TestModel.TEST2_PATH, ImmutableNodes.containerNode(TestModel.TEST2_QNAME))
2083                     .apply(modification3);
2084                 modification3.ready();
2085             final ReadyLocalTransaction readyMessage = new ReadyLocalTransaction(transactionID3, modification3, true);
2086
2087             shard.tell(readyMessage, getRef());
2088
2089             // Commit the first Tx. After completing, the second should expire from the queue and the third
2090             // Tx committed.
2091
2092             shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
2093             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
2094
2095             // Expect commit reply from the third Tx.
2096
2097             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
2098
2099             final NormalizedNode<?, ?> node = readStore(shard, TestModel.TEST2_PATH);
2100             assertNotNull(TestModel.TEST2_PATH + " not found", node);
2101
2102             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2103         }};
2104     }
2105
2106     @Test
2107     public void testCanCommitBeforeReadyFailure() throws Throwable {
2108         new ShardTestKit(getSystem()) {{
2109             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
2110                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
2111                     "testCanCommitBeforeReadyFailure");
2112
2113             shard.tell(new CanCommitTransaction("tx").toSerializable(), getRef());
2114             expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
2115
2116             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2117         }};
2118     }
2119
2120     @Test
2121     public void testAbortCurrentTransaction() throws Throwable {
2122         testAbortCurrentTransaction(true);
2123         testAbortCurrentTransaction(false);
2124     }
2125
2126     public void testAbortCurrentTransaction(final boolean readWrite) throws Throwable {
2127         new ShardTestKit(getSystem()) {{
2128             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
2129                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
2130                     "testAbortCurrentTransaction-" + readWrite);
2131
2132             waitUntilLeader(shard);
2133
2134             // Setup 2 simulated transactions with mock cohorts. The first one will be aborted.
2135
2136             final String transactionID1 = "tx1";
2137             final MutableCompositeModification modification1 = new MutableCompositeModification();
2138             final ShardDataTreeCohort cohort1 = mock(ShardDataTreeCohort.class, "cohort1");
2139             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
2140             doReturn(Futures.immediateFuture(null)).when(cohort1).abort();
2141
2142             final String transactionID2 = "tx2";
2143             final MutableCompositeModification modification2 = new MutableCompositeModification();
2144             final ShardDataTreeCohort cohort2 = mock(ShardDataTreeCohort.class, "cohort2");
2145             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
2146
2147             final FiniteDuration duration = duration("5 seconds");
2148             final Timeout timeout = new Timeout(duration);
2149
2150             shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort1, transactionID1, modification1), getRef());
2151             expectMsgClass(duration, ReadyTransactionReply.class);
2152
2153             shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort2, transactionID2, modification2), getRef());
2154             expectMsgClass(duration, ReadyTransactionReply.class);
2155
2156             // Send the CanCommitTransaction message for the first Tx.
2157
2158             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
2159             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
2160                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
2161             assertEquals("Can commit", true, canCommitReply.getCanCommit());
2162
2163             // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
2164             // processed after the first Tx completes.
2165
2166             final Future<Object> canCommitFuture = Patterns.ask(shard,
2167                     new CanCommitTransaction(transactionID2).toSerializable(), timeout);
2168
2169             // Send the AbortTransaction message for the first Tx. This should trigger the 2nd
2170             // Tx to proceed.
2171
2172             shard.tell(new AbortTransaction(transactionID1).toSerializable(), getRef());
2173             expectMsgClass(duration, AbortTransactionReply.SERIALIZABLE_CLASS);
2174
2175             // Wait for the 2nd Tx to complete the canCommit phase.
2176
2177             Await.ready(canCommitFuture, duration);
2178
2179             final InOrder inOrder = inOrder(cohort1, cohort2);
2180             inOrder.verify(cohort1).canCommit();
2181             inOrder.verify(cohort2).canCommit();
2182
2183             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2184         }};
2185     }
2186
2187     @Test
2188     public void testAbortQueuedTransaction() throws Throwable {
2189         testAbortQueuedTransaction(true);
2190         testAbortQueuedTransaction(false);
2191     }
2192
2193     public void testAbortQueuedTransaction(final boolean readWrite) throws Throwable {
2194         dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1);
2195         new ShardTestKit(getSystem()) {{
2196             final AtomicReference<CountDownLatch> cleaupCheckLatch = new AtomicReference<>();
2197             @SuppressWarnings("serial")
2198             final Creator<Shard> creator = new Creator<Shard>() {
2199                 @Override
2200                 public Shard create() throws Exception {
2201                     return new Shard(newShardBuilder()) {
2202                         @Override
2203                         public void onReceiveCommand(final Object message) throws Exception {
2204                             super.onReceiveCommand(message);
2205                             if(message.equals(TX_COMMIT_TIMEOUT_CHECK_MESSAGE)) {
2206                                 if(cleaupCheckLatch.get() != null) {
2207                                     cleaupCheckLatch.get().countDown();
2208                                 }
2209                             }
2210                         }
2211                     };
2212                 }
2213             };
2214
2215             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
2216                     Props.create(new DelegatingShardCreator(creator)).withDispatcher(
2217                             Dispatchers.DefaultDispatcherId()), "testAbortQueuedTransaction-" + readWrite);
2218
2219             waitUntilLeader(shard);
2220
2221             final String transactionID = "tx1";
2222
2223             final MutableCompositeModification modification = new MutableCompositeModification();
2224             final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort");
2225             doReturn(Futures.immediateFuture(null)).when(cohort).abort();
2226
2227             final FiniteDuration duration = duration("5 seconds");
2228
2229             // Ready the tx.
2230
2231             shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID, modification), getRef());
2232             expectMsgClass(duration, ReadyTransactionReply.class);
2233
2234             assertEquals("getPendingTxCommitQueueSize", 1, shard.underlyingActor().getPendingTxCommitQueueSize());
2235
2236             // Send the AbortTransaction message.
2237
2238             shard.tell(new AbortTransaction(transactionID).toSerializable(), getRef());
2239             expectMsgClass(duration, AbortTransactionReply.SERIALIZABLE_CLASS);
2240
2241             verify(cohort).abort();
2242
2243             // Verify the tx cohort is removed from queue at the cleanup check interval.
2244
2245             cleaupCheckLatch.set(new CountDownLatch(1));
2246             assertEquals("TX_COMMIT_TIMEOUT_CHECK_MESSAGE received", true,
2247                     cleaupCheckLatch.get().await(5, TimeUnit.SECONDS));
2248
2249             assertEquals("getPendingTxCommitQueueSize", 0, shard.underlyingActor().getPendingTxCommitQueueSize());
2250
2251             // Now send CanCommitTransaction - should fail.
2252
2253             shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
2254
2255             Throwable failure = expectMsgClass(duration, akka.actor.Status.Failure.class).cause();
2256             assertTrue("Failure type", failure instanceof IllegalStateException);
2257
2258             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2259         }};
2260     }
2261
2262     @Test
2263     public void testCreateSnapshot() throws Exception {
2264         testCreateSnapshot(true, "testCreateSnapshot");
2265     }
2266
2267     @Test
2268     public void testCreateSnapshotWithNonPersistentData() throws Exception {
2269         testCreateSnapshot(false, "testCreateSnapshotWithNonPersistentData");
2270     }
2271
2272     @SuppressWarnings("serial")
2273     public void testCreateSnapshot(final boolean persistent, final String shardActorName) throws Exception{
2274
2275         final AtomicReference<CountDownLatch> latch = new AtomicReference<>(new CountDownLatch(1));
2276
2277         final AtomicReference<Object> savedSnapshot = new AtomicReference<>();
2278         class TestPersistentDataProvider extends DelegatingPersistentDataProvider {
2279             TestPersistentDataProvider(final DataPersistenceProvider delegate) {
2280                 super(delegate);
2281             }
2282
2283             @Override
2284             public void saveSnapshot(final Object o) {
2285                 savedSnapshot.set(o);
2286                 super.saveSnapshot(o);
2287             }
2288         }
2289
2290         dataStoreContextBuilder.persistent(persistent);
2291
2292         new ShardTestKit(getSystem()) {{
2293             class TestShard extends Shard {
2294
2295                 protected TestShard(AbstractBuilder<?, ?> builder) {
2296                     super(builder);
2297                     setPersistence(new TestPersistentDataProvider(super.persistence()));
2298                 }
2299
2300                 @Override
2301                 public void handleCommand(final Object message) {
2302                     super.handleCommand(message);
2303
2304                     if (message instanceof SaveSnapshotSuccess || message.equals("commit_snapshot")) {
2305                         latch.get().countDown();
2306                     }
2307                 }
2308
2309                 @Override
2310                 public RaftActorContext getRaftActorContext() {
2311                     return super.getRaftActorContext();
2312                 }
2313             }
2314
2315             final Creator<Shard> creator = new Creator<Shard>() {
2316                 @Override
2317                 public Shard create() throws Exception {
2318                     return new TestShard(newShardBuilder());
2319                 }
2320             };
2321
2322             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
2323                     Props.create(new DelegatingShardCreator(creator)), shardActorName);
2324
2325             waitUntilLeader(shard);
2326             writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
2327
2328             final NormalizedNode<?,?> expectedRoot = readStore(shard, YangInstanceIdentifier.builder().build());
2329
2330             // Trigger creation of a snapshot by ensuring
2331             final RaftActorContext raftActorContext = ((TestShard) shard.underlyingActor()).getRaftActorContext();
2332             raftActorContext.getSnapshotManager().capture(mock(ReplicatedLogEntry.class), -1);
2333             awaitAndValidateSnapshot(expectedRoot);
2334
2335             raftActorContext.getSnapshotManager().capture(mock(ReplicatedLogEntry.class), -1);
2336             awaitAndValidateSnapshot(expectedRoot);
2337
2338             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2339         }
2340
2341             private void awaitAndValidateSnapshot(NormalizedNode<?,?> expectedRoot
2342                                               ) throws InterruptedException {
2343                 System.out.println("Inside awaitAndValidateSnapshot {}" + savedSnapshot.get());
2344                 assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS));
2345
2346                 assertTrue("Invalid saved snapshot " + savedSnapshot.get(),
2347                         savedSnapshot.get() instanceof Snapshot);
2348
2349                 verifySnapshot((Snapshot)savedSnapshot.get(), expectedRoot);
2350
2351                 latch.set(new CountDownLatch(1));
2352                 savedSnapshot.set(null);
2353             }
2354
2355             private void verifySnapshot(final Snapshot snapshot, final NormalizedNode<?,?> expectedRoot) {
2356
2357                 final NormalizedNode<?, ?> actual = SerializationUtils.deserializeNormalizedNode(snapshot.getState());
2358                 assertEquals("Root node", expectedRoot, actual);
2359
2360            }
2361         };
2362     }
2363
2364     /**
2365      * This test simply verifies that the applySnapShot logic will work
2366      * @throws ReadFailedException
2367      * @throws DataValidationFailedException
2368      */
2369     @Test
2370     public void testInMemoryDataTreeRestore() throws ReadFailedException, DataValidationFailedException {
2371         final DataTree store = InMemoryDataTreeFactory.getInstance().create(TreeType.OPERATIONAL);
2372         store.setSchemaContext(SCHEMA_CONTEXT);
2373
2374         final DataTreeModification putTransaction = store.takeSnapshot().newModification();
2375         putTransaction.write(TestModel.TEST_PATH,
2376             ImmutableNodes.containerNode(TestModel.TEST_QNAME));
2377         commitTransaction(store, putTransaction);
2378
2379
2380         final NormalizedNode<?, ?> expected = readStore(store, YangInstanceIdentifier.builder().build());
2381
2382         final DataTreeModification writeTransaction = store.takeSnapshot().newModification();
2383
2384         writeTransaction.delete(YangInstanceIdentifier.builder().build());
2385         writeTransaction.write(YangInstanceIdentifier.builder().build(), expected);
2386
2387         commitTransaction(store, writeTransaction);
2388
2389         final NormalizedNode<?, ?> actual = readStore(store, YangInstanceIdentifier.builder().build());
2390
2391         assertEquals(expected, actual);
2392     }
2393
2394     @Test
2395     public void testRecoveryApplicable(){
2396
2397         final DatastoreContext persistentContext = DatastoreContext.newBuilder().
2398                 shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(true).build();
2399
2400         final Props persistentProps = Shard.builder().id(shardID).datastoreContext(persistentContext).
2401                 schemaContext(SCHEMA_CONTEXT).props();
2402
2403         final DatastoreContext nonPersistentContext = DatastoreContext.newBuilder().
2404                 shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(false).build();
2405
2406         final Props nonPersistentProps = Shard.builder().id(shardID).datastoreContext(nonPersistentContext).
2407                 schemaContext(SCHEMA_CONTEXT).props();
2408
2409         new ShardTestKit(getSystem()) {{
2410             final TestActorRef<Shard> shard1 = TestActorRef.create(getSystem(),
2411                     persistentProps, "testPersistence1");
2412
2413             assertTrue("Recovery Applicable", shard1.underlyingActor().persistence().isRecoveryApplicable());
2414
2415             shard1.tell(PoisonPill.getInstance(), ActorRef.noSender());
2416
2417             final TestActorRef<Shard> shard2 = TestActorRef.create(getSystem(),
2418                     nonPersistentProps, "testPersistence2");
2419
2420             assertFalse("Recovery Not Applicable", shard2.underlyingActor().persistence().isRecoveryApplicable());
2421
2422             shard2.tell(PoisonPill.getInstance(), ActorRef.noSender());
2423
2424         }};
2425
2426     }
2427
2428     @Test
2429     public void testOnDatastoreContext() {
2430         new ShardTestKit(getSystem()) {{
2431             dataStoreContextBuilder.persistent(true);
2432