Bug 3195: Cleanup on error paths and error handling
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / ShardTest.java
1 package org.opendaylight.controller.cluster.datastore;
2
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertFalse;
5 import static org.junit.Assert.assertNotNull;
6 import static org.junit.Assert.assertNull;
7 import static org.junit.Assert.assertSame;
8 import static org.junit.Assert.assertTrue;
9 import static org.mockito.Mockito.doReturn;
10 import static org.mockito.Mockito.inOrder;
11 import static org.mockito.Mockito.mock;
12 import static org.mockito.Mockito.reset;
13 import static org.opendaylight.controller.cluster.datastore.DataStoreVersions.CURRENT_VERSION;
14 import akka.actor.ActorRef;
15 import akka.actor.ActorSelection;
16 import akka.actor.PoisonPill;
17 import akka.actor.Props;
18 import akka.actor.Status.Failure;
19 import akka.dispatch.Dispatchers;
20 import akka.dispatch.OnComplete;
21 import akka.japi.Creator;
22 import akka.pattern.Patterns;
23 import akka.persistence.SaveSnapshotSuccess;
24 import akka.testkit.TestActorRef;
25 import akka.util.Timeout;
26 import com.google.common.base.Function;
27 import com.google.common.base.Optional;
28 import com.google.common.util.concurrent.Futures;
29 import com.google.common.util.concurrent.ListenableFuture;
30 import com.google.common.util.concurrent.Uninterruptibles;
31 import java.io.IOException;
32 import java.util.Collections;
33 import java.util.HashSet;
34 import java.util.Map;
35 import java.util.Set;
36 import java.util.concurrent.CountDownLatch;
37 import java.util.concurrent.TimeUnit;
38 import java.util.concurrent.atomic.AtomicBoolean;
39 import java.util.concurrent.atomic.AtomicReference;
40 import org.junit.Test;
41 import org.mockito.InOrder;
42 import org.opendaylight.controller.cluster.DataPersistenceProvider;
43 import org.opendaylight.controller.cluster.DelegatingPersistentDataProvider;
44 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
45 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
46 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
47 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
48 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
49 import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
50 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
51 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
52 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
53 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
54 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
55 import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
56 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
57 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
58 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
59 import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction;
60 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
61 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
62 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
63 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener;
64 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListenerReply;
65 import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
66 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
67 import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
68 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
69 import org.opendaylight.controller.cluster.datastore.modification.Modification;
70 import org.opendaylight.controller.cluster.datastore.modification.ModificationPayload;
71 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
72 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
73 import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener;
74 import org.opendaylight.controller.cluster.datastore.utils.MockDataTreeChangeListener;
75 import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
76 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
77 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListenerReply;
78 import org.opendaylight.controller.cluster.raft.RaftActorContext;
79 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
80 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
81 import org.opendaylight.controller.cluster.raft.Snapshot;
82 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
83 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
84 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
85 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
86 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
87 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
88 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
89 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
90 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
91 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
92 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
93 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
94 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
95 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
96 import org.opendaylight.controller.protobuff.messages.cohort3pc.ThreePhaseCommitCohortMessages;
97 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
98 import org.opendaylight.yangtools.yang.common.QName;
99 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
100 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
101 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
102 import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
103 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
104 import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
105 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
106 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
107 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
108 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateNode;
109 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateTip;
110 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidates;
111 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
112 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
113 import org.opendaylight.yangtools.yang.data.api.schema.tree.ModificationType;
114 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
115 import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
116 import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory;
117 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
118 import scala.concurrent.Await;
119 import scala.concurrent.Future;
120 import scala.concurrent.duration.FiniteDuration;
121
122 public class ShardTest extends AbstractShardTest {
123     private static final QName CARS_QNAME = QName.create("urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:test:cars", "2014-03-13", "cars");
124
125     private static final String DUMMY_DATA = "Dummy data as snapshot sequence number is set to 0 in InMemorySnapshotStore and journal recovery seq number will start from 1";
126
127     final CountDownLatch recoveryComplete = new CountDownLatch(1);
128
129     protected Props newShardPropsWithRecoveryComplete() {
130
131         final Creator<Shard> creator = new Creator<Shard>() {
132             @Override
133             public Shard create() throws Exception {
134                 return new Shard(shardID, Collections.<String,String>emptyMap(),
135                         newDatastoreContext(), SCHEMA_CONTEXT) {
136                     @Override
137                     protected void onRecoveryComplete() {
138                         try {
139                             super.onRecoveryComplete();
140                         } finally {
141                             recoveryComplete.countDown();
142                         }
143                     }
144                 };
145             }
146         };
147         return Props.create(new DelegatingShardCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId());
148     }
149
150     @Test
151     public void testRegisterChangeListener() throws Exception {
152         new ShardTestKit(getSystem()) {{
153             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
154                     newShardProps(),  "testRegisterChangeListener");
155
156             waitUntilLeader(shard);
157
158             shard.tell(new UpdateSchemaContext(SchemaContextHelper.full()), ActorRef.noSender());
159
160             final MockDataChangeListener listener = new MockDataChangeListener(1);
161             final ActorRef dclActor = getSystem().actorOf(DataChangeListener.props(listener),
162                     "testRegisterChangeListener-DataChangeListener");
163
164             shard.tell(new RegisterChangeListener(TestModel.TEST_PATH,
165                     dclActor, AsyncDataBroker.DataChangeScope.BASE), getRef());
166
167             final RegisterChangeListenerReply reply = expectMsgClass(duration("3 seconds"),
168                     RegisterChangeListenerReply.class);
169             final String replyPath = reply.getListenerRegistrationPath().toString();
170             assertTrue("Incorrect reply path: " + replyPath, replyPath.matches(
171                     "akka:\\/\\/test\\/user\\/testRegisterChangeListener\\/\\$.*"));
172
173             final YangInstanceIdentifier path = TestModel.TEST_PATH;
174             writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
175
176             listener.waitForChangeEvents(path);
177
178             dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
179             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
180         }};
181     }
182
183     @SuppressWarnings("serial")
184     @Test
185     public void testChangeListenerNotifiedWhenNotTheLeaderOnRegistration() throws Exception {
186         // This test tests the timing window in which a change listener is registered before the
187         // shard becomes the leader. We verify that the listener is registered and notified of the
188         // existing data when the shard becomes the leader.
189         new ShardTestKit(getSystem()) {{
190             // For this test, we want to send the RegisterChangeListener message after the shard
191             // has recovered from persistence and before it becomes the leader. So we subclass
192             // Shard to override onReceiveCommand and, when the first ElectionTimeout is received,
193             // we know that the shard has been initialized to a follower and has started the
194             // election process. The following 2 CountDownLatches are used to coordinate the
195             // ElectionTimeout with the sending of the RegisterChangeListener message.
196             final CountDownLatch onFirstElectionTimeout = new CountDownLatch(1);
197             final CountDownLatch onChangeListenerRegistered = new CountDownLatch(1);
198             final Creator<Shard> creator = new Creator<Shard>() {
199                 boolean firstElectionTimeout = true;
200
201                 @Override
202                 public Shard create() throws Exception {
203                     // Use a non persistent provider because this test actually invokes persist on the journal
204                     // this will cause all other messages to not be queued properly after that.
205                     // The basic issue is that you cannot use TestActorRef with a persistent actor (at least when
206                     // it does do a persist)
207                     return new Shard(shardID, Collections.<String,String>emptyMap(),
208                             dataStoreContextBuilder.persistent(false).build(), SCHEMA_CONTEXT) {
209                         @Override
210                         public void onReceiveCommand(final Object message) throws Exception {
211                             if(message instanceof ElectionTimeout && firstElectionTimeout) {
212                                 // Got the first ElectionTimeout. We don't forward it to the
213                                 // base Shard yet until we've sent the RegisterChangeListener
214                                 // message. So we signal the onFirstElectionTimeout latch to tell
215                                 // the main thread to send the RegisterChangeListener message and
216                                 // start a thread to wait on the onChangeListenerRegistered latch,
217                                 // which the main thread signals after it has sent the message.
218                                 // After the onChangeListenerRegistered is triggered, we send the
219                                 // original ElectionTimeout message to proceed with the election.
220                                 firstElectionTimeout = false;
221                                 final ActorRef self = getSelf();
222                                 new Thread() {
223                                     @Override
224                                     public void run() {
225                                         Uninterruptibles.awaitUninterruptibly(
226                                                 onChangeListenerRegistered, 5, TimeUnit.SECONDS);
227                                         self.tell(message, self);
228                                     }
229                                 }.start();
230
231                                 onFirstElectionTimeout.countDown();
232                             } else {
233                                 super.onReceiveCommand(message);
234                             }
235                         }
236                     };
237                 }
238             };
239
240             final MockDataChangeListener listener = new MockDataChangeListener(1);
241             final ActorRef dclActor = getSystem().actorOf(DataChangeListener.props(listener),
242                     "testRegisterChangeListenerWhenNotLeaderInitially-DataChangeListener");
243
244             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
245                     Props.create(new DelegatingShardCreator(creator)),
246                     "testRegisterChangeListenerWhenNotLeaderInitially");
247
248             // Write initial data into the in-memory store.
249             final YangInstanceIdentifier path = TestModel.TEST_PATH;
250             writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
251
252             // Wait until the shard receives the first ElectionTimeout message.
253             assertEquals("Got first ElectionTimeout", true,
254                     onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
255
256             // Now send the RegisterChangeListener and wait for the reply.
257             shard.tell(new RegisterChangeListener(path, dclActor,
258                     AsyncDataBroker.DataChangeScope.SUBTREE), getRef());
259
260             final RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
261                     RegisterChangeListenerReply.class);
262             assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
263
264             // Sanity check - verify the shard is not the leader yet.
265             shard.tell(new FindLeader(), getRef());
266             final FindLeaderReply findLeadeReply =
267                     expectMsgClass(duration("5 seconds"), FindLeaderReply.class);
268             assertNull("Expected the shard not to be the leader", findLeadeReply.getLeaderActor());
269
270             // Signal the onChangeListenerRegistered latch to tell the thread above to proceed
271             // with the election process.
272             onChangeListenerRegistered.countDown();
273
274             // Wait for the shard to become the leader and notify our listener with the existing
275             // data in the store.
276             listener.waitForChangeEvents(path);
277
278             dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
279             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
280         }};
281     }
282
283     @Test
284     public void testRegisterDataTreeChangeListener() throws Exception {
285         new ShardTestKit(getSystem()) {{
286             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
287                     newShardProps(), "testRegisterDataTreeChangeListener");
288
289             waitUntilLeader(shard);
290
291             shard.tell(new UpdateSchemaContext(SchemaContextHelper.full()), ActorRef.noSender());
292
293             final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
294             final ActorRef dclActor = getSystem().actorOf(DataTreeChangeListenerActor.props(listener),
295                     "testRegisterDataTreeChangeListener-DataTreeChangeListener");
296
297             shard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor), getRef());
298
299             final RegisterDataTreeChangeListenerReply reply = expectMsgClass(duration("3 seconds"),
300                     RegisterDataTreeChangeListenerReply.class);
301             final String replyPath = reply.getListenerRegistrationPath().toString();
302             assertTrue("Incorrect reply path: " + replyPath, replyPath.matches(
303                     "akka:\\/\\/test\\/user\\/testRegisterDataTreeChangeListener\\/\\$.*"));
304
305             final YangInstanceIdentifier path = TestModel.TEST_PATH;
306             writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
307
308             listener.waitForChangeEvents();
309
310             dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
311             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
312         }};
313     }
314
315     @SuppressWarnings("serial")
316     @Test
317     public void testDataTreeChangeListenerNotifiedWhenNotTheLeaderOnRegistration() throws Exception {
318         new ShardTestKit(getSystem()) {{
319             final CountDownLatch onFirstElectionTimeout = new CountDownLatch(1);
320             final CountDownLatch onChangeListenerRegistered = new CountDownLatch(1);
321             final Creator<Shard> creator = new Creator<Shard>() {
322                 boolean firstElectionTimeout = true;
323
324                 @Override
325                 public Shard create() throws Exception {
326                     return new Shard(shardID, Collections.<String,String>emptyMap(),
327                             dataStoreContextBuilder.persistent(false).build(), SCHEMA_CONTEXT) {
328                         @Override
329                         public void onReceiveCommand(final Object message) throws Exception {
330                             if(message instanceof ElectionTimeout && firstElectionTimeout) {
331                                 firstElectionTimeout = false;
332                                 final ActorRef self = getSelf();
333                                 new Thread() {
334                                     @Override
335                                     public void run() {
336                                         Uninterruptibles.awaitUninterruptibly(
337                                                 onChangeListenerRegistered, 5, TimeUnit.SECONDS);
338                                         self.tell(message, self);
339                                     }
340                                 }.start();
341
342                                 onFirstElectionTimeout.countDown();
343                             } else {
344                                 super.onReceiveCommand(message);
345                             }
346                         }
347                     };
348                 }
349             };
350
351             final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
352             final ActorRef dclActor = getSystem().actorOf(DataTreeChangeListenerActor.props(listener),
353                     "testDataTreeChangeListenerNotifiedWhenNotTheLeaderOnRegistration-DataChangeListener");
354
355             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
356                     Props.create(new DelegatingShardCreator(creator)),
357                     "testDataTreeChangeListenerNotifiedWhenNotTheLeaderOnRegistration");
358
359             final YangInstanceIdentifier path = TestModel.TEST_PATH;
360             writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
361
362             assertEquals("Got first ElectionTimeout", true,
363                     onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
364
365             shard.tell(new RegisterDataTreeChangeListener(path, dclActor), getRef());
366             final RegisterDataTreeChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
367                     RegisterDataTreeChangeListenerReply.class);
368             assertNotNull("getListenerRegistratioznPath", reply.getListenerRegistrationPath());
369
370             shard.tell(new FindLeader(), getRef());
371             final FindLeaderReply findLeadeReply =
372                     expectMsgClass(duration("5 seconds"), FindLeaderReply.class);
373             assertNull("Expected the shard not to be the leader", findLeadeReply.getLeaderActor());
374
375             writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
376
377             onChangeListenerRegistered.countDown();
378
379             // TODO: investigate why we do not receive data chage events
380             listener.waitForChangeEvents();
381
382             dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
383             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
384         }};
385     }
386
387     @Test
388     public void testCreateTransaction(){
389         new ShardTestKit(getSystem()) {{
390             final ActorRef shard = getSystem().actorOf(newShardProps(), "testCreateTransaction");
391
392             waitUntilLeader(shard);
393
394             shard.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
395
396             shard.tell(new CreateTransaction("txn-1",
397                     TransactionType.READ_ONLY.ordinal() ).toSerializable(), getRef());
398
399             final CreateTransactionReply reply = expectMsgClass(duration("3 seconds"),
400                     CreateTransactionReply.class);
401
402             final String path = reply.getTransactionActorPath().toString();
403             assertTrue("Unexpected transaction path " + path,
404                     path.contains("akka://test/user/testCreateTransaction/shard-txn-1"));
405
406             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
407         }};
408     }
409
410     @Test
411     public void testCreateTransactionOnChain(){
412         new ShardTestKit(getSystem()) {{
413             final ActorRef shard = getSystem().actorOf(newShardProps(), "testCreateTransactionOnChain");
414
415             waitUntilLeader(shard);
416
417             shard.tell(new CreateTransaction("txn-1",
418                     TransactionType.READ_ONLY.ordinal() , "foobar").toSerializable(),
419                     getRef());
420
421             final CreateTransactionReply reply = expectMsgClass(duration("3 seconds"),
422                     CreateTransactionReply.class);
423
424             final String path = reply.getTransactionActorPath().toString();
425             assertTrue("Unexpected transaction path " + path,
426                     path.contains("akka://test/user/testCreateTransactionOnChain/shard-txn-1"));
427
428             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
429         }};
430     }
431
432     @SuppressWarnings("serial")
433     @Test
434     public void testPeerAddressResolved() throws Exception {
435         new ShardTestKit(getSystem()) {{
436             final CountDownLatch recoveryComplete = new CountDownLatch(1);
437             class TestShard extends Shard {
438                 TestShard() {
439                     super(shardID, Collections.<String, String>singletonMap(shardID.toString(), null),
440                             newDatastoreContext(), SCHEMA_CONTEXT);
441                 }
442
443                 Map<String, String> getPeerAddresses() {
444                     return getRaftActorContext().getPeerAddresses();
445                 }
446
447                 @Override
448                 protected void onRecoveryComplete() {
449                     try {
450                         super.onRecoveryComplete();
451                     } finally {
452                         recoveryComplete.countDown();
453                     }
454                 }
455             }
456
457             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
458                     Props.create(new DelegatingShardCreator(new Creator<Shard>() {
459                         @Override
460                         public TestShard create() throws Exception {
461                             return new TestShard();
462                         }
463                     })), "testPeerAddressResolved");
464
465             //waitUntilLeader(shard);
466             assertEquals("Recovery complete", true,
467                     Uninterruptibles.awaitUninterruptibly(recoveryComplete, 5, TimeUnit.SECONDS));
468
469             final String address = "akka://foobar";
470             shard.underlyingActor().onReceiveCommand(new PeerAddressResolved(shardID.toString(), address));
471
472             assertEquals("getPeerAddresses", address,
473                     ((TestShard)shard.underlyingActor()).getPeerAddresses().get(shardID.toString()));
474
475             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
476         }};
477     }
478
479     @Test
480     public void testApplySnapshot() throws Exception {
481         final TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(),
482                 "testApplySnapshot");
483
484         final DataTree store = InMemoryDataTreeFactory.getInstance().create();
485         store.setSchemaContext(SCHEMA_CONTEXT);
486
487         final ContainerNode container = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
488                 new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
489                     withChild(ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).addChild(
490                         ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1)).build()).build();
491
492         writeToStore(store, TestModel.TEST_PATH, container);
493
494         final YangInstanceIdentifier root = YangInstanceIdentifier.builder().build();
495         final NormalizedNode<?,?> expected = readStore(store, root);
496
497         final Snapshot snapshot = Snapshot.create(SerializationUtils.serializeNormalizedNode(expected),
498                 Collections.<ReplicatedLogEntry>emptyList(), 1, 2, 3, 4);
499
500         shard.underlyingActor().getRaftActorSnapshotCohort().applySnapshot(snapshot.getState());
501
502         final NormalizedNode<?,?> actual = readStore(shard, root);
503
504         assertEquals("Root node", expected, actual);
505
506         shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
507     }
508
509     @Test
510     public void testApplyState() throws Exception {
511
512         final TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(), "testApplyState");
513
514         final NormalizedNode<?, ?> node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
515
516         final ApplyState applyState = new ApplyState(null, "test", new ReplicatedLogImplEntry(1, 2,
517                 newModificationPayload(new WriteModification(TestModel.TEST_PATH, node))));
518
519         shard.underlyingActor().onReceiveCommand(applyState);
520
521         final NormalizedNode<?,?> actual = readStore(shard, TestModel.TEST_PATH);
522         assertEquals("Applied state", node, actual);
523
524         shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
525     }
526
527     @Test
528     public void testApplyStateWithCandidatePayload() throws Exception {
529
530         final TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardPropsWithRecoveryComplete(), "testApplyState");
531
532         recoveryComplete.await(5,  TimeUnit.SECONDS);
533
534         final NormalizedNode<?, ?> node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
535         final DataTreeCandidate candidate = DataTreeCandidates.fromNormalizedNode(TestModel.TEST_PATH, node);
536
537         final ApplyState applyState = new ApplyState(null, "test", new ReplicatedLogImplEntry(1, 2,
538                 DataTreeCandidatePayload.create(candidate)));
539
540         shard.underlyingActor().onReceiveCommand(applyState);
541
542         final NormalizedNode<?,?> actual = readStore(shard, TestModel.TEST_PATH);
543         assertEquals("Applied state", node, actual);
544
545         shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
546     }
547
548     DataTree setupInMemorySnapshotStore() throws DataValidationFailedException {
549         final DataTree testStore = InMemoryDataTreeFactory.getInstance().create();
550         testStore.setSchemaContext(SCHEMA_CONTEXT);
551
552         writeToStore(testStore, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
553
554         final NormalizedNode<?, ?> root = readStore(testStore, YangInstanceIdentifier.builder().build());
555
556         InMemorySnapshotStore.addSnapshot(shardID.toString(), Snapshot.create(
557                 SerializationUtils.serializeNormalizedNode(root),
558                 Collections.<ReplicatedLogEntry>emptyList(), 0, 1, -1, -1));
559         return testStore;
560     }
561
562     private static DataTreeCandidatePayload payloadForModification(final DataTree source, final DataTreeModification mod) throws DataValidationFailedException {
563         source.validate(mod);
564         final DataTreeCandidate candidate = source.prepare(mod);
565         source.commit(candidate);
566         return DataTreeCandidatePayload.create(candidate);
567     }
568
569     @Test
570     public void testDataTreeCandidateRecovery() throws Exception {
571         // Set up the InMemorySnapshotStore.
572         final DataTree source = setupInMemorySnapshotStore();
573
574         final DataTreeModification writeMod = source.takeSnapshot().newModification();
575         writeMod.write(TestModel.OUTER_LIST_PATH, ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
576         writeMod.ready();
577         InMemoryJournal.addEntry(shardID.toString(), 0, DUMMY_DATA);
578
579         // Set up the InMemoryJournal.
580         InMemoryJournal.addEntry(shardID.toString(), 1, new ReplicatedLogImplEntry(0, 1, payloadForModification(source, writeMod)));
581
582         final int nListEntries = 16;
583         final Set<Integer> listEntryKeys = new HashSet<>();
584
585         // Add some ModificationPayload entries
586         for (int i = 1; i <= nListEntries; i++) {
587             listEntryKeys.add(Integer.valueOf(i));
588
589             final YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
590                     .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
591
592             final DataTreeModification mod = source.takeSnapshot().newModification();
593             mod.merge(path, ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i));
594             mod.ready();
595             InMemoryJournal.addEntry(shardID.toString(), i+1, new ReplicatedLogImplEntry(i, 1,
596                 payloadForModification(source, mod)));
597         }
598
599         InMemoryJournal.addEntry(shardID.toString(), nListEntries + 2,
600                 new ApplyJournalEntries(nListEntries));
601
602         testRecovery(listEntryKeys);
603     }
604
605     @Test
606     public void testModicationRecovery() throws Exception {
607
608         // Set up the InMemorySnapshotStore.
609         setupInMemorySnapshotStore();
610
611         // Set up the InMemoryJournal.
612
613         InMemoryJournal.addEntry(shardID.toString(), 0, DUMMY_DATA);
614
615         InMemoryJournal.addEntry(shardID.toString(), 1, new ReplicatedLogImplEntry(0, 1, newModificationPayload(
616                   new WriteModification(TestModel.OUTER_LIST_PATH,
617                           ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build()))));
618
619         final int nListEntries = 16;
620         final Set<Integer> listEntryKeys = new HashSet<>();
621
622         // Add some ModificationPayload entries
623         for(int i = 1; i <= nListEntries; i++) {
624             listEntryKeys.add(Integer.valueOf(i));
625             final YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
626                     .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
627             final Modification mod = new MergeModification(path,
628                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i));
629             InMemoryJournal.addEntry(shardID.toString(), i + 1, new ReplicatedLogImplEntry(i, 1,
630                     newModificationPayload(mod)));
631         }
632
633         InMemoryJournal.addEntry(shardID.toString(), nListEntries + 2,
634                 new ApplyJournalEntries(nListEntries));
635
636         testRecovery(listEntryKeys);
637     }
638
639     private static ModificationPayload newModificationPayload(final Modification... mods) throws IOException {
640         final MutableCompositeModification compMod = new MutableCompositeModification();
641         for(final Modification mod: mods) {
642             compMod.addModification(mod);
643         }
644
645         return new ModificationPayload(compMod);
646     }
647
648     @Test
649     public void testConcurrentThreePhaseCommits() throws Throwable {
650         new ShardTestKit(getSystem()) {{
651             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
652                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
653                     "testConcurrentThreePhaseCommits");
654
655             waitUntilLeader(shard);
656
657          // Setup 3 simulated transactions with mock cohorts backed by real cohorts.
658
659             final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
660
661             final String transactionID1 = "tx1";
662             final MutableCompositeModification modification1 = new MutableCompositeModification();
663             final ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
664                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
665
666             final String transactionID2 = "tx2";
667             final MutableCompositeModification modification2 = new MutableCompositeModification();
668             final ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
669                     TestModel.OUTER_LIST_PATH,
670                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
671                     modification2);
672
673             final String transactionID3 = "tx3";
674             final MutableCompositeModification modification3 = new MutableCompositeModification();
675             final ShardDataTreeCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
676                     YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
677                         .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
678                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1),
679                     modification3);
680
681             final long timeoutSec = 5;
682             final FiniteDuration duration = FiniteDuration.create(timeoutSec, TimeUnit.SECONDS);
683             final Timeout timeout = new Timeout(duration);
684
685             // Simulate the ForwardedReadyTransaction message for the first Tx that would be sent
686             // by the ShardTransaction.
687
688             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
689                     cohort1, modification1, true, false), getRef());
690             final ReadyTransactionReply readyReply = ReadyTransactionReply.fromSerializable(
691                     expectMsgClass(duration, ReadyTransactionReply.class));
692             assertEquals("Cohort path", shard.path().toString(), readyReply.getCohortPath());
693
694             // Send the CanCommitTransaction message for the first Tx.
695
696             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
697             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
698                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
699             assertEquals("Can commit", true, canCommitReply.getCanCommit());
700
701             // Send the ForwardedReadyTransaction for the next 2 Tx's.
702
703             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
704                     cohort2, modification2, true, false), getRef());
705             expectMsgClass(duration, ReadyTransactionReply.class);
706
707             shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
708                     cohort3, modification3, true, false), getRef());
709             expectMsgClass(duration, ReadyTransactionReply.class);
710
711             // Send the CanCommitTransaction message for the next 2 Tx's. These should get queued and
712             // processed after the first Tx completes.
713
714             final Future<Object> canCommitFuture1 = Patterns.ask(shard,
715                     new CanCommitTransaction(transactionID2).toSerializable(), timeout);
716
717             final Future<Object> canCommitFuture2 = Patterns.ask(shard,
718                     new CanCommitTransaction(transactionID3).toSerializable(), timeout);
719
720             // Send the CommitTransaction message for the first Tx. After it completes, it should
721             // trigger the 2nd Tx to proceed which should in turn then trigger the 3rd.
722
723             shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
724             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
725
726             // Wait for the next 2 Tx's to complete.
727
728             final AtomicReference<Throwable> caughtEx = new AtomicReference<>();
729             final CountDownLatch commitLatch = new CountDownLatch(2);
730
731             class OnFutureComplete extends OnComplete<Object> {
732                 private final Class<?> expRespType;
733
734                 OnFutureComplete(final Class<?> expRespType) {
735                     this.expRespType = expRespType;
736                 }
737
738                 @Override
739                 public void onComplete(final Throwable error, final Object resp) {
740                     if(error != null) {
741                         caughtEx.set(new AssertionError(getClass().getSimpleName() + " failure", error));
742                     } else {
743                         try {
744                             assertEquals("Commit response type", expRespType, resp.getClass());
745                             onSuccess(resp);
746                         } catch (final Exception e) {
747                             caughtEx.set(e);
748                         }
749                     }
750                 }
751
752                 void onSuccess(final Object resp) throws Exception {
753                 }
754             }
755
756             class OnCommitFutureComplete extends OnFutureComplete {
757                 OnCommitFutureComplete() {
758                     super(CommitTransactionReply.SERIALIZABLE_CLASS);
759                 }
760
761                 @Override
762                 public void onComplete(final Throwable error, final Object resp) {
763                     super.onComplete(error, resp);
764                     commitLatch.countDown();
765                 }
766             }
767
768             class OnCanCommitFutureComplete extends OnFutureComplete {
769                 private final String transactionID;
770
771                 OnCanCommitFutureComplete(final String transactionID) {
772                     super(CanCommitTransactionReply.SERIALIZABLE_CLASS);
773                     this.transactionID = transactionID;
774                 }
775
776                 @Override
777                 void onSuccess(final Object resp) throws Exception {
778                     final CanCommitTransactionReply canCommitReply =
779                             CanCommitTransactionReply.fromSerializable(resp);
780                     assertEquals("Can commit", true, canCommitReply.getCanCommit());
781
782                     final Future<Object> commitFuture = Patterns.ask(shard,
783                             new CommitTransaction(transactionID).toSerializable(), timeout);
784                     commitFuture.onComplete(new OnCommitFutureComplete(), getSystem().dispatcher());
785                 }
786             }
787
788             canCommitFuture1.onComplete(new OnCanCommitFutureComplete(transactionID2),
789                     getSystem().dispatcher());
790
791             canCommitFuture2.onComplete(new OnCanCommitFutureComplete(transactionID3),
792                     getSystem().dispatcher());
793
794             final boolean done = commitLatch.await(timeoutSec, TimeUnit.SECONDS);
795
796             if(caughtEx.get() != null) {
797                 throw caughtEx.get();
798             }
799
800             assertEquals("Commits complete", true, done);
801
802             final InOrder inOrder = inOrder(cohort1, cohort2, cohort3);
803             inOrder.verify(cohort1).canCommit();
804             inOrder.verify(cohort1).preCommit();
805             inOrder.verify(cohort1).commit();
806             inOrder.verify(cohort2).canCommit();
807             inOrder.verify(cohort2).preCommit();
808             inOrder.verify(cohort2).commit();
809             inOrder.verify(cohort3).canCommit();
810             inOrder.verify(cohort3).preCommit();
811             inOrder.verify(cohort3).commit();
812
813             // Verify data in the data store.
814
815             verifyOuterListEntry(shard, 1);
816
817             verifyLastApplied(shard, 2);
818
819             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
820         }};
821     }
822
823     private static BatchedModifications newBatchedModifications(final String transactionID, final YangInstanceIdentifier path,
824             final NormalizedNode<?, ?> data, final boolean ready, final boolean doCommitOnReady, final int messagesSent) {
825         return newBatchedModifications(transactionID, null, path, data, ready, doCommitOnReady, messagesSent);
826     }
827
828     private static BatchedModifications newBatchedModifications(final String transactionID, final String transactionChainID,
829             final YangInstanceIdentifier path, final NormalizedNode<?, ?> data, final boolean ready, final boolean doCommitOnReady,
830             final int messagesSent) {
831         final BatchedModifications batched = new BatchedModifications(transactionID, CURRENT_VERSION, transactionChainID);
832         batched.addModification(new WriteModification(path, data));
833         batched.setReady(ready);
834         batched.setDoCommitOnReady(doCommitOnReady);
835         batched.setTotalMessagesSent(messagesSent);
836         return batched;
837     }
838
839     @Test
840     public void testBatchedModificationsWithNoCommitOnReady() throws Throwable {
841         new ShardTestKit(getSystem()) {{
842             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
843                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
844                     "testBatchedModificationsWithNoCommitOnReady");
845
846             waitUntilLeader(shard);
847
848             final String transactionID = "tx";
849             final FiniteDuration duration = duration("5 seconds");
850
851             final AtomicReference<ShardDataTreeCohort> mockCohort = new AtomicReference<>();
852             final ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() {
853                 @Override
854                 public ShardDataTreeCohort decorate(final String txID, final ShardDataTreeCohort actual) {
855                     if(mockCohort.get() == null) {
856                         mockCohort.set(createDelegatingMockCohort("cohort", actual));
857                     }
858
859                     return mockCohort.get();
860                 }
861             };
862
863             shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator);
864
865             // Send a BatchedModifications to start a transaction.
866
867             shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH,
868                     ImmutableNodes.containerNode(TestModel.TEST_QNAME), false, false, 1), getRef());
869             expectMsgClass(duration, BatchedModificationsReply.class);
870
871             // Send a couple more BatchedModifications.
872
873             shard.tell(newBatchedModifications(transactionID, TestModel.OUTER_LIST_PATH,
874                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), false, false, 2), getRef());
875             expectMsgClass(duration, BatchedModificationsReply.class);
876
877             shard.tell(newBatchedModifications(transactionID, YangInstanceIdentifier.builder(
878                     TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
879                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true, false, 3), getRef());
880             expectMsgClass(duration, ReadyTransactionReply.class);
881
882             // Send the CanCommitTransaction message.
883
884             shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
885             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
886                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
887             assertEquals("Can commit", true, canCommitReply.getCanCommit());
888
889             // Send the CanCommitTransaction message.
890
891             shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
892             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
893
894             final InOrder inOrder = inOrder(mockCohort.get());
895             inOrder.verify(mockCohort.get()).canCommit();
896             inOrder.verify(mockCohort.get()).preCommit();
897             inOrder.verify(mockCohort.get()).commit();
898
899             // Verify data in the data store.
900
901             verifyOuterListEntry(shard, 1);
902
903             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
904         }};
905     }
906
907     @Test
908     public void testBatchedModificationsWithCommitOnReady() throws Throwable {
909         new ShardTestKit(getSystem()) {{
910             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
911                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
912                     "testBatchedModificationsWithCommitOnReady");
913
914             waitUntilLeader(shard);
915
916             final String transactionID = "tx";
917             final FiniteDuration duration = duration("5 seconds");
918
919             final AtomicReference<ShardDataTreeCohort> mockCohort = new AtomicReference<>();
920             final ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() {
921                 @Override
922                 public ShardDataTreeCohort decorate(final String txID, final ShardDataTreeCohort actual) {
923                     if(mockCohort.get() == null) {
924                         mockCohort.set(createDelegatingMockCohort("cohort", actual));
925                     }
926
927                     return mockCohort.get();
928                 }
929             };
930
931             shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator);
932
933             // Send a BatchedModifications to start a transaction.
934
935             shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH,
936                     ImmutableNodes.containerNode(TestModel.TEST_QNAME), false, false, 1), getRef());
937             expectMsgClass(duration, BatchedModificationsReply.class);
938
939             // Send a couple more BatchedModifications.
940
941             shard.tell(newBatchedModifications(transactionID, TestModel.OUTER_LIST_PATH,
942                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), false, false, 2), getRef());
943             expectMsgClass(duration, BatchedModificationsReply.class);
944
945             shard.tell(newBatchedModifications(transactionID, YangInstanceIdentifier.builder(
946                     TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
947                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true, true, 3), getRef());
948
949             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
950
951             final InOrder inOrder = inOrder(mockCohort.get());
952             inOrder.verify(mockCohort.get()).canCommit();
953             inOrder.verify(mockCohort.get()).preCommit();
954             inOrder.verify(mockCohort.get()).commit();
955
956             // Verify data in the data store.
957
958             verifyOuterListEntry(shard, 1);
959
960             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
961         }};
962     }
963
964     @Test(expected=IllegalStateException.class)
965     public void testBatchedModificationsReadyWithIncorrectTotalMessageCount() throws Throwable {
966         new ShardTestKit(getSystem()) {{
967             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
968                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
969                     "testBatchedModificationsReadyWithIncorrectTotalMessageCount");
970
971             waitUntilLeader(shard);
972
973             final String transactionID = "tx1";
974             final BatchedModifications batched = new BatchedModifications(transactionID, DataStoreVersions.CURRENT_VERSION, null);
975             batched.setReady(true);
976             batched.setTotalMessagesSent(2);
977
978             shard.tell(batched, getRef());
979
980             final Failure failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
981
982             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
983
984             if(failure != null) {
985                 throw failure.cause();
986             }
987         }};
988     }
989
990     @Test
991     public void testBatchedModificationsWithOperationFailure() throws Throwable {
992         new ShardTestKit(getSystem()) {{
993             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
994                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
995                     "testBatchedModificationsWithOperationFailure");
996
997             waitUntilLeader(shard);
998
999             // Test merge with invalid data. An exception should occur when the merge is applied. Note that
1000             // write will not validate the children for performance reasons.
1001
1002             String transactionID = "tx1";
1003
1004             ContainerNode invalidData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
1005                     new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
1006                         withChild(ImmutableNodes.leafNode(TestModel.JUNK_QNAME, "junk")).build();
1007
1008             BatchedModifications batched = new BatchedModifications(transactionID, CURRENT_VERSION, null);
1009             batched.addModification(new MergeModification(TestModel.TEST_PATH, invalidData));
1010             shard.tell(batched, getRef());
1011             Failure failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
1012
1013             Throwable cause = failure.cause();
1014
1015             batched = new BatchedModifications(transactionID, DataStoreVersions.CURRENT_VERSION, null);
1016             batched.setReady(true);
1017             batched.setTotalMessagesSent(2);
1018
1019             shard.tell(batched, getRef());
1020
1021             failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
1022             assertEquals("Failure cause", cause, failure.cause());
1023
1024             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1025         }};
1026     }
1027
1028     @SuppressWarnings("unchecked")
1029     private static void verifyOuterListEntry(final TestActorRef<Shard> shard, final Object expIDValue) throws Exception {
1030         final NormalizedNode<?, ?> outerList = readStore(shard, TestModel.OUTER_LIST_PATH);
1031         assertNotNull(TestModel.OUTER_LIST_QNAME.getLocalName() + " not found", outerList);
1032         assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " value is not Iterable",
1033                 outerList.getValue() instanceof Iterable);
1034         final Object entry = ((Iterable<Object>)outerList.getValue()).iterator().next();
1035         assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " entry is not MapEntryNode",
1036                 entry instanceof MapEntryNode);
1037         final MapEntryNode mapEntry = (MapEntryNode)entry;
1038         final Optional<DataContainerChild<? extends PathArgument, ?>> idLeaf =
1039                 mapEntry.getChild(new YangInstanceIdentifier.NodeIdentifier(TestModel.ID_QNAME));
1040         assertTrue("Missing leaf " + TestModel.ID_QNAME.getLocalName(), idLeaf.isPresent());
1041         assertEquals(TestModel.ID_QNAME.getLocalName() + " value", expIDValue, idLeaf.get().getValue());
1042     }
1043
1044     @Test
1045     public void testBatchedModificationsOnTransactionChain() throws Throwable {
1046         new ShardTestKit(getSystem()) {{
1047             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1048                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1049                     "testBatchedModificationsOnTransactionChain");
1050
1051             waitUntilLeader(shard);
1052
1053             final String transactionChainID = "txChain";
1054             final String transactionID1 = "tx1";
1055             final String transactionID2 = "tx2";
1056
1057             final FiniteDuration duration = duration("5 seconds");
1058
1059             // Send a BatchedModifications to start a chained write transaction and ready it.
1060
1061             final ContainerNode containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1062             final YangInstanceIdentifier path = TestModel.TEST_PATH;
1063             shard.tell(newBatchedModifications(transactionID1, transactionChainID, path,
1064                     containerNode, true, false, 1), getRef());
1065             expectMsgClass(duration, ReadyTransactionReply.class);
1066
1067             // Create a read Tx on the same chain.
1068
1069             shard.tell(new CreateTransaction(transactionID2, TransactionType.READ_ONLY.ordinal() ,
1070                     transactionChainID).toSerializable(), getRef());
1071
1072             final CreateTransactionReply createReply = expectMsgClass(duration("3 seconds"), CreateTransactionReply.class);
1073
1074             getSystem().actorSelection(createReply.getTransactionActorPath()).tell(new ReadData(path), getRef());
1075             final ReadDataReply readReply = expectMsgClass(duration("3 seconds"), ReadDataReply.class);
1076             assertEquals("Read node", containerNode, readReply.getNormalizedNode());
1077
1078             // Commit the write transaction.
1079
1080             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1081             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1082                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1083             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1084
1085             shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
1086             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1087
1088             // Verify data in the data store.
1089
1090             final NormalizedNode<?, ?> actualNode = readStore(shard, path);
1091             assertEquals("Stored node", containerNode, actualNode);
1092
1093             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1094         }};
1095     }
1096
1097     @Test
1098     public void testOnBatchedModificationsWhenNotLeader() {
1099         final AtomicBoolean overrideLeaderCalls = new AtomicBoolean();
1100         new ShardTestKit(getSystem()) {{
1101             final Creator<Shard> creator = new Creator<Shard>() {
1102                 private static final long serialVersionUID = 1L;
1103
1104                 @Override
1105                 public Shard create() throws Exception {
1106                     return new Shard(shardID, Collections.<String,String>emptyMap(),
1107                             newDatastoreContext(), SCHEMA_CONTEXT) {
1108                         @Override
1109                         protected boolean isLeader() {
1110                             return overrideLeaderCalls.get() ? false : super.isLeader();
1111                         }
1112
1113                         @Override
1114                         protected ActorSelection getLeader() {
1115                             return overrideLeaderCalls.get() ? getSystem().actorSelection(getRef().path()) :
1116                                 super.getLeader();
1117                         }
1118                     };
1119                 }
1120             };
1121
1122             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1123                     Props.create(new DelegatingShardCreator(creator)), "testOnBatchedModificationsWhenNotLeader");
1124
1125             waitUntilLeader(shard);
1126
1127             overrideLeaderCalls.set(true);
1128
1129             final BatchedModifications batched = new BatchedModifications("tx", DataStoreVersions.CURRENT_VERSION, "");
1130
1131             shard.tell(batched, ActorRef.noSender());
1132
1133             expectMsgEquals(batched);
1134
1135             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1136         }};
1137     }
1138
1139     @Test
1140     public void testForwardedReadyTransactionWithImmediateCommit() throws Exception{
1141         new ShardTestKit(getSystem()) {{
1142             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1143                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1144                     "testForwardedReadyTransactionWithImmediateCommit");
1145
1146             waitUntilLeader(shard);
1147
1148             final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1149
1150             final String transactionID = "tx1";
1151             final MutableCompositeModification modification = new MutableCompositeModification();
1152             final NormalizedNode<?, ?> containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1153             final ShardDataTreeCohort cohort = setupMockWriteTransaction("cohort", dataStore,
1154                     TestModel.TEST_PATH, containerNode, modification);
1155
1156             final FiniteDuration duration = duration("5 seconds");
1157
1158             // Simulate the ForwardedReadyTransaction messages that would be sent
1159             // by the ShardTransaction.
1160
1161             shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
1162                     cohort, modification, true, true), getRef());
1163
1164             expectMsgClass(duration, ThreePhaseCommitCohortMessages.CommitTransactionReply.class);
1165
1166             final InOrder inOrder = inOrder(cohort);
1167             inOrder.verify(cohort).canCommit();
1168             inOrder.verify(cohort).preCommit();
1169             inOrder.verify(cohort).commit();
1170
1171             final NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.TEST_PATH);
1172             assertEquals(TestModel.TEST_QNAME.getLocalName(), containerNode, actualNode);
1173
1174             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1175         }};
1176     }
1177
1178     @Test
1179     public void testReadyLocalTransactionWithImmediateCommit() throws Exception{
1180         new ShardTestKit(getSystem()) {{
1181             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1182                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1183                     "testReadyLocalTransactionWithImmediateCommit");
1184
1185             waitUntilLeader(shard);
1186
1187             final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1188
1189             final DataTreeModification modification = dataStore.getDataTree().takeSnapshot().newModification();
1190
1191             final ContainerNode writeData = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1192             new WriteModification(TestModel.TEST_PATH, writeData).apply(modification);
1193             final MapNode mergeData = ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build();
1194             new MergeModification(TestModel.OUTER_LIST_PATH, mergeData).apply(modification);
1195
1196             final String txId = "tx1";
1197             modification.ready();
1198             final ReadyLocalTransaction readyMessage = new ReadyLocalTransaction(txId, modification, true);
1199
1200             shard.tell(readyMessage, getRef());
1201
1202             expectMsgClass(CommitTransactionReply.SERIALIZABLE_CLASS);
1203
1204             final NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.OUTER_LIST_PATH);
1205             assertEquals(TestModel.OUTER_LIST_QNAME.getLocalName(), mergeData, actualNode);
1206
1207             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1208         }};
1209     }
1210
1211     @Test
1212     public void testReadyLocalTransactionWithThreePhaseCommit() throws Exception{
1213         new ShardTestKit(getSystem()) {{
1214             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1215                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1216                     "testReadyLocalTransactionWithThreePhaseCommit");
1217
1218             waitUntilLeader(shard);
1219
1220             final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1221
1222             final DataTreeModification modification = dataStore.getDataTree().takeSnapshot().newModification();
1223
1224             final ContainerNode writeData = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1225             new WriteModification(TestModel.TEST_PATH, writeData).apply(modification);
1226             final MapNode mergeData = ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build();
1227             new MergeModification(TestModel.OUTER_LIST_PATH, mergeData).apply(modification);
1228
1229             final String txId = "tx1";
1230                 modification.ready();
1231             final ReadyLocalTransaction readyMessage = new ReadyLocalTransaction(txId, modification, false);
1232
1233             shard.tell(readyMessage, getRef());
1234
1235             expectMsgClass(ReadyTransactionReply.class);
1236
1237             // Send the CanCommitTransaction message.
1238
1239             shard.tell(new CanCommitTransaction(txId).toSerializable(), getRef());
1240             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1241                     expectMsgClass(CanCommitTransactionReply.SERIALIZABLE_CLASS));
1242             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1243
1244             // Send the CanCommitTransaction message.
1245
1246             shard.tell(new CommitTransaction(txId).toSerializable(), getRef());
1247             expectMsgClass(CommitTransactionReply.SERIALIZABLE_CLASS);
1248
1249             final NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.OUTER_LIST_PATH);
1250             assertEquals(TestModel.OUTER_LIST_QNAME.getLocalName(), mergeData, actualNode);
1251
1252             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1253         }};
1254     }
1255
1256     @Test
1257     public void testCommitWithPersistenceDisabled() throws Throwable {
1258         dataStoreContextBuilder.persistent(false);
1259         new ShardTestKit(getSystem()) {{
1260             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1261                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1262                     "testCommitWithPersistenceDisabled");
1263
1264             waitUntilLeader(shard);
1265
1266             final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1267
1268             // Setup a simulated transactions with a mock cohort.
1269
1270             final String transactionID = "tx";
1271             final MutableCompositeModification modification = new MutableCompositeModification();
1272             final NormalizedNode<?, ?> containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1273             final ShardDataTreeCohort cohort = setupMockWriteTransaction("cohort", dataStore,
1274                     TestModel.TEST_PATH, containerNode, modification);
1275
1276             final FiniteDuration duration = duration("5 seconds");
1277
1278             // Simulate the ForwardedReadyTransaction messages that would be sent
1279             // by the ShardTransaction.
1280
1281             shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
1282                     cohort, modification, true, false), getRef());
1283             expectMsgClass(duration, ReadyTransactionReply.class);
1284
1285             // Send the CanCommitTransaction message.
1286
1287             shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
1288             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1289                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1290             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1291
1292             // Send the CanCommitTransaction message.
1293
1294             shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
1295             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1296
1297             final InOrder inOrder = inOrder(cohort);
1298             inOrder.verify(cohort).canCommit();
1299             inOrder.verify(cohort).preCommit();
1300             inOrder.verify(cohort).commit();
1301
1302             final NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.TEST_PATH);
1303             assertEquals(TestModel.TEST_QNAME.getLocalName(), containerNode, actualNode);
1304
1305             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1306         }};
1307     }
1308
1309     private static DataTreeCandidateTip mockCandidate(final String name) {
1310         final DataTreeCandidateTip mockCandidate = mock(DataTreeCandidateTip.class, name);
1311         final DataTreeCandidateNode mockCandidateNode = mock(DataTreeCandidateNode.class, name + "-node");
1312         doReturn(ModificationType.WRITE).when(mockCandidateNode).getModificationType();
1313         doReturn(Optional.of(ImmutableNodes.containerNode(CARS_QNAME))).when(mockCandidateNode).getDataAfter();
1314         doReturn(YangInstanceIdentifier.builder().build()).when(mockCandidate).getRootPath();
1315         doReturn(mockCandidateNode).when(mockCandidate).getRootNode();
1316         return mockCandidate;
1317     }
1318
1319     private static DataTreeCandidateTip mockUnmodifiedCandidate(final String name) {
1320         final DataTreeCandidateTip mockCandidate = mock(DataTreeCandidateTip.class, name);
1321         final DataTreeCandidateNode mockCandidateNode = mock(DataTreeCandidateNode.class, name + "-node");
1322         doReturn(ModificationType.UNMODIFIED).when(mockCandidateNode).getModificationType();
1323         doReturn(YangInstanceIdentifier.builder().build()).when(mockCandidate).getRootPath();
1324         doReturn(mockCandidateNode).when(mockCandidate).getRootNode();
1325         return mockCandidate;
1326     }
1327
1328     @Test
1329     public void testCommitWhenTransactionHasNoModifications(){
1330         // Note that persistence is enabled which would normally result in the entry getting written to the journal
1331         // but here that need not happen
1332         new ShardTestKit(getSystem()) {
1333             {
1334                 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1335                         newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1336                         "testCommitWhenTransactionHasNoModifications");
1337
1338                 waitUntilLeader(shard);
1339
1340                 final String transactionID = "tx1";
1341                 final MutableCompositeModification modification = new MutableCompositeModification();
1342                 final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1343                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1344                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).preCommit();
1345                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).commit();
1346                 doReturn(mockUnmodifiedCandidate("cohort1-candidate")).when(cohort).getCandidate();
1347
1348                 final FiniteDuration duration = duration("5 seconds");
1349
1350                 // Simulate the ForwardedReadyTransaction messages that would be sent
1351                 // by the ShardTransaction.
1352
1353                 shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
1354                         cohort, modification, true, false), getRef());
1355                 expectMsgClass(duration, ReadyTransactionReply.class);
1356
1357                 // Send the CanCommitTransaction message.
1358
1359                 shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
1360                 final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1361                         expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1362                 assertEquals("Can commit", true, canCommitReply.getCanCommit());
1363
1364                 shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
1365                 expectMsgClass(duration, ThreePhaseCommitCohortMessages.CommitTransactionReply.class);
1366
1367                 final InOrder inOrder = inOrder(cohort);
1368                 inOrder.verify(cohort).canCommit();
1369                 inOrder.verify(cohort).preCommit();
1370                 inOrder.verify(cohort).commit();
1371
1372                 shard.tell(Shard.GET_SHARD_MBEAN_MESSAGE, getRef());
1373                 final ShardStats shardStats = expectMsgClass(duration, ShardStats.class);
1374
1375                 // Use MBean for verification
1376                 // Committed transaction count should increase as usual
1377                 assertEquals(1,shardStats.getCommittedTransactionsCount());
1378
1379                 // Commit index should not advance because this does not go into the journal
1380                 assertEquals(-1, shardStats.getCommitIndex());
1381
1382                 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1383
1384             }
1385         };
1386     }
1387
1388     @Test
1389     public void testCommitWhenTransactionHasModifications(){
1390         new ShardTestKit(getSystem()) {
1391             {
1392                 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1393                         newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1394                         "testCommitWhenTransactionHasModifications");
1395
1396                 waitUntilLeader(shard);
1397
1398                 final String transactionID = "tx1";
1399                 final MutableCompositeModification modification = new MutableCompositeModification();
1400                 modification.addModification(new DeleteModification(YangInstanceIdentifier.builder().build()));
1401                 final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1402                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1403                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).preCommit();
1404                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).commit();
1405                 doReturn(mockCandidate("cohort1-candidate")).when(cohort).getCandidate();
1406
1407                 final FiniteDuration duration = duration("5 seconds");
1408
1409                 // Simulate the ForwardedReadyTransaction messages that would be sent
1410                 // by the ShardTransaction.
1411
1412                 shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
1413                         cohort, modification, true, false), getRef());
1414                 expectMsgClass(duration, ReadyTransactionReply.class);
1415
1416                 // Send the CanCommitTransaction message.
1417
1418                 shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
1419                 final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1420                         expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1421                 assertEquals("Can commit", true, canCommitReply.getCanCommit());
1422
1423                 shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
1424                 expectMsgClass(duration, ThreePhaseCommitCohortMessages.CommitTransactionReply.class);
1425
1426                 final InOrder inOrder = inOrder(cohort);
1427                 inOrder.verify(cohort).canCommit();
1428                 inOrder.verify(cohort).preCommit();
1429                 inOrder.verify(cohort).commit();
1430
1431                 shard.tell(Shard.GET_SHARD_MBEAN_MESSAGE, getRef());
1432                 final ShardStats shardStats = expectMsgClass(duration, ShardStats.class);
1433
1434                 // Use MBean for verification
1435                 // Committed transaction count should increase as usual
1436                 assertEquals(1, shardStats.getCommittedTransactionsCount());
1437
1438                 // Commit index should advance as we do not have an empty modification
1439                 assertEquals(0, shardStats.getCommitIndex());
1440
1441                 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1442
1443             }
1444         };
1445     }
1446
1447     @Test
1448     public void testCommitPhaseFailure() throws Throwable {
1449         new ShardTestKit(getSystem()) {{
1450             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1451                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1452                     "testCommitPhaseFailure");
1453
1454             waitUntilLeader(shard);
1455
1456             // Setup 2 simulated transactions with mock cohorts. The first one fails in the
1457             // commit phase.
1458
1459             final String transactionID1 = "tx1";
1460             final MutableCompositeModification modification1 = new MutableCompositeModification();
1461             final ShardDataTreeCohort cohort1 = mock(ShardDataTreeCohort.class, "cohort1");
1462             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
1463             doReturn(Futures.immediateFuture(null)).when(cohort1).preCommit();
1464             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort1).commit();
1465             doReturn(mockCandidate("cohort1-candidate")).when(cohort1).getCandidate();
1466
1467             final String transactionID2 = "tx2";
1468             final MutableCompositeModification modification2 = new MutableCompositeModification();
1469             final ShardDataTreeCohort cohort2 = mock(ShardDataTreeCohort.class, "cohort2");
1470             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
1471
1472             final FiniteDuration duration = duration("5 seconds");
1473             final Timeout timeout = new Timeout(duration);
1474
1475             // Simulate the ForwardedReadyTransaction messages that would be sent
1476             // by the ShardTransaction.
1477
1478             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1479                     cohort1, modification1, true, false), getRef());
1480             expectMsgClass(duration, ReadyTransactionReply.class);
1481
1482             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1483                     cohort2, modification2, true, false), getRef());
1484             expectMsgClass(duration, ReadyTransactionReply.class);
1485
1486             // Send the CanCommitTransaction message for the first Tx.
1487
1488             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1489             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1490                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1491             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1492
1493             // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
1494             // processed after the first Tx completes.
1495
1496             final Future<Object> canCommitFuture = Patterns.ask(shard,
1497                     new CanCommitTransaction(transactionID2).toSerializable(), timeout);
1498
1499             // Send the CommitTransaction message for the first Tx. This should send back an error
1500             // and trigger the 2nd Tx to proceed.
1501
1502             shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
1503             expectMsgClass(duration, akka.actor.Status.Failure.class);
1504
1505             // Wait for the 2nd Tx to complete the canCommit phase.
1506
1507             final CountDownLatch latch = new CountDownLatch(1);
1508             canCommitFuture.onComplete(new OnComplete<Object>() {
1509                 @Override
1510                 public void onComplete(final Throwable t, final Object resp) {
1511                     latch.countDown();
1512                 }
1513             }, getSystem().dispatcher());
1514
1515             assertEquals("2nd CanCommit complete", true, latch.await(5, TimeUnit.SECONDS));
1516
1517             final InOrder inOrder = inOrder(cohort1, cohort2);
1518             inOrder.verify(cohort1).canCommit();
1519             inOrder.verify(cohort1).preCommit();
1520             inOrder.verify(cohort1).commit();
1521             inOrder.verify(cohort2).canCommit();
1522
1523             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1524         }};
1525     }
1526
1527     @Test
1528     public void testPreCommitPhaseFailure() throws Throwable {
1529         new ShardTestKit(getSystem()) {{
1530             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1531                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1532                     "testPreCommitPhaseFailure");
1533
1534             waitUntilLeader(shard);
1535
1536             final String transactionID1 = "tx1";
1537             final MutableCompositeModification modification1 = new MutableCompositeModification();
1538             final ShardDataTreeCohort cohort1 = mock(ShardDataTreeCohort.class, "cohort1");
1539             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
1540             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort1).preCommit();
1541
1542             final String transactionID2 = "tx2";
1543             final MutableCompositeModification modification2 = new MutableCompositeModification();
1544             final ShardDataTreeCohort cohort2 = mock(ShardDataTreeCohort.class, "cohort2");
1545             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
1546
1547             final FiniteDuration duration = duration("5 seconds");
1548             final Timeout timeout = new Timeout(duration);
1549
1550             // Simulate the ForwardedReadyTransaction messages that would be sent
1551             // by the ShardTransaction.
1552
1553             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1554                     cohort1, modification1, true, false), getRef());
1555             expectMsgClass(duration, ReadyTransactionReply.class);
1556
1557             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1558                     cohort2, modification2, true, false), getRef());
1559             expectMsgClass(duration, ReadyTransactionReply.class);
1560
1561             // Send the CanCommitTransaction message for the first Tx.
1562
1563             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1564             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1565                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1566             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1567
1568             // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
1569             // processed after the first Tx completes.
1570
1571             final Future<Object> canCommitFuture = Patterns.ask(shard,
1572                     new CanCommitTransaction(transactionID2).toSerializable(), timeout);
1573
1574             // Send the CommitTransaction message for the first Tx. This should send back an error
1575             // and trigger the 2nd Tx to proceed.
1576
1577             shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
1578             expectMsgClass(duration, akka.actor.Status.Failure.class);
1579
1580             // Wait for the 2nd Tx to complete the canCommit phase.
1581
1582             final CountDownLatch latch = new CountDownLatch(1);
1583             canCommitFuture.onComplete(new OnComplete<Object>() {
1584                 @Override
1585                 public void onComplete(final Throwable t, final Object resp) {
1586                     latch.countDown();
1587                 }
1588             }, getSystem().dispatcher());
1589
1590             assertEquals("2nd CanCommit complete", true, latch.await(5, TimeUnit.SECONDS));
1591
1592             final InOrder inOrder = inOrder(cohort1, cohort2);
1593             inOrder.verify(cohort1).canCommit();
1594             inOrder.verify(cohort1).preCommit();
1595             inOrder.verify(cohort2).canCommit();
1596
1597             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1598         }};
1599     }
1600
1601     @Test
1602     public void testCanCommitPhaseFailure() throws Throwable {
1603         new ShardTestKit(getSystem()) {{
1604             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1605                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1606                     "testCanCommitPhaseFailure");
1607
1608             waitUntilLeader(shard);
1609
1610             final FiniteDuration duration = duration("5 seconds");
1611
1612             final String transactionID1 = "tx1";
1613             final MutableCompositeModification modification = new MutableCompositeModification();
1614             final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1615             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).canCommit();
1616
1617             // Simulate the ForwardedReadyTransaction messages that would be sent
1618             // by the ShardTransaction.
1619
1620             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1621                     cohort, modification, true, false), getRef());
1622             expectMsgClass(duration, ReadyTransactionReply.class);
1623
1624             // Send the CanCommitTransaction message.
1625
1626             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1627             expectMsgClass(duration, akka.actor.Status.Failure.class);
1628
1629             // Send another can commit to ensure the failed one got cleaned up.
1630
1631             reset(cohort);
1632
1633             final String transactionID2 = "tx2";
1634             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1635
1636             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1637                     cohort, modification, true, false), getRef());
1638             expectMsgClass(duration, ReadyTransactionReply.class);
1639
1640             shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
1641             final CanCommitTransactionReply reply = CanCommitTransactionReply.fromSerializable(
1642                     expectMsgClass(CanCommitTransactionReply.SERIALIZABLE_CLASS));
1643             assertEquals("getCanCommit", true, reply.getCanCommit());
1644
1645             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1646         }};
1647     }
1648
1649     @Test
1650     public void testCanCommitPhaseFalseResponse() throws Throwable {
1651         new ShardTestKit(getSystem()) {{
1652             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1653                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1654                     "testCanCommitPhaseFalseResponse");
1655
1656             waitUntilLeader(shard);
1657
1658             final FiniteDuration duration = duration("5 seconds");
1659
1660             final String transactionID1 = "tx1";
1661             final MutableCompositeModification modification = new MutableCompositeModification();
1662             final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1663             doReturn(Futures.immediateFuture(Boolean.FALSE)).when(cohort).canCommit();
1664
1665             // Simulate the ForwardedReadyTransaction messages that would be sent
1666             // by the ShardTransaction.
1667
1668             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1669                     cohort, modification, true, false), getRef());
1670             expectMsgClass(duration, ReadyTransactionReply.class);
1671
1672             // Send the CanCommitTransaction message.
1673
1674             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1675             CanCommitTransactionReply reply = CanCommitTransactionReply.fromSerializable(
1676                     expectMsgClass(CanCommitTransactionReply.SERIALIZABLE_CLASS));
1677             assertEquals("getCanCommit", false, reply.getCanCommit());
1678
1679             // Send another can commit to ensure the failed one got cleaned up.
1680
1681             reset(cohort);
1682
1683             final String transactionID2 = "tx2";
1684             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1685
1686             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1687                     cohort, modification, true, false), getRef());
1688             expectMsgClass(duration, ReadyTransactionReply.class);
1689
1690             shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
1691             reply = CanCommitTransactionReply.fromSerializable(
1692                     expectMsgClass(CanCommitTransactionReply.SERIALIZABLE_CLASS));
1693             assertEquals("getCanCommit", true, reply.getCanCommit());
1694
1695             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1696         }};
1697     }
1698
1699     @Test
1700     public void testImmediateCommitWithCanCommitPhaseFailure() throws Throwable {
1701         new ShardTestKit(getSystem()) {{
1702             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1703                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1704                     "testImmediateCommitWithCanCommitPhaseFailure");
1705
1706             waitUntilLeader(shard);
1707
1708             final FiniteDuration duration = duration("5 seconds");
1709
1710             final String transactionID1 = "tx1";
1711             final MutableCompositeModification modification = new MutableCompositeModification();
1712             final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1713             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).canCommit();
1714
1715             // Simulate the ForwardedReadyTransaction messages that would be sent
1716             // by the ShardTransaction.
1717
1718             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1719                     cohort, modification, true, true), getRef());
1720
1721             expectMsgClass(duration, akka.actor.Status.Failure.class);
1722
1723             // Send another can commit to ensure the failed one got cleaned up.
1724
1725             reset(cohort);
1726
1727             final String transactionID2 = "tx2";
1728             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1729             doReturn(Futures.immediateFuture(null)).when(cohort).preCommit();
1730             doReturn(Futures.immediateFuture(null)).when(cohort).commit();
1731             final DataTreeCandidateTip candidate = mock(DataTreeCandidateTip.class);
1732             final DataTreeCandidateNode candidateRoot = mock(DataTreeCandidateNode.class);
1733             doReturn(ModificationType.UNMODIFIED).when(candidateRoot).getModificationType();
1734             doReturn(candidateRoot).when(candidate).getRootNode();
1735             doReturn(candidate).when(cohort).getCandidate();
1736
1737             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1738                     cohort, modification, true, true), getRef());
1739
1740             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1741
1742             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1743         }};
1744     }
1745
1746     @Test
1747     public void testImmediateCommitWithCanCommitPhaseFalseResponse() throws Throwable {
1748         new ShardTestKit(getSystem()) {{
1749             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1750                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1751                     "testImmediateCommitWithCanCommitPhaseFalseResponse");
1752
1753             waitUntilLeader(shard);
1754
1755             final FiniteDuration duration = duration("5 seconds");
1756
1757             final String transactionID = "tx1";
1758             final MutableCompositeModification modification = new MutableCompositeModification();
1759             final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1760             doReturn(Futures.immediateFuture(Boolean.FALSE)).when(cohort).canCommit();
1761
1762             // Simulate the ForwardedReadyTransaction messages that would be sent
1763             // by the ShardTransaction.
1764
1765             shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
1766                     cohort, modification, true, true), getRef());
1767
1768             expectMsgClass(duration, akka.actor.Status.Failure.class);
1769
1770             // Send another can commit to ensure the failed one got cleaned up.
1771
1772             reset(cohort);
1773
1774             final String transactionID2 = "tx2";
1775             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1776             doReturn(Futures.immediateFuture(null)).when(cohort).preCommit();
1777             doReturn(Futures.immediateFuture(null)).when(cohort).commit();
1778             final DataTreeCandidateTip candidate = mock(DataTreeCandidateTip.class);
1779             final DataTreeCandidateNode candidateRoot = mock(DataTreeCandidateNode.class);
1780             doReturn(ModificationType.UNMODIFIED).when(candidateRoot).getModificationType();
1781             doReturn(candidateRoot).when(candidate).getRootNode();
1782             doReturn(candidate).when(cohort).getCandidate();
1783
1784             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1785                     cohort, modification, true, true), getRef());
1786
1787             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1788
1789             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1790         }};
1791     }
1792
1793     @Test
1794     public void testAbortBeforeFinishCommit() throws Throwable {
1795         new ShardTestKit(getSystem()) {{
1796             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1797                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1798                     "testAbortBeforeFinishCommit");
1799
1800             waitUntilLeader(shard);
1801
1802             final FiniteDuration duration = duration("5 seconds");
1803             final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1804
1805             final String transactionID = "tx1";
1806             final Function<ShardDataTreeCohort, ListenableFuture<Void>> preCommit =
1807                           new Function<ShardDataTreeCohort, ListenableFuture<Void>>() {
1808                 @Override
1809                 public ListenableFuture<Void> apply(final ShardDataTreeCohort cohort) {
1810                     final ListenableFuture<Void> preCommitFuture = cohort.preCommit();
1811
1812                     // Simulate an AbortTransaction message occurring during replication, after
1813                     // persisting and before finishing the commit to the in-memory store.
1814                     // We have no followers so due to optimizations in the RaftActor, it does not
1815                     // attempt replication and thus we can't send an AbortTransaction message b/c
1816                     // it would be processed too late after CommitTransaction completes. So we'll
1817                     // simulate an AbortTransaction message occurring during replication by calling
1818                     // the shard directly.
1819                     //
1820                     shard.underlyingActor().doAbortTransaction(transactionID, null);
1821
1822                     return preCommitFuture;
1823                 }
1824             };
1825
1826             final MutableCompositeModification modification = new MutableCompositeModification();
1827             final ShardDataTreeCohort cohort = setupMockWriteTransaction("cohort1", dataStore,
1828                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME),
1829                     modification, preCommit);
1830
1831             shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
1832                     cohort, modification, true, false), getRef());
1833             expectMsgClass(duration, ReadyTransactionReply.class);
1834
1835             shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
1836             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1837                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1838             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1839
1840             shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
1841             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1842
1843             final NormalizedNode<?, ?> node = readStore(shard, TestModel.TEST_PATH);
1844
1845             // Since we're simulating an abort occurring during replication and before finish commit,
1846             // the data should still get written to the in-memory store since we've gotten past
1847             // canCommit and preCommit and persisted the data.
1848             assertNotNull(TestModel.TEST_QNAME.getLocalName() + " not found", node);
1849
1850             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1851         }};
1852     }
1853
1854     @Test
1855     public void testTransactionCommitTimeout() throws Throwable {
1856         dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1);
1857
1858         new ShardTestKit(getSystem()) {{
1859             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1860                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1861                     "testTransactionCommitTimeout");
1862
1863             waitUntilLeader(shard);
1864
1865             final FiniteDuration duration = duration("5 seconds");
1866
1867             final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1868
1869             writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1870             writeToStore(shard, TestModel.OUTER_LIST_PATH,
1871                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
1872
1873             // Create 1st Tx - will timeout
1874
1875             final String transactionID1 = "tx1";
1876             final MutableCompositeModification modification1 = new MutableCompositeModification();
1877             final ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
1878                     YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
1879                         .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
1880                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1),
1881                     modification1);
1882
1883             // Create 2nd Tx
1884
1885             final String transactionID2 = "tx3";
1886             final MutableCompositeModification modification2 = new MutableCompositeModification();
1887             final YangInstanceIdentifier listNodePath = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
1888                 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build();
1889             final ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort3", dataStore,
1890                     listNodePath,
1891                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2),
1892                     modification2);
1893
1894             // Ready the Tx's
1895
1896             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1897                     cohort1, modification1, true, false), getRef());
1898             expectMsgClass(duration, ReadyTransactionReply.class);
1899
1900             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1901                     cohort2, modification2, true, false), getRef());
1902             expectMsgClass(duration, ReadyTransactionReply.class);
1903
1904             // canCommit 1st Tx. We don't send the commit so it should timeout.
1905
1906             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1907             expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
1908
1909             // canCommit the 2nd Tx - it should complete after the 1st Tx times out.
1910
1911             shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
1912             expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
1913
1914             // Try to commit the 1st Tx - should fail as it's not the current Tx.
1915
1916             shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
1917             expectMsgClass(duration, akka.actor.Status.Failure.class);
1918
1919             // Commit the 2nd Tx.
1920
1921             shard.tell(new CommitTransaction(transactionID2).toSerializable(), getRef());
1922             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1923
1924             final NormalizedNode<?, ?> node = readStore(shard, listNodePath);
1925             assertNotNull(listNodePath + " not found", node);
1926
1927             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1928         }};
1929     }
1930
1931     @Test
1932     public void testTransactionCommitQueueCapacityExceeded() throws Throwable {
1933         dataStoreContextBuilder.shardTransactionCommitQueueCapacity(2);
1934
1935         new ShardTestKit(getSystem()) {{
1936             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1937                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1938                     "testTransactionCommitQueueCapacityExceeded");
1939
1940             waitUntilLeader(shard);
1941
1942             final FiniteDuration duration = duration("5 seconds");
1943
1944             final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1945
1946             final String transactionID1 = "tx1";
1947             final MutableCompositeModification modification1 = new MutableCompositeModification();
1948             final ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
1949                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
1950
1951             final String transactionID2 = "tx2";
1952             final MutableCompositeModification modification2 = new MutableCompositeModification();
1953             final ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
1954                     TestModel.OUTER_LIST_PATH,
1955                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
1956                     modification2);
1957
1958             final String transactionID3 = "tx3";
1959             final MutableCompositeModification modification3 = new MutableCompositeModification();
1960             final ShardDataTreeCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
1961                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification3);
1962
1963             // Ready the Tx's
1964
1965             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1966                     cohort1, modification1, true, false), getRef());
1967             expectMsgClass(duration, ReadyTransactionReply.class);
1968
1969             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1970                     cohort2, modification2, true, false), getRef());
1971             expectMsgClass(duration, ReadyTransactionReply.class);
1972
1973             // The 3rd Tx should exceed queue capacity and fail.
1974
1975             shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
1976                     cohort3, modification3, true, false), getRef());
1977             expectMsgClass(duration, akka.actor.Status.Failure.class);
1978
1979             // canCommit 1st Tx.
1980
1981             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1982             expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
1983
1984             // canCommit the 2nd Tx - it should get queued.
1985
1986             shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
1987
1988             // canCommit the 3rd Tx - should exceed queue capacity and fail.
1989
1990             shard.tell(new CanCommitTransaction(transactionID3).toSerializable(), getRef());
1991             expectMsgClass(duration, akka.actor.Status.Failure.class);
1992
1993             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1994         }};
1995     }
1996
1997     @Test
1998     public void testTransactionCommitWithPriorExpiredCohortEntries() throws Throwable {
1999         dataStoreContextBuilder.shardCommitQueueExpiryTimeoutInMillis(1300).shardTransactionCommitTimeoutInSeconds(1);
2000
2001         new ShardTestKit(getSystem()) {{
2002             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
2003                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
2004                     "testTransactionCommitWithPriorExpiredCohortEntries");
2005
2006             waitUntilLeader(shard);
2007
2008             final FiniteDuration duration = duration("5 seconds");
2009
2010             final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
2011
2012             final String transactionID1 = "tx1";
2013             final MutableCompositeModification modification1 = new MutableCompositeModification();
2014             final ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
2015                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
2016
2017             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
2018                     cohort1, modification1, true, false), getRef());
2019             expectMsgClass(duration, ReadyTransactionReply.class);
2020
2021             final String transactionID2 = "tx2";
2022             final MutableCompositeModification modification2 = new MutableCompositeModification();
2023             final ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
2024                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification2);
2025
2026             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
2027                     cohort2, modification2, true, false), getRef());
2028             expectMsgClass(duration, ReadyTransactionReply.class);
2029
2030             final String transactionID3 = "tx3";
2031             final MutableCompositeModification modification3 = new MutableCompositeModification();
2032             final ShardDataTreeCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
2033                     TestModel.TEST2_PATH, ImmutableNodes.containerNode(TestModel.TEST2_QNAME), modification3);
2034
2035             shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
2036                     cohort3, modification3, true, false), getRef());
2037             expectMsgClass(duration, ReadyTransactionReply.class);
2038
2039             // All Tx's are readied. We'll send canCommit for the last one but not the others. The others
2040             // should expire from the queue and the last one should be processed.
2041
2042             shard.tell(new CanCommitTransaction(transactionID3).toSerializable(), getRef());
2043             expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
2044
2045             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2046         }};
2047     }
2048
2049     @Test
2050     public void testTransactionCommitWithSubsequentExpiredCohortEntry() throws Throwable {
2051         dataStoreContextBuilder.shardCommitQueueExpiryTimeoutInMillis(1300).shardTransactionCommitTimeoutInSeconds(1);
2052
2053         new ShardTestKit(getSystem()) {{
2054             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
2055                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
2056                     "testTransactionCommitWithSubsequentExpiredCohortEntry");
2057
2058             waitUntilLeader(shard);
2059
2060             final FiniteDuration duration = duration("5 seconds");
2061
2062             final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
2063
2064             final String transactionID1 = "tx1";
2065             final MutableCompositeModification modification1 = new MutableCompositeModification();
2066             final ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
2067                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
2068
2069             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
2070                     cohort1, modification1, true, false), getRef());
2071             expectMsgClass(duration, ReadyTransactionReply.class);
2072
2073             // CanCommit the first one so it's the current in-progress CohortEntry.
2074
2075             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
2076             expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
2077
2078             // Ready the second Tx.
2079
2080             final String transactionID2 = "tx2";
2081             final MutableCompositeModification modification2 = new MutableCompositeModification();
2082             final ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
2083                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification2);
2084
2085             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
2086                     cohort2, modification2, true, false), getRef());
2087             expectMsgClass(duration, ReadyTransactionReply.class);
2088
2089             // Ready the third Tx.
2090
2091             final String transactionID3 = "tx3";
2092             final DataTreeModification modification3 = dataStore.getDataTree().takeSnapshot().newModification();
2093             new WriteModification(TestModel.TEST2_PATH, ImmutableNodes.containerNode(TestModel.TEST2_QNAME))
2094                     .apply(modification3);
2095                 modification3.ready();
2096             final ReadyLocalTransaction readyMessage = new ReadyLocalTransaction(transactionID3, modification3, true);
2097
2098             shard.tell(readyMessage, getRef());
2099
2100             // Commit the first Tx. After completing, the second should expire from the queue and the third
2101             // Tx committed.
2102
2103             shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
2104             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
2105
2106             // Expect commit reply from the third Tx.
2107
2108             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
2109
2110             final NormalizedNode<?, ?> node = readStore(shard, TestModel.TEST2_PATH);
2111             assertNotNull(TestModel.TEST2_PATH + " not found", node);
2112
2113             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2114         }};
2115     }
2116
2117     @Test
2118     public void testCanCommitBeforeReadyFailure() throws Throwable {
2119         new ShardTestKit(getSystem()) {{
2120             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
2121                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
2122                     "testCanCommitBeforeReadyFailure");
2123
2124             shard.tell(new CanCommitTransaction("tx").toSerializable(), getRef());
2125             expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
2126
2127             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2128         }};
2129     }
2130
2131     @Test
2132     public void testAbortTransaction() throws Throwable {
2133         new ShardTestKit(getSystem()) {{
2134             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
2135                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
2136                     "testAbortTransaction");
2137
2138             waitUntilLeader(shard);
2139
2140             // Setup 2 simulated transactions with mock cohorts. The first one will be aborted.
2141
2142             final String transactionID1 = "tx1";
2143             final MutableCompositeModification modification1 = new MutableCompositeModification();
2144             final ShardDataTreeCohort cohort1 = mock(ShardDataTreeCohort.class, "cohort1");
2145             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
2146             doReturn(Futures.immediateFuture(null)).when(cohort1).abort();
2147
2148             final String transactionID2 = "tx2";
2149             final MutableCompositeModification modification2 = new MutableCompositeModification();
2150             final ShardDataTreeCohort cohort2 = mock(ShardDataTreeCohort.class, "cohort2");
2151             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
2152
2153             final FiniteDuration duration = duration("5 seconds");
2154             final Timeout timeout = new Timeout(duration);
2155
2156             // Simulate the ForwardedReadyTransaction messages that would be sent
2157             // by the ShardTransaction.
2158
2159             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
2160                     cohort1, modification1, true, false), getRef());
2161             expectMsgClass(duration, ReadyTransactionReply.class);
2162
2163             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
2164                     cohort2, modification2, true, false), getRef());
2165             expectMsgClass(duration, ReadyTransactionReply.class);
2166
2167             // Send the CanCommitTransaction message for the first Tx.
2168
2169             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
2170             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
2171                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
2172             assertEquals("Can commit", true, canCommitReply.getCanCommit());
2173
2174             // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
2175             // processed after the first Tx completes.
2176
2177             final Future<Object> canCommitFuture = Patterns.ask(shard,
2178                     new CanCommitTransaction(transactionID2).toSerializable(), timeout);
2179
2180             // Send the AbortTransaction message for the first Tx. This should trigger the 2nd
2181             // Tx to proceed.
2182
2183             shard.tell(new AbortTransaction(transactionID1).toSerializable(), getRef());
2184             expectMsgClass(duration, AbortTransactionReply.SERIALIZABLE_CLASS);
2185
2186             // Wait for the 2nd Tx to complete the canCommit phase.
2187
2188             Await.ready(canCommitFuture, duration);
2189
2190             final InOrder inOrder = inOrder(cohort1, cohort2);
2191             inOrder.verify(cohort1).canCommit();
2192             inOrder.verify(cohort2).canCommit();
2193
2194             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2195         }};
2196     }
2197
2198     @Test
2199     public void testCreateSnapshot() throws Exception {
2200         testCreateSnapshot(true, "testCreateSnapshot");
2201     }
2202
2203     @Test
2204     public void testCreateSnapshotWithNonPersistentData() throws Exception {
2205         testCreateSnapshot(false, "testCreateSnapshotWithNonPersistentData");
2206     }
2207
2208     @SuppressWarnings("serial")
2209     public void testCreateSnapshot(final boolean persistent, final String shardActorName) throws Exception{
2210
2211         final AtomicReference<CountDownLatch> latch = new AtomicReference<>(new CountDownLatch(1));
2212
2213         final AtomicReference<Object> savedSnapshot = new AtomicReference<>();
2214         class TestPersistentDataProvider extends DelegatingPersistentDataProvider {
2215             TestPersistentDataProvider(final DataPersistenceProvider delegate) {
2216                 super(delegate);
2217             }
2218
2219             @Override
2220             public void saveSnapshot(final Object o) {
2221                 savedSnapshot.set(o);
2222                 super.saveSnapshot(o);
2223             }
2224         }
2225
2226         dataStoreContextBuilder.persistent(persistent);
2227
2228         new ShardTestKit(getSystem()) {{
2229             class TestShard extends Shard {
2230
2231                 protected TestShard(final ShardIdentifier name, final Map<String, String> peerAddresses,
2232                                     final DatastoreContext datastoreContext, final SchemaContext schemaContext) {
2233                     super(name, peerAddresses, datastoreContext, schemaContext);
2234                     setPersistence(new TestPersistentDataProvider(super.persistence()));
2235                 }
2236
2237                 @Override
2238                 public void handleCommand(final Object message) {
2239                     super.handleCommand(message);
2240
2241                     if (message instanceof SaveSnapshotSuccess || message.equals("commit_snapshot")) {
2242                         latch.get().countDown();
2243                     }
2244                 }
2245
2246                 @Override
2247                 public RaftActorContext getRaftActorContext() {
2248                     return super.getRaftActorContext();
2249                 }
2250             }
2251
2252             final Creator<Shard> creator = new Creator<Shard>() {
2253                 @Override
2254                 public Shard create() throws Exception {
2255                     return new TestShard(shardID, Collections.<String,String>emptyMap(),
2256                             newDatastoreContext(), SCHEMA_CONTEXT);
2257                 }
2258             };
2259
2260             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
2261                     Props.create(new DelegatingShardCreator(creator)), shardActorName);
2262
2263             waitUntilLeader(shard);
2264
2265             writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
2266
2267             final NormalizedNode<?,?> expectedRoot = readStore(shard, YangInstanceIdentifier.builder().build());
2268
2269             // Trigger creation of a snapshot by ensuring
2270             final RaftActorContext raftActorContext = ((TestShard) shard.underlyingActor()).getRaftActorContext();
2271             raftActorContext.getSnapshotManager().capture(mock(ReplicatedLogEntry.class), -1);
2272
2273             assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS));
2274
2275             assertTrue("Invalid saved snapshot " + savedSnapshot.get(),
2276                     savedSnapshot.get() instanceof Snapshot);
2277
2278             verifySnapshot((Snapshot)savedSnapshot.get(), expectedRoot);
2279
2280             latch.set(new CountDownLatch(1));
2281             savedSnapshot.set(null);
2282
2283             raftActorContext.getSnapshotManager().capture(mock(ReplicatedLogEntry.class), -1);
2284
2285             assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS));
2286
2287             assertTrue("Invalid saved snapshot " + savedSnapshot.get(),
2288                     savedSnapshot.get() instanceof Snapshot);
2289
2290             verifySnapshot((Snapshot)savedSnapshot.get(), expectedRoot);
2291
2292             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2293         }
2294
2295         private void verifySnapshot(final Snapshot snapshot, final NormalizedNode<?,?> expectedRoot) {
2296
2297             final NormalizedNode<?, ?> actual = SerializationUtils.deserializeNormalizedNode(snapshot.getState());
2298             assertEquals("Root node", expectedRoot, actual);
2299
2300         }};
2301     }
2302
2303     /**
2304      * This test simply verifies that the applySnapShot logic will work
2305      * @throws ReadFailedException
2306      * @throws DataValidationFailedException
2307      */
2308     @Test
2309     public void testInMemoryDataTreeRestore() throws ReadFailedException, DataValidationFailedException {
2310         final DataTree store = InMemoryDataTreeFactory.getInstance().create();
2311         store.setSchemaContext(SCHEMA_CONTEXT);
2312
2313         final DataTreeModification putTransaction = store.takeSnapshot().newModification();
2314         putTransaction.write(TestModel.TEST_PATH,
2315             ImmutableNodes.containerNode(TestModel.TEST_QNAME));
2316         commitTransaction(store, putTransaction);
2317
2318
2319         final NormalizedNode<?, ?> expected = readStore(store, YangInstanceIdentifier.builder().build());
2320
2321         final DataTreeModification writeTransaction = store.takeSnapshot().newModification();
2322
2323         writeTransaction.delete(YangInstanceIdentifier.builder().build());
2324         writeTransaction.write(YangInstanceIdentifier.builder().build(), expected);
2325
2326         commitTransaction(store, writeTransaction);
2327
2328         final NormalizedNode<?, ?> actual = readStore(store, YangInstanceIdentifier.builder().build());
2329
2330         assertEquals(expected, actual);
2331     }
2332
2333     @Test
2334     public void testRecoveryApplicable(){
2335
2336         final DatastoreContext persistentContext = DatastoreContext.newBuilder().
2337                 shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(true).build();
2338
2339         final Props persistentProps = Shard.props(shardID, Collections.<String, String>emptyMap(),
2340                 persistentContext, SCHEMA_CONTEXT);
2341
2342         final DatastoreContext nonPersistentContext = DatastoreContext.newBuilder().
2343                 shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(false).build();
2344
2345         final Props nonPersistentProps = Shard.props(shardID, Collections.<String, String>emptyMap(),
2346                 nonPersistentContext, SCHEMA_CONTEXT);
2347
2348         new ShardTestKit(getSystem()) {{
2349             final TestActorRef<Shard> shard1 = TestActorRef.create(getSystem(),
2350                     persistentProps, "testPersistence1");
2351
2352             assertTrue("Recovery Applicable", shard1.underlyingActor().persistence().isRecoveryApplicable());
2353
2354             shard1.tell(PoisonPill.getInstance(), ActorRef.noSender());
2355
2356             final TestActorRef<Shard> shard2 = TestActorRef.create(getSystem(),
2357                     nonPersistentProps, "testPersistence2");
2358
2359             assertFalse("Recovery Not Applicable", shard2.underlyingActor().persistence().isRecoveryApplicable());
2360
2361             shard2.tell(PoisonPill.getInstance(), ActorRef.noSender());
2362
2363         }};
2364
2365     }
2366
2367     @Test
2368     public void testOnDatastoreContext() {
2369         new ShardTestKit(getSystem()) {{
2370             dataStoreContextBuilder.persistent(true);
2371
2372             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(), "testOnDatastoreContext");
2373
2374             assertEquals("isRecoveryApplicable", true,
2375                     shard.underlyingActor().persistence().isRecoveryApplicable());
2376
2377             waitUntilLeader(shard);
2378
2379             shard.tell(dataStoreContextBuilder.persistent(false).build(), ActorRef.noSender());
2380
2381             assertEquals("isRecoveryApplicable", false,
2382                     shard.underlyingActor().persistence().isRecoveryApplicable());
2383
2384             shard.tell(dataStoreContextBuilder.persistent(true).build(), ActorRef.noSender());
2385
2386             assertEquals("isRecoveryApplicable", true,
2387                     shard.underlyingActor().persistence().isRecoveryApplicable());
2388
2389             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2390         }};
2391     }
2392
2393     @Test
2394     public void testRegisterRoleChangeListener() throws Exception {
2395         new ShardTestKit(getSystem()) {
2396             {
2397                 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
2398                         newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
2399                         "testRegisterRoleChangeListener");
2400
2401                 waitUntilLeader(shard);
2402
2403                 final TestActorRef<MessageCollectorActor> listener =
2404                         TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
2405
2406                 shard.tell(new RegisterRoleChangeListener(), listener);
2407
2408                 MessageCollectorActor.expectFirstMatching(listener, RegisterRoleChangeListenerReply.class);
2409
2410                 ShardLeaderStateChanged leaderStateChanged = MessageCollectorActor.expectFirstMatching(listener,
2411                         ShardLeaderStateChanged.class);
2412                 assertEquals("getLocalShardDataTree present", true,
2413                         leaderStateChanged.getLocalShardDataTree().isPresent());
2414                 assertSame("getLocalShardDataTree", shard.underlyingActor().getDataStore().getDataTree(),
2415                         leaderStateChanged.getLocalShardDataTree().get());
2416
2417                 MessageCollectorActor.clearMessages(listener);
2418
2419                 // Force a leader change
2420
2421                 shard.tell(new RequestVote(10000, "member2", 50, 50), getRef());
2422
2423                 leaderStateChanged = MessageCollectorActor.expectFirstMatching(listener,
2424                         ShardLeaderStateChanged.class);
2425                 assertEquals("getLocalShardDataTree present", false,
2426                         leaderStateChanged.getLocalShardDataTree().isPresent());
2427
2428                 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2429             }
2430         };
2431     }
2432
2433     @Test
2434     public void testFollowerInitialSyncStatus() throws Exception {
2435         final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
2436                 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
2437                 "testFollowerInitialSyncStatus");
2438
2439         shard.underlyingActor().onReceiveCommand(new FollowerInitialSyncUpStatus(false, "member-1-shard-inventory-operational"));
2440
2441         assertEquals(false, shard.underlyingActor().getShardMBean().getFollowerInitialSyncStatus());
2442
2443         shard.underlyingActor().onReceiveCommand(new FollowerInitialSyncUpStatus(true, "member-1-shard-inventory-operational"));
2444
2445         assertEquals(true, shard.underlyingActor().getShardMBean().getFollowerInitialSyncStatus());
2446
2447         shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2448     }
2449
2450     private static void commitTransaction(final DataTree store, final DataTreeModification modification) throws DataValidationFailedException {
2451         modification.ready();
2452         store.validate(modification);
2453         store.commit(store.prepare(modification));
2454     }
2455 }