Check total batched messages sent in ShardCommitCoordinator on tx ready
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / ShardTest.java
1 package org.opendaylight.controller.cluster.datastore;
2
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertFalse;
5 import static org.junit.Assert.assertNotNull;
6 import static org.junit.Assert.assertNull;
7 import static org.junit.Assert.assertSame;
8 import static org.junit.Assert.assertTrue;
9 import static org.mockito.Mockito.doReturn;
10 import static org.mockito.Mockito.inOrder;
11 import static org.mockito.Mockito.mock;
12 import static org.mockito.Mockito.reset;
13 import static org.opendaylight.controller.cluster.datastore.DataStoreVersions.CURRENT_VERSION;
14 import akka.actor.ActorRef;
15 import akka.actor.ActorSelection;
16 import akka.actor.PoisonPill;
17 import akka.actor.Props;
18 import akka.actor.Status.Failure;
19 import akka.dispatch.Dispatchers;
20 import akka.dispatch.OnComplete;
21 import akka.japi.Creator;
22 import akka.pattern.Patterns;
23 import akka.persistence.SaveSnapshotSuccess;
24 import akka.testkit.TestActorRef;
25 import akka.util.Timeout;
26 import com.google.common.base.Function;
27 import com.google.common.base.Optional;
28 import com.google.common.util.concurrent.Futures;
29 import com.google.common.util.concurrent.ListenableFuture;
30 import com.google.common.util.concurrent.Uninterruptibles;
31 import java.io.IOException;
32 import java.util.Collections;
33 import java.util.HashSet;
34 import java.util.Map;
35 import java.util.Set;
36 import java.util.concurrent.CountDownLatch;
37 import java.util.concurrent.TimeUnit;
38 import java.util.concurrent.atomic.AtomicBoolean;
39 import java.util.concurrent.atomic.AtomicReference;
40 import org.junit.Test;
41 import org.mockito.InOrder;
42 import org.opendaylight.controller.cluster.DataPersistenceProvider;
43 import org.opendaylight.controller.cluster.DelegatingPersistentDataProvider;
44 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
45 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
46 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
47 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
48 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
49 import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
50 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
51 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
52 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
53 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
54 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
55 import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
56 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
57 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
58 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
59 import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction;
60 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
61 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
62 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
63 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener;
64 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListenerReply;
65 import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
66 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
67 import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
68 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
69 import org.opendaylight.controller.cluster.datastore.modification.Modification;
70 import org.opendaylight.controller.cluster.datastore.modification.ModificationPayload;
71 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
72 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
73 import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener;
74 import org.opendaylight.controller.cluster.datastore.utils.MockDataTreeChangeListener;
75 import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
76 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
77 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListenerReply;
78 import org.opendaylight.controller.cluster.raft.RaftActorContext;
79 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
80 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
81 import org.opendaylight.controller.cluster.raft.Snapshot;
82 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
83 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
84 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
85 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
86 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
87 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
88 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
89 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
90 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
91 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
92 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
93 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
94 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
95 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
96 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
97 import org.opendaylight.controller.protobuff.messages.cohort3pc.ThreePhaseCommitCohortMessages;
98 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
99 import org.opendaylight.yangtools.yang.common.QName;
100 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
101 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
102 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
103 import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
104 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
105 import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
106 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
107 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
108 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
109 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateNode;
110 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateTip;
111 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidates;
112 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
113 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
114 import org.opendaylight.yangtools.yang.data.api.schema.tree.ModificationType;
115 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
116 import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory;
117 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
118 import scala.concurrent.Await;
119 import scala.concurrent.Future;
120 import scala.concurrent.duration.FiniteDuration;
121
122 public class ShardTest extends AbstractShardTest {
123     private static final QName CARS_QNAME = QName.create("urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:test:cars", "2014-03-13", "cars");
124
125     private static final String DUMMY_DATA = "Dummy data as snapshot sequence number is set to 0 in InMemorySnapshotStore and journal recovery seq number will start from 1";
126
127     final CountDownLatch recoveryComplete = new CountDownLatch(1);
128
129     protected Props newShardPropsWithRecoveryComplete() {
130
131         Creator<Shard> creator = new Creator<Shard>() {
132             @Override
133             public Shard create() throws Exception {
134                 return new Shard(shardID, Collections.<String,String>emptyMap(),
135                         newDatastoreContext(), SCHEMA_CONTEXT) {
136                     @Override
137                     protected void onRecoveryComplete() {
138                         try {
139                             super.onRecoveryComplete();
140                         } finally {
141                             recoveryComplete.countDown();
142                         }
143                     }
144                 };
145             }
146         };
147         return Props.create(new DelegatingShardCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId());
148     }
149
150     @Test
151     public void testRegisterChangeListener() throws Exception {
152         new ShardTestKit(getSystem()) {{
153             TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
154                     newShardProps(),  "testRegisterChangeListener");
155
156             waitUntilLeader(shard);
157
158             shard.tell(new UpdateSchemaContext(SchemaContextHelper.full()), ActorRef.noSender());
159
160             MockDataChangeListener listener = new MockDataChangeListener(1);
161             ActorRef dclActor = getSystem().actorOf(DataChangeListener.props(listener),
162                     "testRegisterChangeListener-DataChangeListener");
163
164             shard.tell(new RegisterChangeListener(TestModel.TEST_PATH,
165                     dclActor, AsyncDataBroker.DataChangeScope.BASE), getRef());
166
167             RegisterChangeListenerReply reply = expectMsgClass(duration("3 seconds"),
168                     RegisterChangeListenerReply.class);
169             String replyPath = reply.getListenerRegistrationPath().toString();
170             assertTrue("Incorrect reply path: " + replyPath, replyPath.matches(
171                     "akka:\\/\\/test\\/user\\/testRegisterChangeListener\\/\\$.*"));
172
173             YangInstanceIdentifier path = TestModel.TEST_PATH;
174             writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
175
176             listener.waitForChangeEvents(path);
177
178             dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
179             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
180         }};
181     }
182
183     @SuppressWarnings("serial")
184     @Test
185     public void testChangeListenerNotifiedWhenNotTheLeaderOnRegistration() throws Exception {
186         // This test tests the timing window in which a change listener is registered before the
187         // shard becomes the leader. We verify that the listener is registered and notified of the
188         // existing data when the shard becomes the leader.
189         new ShardTestKit(getSystem()) {{
190             // For this test, we want to send the RegisterChangeListener message after the shard
191             // has recovered from persistence and before it becomes the leader. So we subclass
192             // Shard to override onReceiveCommand and, when the first ElectionTimeout is received,
193             // we know that the shard has been initialized to a follower and has started the
194             // election process. The following 2 CountDownLatches are used to coordinate the
195             // ElectionTimeout with the sending of the RegisterChangeListener message.
196             final CountDownLatch onFirstElectionTimeout = new CountDownLatch(1);
197             final CountDownLatch onChangeListenerRegistered = new CountDownLatch(1);
198             Creator<Shard> creator = new Creator<Shard>() {
199                 boolean firstElectionTimeout = true;
200
201                 @Override
202                 public Shard create() throws Exception {
203                     // Use a non persistent provider because this test actually invokes persist on the journal
204                     // this will cause all other messages to not be queued properly after that.
205                     // The basic issue is that you cannot use TestActorRef with a persistent actor (at least when
206                     // it does do a persist)
207                     return new Shard(shardID, Collections.<String,String>emptyMap(),
208                             dataStoreContextBuilder.persistent(false).build(), SCHEMA_CONTEXT) {
209                         @Override
210                         public void onReceiveCommand(final Object message) throws Exception {
211                             if(message instanceof ElectionTimeout && firstElectionTimeout) {
212                                 // Got the first ElectionTimeout. We don't forward it to the
213                                 // base Shard yet until we've sent the RegisterChangeListener
214                                 // message. So we signal the onFirstElectionTimeout latch to tell
215                                 // the main thread to send the RegisterChangeListener message and
216                                 // start a thread to wait on the onChangeListenerRegistered latch,
217                                 // which the main thread signals after it has sent the message.
218                                 // After the onChangeListenerRegistered is triggered, we send the
219                                 // original ElectionTimeout message to proceed with the election.
220                                 firstElectionTimeout = false;
221                                 final ActorRef self = getSelf();
222                                 new Thread() {
223                                     @Override
224                                     public void run() {
225                                         Uninterruptibles.awaitUninterruptibly(
226                                                 onChangeListenerRegistered, 5, TimeUnit.SECONDS);
227                                         self.tell(message, self);
228                                     }
229                                 }.start();
230
231                                 onFirstElectionTimeout.countDown();
232                             } else {
233                                 super.onReceiveCommand(message);
234                             }
235                         }
236                     };
237                 }
238             };
239
240             MockDataChangeListener listener = new MockDataChangeListener(1);
241             ActorRef dclActor = getSystem().actorOf(DataChangeListener.props(listener),
242                     "testRegisterChangeListenerWhenNotLeaderInitially-DataChangeListener");
243
244             TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
245                     Props.create(new DelegatingShardCreator(creator)),
246                     "testRegisterChangeListenerWhenNotLeaderInitially");
247
248             // Write initial data into the in-memory store.
249             YangInstanceIdentifier path = TestModel.TEST_PATH;
250             writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
251
252             // Wait until the shard receives the first ElectionTimeout message.
253             assertEquals("Got first ElectionTimeout", true,
254                     onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
255
256             // Now send the RegisterChangeListener and wait for the reply.
257             shard.tell(new RegisterChangeListener(path, dclActor,
258                     AsyncDataBroker.DataChangeScope.SUBTREE), getRef());
259
260             RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
261                     RegisterChangeListenerReply.class);
262             assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
263
264             // Sanity check - verify the shard is not the leader yet.
265             shard.tell(new FindLeader(), getRef());
266             FindLeaderReply findLeadeReply =
267                     expectMsgClass(duration("5 seconds"), FindLeaderReply.class);
268             assertNull("Expected the shard not to be the leader", findLeadeReply.getLeaderActor());
269
270             // Signal the onChangeListenerRegistered latch to tell the thread above to proceed
271             // with the election process.
272             onChangeListenerRegistered.countDown();
273
274             // Wait for the shard to become the leader and notify our listener with the existing
275             // data in the store.
276             listener.waitForChangeEvents(path);
277
278             dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
279             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
280         }};
281     }
282
283     @Test
284     public void testRegisterDataTreeChangeListener() throws Exception {
285         new ShardTestKit(getSystem()) {{
286             TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
287                     newShardProps(), "testRegisterDataTreeChangeListener");
288
289             waitUntilLeader(shard);
290
291             shard.tell(new UpdateSchemaContext(SchemaContextHelper.full()), ActorRef.noSender());
292
293             MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
294             ActorRef dclActor = getSystem().actorOf(DataTreeChangeListenerActor.props(listener),
295                     "testRegisterDataTreeChangeListener-DataTreeChangeListener");
296
297             shard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor), getRef());
298
299             RegisterDataTreeChangeListenerReply reply = expectMsgClass(duration("3 seconds"),
300                     RegisterDataTreeChangeListenerReply.class);
301             String replyPath = reply.getListenerRegistrationPath().toString();
302             assertTrue("Incorrect reply path: " + replyPath, replyPath.matches(
303                     "akka:\\/\\/test\\/user\\/testRegisterDataTreeChangeListener\\/\\$.*"));
304
305             YangInstanceIdentifier path = TestModel.TEST_PATH;
306             writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
307
308             listener.waitForChangeEvents();
309
310             dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
311             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
312         }};
313     }
314
315     @SuppressWarnings("serial")
316     @Test
317     public void testDataTreeChangeListenerNotifiedWhenNotTheLeaderOnRegistration() throws Exception {
318         new ShardTestKit(getSystem()) {{
319             final CountDownLatch onFirstElectionTimeout = new CountDownLatch(1);
320             final CountDownLatch onChangeListenerRegistered = new CountDownLatch(1);
321             Creator<Shard> creator = new Creator<Shard>() {
322                 boolean firstElectionTimeout = true;
323
324                 @Override
325                 public Shard create() throws Exception {
326                     return new Shard(shardID, Collections.<String,String>emptyMap(),
327                             dataStoreContextBuilder.persistent(false).build(), SCHEMA_CONTEXT) {
328                         @Override
329                         public void onReceiveCommand(final Object message) throws Exception {
330                             if(message instanceof ElectionTimeout && firstElectionTimeout) {
331                                 firstElectionTimeout = false;
332                                 final ActorRef self = getSelf();
333                                 new Thread() {
334                                     @Override
335                                     public void run() {
336                                         Uninterruptibles.awaitUninterruptibly(
337                                                 onChangeListenerRegistered, 5, TimeUnit.SECONDS);
338                                         self.tell(message, self);
339                                     }
340                                 }.start();
341
342                                 onFirstElectionTimeout.countDown();
343                             } else {
344                                 super.onReceiveCommand(message);
345                             }
346                         }
347                     };
348                 }
349             };
350
351             MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
352             ActorRef dclActor = getSystem().actorOf(DataTreeChangeListenerActor.props(listener),
353                     "testDataTreeChangeListenerNotifiedWhenNotTheLeaderOnRegistration-DataChangeListener");
354
355             TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
356                     Props.create(new DelegatingShardCreator(creator)),
357                     "testDataTreeChangeListenerNotifiedWhenNotTheLeaderOnRegistration");
358
359             YangInstanceIdentifier path = TestModel.TEST_PATH;
360             writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
361
362             assertEquals("Got first ElectionTimeout", true,
363                     onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
364
365             shard.tell(new RegisterDataTreeChangeListener(path, dclActor), getRef());
366             RegisterDataTreeChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
367                     RegisterDataTreeChangeListenerReply.class);
368             assertNotNull("getListenerRegistratioznPath", reply.getListenerRegistrationPath());
369
370             shard.tell(new FindLeader(), getRef());
371             FindLeaderReply findLeadeReply =
372                     expectMsgClass(duration("5 seconds"), FindLeaderReply.class);
373             assertNull("Expected the shard not to be the leader", findLeadeReply.getLeaderActor());
374
375             writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
376
377             onChangeListenerRegistered.countDown();
378
379             // TODO: investigate why we do not receive data chage events
380             listener.waitForChangeEvents();
381
382             dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
383             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
384         }};
385     }
386
387     @Test
388     public void testCreateTransaction(){
389         new ShardTestKit(getSystem()) {{
390             ActorRef shard = getSystem().actorOf(newShardProps(), "testCreateTransaction");
391
392             waitUntilLeader(shard);
393
394             shard.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
395
396             shard.tell(new CreateTransaction("txn-1",
397                     TransactionType.READ_ONLY.ordinal() ).toSerializable(), getRef());
398
399             CreateTransactionReply reply = expectMsgClass(duration("3 seconds"),
400                     CreateTransactionReply.class);
401
402             String path = reply.getTransactionActorPath().toString();
403             assertTrue("Unexpected transaction path " + path,
404                     path.contains("akka://test/user/testCreateTransaction/shard-txn-1"));
405
406             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
407         }};
408     }
409
410     @Test
411     public void testCreateTransactionOnChain(){
412         new ShardTestKit(getSystem()) {{
413             final ActorRef shard = getSystem().actorOf(newShardProps(), "testCreateTransactionOnChain");
414
415             waitUntilLeader(shard);
416
417             shard.tell(new CreateTransaction("txn-1",
418                     TransactionType.READ_ONLY.ordinal() , "foobar").toSerializable(),
419                     getRef());
420
421             CreateTransactionReply reply = expectMsgClass(duration("3 seconds"),
422                     CreateTransactionReply.class);
423
424             String path = reply.getTransactionActorPath().toString();
425             assertTrue("Unexpected transaction path " + path,
426                     path.contains("akka://test/user/testCreateTransactionOnChain/shard-txn-1"));
427
428             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
429         }};
430     }
431
432     @SuppressWarnings("serial")
433     @Test
434     public void testPeerAddressResolved() throws Exception {
435         new ShardTestKit(getSystem()) {{
436             final CountDownLatch recoveryComplete = new CountDownLatch(1);
437             class TestShard extends Shard {
438                 TestShard() {
439                     super(shardID, Collections.<String, String>singletonMap(shardID.toString(), null),
440                             newDatastoreContext(), SCHEMA_CONTEXT);
441                 }
442
443                 Map<String, String> getPeerAddresses() {
444                     return getRaftActorContext().getPeerAddresses();
445                 }
446
447                 @Override
448                 protected void onRecoveryComplete() {
449                     try {
450                         super.onRecoveryComplete();
451                     } finally {
452                         recoveryComplete.countDown();
453                     }
454                 }
455             }
456
457             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
458                     Props.create(new DelegatingShardCreator(new Creator<Shard>() {
459                         @Override
460                         public TestShard create() throws Exception {
461                             return new TestShard();
462                         }
463                     })), "testPeerAddressResolved");
464
465             //waitUntilLeader(shard);
466             assertEquals("Recovery complete", true,
467                     Uninterruptibles.awaitUninterruptibly(recoveryComplete, 5, TimeUnit.SECONDS));
468
469             String address = "akka://foobar";
470             shard.underlyingActor().onReceiveCommand(new PeerAddressResolved(shardID.toString(), address));
471
472             assertEquals("getPeerAddresses", address,
473                     ((TestShard)shard.underlyingActor()).getPeerAddresses().get(shardID.toString()));
474
475             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
476         }};
477     }
478
479     @Test
480     public void testApplySnapshot() throws Exception {
481         TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(),
482                 "testApplySnapshot");
483
484         DataTree store = InMemoryDataTreeFactory.getInstance().create();
485         store.setSchemaContext(SCHEMA_CONTEXT);
486
487         writeToStore(store, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
488
489         YangInstanceIdentifier root = YangInstanceIdentifier.builder().build();
490         NormalizedNode<?,?> expected = readStore(store, root);
491
492         ApplySnapshot applySnapshot = new ApplySnapshot(Snapshot.create(
493                 SerializationUtils.serializeNormalizedNode(expected),
494                 Collections.<ReplicatedLogEntry>emptyList(), 1, 2, 3, 4));
495
496         shard.underlyingActor().onReceiveCommand(applySnapshot);
497
498         NormalizedNode<?,?> actual = readStore(shard, root);
499
500         assertEquals("Root node", expected, actual);
501
502         shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
503     }
504
505     @Test
506     public void testApplyState() throws Exception {
507
508         TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(), "testApplyState");
509
510         NormalizedNode<?, ?> node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
511
512         ApplyState applyState = new ApplyState(null, "test", new ReplicatedLogImplEntry(1, 2,
513                 newModificationPayload(new WriteModification(TestModel.TEST_PATH, node))));
514
515         shard.underlyingActor().onReceiveCommand(applyState);
516
517         NormalizedNode<?,?> actual = readStore(shard, TestModel.TEST_PATH);
518         assertEquals("Applied state", node, actual);
519
520         shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
521     }
522
523     @Test
524     public void testApplyStateWithCandidatePayload() throws Exception {
525
526         TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardPropsWithRecoveryComplete(), "testApplyState");
527
528         recoveryComplete.await(5,  TimeUnit.SECONDS);
529
530         NormalizedNode<?, ?> node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
531         DataTreeCandidate candidate = DataTreeCandidates.fromNormalizedNode(TestModel.TEST_PATH, node);
532
533         ApplyState applyState = new ApplyState(null, "test", new ReplicatedLogImplEntry(1, 2,
534                 DataTreeCandidatePayload.create(candidate)));
535
536         shard.underlyingActor().onReceiveCommand(applyState);
537
538         NormalizedNode<?,?> actual = readStore(shard, TestModel.TEST_PATH);
539         assertEquals("Applied state", node, actual);
540
541         shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
542     }
543
544     DataTree setupInMemorySnapshotStore() throws DataValidationFailedException {
545         DataTree testStore = InMemoryDataTreeFactory.getInstance().create();
546         testStore.setSchemaContext(SCHEMA_CONTEXT);
547
548         writeToStore(testStore, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
549
550         NormalizedNode<?, ?> root = readStore(testStore, YangInstanceIdentifier.builder().build());
551
552         InMemorySnapshotStore.addSnapshot(shardID.toString(), Snapshot.create(
553                 SerializationUtils.serializeNormalizedNode(root),
554                 Collections.<ReplicatedLogEntry>emptyList(), 0, 1, -1, -1));
555         return testStore;
556     }
557
558     private static DataTreeCandidatePayload payloadForModification(DataTree source, DataTreeModification mod) throws DataValidationFailedException {
559         source.validate(mod);
560         final DataTreeCandidate candidate = source.prepare(mod);
561         source.commit(candidate);
562         return DataTreeCandidatePayload.create(candidate);
563     }
564
565     @Test
566     public void testDataTreeCandidateRecovery() throws Exception {
567         // Set up the InMemorySnapshotStore.
568         final DataTree source = setupInMemorySnapshotStore();
569
570         final DataTreeModification writeMod = source.takeSnapshot().newModification();
571         writeMod.write(TestModel.OUTER_LIST_PATH, ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
572
573         InMemoryJournal.addEntry(shardID.toString(), 0, DUMMY_DATA);
574
575         // Set up the InMemoryJournal.
576         InMemoryJournal.addEntry(shardID.toString(), 1, new ReplicatedLogImplEntry(0, 1, payloadForModification(source, writeMod)));
577
578         int nListEntries = 16;
579         Set<Integer> listEntryKeys = new HashSet<>();
580
581         // Add some ModificationPayload entries
582         for (int i = 1; i <= nListEntries; i++) {
583             listEntryKeys.add(Integer.valueOf(i));
584
585             YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
586                     .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
587
588             final DataTreeModification mod = source.takeSnapshot().newModification();
589             mod.merge(path, ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i));
590
591             InMemoryJournal.addEntry(shardID.toString(), i+1, new ReplicatedLogImplEntry(i, 1,
592                 payloadForModification(source, mod)));
593         }
594
595         InMemoryJournal.addEntry(shardID.toString(), nListEntries + 2,
596                 new ApplyJournalEntries(nListEntries));
597
598         testRecovery(listEntryKeys);
599     }
600
601     @Test
602     public void testModicationRecovery() throws Exception {
603
604         // Set up the InMemorySnapshotStore.
605         setupInMemorySnapshotStore();
606
607         // Set up the InMemoryJournal.
608
609         InMemoryJournal.addEntry(shardID.toString(), 0, DUMMY_DATA);
610
611         InMemoryJournal.addEntry(shardID.toString(), 1, new ReplicatedLogImplEntry(0, 1, newModificationPayload(
612                   new WriteModification(TestModel.OUTER_LIST_PATH,
613                           ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build()))));
614
615         int nListEntries = 16;
616         Set<Integer> listEntryKeys = new HashSet<>();
617
618         // Add some ModificationPayload entries
619         for(int i = 1; i <= nListEntries; i++) {
620             listEntryKeys.add(Integer.valueOf(i));
621             YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
622                     .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
623             Modification mod = new MergeModification(path,
624                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i));
625             InMemoryJournal.addEntry(shardID.toString(), i + 1, new ReplicatedLogImplEntry(i, 1,
626                     newModificationPayload(mod)));
627         }
628
629         InMemoryJournal.addEntry(shardID.toString(), nListEntries + 2,
630                 new ApplyJournalEntries(nListEntries));
631
632         testRecovery(listEntryKeys);
633     }
634
635     private static ModificationPayload newModificationPayload(final Modification... mods) throws IOException {
636         MutableCompositeModification compMod = new MutableCompositeModification();
637         for(Modification mod: mods) {
638             compMod.addModification(mod);
639         }
640
641         return new ModificationPayload(compMod);
642     }
643
644     @Test
645     public void testConcurrentThreePhaseCommits() throws Throwable {
646         new ShardTestKit(getSystem()) {{
647             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
648                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
649                     "testConcurrentThreePhaseCommits");
650
651             waitUntilLeader(shard);
652
653          // Setup 3 simulated transactions with mock cohorts backed by real cohorts.
654
655             ShardDataTree dataStore = shard.underlyingActor().getDataStore();
656
657             String transactionID1 = "tx1";
658             MutableCompositeModification modification1 = new MutableCompositeModification();
659             ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
660                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
661
662             String transactionID2 = "tx2";
663             MutableCompositeModification modification2 = new MutableCompositeModification();
664             ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
665                     TestModel.OUTER_LIST_PATH,
666                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
667                     modification2);
668
669             String transactionID3 = "tx3";
670             MutableCompositeModification modification3 = new MutableCompositeModification();
671             ShardDataTreeCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
672                     YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
673                         .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
674                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1),
675                     modification3);
676
677             long timeoutSec = 5;
678             final FiniteDuration duration = FiniteDuration.create(timeoutSec, TimeUnit.SECONDS);
679             final Timeout timeout = new Timeout(duration);
680
681             // Simulate the ForwardedReadyTransaction message for the first Tx that would be sent
682             // by the ShardTransaction.
683
684             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
685                     cohort1, modification1, true, false), getRef());
686             ReadyTransactionReply readyReply = ReadyTransactionReply.fromSerializable(
687                     expectMsgClass(duration, ReadyTransactionReply.class));
688             assertEquals("Cohort path", shard.path().toString(), readyReply.getCohortPath());
689
690             // Send the CanCommitTransaction message for the first Tx.
691
692             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
693             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
694                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
695             assertEquals("Can commit", true, canCommitReply.getCanCommit());
696
697             // Send the ForwardedReadyTransaction for the next 2 Tx's.
698
699             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
700                     cohort2, modification2, true, false), getRef());
701             expectMsgClass(duration, ReadyTransactionReply.class);
702
703             shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
704                     cohort3, modification3, true, false), getRef());
705             expectMsgClass(duration, ReadyTransactionReply.class);
706
707             // Send the CanCommitTransaction message for the next 2 Tx's. These should get queued and
708             // processed after the first Tx completes.
709
710             Future<Object> canCommitFuture1 = Patterns.ask(shard,
711                     new CanCommitTransaction(transactionID2).toSerializable(), timeout);
712
713             Future<Object> canCommitFuture2 = Patterns.ask(shard,
714                     new CanCommitTransaction(transactionID3).toSerializable(), timeout);
715
716             // Send the CommitTransaction message for the first Tx. After it completes, it should
717             // trigger the 2nd Tx to proceed which should in turn then trigger the 3rd.
718
719             shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
720             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
721
722             // Wait for the next 2 Tx's to complete.
723
724             final AtomicReference<Throwable> caughtEx = new AtomicReference<>();
725             final CountDownLatch commitLatch = new CountDownLatch(2);
726
727             class OnFutureComplete extends OnComplete<Object> {
728                 private final Class<?> expRespType;
729
730                 OnFutureComplete(final Class<?> expRespType) {
731                     this.expRespType = expRespType;
732                 }
733
734                 @Override
735                 public void onComplete(final Throwable error, final Object resp) {
736                     if(error != null) {
737                         caughtEx.set(new AssertionError(getClass().getSimpleName() + " failure", error));
738                     } else {
739                         try {
740                             assertEquals("Commit response type", expRespType, resp.getClass());
741                             onSuccess(resp);
742                         } catch (Exception e) {
743                             caughtEx.set(e);
744                         }
745                     }
746                 }
747
748                 void onSuccess(final Object resp) throws Exception {
749                 }
750             }
751
752             class OnCommitFutureComplete extends OnFutureComplete {
753                 OnCommitFutureComplete() {
754                     super(CommitTransactionReply.SERIALIZABLE_CLASS);
755                 }
756
757                 @Override
758                 public void onComplete(final Throwable error, final Object resp) {
759                     super.onComplete(error, resp);
760                     commitLatch.countDown();
761                 }
762             }
763
764             class OnCanCommitFutureComplete extends OnFutureComplete {
765                 private final String transactionID;
766
767                 OnCanCommitFutureComplete(final String transactionID) {
768                     super(CanCommitTransactionReply.SERIALIZABLE_CLASS);
769                     this.transactionID = transactionID;
770                 }
771
772                 @Override
773                 void onSuccess(final Object resp) throws Exception {
774                     CanCommitTransactionReply canCommitReply =
775                             CanCommitTransactionReply.fromSerializable(resp);
776                     assertEquals("Can commit", true, canCommitReply.getCanCommit());
777
778                     Future<Object> commitFuture = Patterns.ask(shard,
779                             new CommitTransaction(transactionID).toSerializable(), timeout);
780                     commitFuture.onComplete(new OnCommitFutureComplete(), getSystem().dispatcher());
781                 }
782             }
783
784             canCommitFuture1.onComplete(new OnCanCommitFutureComplete(transactionID2),
785                     getSystem().dispatcher());
786
787             canCommitFuture2.onComplete(new OnCanCommitFutureComplete(transactionID3),
788                     getSystem().dispatcher());
789
790             boolean done = commitLatch.await(timeoutSec, TimeUnit.SECONDS);
791
792             if(caughtEx.get() != null) {
793                 throw caughtEx.get();
794             }
795
796             assertEquals("Commits complete", true, done);
797
798             InOrder inOrder = inOrder(cohort1, cohort2, cohort3);
799             inOrder.verify(cohort1).canCommit();
800             inOrder.verify(cohort1).preCommit();
801             inOrder.verify(cohort1).commit();
802             inOrder.verify(cohort2).canCommit();
803             inOrder.verify(cohort2).preCommit();
804             inOrder.verify(cohort2).commit();
805             inOrder.verify(cohort3).canCommit();
806             inOrder.verify(cohort3).preCommit();
807             inOrder.verify(cohort3).commit();
808
809             // Verify data in the data store.
810
811             verifyOuterListEntry(shard, 1);
812
813             verifyLastApplied(shard, 2);
814
815             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
816         }};
817     }
818
819     private static BatchedModifications newBatchedModifications(String transactionID, YangInstanceIdentifier path,
820             NormalizedNode<?, ?> data, boolean ready, boolean doCommitOnReady, int messagesSent) {
821         return newBatchedModifications(transactionID, null, path, data, ready, doCommitOnReady, messagesSent);
822     }
823
824     private static BatchedModifications newBatchedModifications(String transactionID, String transactionChainID,
825             YangInstanceIdentifier path, NormalizedNode<?, ?> data, boolean ready, boolean doCommitOnReady,
826             int messagesSent) {
827         BatchedModifications batched = new BatchedModifications(transactionID, CURRENT_VERSION, transactionChainID);
828         batched.addModification(new WriteModification(path, data));
829         batched.setReady(ready);
830         batched.setDoCommitOnReady(doCommitOnReady);
831         batched.setTotalMessagesSent(messagesSent);
832         return batched;
833     }
834
835     @Test
836     public void testBatchedModificationsWithNoCommitOnReady() throws Throwable {
837         new ShardTestKit(getSystem()) {{
838             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
839                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
840                     "testBatchedModificationsWithNoCommitOnReady");
841
842             waitUntilLeader(shard);
843
844             final String transactionID = "tx";
845             FiniteDuration duration = duration("5 seconds");
846
847             final AtomicReference<ShardDataTreeCohort> mockCohort = new AtomicReference<>();
848             ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() {
849                 @Override
850                 public ShardDataTreeCohort decorate(String txID, ShardDataTreeCohort actual) {
851                     if(mockCohort.get() == null) {
852                         mockCohort.set(createDelegatingMockCohort("cohort", actual));
853                     }
854
855                     return mockCohort.get();
856                 }
857             };
858
859             shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator);
860
861             // Send a BatchedModifications to start a transaction.
862
863             shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH,
864                     ImmutableNodes.containerNode(TestModel.TEST_QNAME), false, false, 1), getRef());
865             expectMsgClass(duration, BatchedModificationsReply.class);
866
867             // Send a couple more BatchedModifications.
868
869             shard.tell(newBatchedModifications(transactionID, TestModel.OUTER_LIST_PATH,
870                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), false, false, 2), getRef());
871             expectMsgClass(duration, BatchedModificationsReply.class);
872
873             shard.tell(newBatchedModifications(transactionID, YangInstanceIdentifier.builder(
874                     TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
875                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true, false, 3), getRef());
876             expectMsgClass(duration, ReadyTransactionReply.class);
877
878             // Send the CanCommitTransaction message.
879
880             shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
881             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
882                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
883             assertEquals("Can commit", true, canCommitReply.getCanCommit());
884
885             // Send the CanCommitTransaction message.
886
887             shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
888             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
889
890             InOrder inOrder = inOrder(mockCohort.get());
891             inOrder.verify(mockCohort.get()).canCommit();
892             inOrder.verify(mockCohort.get()).preCommit();
893             inOrder.verify(mockCohort.get()).commit();
894
895             // Verify data in the data store.
896
897             verifyOuterListEntry(shard, 1);
898
899             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
900         }};
901     }
902
903     @Test
904     public void testBatchedModificationsWithCommitOnReady() throws Throwable {
905         new ShardTestKit(getSystem()) {{
906             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
907                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
908                     "testBatchedModificationsWithCommitOnReady");
909
910             waitUntilLeader(shard);
911
912             final String transactionID = "tx";
913             FiniteDuration duration = duration("5 seconds");
914
915             final AtomicReference<ShardDataTreeCohort> mockCohort = new AtomicReference<>();
916             ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() {
917                 @Override
918                 public ShardDataTreeCohort decorate(String txID, ShardDataTreeCohort actual) {
919                     if(mockCohort.get() == null) {
920                         mockCohort.set(createDelegatingMockCohort("cohort", actual));
921                     }
922
923                     return mockCohort.get();
924                 }
925             };
926
927             shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator);
928
929             // Send a BatchedModifications to start a transaction.
930
931             shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH,
932                     ImmutableNodes.containerNode(TestModel.TEST_QNAME), false, false, 1), getRef());
933             expectMsgClass(duration, BatchedModificationsReply.class);
934
935             // Send a couple more BatchedModifications.
936
937             shard.tell(newBatchedModifications(transactionID, TestModel.OUTER_LIST_PATH,
938                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), false, false, 2), getRef());
939             expectMsgClass(duration, BatchedModificationsReply.class);
940
941             shard.tell(newBatchedModifications(transactionID, YangInstanceIdentifier.builder(
942                     TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
943                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true, true, 3), getRef());
944
945             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
946
947             InOrder inOrder = inOrder(mockCohort.get());
948             inOrder.verify(mockCohort.get()).canCommit();
949             inOrder.verify(mockCohort.get()).preCommit();
950             inOrder.verify(mockCohort.get()).commit();
951
952             // Verify data in the data store.
953
954             verifyOuterListEntry(shard, 1);
955
956             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
957         }};
958     }
959
960     @Test(expected=IllegalStateException.class)
961     public void testBatchedModificationsReadyWithIncorrectTotalMessageCount() throws Throwable {
962         new ShardTestKit(getSystem()) {{
963             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
964                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
965                     "testBatchedModificationsReadyWithIncorrectTotalMessageCount");
966
967             waitUntilLeader(shard);
968
969             String transactionID = "tx1";
970             BatchedModifications batched = new BatchedModifications(transactionID, DataStoreVersions.CURRENT_VERSION, null);
971             batched.setReady(true);
972             batched.setTotalMessagesSent(2);
973
974             shard.tell(batched, getRef());
975
976             Failure failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
977
978             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
979
980             if(failure != null) {
981                 throw failure.cause();
982             }
983         }};
984     }
985
986     @SuppressWarnings("unchecked")
987     private static void verifyOuterListEntry(final TestActorRef<Shard> shard, Object expIDValue) throws Exception {
988         NormalizedNode<?, ?> outerList = readStore(shard, TestModel.OUTER_LIST_PATH);
989         assertNotNull(TestModel.OUTER_LIST_QNAME.getLocalName() + " not found", outerList);
990         assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " value is not Iterable",
991                 outerList.getValue() instanceof Iterable);
992         Object entry = ((Iterable<Object>)outerList.getValue()).iterator().next();
993         assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " entry is not MapEntryNode",
994                 entry instanceof MapEntryNode);
995         MapEntryNode mapEntry = (MapEntryNode)entry;
996         Optional<DataContainerChild<? extends PathArgument, ?>> idLeaf =
997                 mapEntry.getChild(new YangInstanceIdentifier.NodeIdentifier(TestModel.ID_QNAME));
998         assertTrue("Missing leaf " + TestModel.ID_QNAME.getLocalName(), idLeaf.isPresent());
999         assertEquals(TestModel.ID_QNAME.getLocalName() + " value", expIDValue, idLeaf.get().getValue());
1000     }
1001
1002     @Test
1003     public void testBatchedModificationsOnTransactionChain() throws Throwable {
1004         new ShardTestKit(getSystem()) {{
1005             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1006                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1007                     "testBatchedModificationsOnTransactionChain");
1008
1009             waitUntilLeader(shard);
1010
1011             String transactionChainID = "txChain";
1012             String transactionID1 = "tx1";
1013             String transactionID2 = "tx2";
1014
1015             FiniteDuration duration = duration("5 seconds");
1016
1017             // Send a BatchedModifications to start a chained write transaction and ready it.
1018
1019             ContainerNode containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1020             YangInstanceIdentifier path = TestModel.TEST_PATH;
1021             shard.tell(newBatchedModifications(transactionID1, transactionChainID, path,
1022                     containerNode, true, false, 1), getRef());
1023             expectMsgClass(duration, ReadyTransactionReply.class);
1024
1025             // Create a read Tx on the same chain.
1026
1027             shard.tell(new CreateTransaction(transactionID2, TransactionType.READ_ONLY.ordinal() ,
1028                     transactionChainID).toSerializable(), getRef());
1029
1030             CreateTransactionReply createReply = expectMsgClass(duration("3 seconds"), CreateTransactionReply.class);
1031
1032             getSystem().actorSelection(createReply.getTransactionActorPath()).tell(new ReadData(path), getRef());
1033             ReadDataReply readReply = expectMsgClass(duration("3 seconds"), ReadDataReply.class);
1034             assertEquals("Read node", containerNode, readReply.getNormalizedNode());
1035
1036             // Commit the write transaction.
1037
1038             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1039             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1040                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1041             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1042
1043             shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
1044             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1045
1046             // Verify data in the data store.
1047
1048             NormalizedNode<?, ?> actualNode = readStore(shard, path);
1049             assertEquals("Stored node", containerNode, actualNode);
1050
1051             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1052         }};
1053     }
1054
1055     @Test
1056     public void testOnBatchedModificationsWhenNotLeader() {
1057         final AtomicBoolean overrideLeaderCalls = new AtomicBoolean();
1058         new ShardTestKit(getSystem()) {{
1059             Creator<Shard> creator = new Creator<Shard>() {
1060                 private static final long serialVersionUID = 1L;
1061
1062                 @Override
1063                 public Shard create() throws Exception {
1064                     return new Shard(shardID, Collections.<String,String>emptyMap(),
1065                             newDatastoreContext(), SCHEMA_CONTEXT) {
1066                         @Override
1067                         protected boolean isLeader() {
1068                             return overrideLeaderCalls.get() ? false : super.isLeader();
1069                         }
1070
1071                         @Override
1072                         protected ActorSelection getLeader() {
1073                             return overrideLeaderCalls.get() ? getSystem().actorSelection(getRef().path()) :
1074                                 super.getLeader();
1075                         }
1076                     };
1077                 }
1078             };
1079
1080             TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1081                     Props.create(new DelegatingShardCreator(creator)), "testOnBatchedModificationsWhenNotLeader");
1082
1083             waitUntilLeader(shard);
1084
1085             overrideLeaderCalls.set(true);
1086
1087             BatchedModifications batched = new BatchedModifications("tx", DataStoreVersions.CURRENT_VERSION, "");
1088
1089             shard.tell(batched, ActorRef.noSender());
1090
1091             expectMsgEquals(batched);
1092
1093             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1094         }};
1095     }
1096
1097     @Test
1098     public void testForwardedReadyTransactionWithImmediateCommit() throws Exception{
1099         new ShardTestKit(getSystem()) {{
1100             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1101                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1102                     "testForwardedReadyTransactionWithImmediateCommit");
1103
1104             waitUntilLeader(shard);
1105
1106             ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1107
1108             String transactionID = "tx1";
1109             MutableCompositeModification modification = new MutableCompositeModification();
1110             NormalizedNode<?, ?> containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1111             ShardDataTreeCohort cohort = setupMockWriteTransaction("cohort", dataStore,
1112                     TestModel.TEST_PATH, containerNode, modification);
1113
1114             FiniteDuration duration = duration("5 seconds");
1115
1116             // Simulate the ForwardedReadyTransaction messages that would be sent
1117             // by the ShardTransaction.
1118
1119             shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
1120                     cohort, modification, true, true), getRef());
1121
1122             expectMsgClass(duration, ThreePhaseCommitCohortMessages.CommitTransactionReply.class);
1123
1124             InOrder inOrder = inOrder(cohort);
1125             inOrder.verify(cohort).canCommit();
1126             inOrder.verify(cohort).preCommit();
1127             inOrder.verify(cohort).commit();
1128
1129             NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.TEST_PATH);
1130             assertEquals(TestModel.TEST_QNAME.getLocalName(), containerNode, actualNode);
1131
1132             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1133         }};
1134     }
1135
1136     @Test
1137     public void testReadyLocalTransactionWithImmediateCommit() throws Exception{
1138         new ShardTestKit(getSystem()) {{
1139             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1140                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1141                     "testReadyLocalTransactionWithImmediateCommit");
1142
1143             waitUntilLeader(shard);
1144
1145             ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1146
1147             DataTreeModification modification = dataStore.getDataTree().takeSnapshot().newModification();
1148
1149             ContainerNode writeData = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1150             new WriteModification(TestModel.TEST_PATH, writeData).apply(modification);
1151             MapNode mergeData = ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build();
1152             new MergeModification(TestModel.OUTER_LIST_PATH, mergeData).apply(modification);
1153
1154             String txId = "tx1";
1155             ReadyLocalTransaction readyMessage = new ReadyLocalTransaction(txId, modification, true);
1156
1157             shard.tell(readyMessage, getRef());
1158
1159             expectMsgClass(CommitTransactionReply.SERIALIZABLE_CLASS);
1160
1161             NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.OUTER_LIST_PATH);
1162             assertEquals(TestModel.OUTER_LIST_QNAME.getLocalName(), mergeData, actualNode);
1163
1164             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1165         }};
1166     }
1167
1168     @Test
1169     public void testReadyLocalTransactionWithThreePhaseCommit() throws Exception{
1170         new ShardTestKit(getSystem()) {{
1171             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1172                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1173                     "testReadyLocalTransactionWithThreePhaseCommit");
1174
1175             waitUntilLeader(shard);
1176
1177             ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1178
1179             DataTreeModification modification = dataStore.getDataTree().takeSnapshot().newModification();
1180
1181             ContainerNode writeData = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1182             new WriteModification(TestModel.TEST_PATH, writeData).apply(modification);
1183             MapNode mergeData = ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build();
1184             new MergeModification(TestModel.OUTER_LIST_PATH, mergeData).apply(modification);
1185
1186             String txId = "tx1";
1187             ReadyLocalTransaction readyMessage = new ReadyLocalTransaction(txId, modification, false);
1188
1189             shard.tell(readyMessage, getRef());
1190
1191             expectMsgClass(ReadyTransactionReply.class);
1192
1193             // Send the CanCommitTransaction message.
1194
1195             shard.tell(new CanCommitTransaction(txId).toSerializable(), getRef());
1196             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1197                     expectMsgClass(CanCommitTransactionReply.SERIALIZABLE_CLASS));
1198             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1199
1200             // Send the CanCommitTransaction message.
1201
1202             shard.tell(new CommitTransaction(txId).toSerializable(), getRef());
1203             expectMsgClass(CommitTransactionReply.SERIALIZABLE_CLASS);
1204
1205             NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.OUTER_LIST_PATH);
1206             assertEquals(TestModel.OUTER_LIST_QNAME.getLocalName(), mergeData, actualNode);
1207
1208             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1209         }};
1210     }
1211
1212     @Test
1213     public void testCommitWithPersistenceDisabled() throws Throwable {
1214         dataStoreContextBuilder.persistent(false);
1215         new ShardTestKit(getSystem()) {{
1216             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1217                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1218                     "testCommitWithPersistenceDisabled");
1219
1220             waitUntilLeader(shard);
1221
1222             ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1223
1224             // Setup a simulated transactions with a mock cohort.
1225
1226             String transactionID = "tx";
1227             MutableCompositeModification modification = new MutableCompositeModification();
1228             NormalizedNode<?, ?> containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1229             ShardDataTreeCohort cohort = setupMockWriteTransaction("cohort", dataStore,
1230                     TestModel.TEST_PATH, containerNode, modification);
1231
1232             FiniteDuration duration = duration("5 seconds");
1233
1234             // Simulate the ForwardedReadyTransaction messages that would be sent
1235             // by the ShardTransaction.
1236
1237             shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
1238                     cohort, modification, true, false), getRef());
1239             expectMsgClass(duration, ReadyTransactionReply.class);
1240
1241             // Send the CanCommitTransaction message.
1242
1243             shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
1244             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1245                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1246             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1247
1248             // Send the CanCommitTransaction message.
1249
1250             shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
1251             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1252
1253             InOrder inOrder = inOrder(cohort);
1254             inOrder.verify(cohort).canCommit();
1255             inOrder.verify(cohort).preCommit();
1256             inOrder.verify(cohort).commit();
1257
1258             NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.TEST_PATH);
1259             assertEquals(TestModel.TEST_QNAME.getLocalName(), containerNode, actualNode);
1260
1261             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1262         }};
1263     }
1264
1265     private static DataTreeCandidateTip mockCandidate(final String name) {
1266         DataTreeCandidateTip mockCandidate = mock(DataTreeCandidateTip.class, name);
1267         DataTreeCandidateNode mockCandidateNode = mock(DataTreeCandidateNode.class, name + "-node");
1268         doReturn(ModificationType.WRITE).when(mockCandidateNode).getModificationType();
1269         doReturn(Optional.of(ImmutableNodes.containerNode(CARS_QNAME))).when(mockCandidateNode).getDataAfter();
1270         doReturn(YangInstanceIdentifier.builder().build()).when(mockCandidate).getRootPath();
1271         doReturn(mockCandidateNode).when(mockCandidate).getRootNode();
1272         return mockCandidate;
1273     }
1274
1275     private static DataTreeCandidateTip mockUnmodifiedCandidate(final String name) {
1276         DataTreeCandidateTip mockCandidate = mock(DataTreeCandidateTip.class, name);
1277         DataTreeCandidateNode mockCandidateNode = mock(DataTreeCandidateNode.class, name + "-node");
1278         doReturn(ModificationType.UNMODIFIED).when(mockCandidateNode).getModificationType();
1279         doReturn(YangInstanceIdentifier.builder().build()).when(mockCandidate).getRootPath();
1280         doReturn(mockCandidateNode).when(mockCandidate).getRootNode();
1281         return mockCandidate;
1282     }
1283
1284     @Test
1285     public void testCommitWhenTransactionHasNoModifications(){
1286         // Note that persistence is enabled which would normally result in the entry getting written to the journal
1287         // but here that need not happen
1288         new ShardTestKit(getSystem()) {
1289             {
1290                 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1291                         newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1292                         "testCommitWhenTransactionHasNoModifications");
1293
1294                 waitUntilLeader(shard);
1295
1296                 String transactionID = "tx1";
1297                 MutableCompositeModification modification = new MutableCompositeModification();
1298                 ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1299                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1300                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).preCommit();
1301                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).commit();
1302                 doReturn(mockUnmodifiedCandidate("cohort1-candidate")).when(cohort).getCandidate();
1303
1304                 FiniteDuration duration = duration("5 seconds");
1305
1306                 // Simulate the ForwardedReadyTransaction messages that would be sent
1307                 // by the ShardTransaction.
1308
1309                 shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
1310                         cohort, modification, true, false), getRef());
1311                 expectMsgClass(duration, ReadyTransactionReply.class);
1312
1313                 // Send the CanCommitTransaction message.
1314
1315                 shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
1316                 CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1317                         expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1318                 assertEquals("Can commit", true, canCommitReply.getCanCommit());
1319
1320                 shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
1321                 expectMsgClass(duration, ThreePhaseCommitCohortMessages.CommitTransactionReply.class);
1322
1323                 InOrder inOrder = inOrder(cohort);
1324                 inOrder.verify(cohort).canCommit();
1325                 inOrder.verify(cohort).preCommit();
1326                 inOrder.verify(cohort).commit();
1327
1328                 shard.tell(Shard.GET_SHARD_MBEAN_MESSAGE, getRef());
1329                 ShardStats shardStats = expectMsgClass(duration, ShardStats.class);
1330
1331                 // Use MBean for verification
1332                 // Committed transaction count should increase as usual
1333                 assertEquals(1,shardStats.getCommittedTransactionsCount());
1334
1335                 // Commit index should not advance because this does not go into the journal
1336                 assertEquals(-1, shardStats.getCommitIndex());
1337
1338                 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1339
1340             }
1341         };
1342     }
1343
1344     @Test
1345     public void testCommitWhenTransactionHasModifications(){
1346         new ShardTestKit(getSystem()) {
1347             {
1348                 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1349                         newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1350                         "testCommitWhenTransactionHasModifications");
1351
1352                 waitUntilLeader(shard);
1353
1354                 String transactionID = "tx1";
1355                 MutableCompositeModification modification = new MutableCompositeModification();
1356                 modification.addModification(new DeleteModification(YangInstanceIdentifier.builder().build()));
1357                 ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1358                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1359                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).preCommit();
1360                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).commit();
1361                 doReturn(mockCandidate("cohort1-candidate")).when(cohort).getCandidate();
1362
1363                 FiniteDuration duration = duration("5 seconds");
1364
1365                 // Simulate the ForwardedReadyTransaction messages that would be sent
1366                 // by the ShardTransaction.
1367
1368                 shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
1369                         cohort, modification, true, false), getRef());
1370                 expectMsgClass(duration, ReadyTransactionReply.class);
1371
1372                 // Send the CanCommitTransaction message.
1373
1374                 shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
1375                 CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1376                         expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1377                 assertEquals("Can commit", true, canCommitReply.getCanCommit());
1378
1379                 shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
1380                 expectMsgClass(duration, ThreePhaseCommitCohortMessages.CommitTransactionReply.class);
1381
1382                 InOrder inOrder = inOrder(cohort);
1383                 inOrder.verify(cohort).canCommit();
1384                 inOrder.verify(cohort).preCommit();
1385                 inOrder.verify(cohort).commit();
1386
1387                 shard.tell(Shard.GET_SHARD_MBEAN_MESSAGE, getRef());
1388                 ShardStats shardStats = expectMsgClass(duration, ShardStats.class);
1389
1390                 // Use MBean for verification
1391                 // Committed transaction count should increase as usual
1392                 assertEquals(1, shardStats.getCommittedTransactionsCount());
1393
1394                 // Commit index should advance as we do not have an empty modification
1395                 assertEquals(0, shardStats.getCommitIndex());
1396
1397                 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1398
1399             }
1400         };
1401     }
1402
1403     @Test
1404     public void testCommitPhaseFailure() throws Throwable {
1405         new ShardTestKit(getSystem()) {{
1406             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1407                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1408                     "testCommitPhaseFailure");
1409
1410             waitUntilLeader(shard);
1411
1412             // Setup 2 simulated transactions with mock cohorts. The first one fails in the
1413             // commit phase.
1414
1415             String transactionID1 = "tx1";
1416             MutableCompositeModification modification1 = new MutableCompositeModification();
1417             ShardDataTreeCohort cohort1 = mock(ShardDataTreeCohort.class, "cohort1");
1418             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
1419             doReturn(Futures.immediateFuture(null)).when(cohort1).preCommit();
1420             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort1).commit();
1421             doReturn(mockCandidate("cohort1-candidate")).when(cohort1).getCandidate();
1422
1423             String transactionID2 = "tx2";
1424             MutableCompositeModification modification2 = new MutableCompositeModification();
1425             ShardDataTreeCohort cohort2 = mock(ShardDataTreeCohort.class, "cohort2");
1426             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
1427
1428             FiniteDuration duration = duration("5 seconds");
1429             final Timeout timeout = new Timeout(duration);
1430
1431             // Simulate the ForwardedReadyTransaction messages that would be sent
1432             // by the ShardTransaction.
1433
1434             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1435                     cohort1, modification1, true, false), getRef());
1436             expectMsgClass(duration, ReadyTransactionReply.class);
1437
1438             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1439                     cohort2, modification2, true, false), getRef());
1440             expectMsgClass(duration, ReadyTransactionReply.class);
1441
1442             // Send the CanCommitTransaction message for the first Tx.
1443
1444             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1445             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1446                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1447             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1448
1449             // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
1450             // processed after the first Tx completes.
1451
1452             Future<Object> canCommitFuture = Patterns.ask(shard,
1453                     new CanCommitTransaction(transactionID2).toSerializable(), timeout);
1454
1455             // Send the CommitTransaction message for the first Tx. This should send back an error
1456             // and trigger the 2nd Tx to proceed.
1457
1458             shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
1459             expectMsgClass(duration, akka.actor.Status.Failure.class);
1460
1461             // Wait for the 2nd Tx to complete the canCommit phase.
1462
1463             final CountDownLatch latch = new CountDownLatch(1);
1464             canCommitFuture.onComplete(new OnComplete<Object>() {
1465                 @Override
1466                 public void onComplete(final Throwable t, final Object resp) {
1467                     latch.countDown();
1468                 }
1469             }, getSystem().dispatcher());
1470
1471             assertEquals("2nd CanCommit complete", true, latch.await(5, TimeUnit.SECONDS));
1472
1473             InOrder inOrder = inOrder(cohort1, cohort2);
1474             inOrder.verify(cohort1).canCommit();
1475             inOrder.verify(cohort1).preCommit();
1476             inOrder.verify(cohort1).commit();
1477             inOrder.verify(cohort2).canCommit();
1478
1479             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1480         }};
1481     }
1482
1483     @Test
1484     public void testPreCommitPhaseFailure() throws Throwable {
1485         new ShardTestKit(getSystem()) {{
1486             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1487                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1488                     "testPreCommitPhaseFailure");
1489
1490             waitUntilLeader(shard);
1491
1492             String transactionID1 = "tx1";
1493             MutableCompositeModification modification1 = new MutableCompositeModification();
1494             ShardDataTreeCohort cohort1 = mock(ShardDataTreeCohort.class, "cohort1");
1495             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
1496             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort1).preCommit();
1497
1498             String transactionID2 = "tx2";
1499             MutableCompositeModification modification2 = new MutableCompositeModification();
1500             ShardDataTreeCohort cohort2 = mock(ShardDataTreeCohort.class, "cohort2");
1501             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
1502
1503             FiniteDuration duration = duration("5 seconds");
1504             final Timeout timeout = new Timeout(duration);
1505
1506             // Simulate the ForwardedReadyTransaction messages that would be sent
1507             // by the ShardTransaction.
1508
1509             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1510                     cohort1, modification1, true, false), getRef());
1511             expectMsgClass(duration, ReadyTransactionReply.class);
1512
1513             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1514                     cohort2, modification2, true, false), getRef());
1515             expectMsgClass(duration, ReadyTransactionReply.class);
1516
1517             // Send the CanCommitTransaction message for the first Tx.
1518
1519             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1520             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1521                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1522             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1523
1524             // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
1525             // processed after the first Tx completes.
1526
1527             Future<Object> canCommitFuture = Patterns.ask(shard,
1528                     new CanCommitTransaction(transactionID2).toSerializable(), timeout);
1529
1530             // Send the CommitTransaction message for the first Tx. This should send back an error
1531             // and trigger the 2nd Tx to proceed.
1532
1533             shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
1534             expectMsgClass(duration, akka.actor.Status.Failure.class);
1535
1536             // Wait for the 2nd Tx to complete the canCommit phase.
1537
1538             final CountDownLatch latch = new CountDownLatch(1);
1539             canCommitFuture.onComplete(new OnComplete<Object>() {
1540                 @Override
1541                 public void onComplete(final Throwable t, final Object resp) {
1542                     latch.countDown();
1543                 }
1544             }, getSystem().dispatcher());
1545
1546             assertEquals("2nd CanCommit complete", true, latch.await(5, TimeUnit.SECONDS));
1547
1548             InOrder inOrder = inOrder(cohort1, cohort2);
1549             inOrder.verify(cohort1).canCommit();
1550             inOrder.verify(cohort1).preCommit();
1551             inOrder.verify(cohort2).canCommit();
1552
1553             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1554         }};
1555     }
1556
1557     @Test
1558     public void testCanCommitPhaseFailure() throws Throwable {
1559         new ShardTestKit(getSystem()) {{
1560             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1561                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1562                     "testCanCommitPhaseFailure");
1563
1564             waitUntilLeader(shard);
1565
1566             final FiniteDuration duration = duration("5 seconds");
1567
1568             String transactionID1 = "tx1";
1569             MutableCompositeModification modification = new MutableCompositeModification();
1570             ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1571             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).canCommit();
1572
1573             // Simulate the ForwardedReadyTransaction messages that would be sent
1574             // by the ShardTransaction.
1575
1576             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1577                     cohort, modification, true, false), getRef());
1578             expectMsgClass(duration, ReadyTransactionReply.class);
1579
1580             // Send the CanCommitTransaction message.
1581
1582             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1583             expectMsgClass(duration, akka.actor.Status.Failure.class);
1584
1585             // Send another can commit to ensure the failed one got cleaned up.
1586
1587             reset(cohort);
1588
1589             String transactionID2 = "tx2";
1590             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1591
1592             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1593                     cohort, modification, true, false), getRef());
1594             expectMsgClass(duration, ReadyTransactionReply.class);
1595
1596             shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
1597             CanCommitTransactionReply reply = CanCommitTransactionReply.fromSerializable(
1598                     expectMsgClass(CanCommitTransactionReply.SERIALIZABLE_CLASS));
1599             assertEquals("getCanCommit", true, reply.getCanCommit());
1600
1601             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1602         }};
1603     }
1604
1605     @Test
1606     public void testCanCommitPhaseFalseResponse() throws Throwable {
1607         new ShardTestKit(getSystem()) {{
1608             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1609                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1610                     "testCanCommitPhaseFalseResponse");
1611
1612             waitUntilLeader(shard);
1613
1614             final FiniteDuration duration = duration("5 seconds");
1615
1616             String transactionID1 = "tx1";
1617             MutableCompositeModification modification = new MutableCompositeModification();
1618             ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1619             doReturn(Futures.immediateFuture(Boolean.FALSE)).when(cohort).canCommit();
1620
1621             // Simulate the ForwardedReadyTransaction messages that would be sent
1622             // by the ShardTransaction.
1623
1624             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1625                     cohort, modification, true, false), getRef());
1626             expectMsgClass(duration, ReadyTransactionReply.class);
1627
1628             // Send the CanCommitTransaction message.
1629
1630             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1631             CanCommitTransactionReply reply = CanCommitTransactionReply.fromSerializable(
1632                     expectMsgClass(CanCommitTransactionReply.SERIALIZABLE_CLASS));
1633             assertEquals("getCanCommit", false, reply.getCanCommit());
1634
1635             // Send another can commit to ensure the failed one got cleaned up.
1636
1637             reset(cohort);
1638
1639             String transactionID2 = "tx2";
1640             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1641
1642             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1643                     cohort, modification, true, false), getRef());
1644             expectMsgClass(duration, ReadyTransactionReply.class);
1645
1646             shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
1647             reply = CanCommitTransactionReply.fromSerializable(
1648                     expectMsgClass(CanCommitTransactionReply.SERIALIZABLE_CLASS));
1649             assertEquals("getCanCommit", true, reply.getCanCommit());
1650
1651             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1652         }};
1653     }
1654
1655     @Test
1656     public void testImmediateCommitWithCanCommitPhaseFailure() throws Throwable {
1657         new ShardTestKit(getSystem()) {{
1658             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1659                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1660                     "testImmediateCommitWithCanCommitPhaseFailure");
1661
1662             waitUntilLeader(shard);
1663
1664             final FiniteDuration duration = duration("5 seconds");
1665
1666             String transactionID1 = "tx1";
1667             MutableCompositeModification modification = new MutableCompositeModification();
1668             ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1669             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).canCommit();
1670
1671             // Simulate the ForwardedReadyTransaction messages that would be sent
1672             // by the ShardTransaction.
1673
1674             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1675                     cohort, modification, true, true), getRef());
1676
1677             expectMsgClass(duration, akka.actor.Status.Failure.class);
1678
1679             // Send another can commit to ensure the failed one got cleaned up.
1680
1681             reset(cohort);
1682
1683             String transactionID2 = "tx2";
1684             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1685             doReturn(Futures.immediateFuture(null)).when(cohort).preCommit();
1686             doReturn(Futures.immediateFuture(null)).when(cohort).commit();
1687             DataTreeCandidateTip candidate = mock(DataTreeCandidateTip.class);
1688             DataTreeCandidateNode candidateRoot = mock(DataTreeCandidateNode.class);
1689             doReturn(ModificationType.UNMODIFIED).when(candidateRoot).getModificationType();
1690             doReturn(candidateRoot).when(candidate).getRootNode();
1691             doReturn(candidate).when(cohort).getCandidate();
1692
1693             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1694                     cohort, modification, true, true), getRef());
1695
1696             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1697
1698             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1699         }};
1700     }
1701
1702     @Test
1703     public void testImmediateCommitWithCanCommitPhaseFalseResponse() throws Throwable {
1704         new ShardTestKit(getSystem()) {{
1705             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1706                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1707                     "testImmediateCommitWithCanCommitPhaseFalseResponse");
1708
1709             waitUntilLeader(shard);
1710
1711             final FiniteDuration duration = duration("5 seconds");
1712
1713             String transactionID = "tx1";
1714             MutableCompositeModification modification = new MutableCompositeModification();
1715             ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1716             doReturn(Futures.immediateFuture(Boolean.FALSE)).when(cohort).canCommit();
1717
1718             // Simulate the ForwardedReadyTransaction messages that would be sent
1719             // by the ShardTransaction.
1720
1721             shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
1722                     cohort, modification, true, true), getRef());
1723
1724             expectMsgClass(duration, akka.actor.Status.Failure.class);
1725
1726             // Send another can commit to ensure the failed one got cleaned up.
1727
1728             reset(cohort);
1729
1730             String transactionID2 = "tx2";
1731             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1732             doReturn(Futures.immediateFuture(null)).when(cohort).preCommit();
1733             doReturn(Futures.immediateFuture(null)).when(cohort).commit();
1734             DataTreeCandidateTip candidate = mock(DataTreeCandidateTip.class);
1735             DataTreeCandidateNode candidateRoot = mock(DataTreeCandidateNode.class);
1736             doReturn(ModificationType.UNMODIFIED).when(candidateRoot).getModificationType();
1737             doReturn(candidateRoot).when(candidate).getRootNode();
1738             doReturn(candidate).when(cohort).getCandidate();
1739
1740             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1741                     cohort, modification, true, true), getRef());
1742
1743             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1744
1745             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1746         }};
1747     }
1748
1749     @Test
1750     public void testAbortBeforeFinishCommit() throws Throwable {
1751         new ShardTestKit(getSystem()) {{
1752             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1753                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1754                     "testAbortBeforeFinishCommit");
1755
1756             waitUntilLeader(shard);
1757
1758             final FiniteDuration duration = duration("5 seconds");
1759             ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1760
1761             final String transactionID = "tx1";
1762             Function<ShardDataTreeCohort, ListenableFuture<Void>> preCommit =
1763                           new Function<ShardDataTreeCohort, ListenableFuture<Void>>() {
1764                 @Override
1765                 public ListenableFuture<Void> apply(final ShardDataTreeCohort cohort) {
1766                     ListenableFuture<Void> preCommitFuture = cohort.preCommit();
1767
1768                     // Simulate an AbortTransaction message occurring during replication, after
1769                     // persisting and before finishing the commit to the in-memory store.
1770                     // We have no followers so due to optimizations in the RaftActor, it does not
1771                     // attempt replication and thus we can't send an AbortTransaction message b/c
1772                     // it would be processed too late after CommitTransaction completes. So we'll
1773                     // simulate an AbortTransaction message occurring during replication by calling
1774                     // the shard directly.
1775                     //
1776                     shard.underlyingActor().doAbortTransaction(transactionID, null);
1777
1778                     return preCommitFuture;
1779                 }
1780             };
1781
1782             MutableCompositeModification modification = new MutableCompositeModification();
1783             ShardDataTreeCohort cohort = setupMockWriteTransaction("cohort1", dataStore,
1784                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME),
1785                     modification, preCommit);
1786
1787             shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
1788                     cohort, modification, true, false), getRef());
1789             expectMsgClass(duration, ReadyTransactionReply.class);
1790
1791             shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
1792             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1793                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1794             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1795
1796             shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
1797             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1798
1799             NormalizedNode<?, ?> node = readStore(shard, TestModel.TEST_PATH);
1800
1801             // Since we're simulating an abort occurring during replication and before finish commit,
1802             // the data should still get written to the in-memory store since we've gotten past
1803             // canCommit and preCommit and persisted the data.
1804             assertNotNull(TestModel.TEST_QNAME.getLocalName() + " not found", node);
1805
1806             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1807         }};
1808     }
1809
1810     @Test
1811     public void testTransactionCommitTimeout() throws Throwable {
1812         dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1);
1813
1814         new ShardTestKit(getSystem()) {{
1815             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1816                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1817                     "testTransactionCommitTimeout");
1818
1819             waitUntilLeader(shard);
1820
1821             final FiniteDuration duration = duration("5 seconds");
1822
1823             ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1824
1825             writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1826             writeToStore(shard, TestModel.OUTER_LIST_PATH,
1827                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
1828
1829             // Create 1st Tx - will timeout
1830
1831             String transactionID1 = "tx1";
1832             MutableCompositeModification modification1 = new MutableCompositeModification();
1833             ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
1834                     YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
1835                         .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
1836                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1),
1837                     modification1);
1838
1839             // Create 2nd Tx
1840
1841             String transactionID2 = "tx3";
1842             MutableCompositeModification modification2 = new MutableCompositeModification();
1843             YangInstanceIdentifier listNodePath = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
1844                 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build();
1845             ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort3", dataStore,
1846                     listNodePath,
1847                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2),
1848                     modification2);
1849
1850             // Ready the Tx's
1851
1852             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1853                     cohort1, modification1, true, false), getRef());
1854             expectMsgClass(duration, ReadyTransactionReply.class);
1855
1856             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1857                     cohort2, modification2, true, false), getRef());
1858             expectMsgClass(duration, ReadyTransactionReply.class);
1859
1860             // canCommit 1st Tx. We don't send the commit so it should timeout.
1861
1862             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1863             expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
1864
1865             // canCommit the 2nd Tx - it should complete after the 1st Tx times out.
1866
1867             shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
1868             expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
1869
1870             // Try to commit the 1st Tx - should fail as it's not the current Tx.
1871
1872             shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
1873             expectMsgClass(duration, akka.actor.Status.Failure.class);
1874
1875             // Commit the 2nd Tx.
1876
1877             shard.tell(new CommitTransaction(transactionID2).toSerializable(), getRef());
1878             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1879
1880             NormalizedNode<?, ?> node = readStore(shard, listNodePath);
1881             assertNotNull(listNodePath + " not found", node);
1882
1883             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1884         }};
1885     }
1886
1887     @Test
1888     public void testTransactionCommitQueueCapacityExceeded() throws Throwable {
1889         dataStoreContextBuilder.shardTransactionCommitQueueCapacity(2);
1890
1891         new ShardTestKit(getSystem()) {{
1892             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1893                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1894                     "testTransactionCommitQueueCapacityExceeded");
1895
1896             waitUntilLeader(shard);
1897
1898             final FiniteDuration duration = duration("5 seconds");
1899
1900             ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1901
1902             String transactionID1 = "tx1";
1903             MutableCompositeModification modification1 = new MutableCompositeModification();
1904             ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
1905                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
1906
1907             String transactionID2 = "tx2";
1908             MutableCompositeModification modification2 = new MutableCompositeModification();
1909             ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
1910                     TestModel.OUTER_LIST_PATH,
1911                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
1912                     modification2);
1913
1914             String transactionID3 = "tx3";
1915             MutableCompositeModification modification3 = new MutableCompositeModification();
1916             ShardDataTreeCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
1917                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification3);
1918
1919             // Ready the Tx's
1920
1921             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1922                     cohort1, modification1, true, false), getRef());
1923             expectMsgClass(duration, ReadyTransactionReply.class);
1924
1925             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1926                     cohort2, modification2, true, false), getRef());
1927             expectMsgClass(duration, ReadyTransactionReply.class);
1928
1929             // The 3rd Tx should exceed queue capacity and fail.
1930
1931             shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
1932                     cohort3, modification3, true, false), getRef());
1933             expectMsgClass(duration, akka.actor.Status.Failure.class);
1934
1935             // canCommit 1st Tx.
1936
1937             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1938             expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
1939
1940             // canCommit the 2nd Tx - it should get queued.
1941
1942             shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
1943
1944             // canCommit the 3rd Tx - should exceed queue capacity and fail.
1945
1946             shard.tell(new CanCommitTransaction(transactionID3).toSerializable(), getRef());
1947             expectMsgClass(duration, akka.actor.Status.Failure.class);
1948
1949             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1950         }};
1951     }
1952
1953     @Test
1954     public void testTransactionCommitWithPriorExpiredCohortEntries() throws Throwable {
1955         dataStoreContextBuilder.shardCommitQueueExpiryTimeoutInMillis(1300).shardTransactionCommitTimeoutInSeconds(1);
1956
1957         new ShardTestKit(getSystem()) {{
1958             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1959                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1960                     "testTransactionCommitWithPriorExpiredCohortEntries");
1961
1962             waitUntilLeader(shard);
1963
1964             final FiniteDuration duration = duration("5 seconds");
1965
1966             ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1967
1968             String transactionID1 = "tx1";
1969             MutableCompositeModification modification1 = new MutableCompositeModification();
1970             ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
1971                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
1972
1973             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1974                     cohort1, modification1, true, false), getRef());
1975             expectMsgClass(duration, ReadyTransactionReply.class);
1976
1977             String transactionID2 = "tx2";
1978             MutableCompositeModification modification2 = new MutableCompositeModification();
1979             ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
1980                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification2);
1981
1982             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1983                     cohort2, modification2, true, false), getRef());
1984             expectMsgClass(duration, ReadyTransactionReply.class);
1985
1986             String transactionID3 = "tx3";
1987             MutableCompositeModification modification3 = new MutableCompositeModification();
1988             ShardDataTreeCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
1989                     TestModel.TEST2_PATH, ImmutableNodes.containerNode(TestModel.TEST2_QNAME), modification3);
1990
1991             shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
1992                     cohort3, modification3, true, false), getRef());
1993             expectMsgClass(duration, ReadyTransactionReply.class);
1994
1995             // All Tx's are readied. We'll send canCommit for the last one but not the others. The others
1996             // should expire from the queue and the last one should be processed.
1997
1998             shard.tell(new CanCommitTransaction(transactionID3).toSerializable(), getRef());
1999             expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
2000
2001             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2002         }};
2003     }
2004
2005     @Test
2006     public void testTransactionCommitWithSubsequentExpiredCohortEntry() throws Throwable {
2007         dataStoreContextBuilder.shardCommitQueueExpiryTimeoutInMillis(1300).shardTransactionCommitTimeoutInSeconds(1);
2008
2009         new ShardTestKit(getSystem()) {{
2010             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
2011                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
2012                     "testTransactionCommitWithSubsequentExpiredCohortEntry");
2013
2014             waitUntilLeader(shard);
2015
2016             final FiniteDuration duration = duration("5 seconds");
2017
2018             ShardDataTree dataStore = shard.underlyingActor().getDataStore();
2019
2020             String transactionID1 = "tx1";
2021             MutableCompositeModification modification1 = new MutableCompositeModification();
2022             ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
2023                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
2024
2025             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
2026                     cohort1, modification1, true, false), getRef());
2027             expectMsgClass(duration, ReadyTransactionReply.class);
2028
2029             // CanCommit the first one so it's the current in-progress CohortEntry.
2030
2031             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
2032             expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
2033
2034             // Ready the second Tx.
2035
2036             String transactionID2 = "tx2";
2037             MutableCompositeModification modification2 = new MutableCompositeModification();
2038             ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
2039                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification2);
2040
2041             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
2042                     cohort2, modification2, true, false), getRef());
2043             expectMsgClass(duration, ReadyTransactionReply.class);
2044
2045             // Ready the third Tx.
2046
2047             String transactionID3 = "tx3";
2048             DataTreeModification modification3 = dataStore.getDataTree().takeSnapshot().newModification();
2049             new WriteModification(TestModel.TEST2_PATH, ImmutableNodes.containerNode(TestModel.TEST2_QNAME))
2050                     .apply(modification3);
2051             ReadyLocalTransaction readyMessage = new ReadyLocalTransaction(transactionID3, modification3, true);
2052
2053             shard.tell(readyMessage, getRef());
2054
2055             // Commit the first Tx. After completing, the second should expire from the queue and the third
2056             // Tx committed.
2057
2058             shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
2059             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
2060
2061             // Expect commit reply from the third Tx.
2062
2063             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
2064
2065             NormalizedNode<?, ?> node = readStore(shard, TestModel.TEST2_PATH);
2066             assertNotNull(TestModel.TEST2_PATH + " not found", node);
2067
2068             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2069         }};
2070     }
2071
2072     @Test
2073     public void testCanCommitBeforeReadyFailure() throws Throwable {
2074         new ShardTestKit(getSystem()) {{
2075             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
2076                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
2077                     "testCanCommitBeforeReadyFailure");
2078
2079             shard.tell(new CanCommitTransaction("tx").toSerializable(), getRef());
2080             expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
2081
2082             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2083         }};
2084     }
2085
2086     @Test
2087     public void testAbortTransaction() throws Throwable {
2088         new ShardTestKit(getSystem()) {{
2089             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
2090                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
2091                     "testAbortTransaction");
2092
2093             waitUntilLeader(shard);
2094
2095             // Setup 2 simulated transactions with mock cohorts. The first one will be aborted.
2096
2097             String transactionID1 = "tx1";
2098             MutableCompositeModification modification1 = new MutableCompositeModification();
2099             ShardDataTreeCohort cohort1 = mock(ShardDataTreeCohort.class, "cohort1");
2100             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
2101             doReturn(Futures.immediateFuture(null)).when(cohort1).abort();
2102
2103             String transactionID2 = "tx2";
2104             MutableCompositeModification modification2 = new MutableCompositeModification();
2105             ShardDataTreeCohort cohort2 = mock(ShardDataTreeCohort.class, "cohort2");
2106             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
2107
2108             FiniteDuration duration = duration("5 seconds");
2109             final Timeout timeout = new Timeout(duration);
2110
2111             // Simulate the ForwardedReadyTransaction messages that would be sent
2112             // by the ShardTransaction.
2113
2114             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
2115                     cohort1, modification1, true, false), getRef());
2116             expectMsgClass(duration, ReadyTransactionReply.class);
2117
2118             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
2119                     cohort2, modification2, true, false), getRef());
2120             expectMsgClass(duration, ReadyTransactionReply.class);
2121
2122             // Send the CanCommitTransaction message for the first Tx.
2123
2124             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
2125             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
2126                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
2127             assertEquals("Can commit", true, canCommitReply.getCanCommit());
2128
2129             // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
2130             // processed after the first Tx completes.
2131
2132             Future<Object> canCommitFuture = Patterns.ask(shard,
2133                     new CanCommitTransaction(transactionID2).toSerializable(), timeout);
2134
2135             // Send the AbortTransaction message for the first Tx. This should trigger the 2nd
2136             // Tx to proceed.
2137
2138             shard.tell(new AbortTransaction(transactionID1).toSerializable(), getRef());
2139             expectMsgClass(duration, AbortTransactionReply.SERIALIZABLE_CLASS);
2140
2141             // Wait for the 2nd Tx to complete the canCommit phase.
2142
2143             Await.ready(canCommitFuture, duration);
2144
2145             InOrder inOrder = inOrder(cohort1, cohort2);
2146             inOrder.verify(cohort1).canCommit();
2147             inOrder.verify(cohort2).canCommit();
2148
2149             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2150         }};
2151     }
2152
2153     @Test
2154     public void testCreateSnapshot() throws Exception {
2155         testCreateSnapshot(true, "testCreateSnapshot");
2156     }
2157
2158     @Test
2159     public void testCreateSnapshotWithNonPersistentData() throws Exception {
2160         testCreateSnapshot(false, "testCreateSnapshotWithNonPersistentData");
2161     }
2162
2163     @SuppressWarnings("serial")
2164     public void testCreateSnapshot(final boolean persistent, final String shardActorName) throws Exception{
2165
2166         final AtomicReference<CountDownLatch> latch = new AtomicReference<>(new CountDownLatch(1));
2167
2168         final AtomicReference<Object> savedSnapshot = new AtomicReference<>();
2169         class TestPersistentDataProvider extends DelegatingPersistentDataProvider {
2170             TestPersistentDataProvider(DataPersistenceProvider delegate) {
2171                 super(delegate);
2172             }
2173
2174             @Override
2175             public void saveSnapshot(Object o) {
2176                 savedSnapshot.set(o);
2177                 super.saveSnapshot(o);
2178             }
2179         }
2180
2181         dataStoreContextBuilder.persistent(persistent);
2182
2183         new ShardTestKit(getSystem()) {{
2184             class TestShard extends Shard {
2185
2186                 protected TestShard(ShardIdentifier name, Map<String, String> peerAddresses,
2187                                     DatastoreContext datastoreContext, SchemaContext schemaContext) {
2188                     super(name, peerAddresses, datastoreContext, schemaContext);
2189                     setPersistence(new TestPersistentDataProvider(super.persistence()));
2190                 }
2191
2192                 @Override
2193                 public void handleCommand(Object message) {
2194                     super.handleCommand(message);
2195
2196                     if (message instanceof SaveSnapshotSuccess || message.equals("commit_snapshot")) {
2197                         latch.get().countDown();
2198                     }
2199                 }
2200
2201                 @Override
2202                 public RaftActorContext getRaftActorContext() {
2203                     return super.getRaftActorContext();
2204                 }
2205             }
2206
2207             Creator<Shard> creator = new Creator<Shard>() {
2208                 @Override
2209                 public Shard create() throws Exception {
2210                     return new TestShard(shardID, Collections.<String,String>emptyMap(),
2211                             newDatastoreContext(), SCHEMA_CONTEXT);
2212                 }
2213             };
2214
2215             TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
2216                     Props.create(new DelegatingShardCreator(creator)), shardActorName);
2217
2218             waitUntilLeader(shard);
2219
2220             writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
2221
2222             NormalizedNode<?,?> expectedRoot = readStore(shard, YangInstanceIdentifier.builder().build());
2223
2224             // Trigger creation of a snapshot by ensuring
2225             RaftActorContext raftActorContext = ((TestShard) shard.underlyingActor()).getRaftActorContext();
2226             raftActorContext.getSnapshotManager().capture(mock(ReplicatedLogEntry.class), -1);
2227
2228             assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS));
2229
2230             assertTrue("Invalid saved snapshot " + savedSnapshot.get(),
2231                     savedSnapshot.get() instanceof Snapshot);
2232
2233             verifySnapshot((Snapshot)savedSnapshot.get(), expectedRoot);
2234
2235             latch.set(new CountDownLatch(1));
2236             savedSnapshot.set(null);
2237
2238             raftActorContext.getSnapshotManager().capture(mock(ReplicatedLogEntry.class), -1);
2239
2240             assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS));
2241
2242             assertTrue("Invalid saved snapshot " + savedSnapshot.get(),
2243                     savedSnapshot.get() instanceof Snapshot);
2244
2245             verifySnapshot((Snapshot)savedSnapshot.get(), expectedRoot);
2246
2247             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2248         }
2249
2250         private void verifySnapshot(Snapshot snapshot, NormalizedNode<?,?> expectedRoot) {
2251
2252             NormalizedNode<?, ?> actual = SerializationUtils.deserializeNormalizedNode(snapshot.getState());
2253             assertEquals("Root node", expectedRoot, actual);
2254
2255         }};
2256     }
2257
2258     /**
2259      * This test simply verifies that the applySnapShot logic will work
2260      * @throws ReadFailedException
2261      * @throws DataValidationFailedException
2262      */
2263     @Test
2264     public void testInMemoryDataTreeRestore() throws ReadFailedException, DataValidationFailedException {
2265         DataTree store = InMemoryDataTreeFactory.getInstance().create();
2266         store.setSchemaContext(SCHEMA_CONTEXT);
2267
2268         DataTreeModification putTransaction = store.takeSnapshot().newModification();
2269         putTransaction.write(TestModel.TEST_PATH,
2270             ImmutableNodes.containerNode(TestModel.TEST_QNAME));
2271         commitTransaction(store, putTransaction);
2272
2273
2274         NormalizedNode<?, ?> expected = readStore(store, YangInstanceIdentifier.builder().build());
2275
2276         DataTreeModification writeTransaction = store.takeSnapshot().newModification();
2277
2278         writeTransaction.delete(YangInstanceIdentifier.builder().build());
2279         writeTransaction.write(YangInstanceIdentifier.builder().build(), expected);
2280
2281         commitTransaction(store, writeTransaction);
2282
2283         NormalizedNode<?, ?> actual = readStore(store, YangInstanceIdentifier.builder().build());
2284
2285         assertEquals(expected, actual);
2286     }
2287
2288     @Test
2289     public void testRecoveryApplicable(){
2290
2291         final DatastoreContext persistentContext = DatastoreContext.newBuilder().
2292                 shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(true).build();
2293
2294         final Props persistentProps = Shard.props(shardID, Collections.<String, String>emptyMap(),
2295                 persistentContext, SCHEMA_CONTEXT);
2296
2297         final DatastoreContext nonPersistentContext = DatastoreContext.newBuilder().
2298                 shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(false).build();
2299
2300         final Props nonPersistentProps = Shard.props(shardID, Collections.<String, String>emptyMap(),
2301                 nonPersistentContext, SCHEMA_CONTEXT);
2302
2303         new ShardTestKit(getSystem()) {{
2304             TestActorRef<Shard> shard1 = TestActorRef.create(getSystem(),
2305                     persistentProps, "testPersistence1");
2306
2307             assertTrue("Recovery Applicable", shard1.underlyingActor().persistence().isRecoveryApplicable());
2308
2309             shard1.tell(PoisonPill.getInstance(), ActorRef.noSender());
2310
2311             TestActorRef<Shard> shard2 = TestActorRef.create(getSystem(),
2312                     nonPersistentProps, "testPersistence2");
2313
2314             assertFalse("Recovery Not Applicable", shard2.underlyingActor().persistence().isRecoveryApplicable());
2315
2316             shard2.tell(PoisonPill.getInstance(), ActorRef.noSender());
2317
2318         }};
2319
2320     }
2321
2322     @Test
2323     public void testOnDatastoreContext() {
2324         new ShardTestKit(getSystem()) {{
2325             dataStoreContextBuilder.persistent(true);
2326
2327             TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(), "testOnDatastoreContext");
2328
2329             assertEquals("isRecoveryApplicable", true,
2330                     shard.underlyingActor().persistence().isRecoveryApplicable());
2331
2332             waitUntilLeader(shard);
2333
2334             shard.tell(dataStoreContextBuilder.persistent(false).build(), ActorRef.noSender());
2335
2336             assertEquals("isRecoveryApplicable", false,
2337                     shard.underlyingActor().persistence().isRecoveryApplicable());
2338
2339             shard.tell(dataStoreContextBuilder.persistent(true).build(), ActorRef.noSender());
2340
2341             assertEquals("isRecoveryApplicable", true,
2342                     shard.underlyingActor().persistence().isRecoveryApplicable());
2343
2344             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2345         }};
2346     }
2347
2348     @Test
2349     public void testRegisterRoleChangeListener() throws Exception {
2350         new ShardTestKit(getSystem()) {
2351             {
2352                 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
2353                         newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
2354                         "testRegisterRoleChangeListener");
2355
2356                 waitUntilLeader(shard);
2357
2358                 TestActorRef<MessageCollectorActor> listener =
2359                         TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
2360
2361                 shard.tell(new RegisterRoleChangeListener(), listener);
2362
2363                 MessageCollectorActor.expectFirstMatching(listener, RegisterRoleChangeListenerReply.class);
2364
2365                 ShardLeaderStateChanged leaderStateChanged = MessageCollectorActor.expectFirstMatching(listener,
2366                         ShardLeaderStateChanged.class);
2367                 assertEquals("getLocalShardDataTree present", true,
2368                         leaderStateChanged.getLocalShardDataTree().isPresent());
2369                 assertSame("getLocalShardDataTree", shard.underlyingActor().getDataStore().getDataTree(),
2370                         leaderStateChanged.getLocalShardDataTree().get());
2371
2372                 MessageCollectorActor.clearMessages(listener);
2373
2374                 // Force a leader change
2375
2376                 shard.tell(new RequestVote(10000, "member2", 50, 50), getRef());
2377
2378                 leaderStateChanged = MessageCollectorActor.expectFirstMatching(listener,
2379                         ShardLeaderStateChanged.class);
2380                 assertEquals("getLocalShardDataTree present", false,
2381                         leaderStateChanged.getLocalShardDataTree().isPresent());
2382
2383                 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2384             }
2385         };
2386     }
2387
2388     @Test
2389     public void testFollowerInitialSyncStatus() throws Exception {
2390         final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
2391                 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
2392                 "testFollowerInitialSyncStatus");
2393
2394         shard.underlyingActor().onReceiveCommand(new FollowerInitialSyncUpStatus(false, "member-1-shard-inventory-operational"));
2395
2396         assertEquals(false, shard.underlyingActor().getShardMBean().getFollowerInitialSyncStatus());
2397
2398         shard.underlyingActor().onReceiveCommand(new FollowerInitialSyncUpStatus(true, "member-1-shard-inventory-operational"));
2399
2400         assertEquals(true, shard.underlyingActor().getShardMBean().getFollowerInitialSyncStatus());
2401
2402         shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2403     }
2404
2405     private static void commitTransaction(DataTree store, final DataTreeModification modification) throws DataValidationFailedException {
2406         modification.ready();
2407         store.validate(modification);
2408         store.commit(store.prepare(modification));
2409     }
2410 }