CDS: Change operationTimeout units to millis
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / ShardTest.java
1 package org.opendaylight.controller.cluster.datastore;
2
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertFalse;
5 import static org.junit.Assert.assertNotNull;
6 import static org.junit.Assert.assertNull;
7 import static org.junit.Assert.assertSame;
8 import static org.junit.Assert.assertTrue;
9 import static org.mockito.Mockito.doReturn;
10 import static org.mockito.Mockito.inOrder;
11 import static org.mockito.Mockito.mock;
12 import static org.mockito.Mockito.reset;
13 import static org.opendaylight.controller.cluster.datastore.DataStoreVersions.CURRENT_VERSION;
14
15 import akka.actor.ActorRef;
16 import akka.actor.ActorSelection;
17 import akka.actor.PoisonPill;
18 import akka.actor.Props;
19 import akka.actor.Status.Failure;
20 import akka.dispatch.Dispatchers;
21 import akka.dispatch.OnComplete;
22 import akka.japi.Creator;
23 import akka.pattern.Patterns;
24 import akka.persistence.SaveSnapshotSuccess;
25 import akka.testkit.TestActorRef;
26 import akka.util.Timeout;
27 import com.google.common.base.Function;
28 import com.google.common.base.Optional;
29 import com.google.common.util.concurrent.Futures;
30 import com.google.common.util.concurrent.ListenableFuture;
31 import com.google.common.util.concurrent.Uninterruptibles;
32 import java.io.IOException;
33 import java.util.Collections;
34 import java.util.HashSet;
35 import java.util.Map;
36 import java.util.Set;
37 import java.util.concurrent.CountDownLatch;
38 import java.util.concurrent.TimeUnit;
39 import java.util.concurrent.atomic.AtomicBoolean;
40 import java.util.concurrent.atomic.AtomicReference;
41 import org.junit.Test;
42 import org.mockito.InOrder;
43 import org.opendaylight.controller.cluster.DataPersistenceProvider;
44 import org.opendaylight.controller.cluster.DelegatingPersistentDataProvider;
45 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
46 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
47 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
48 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
49 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
50 import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
51 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
52 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
53 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
54 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
55 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
56 import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
57 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
58 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
59 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
60 import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction;
61 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
62 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
63 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
64 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener;
65 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListenerReply;
66 import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
67 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
68 import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
69 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
70 import org.opendaylight.controller.cluster.datastore.modification.Modification;
71 import org.opendaylight.controller.cluster.datastore.modification.ModificationPayload;
72 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
73 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
74 import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener;
75 import org.opendaylight.controller.cluster.datastore.utils.MockDataTreeChangeListener;
76 import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
77 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
78 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListenerReply;
79 import org.opendaylight.controller.cluster.raft.RaftActorContext;
80 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
81 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
82 import org.opendaylight.controller.cluster.raft.Snapshot;
83 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
84 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
85 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
86 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
87 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
88 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
89 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
90 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
91 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
92 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
93 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
94 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
95 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
96 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
97 import org.opendaylight.controller.protobuff.messages.cohort3pc.ThreePhaseCommitCohortMessages;
98 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
99 import org.opendaylight.yangtools.yang.common.QName;
100 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
101 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
102 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
103 import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
104 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
105 import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
106 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
107 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
108 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
109 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateNode;
110 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateTip;
111 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidates;
112 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
113 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
114 import org.opendaylight.yangtools.yang.data.api.schema.tree.ModificationType;
115 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
116 import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
117 import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory;
118 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
119 import scala.concurrent.Await;
120 import scala.concurrent.Future;
121 import scala.concurrent.duration.FiniteDuration;
122
123 public class ShardTest extends AbstractShardTest {
124     private static final QName CARS_QNAME = QName.create("urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:test:cars", "2014-03-13", "cars");
125
126     private static final String DUMMY_DATA = "Dummy data as snapshot sequence number is set to 0 in InMemorySnapshotStore and journal recovery seq number will start from 1";
127
128     final CountDownLatch recoveryComplete = new CountDownLatch(1);
129
130     protected Props newShardPropsWithRecoveryComplete() {
131
132         final Creator<Shard> creator = new Creator<Shard>() {
133             @Override
134             public Shard create() throws Exception {
135                 return new Shard(shardID, Collections.<String,String>emptyMap(),
136                         newDatastoreContext(), SCHEMA_CONTEXT) {
137                     @Override
138                     protected void onRecoveryComplete() {
139                         try {
140                             super.onRecoveryComplete();
141                         } finally {
142                             recoveryComplete.countDown();
143                         }
144                     }
145                 };
146             }
147         };
148         return Props.create(new DelegatingShardCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId());
149     }
150
151     @Test
152     public void testRegisterChangeListener() throws Exception {
153         new ShardTestKit(getSystem()) {{
154             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
155                     newShardProps(),  "testRegisterChangeListener");
156
157             waitUntilLeader(shard);
158
159             shard.tell(new UpdateSchemaContext(SchemaContextHelper.full()), ActorRef.noSender());
160
161             final MockDataChangeListener listener = new MockDataChangeListener(1);
162             final ActorRef dclActor = getSystem().actorOf(DataChangeListener.props(listener),
163                     "testRegisterChangeListener-DataChangeListener");
164
165             shard.tell(new RegisterChangeListener(TestModel.TEST_PATH,
166                     dclActor, AsyncDataBroker.DataChangeScope.BASE), getRef());
167
168             final RegisterChangeListenerReply reply = expectMsgClass(duration("3 seconds"),
169                     RegisterChangeListenerReply.class);
170             final String replyPath = reply.getListenerRegistrationPath().toString();
171             assertTrue("Incorrect reply path: " + replyPath, replyPath.matches(
172                     "akka:\\/\\/test\\/user\\/testRegisterChangeListener\\/\\$.*"));
173
174             final YangInstanceIdentifier path = TestModel.TEST_PATH;
175             writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
176
177             listener.waitForChangeEvents(path);
178
179             dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
180             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
181         }};
182     }
183
184     @SuppressWarnings("serial")
185     @Test
186     public void testChangeListenerNotifiedWhenNotTheLeaderOnRegistration() throws Exception {
187         // This test tests the timing window in which a change listener is registered before the
188         // shard becomes the leader. We verify that the listener is registered and notified of the
189         // existing data when the shard becomes the leader.
190         new ShardTestKit(getSystem()) {{
191             // For this test, we want to send the RegisterChangeListener message after the shard
192             // has recovered from persistence and before it becomes the leader. So we subclass
193             // Shard to override onReceiveCommand and, when the first ElectionTimeout is received,
194             // we know that the shard has been initialized to a follower and has started the
195             // election process. The following 2 CountDownLatches are used to coordinate the
196             // ElectionTimeout with the sending of the RegisterChangeListener message.
197             final CountDownLatch onFirstElectionTimeout = new CountDownLatch(1);
198             final CountDownLatch onChangeListenerRegistered = new CountDownLatch(1);
199             final Creator<Shard> creator = new Creator<Shard>() {
200                 boolean firstElectionTimeout = true;
201
202                 @Override
203                 public Shard create() throws Exception {
204                     // Use a non persistent provider because this test actually invokes persist on the journal
205                     // this will cause all other messages to not be queued properly after that.
206                     // The basic issue is that you cannot use TestActorRef with a persistent actor (at least when
207                     // it does do a persist)
208                     return new Shard(shardID, Collections.<String,String>emptyMap(),
209                             dataStoreContextBuilder.persistent(false).build(), SCHEMA_CONTEXT) {
210                         @Override
211                         public void onReceiveCommand(final Object message) throws Exception {
212                             if(message instanceof ElectionTimeout && firstElectionTimeout) {
213                                 // Got the first ElectionTimeout. We don't forward it to the
214                                 // base Shard yet until we've sent the RegisterChangeListener
215                                 // message. So we signal the onFirstElectionTimeout latch to tell
216                                 // the main thread to send the RegisterChangeListener message and
217                                 // start a thread to wait on the onChangeListenerRegistered latch,
218                                 // which the main thread signals after it has sent the message.
219                                 // After the onChangeListenerRegistered is triggered, we send the
220                                 // original ElectionTimeout message to proceed with the election.
221                                 firstElectionTimeout = false;
222                                 final ActorRef self = getSelf();
223                                 new Thread() {
224                                     @Override
225                                     public void run() {
226                                         Uninterruptibles.awaitUninterruptibly(
227                                                 onChangeListenerRegistered, 5, TimeUnit.SECONDS);
228                                         self.tell(message, self);
229                                     }
230                                 }.start();
231
232                                 onFirstElectionTimeout.countDown();
233                             } else {
234                                 super.onReceiveCommand(message);
235                             }
236                         }
237                     };
238                 }
239             };
240
241             final MockDataChangeListener listener = new MockDataChangeListener(1);
242             final ActorRef dclActor = getSystem().actorOf(DataChangeListener.props(listener),
243                     "testRegisterChangeListenerWhenNotLeaderInitially-DataChangeListener");
244
245             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
246                     Props.create(new DelegatingShardCreator(creator)),
247                     "testRegisterChangeListenerWhenNotLeaderInitially");
248
249             // Write initial data into the in-memory store.
250             final YangInstanceIdentifier path = TestModel.TEST_PATH;
251             writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
252
253             // Wait until the shard receives the first ElectionTimeout message.
254             assertEquals("Got first ElectionTimeout", true,
255                     onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
256
257             // Now send the RegisterChangeListener and wait for the reply.
258             shard.tell(new RegisterChangeListener(path, dclActor,
259                     AsyncDataBroker.DataChangeScope.SUBTREE), getRef());
260
261             final RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
262                     RegisterChangeListenerReply.class);
263             assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
264
265             // Sanity check - verify the shard is not the leader yet.
266             shard.tell(new FindLeader(), getRef());
267             final FindLeaderReply findLeadeReply =
268                     expectMsgClass(duration("5 seconds"), FindLeaderReply.class);
269             assertNull("Expected the shard not to be the leader", findLeadeReply.getLeaderActor());
270
271             // Signal the onChangeListenerRegistered latch to tell the thread above to proceed
272             // with the election process.
273             onChangeListenerRegistered.countDown();
274
275             // Wait for the shard to become the leader and notify our listener with the existing
276             // data in the store.
277             listener.waitForChangeEvents(path);
278
279             dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
280             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
281         }};
282     }
283
284     @Test
285     public void testRegisterDataTreeChangeListener() throws Exception {
286         new ShardTestKit(getSystem()) {{
287             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
288                     newShardProps(), "testRegisterDataTreeChangeListener");
289
290             waitUntilLeader(shard);
291
292             shard.tell(new UpdateSchemaContext(SchemaContextHelper.full()), ActorRef.noSender());
293
294             final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
295             final ActorRef dclActor = getSystem().actorOf(DataTreeChangeListenerActor.props(listener),
296                     "testRegisterDataTreeChangeListener-DataTreeChangeListener");
297
298             shard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor), getRef());
299
300             final RegisterDataTreeChangeListenerReply reply = expectMsgClass(duration("3 seconds"),
301                     RegisterDataTreeChangeListenerReply.class);
302             final String replyPath = reply.getListenerRegistrationPath().toString();
303             assertTrue("Incorrect reply path: " + replyPath, replyPath.matches(
304                     "akka:\\/\\/test\\/user\\/testRegisterDataTreeChangeListener\\/\\$.*"));
305
306             final YangInstanceIdentifier path = TestModel.TEST_PATH;
307             writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
308
309             listener.waitForChangeEvents();
310
311             dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
312             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
313         }};
314     }
315
316     @SuppressWarnings("serial")
317     @Test
318     public void testDataTreeChangeListenerNotifiedWhenNotTheLeaderOnRegistration() throws Exception {
319         new ShardTestKit(getSystem()) {{
320             final CountDownLatch onFirstElectionTimeout = new CountDownLatch(1);
321             final CountDownLatch onChangeListenerRegistered = new CountDownLatch(1);
322             final Creator<Shard> creator = new Creator<Shard>() {
323                 boolean firstElectionTimeout = true;
324
325                 @Override
326                 public Shard create() throws Exception {
327                     return new Shard(shardID, Collections.<String,String>emptyMap(),
328                             dataStoreContextBuilder.persistent(false).build(), SCHEMA_CONTEXT) {
329                         @Override
330                         public void onReceiveCommand(final Object message) throws Exception {
331                             if(message instanceof ElectionTimeout && firstElectionTimeout) {
332                                 firstElectionTimeout = false;
333                                 final ActorRef self = getSelf();
334                                 new Thread() {
335                                     @Override
336                                     public void run() {
337                                         Uninterruptibles.awaitUninterruptibly(
338                                                 onChangeListenerRegistered, 5, TimeUnit.SECONDS);
339                                         self.tell(message, self);
340                                     }
341                                 }.start();
342
343                                 onFirstElectionTimeout.countDown();
344                             } else {
345                                 super.onReceiveCommand(message);
346                             }
347                         }
348                     };
349                 }
350             };
351
352             final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
353             final ActorRef dclActor = getSystem().actorOf(DataTreeChangeListenerActor.props(listener),
354                     "testDataTreeChangeListenerNotifiedWhenNotTheLeaderOnRegistration-DataChangeListener");
355
356             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
357                     Props.create(new DelegatingShardCreator(creator)),
358                     "testDataTreeChangeListenerNotifiedWhenNotTheLeaderOnRegistration");
359
360             final YangInstanceIdentifier path = TestModel.TEST_PATH;
361             writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
362
363             assertEquals("Got first ElectionTimeout", true,
364                     onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
365
366             shard.tell(new RegisterDataTreeChangeListener(path, dclActor), getRef());
367             final RegisterDataTreeChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
368                     RegisterDataTreeChangeListenerReply.class);
369             assertNotNull("getListenerRegistratioznPath", reply.getListenerRegistrationPath());
370
371             shard.tell(new FindLeader(), getRef());
372             final FindLeaderReply findLeadeReply =
373                     expectMsgClass(duration("5 seconds"), FindLeaderReply.class);
374             assertNull("Expected the shard not to be the leader", findLeadeReply.getLeaderActor());
375
376             writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
377
378             onChangeListenerRegistered.countDown();
379
380             // TODO: investigate why we do not receive data chage events
381             listener.waitForChangeEvents();
382
383             dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
384             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
385         }};
386     }
387
388     @Test
389     public void testCreateTransaction(){
390         new ShardTestKit(getSystem()) {{
391             final ActorRef shard = getSystem().actorOf(newShardProps(), "testCreateTransaction");
392
393             waitUntilLeader(shard);
394
395             shard.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
396
397             shard.tell(new CreateTransaction("txn-1",
398                     TransactionType.READ_ONLY.ordinal() ).toSerializable(), getRef());
399
400             final CreateTransactionReply reply = expectMsgClass(duration("3 seconds"),
401                     CreateTransactionReply.class);
402
403             final String path = reply.getTransactionActorPath().toString();
404             assertTrue("Unexpected transaction path " + path,
405                     path.contains("akka://test/user/testCreateTransaction/shard-txn-1"));
406
407             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
408         }};
409     }
410
411     @Test
412     public void testCreateTransactionOnChain(){
413         new ShardTestKit(getSystem()) {{
414             final ActorRef shard = getSystem().actorOf(newShardProps(), "testCreateTransactionOnChain");
415
416             waitUntilLeader(shard);
417
418             shard.tell(new CreateTransaction("txn-1",
419                     TransactionType.READ_ONLY.ordinal() , "foobar").toSerializable(),
420                     getRef());
421
422             final CreateTransactionReply reply = expectMsgClass(duration("3 seconds"),
423                     CreateTransactionReply.class);
424
425             final String path = reply.getTransactionActorPath().toString();
426             assertTrue("Unexpected transaction path " + path,
427                     path.contains("akka://test/user/testCreateTransactionOnChain/shard-txn-1"));
428
429             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
430         }};
431     }
432
433     @SuppressWarnings("serial")
434     @Test
435     public void testPeerAddressResolved() throws Exception {
436         new ShardTestKit(getSystem()) {{
437             final CountDownLatch recoveryComplete = new CountDownLatch(1);
438             class TestShard extends Shard {
439                 TestShard() {
440                     super(shardID, Collections.<String, String>singletonMap(shardID.toString(), null),
441                             newDatastoreContext(), SCHEMA_CONTEXT);
442                 }
443
444                 Map<String, String> getPeerAddresses() {
445                     return getRaftActorContext().getPeerAddresses();
446                 }
447
448                 @Override
449                 protected void onRecoveryComplete() {
450                     try {
451                         super.onRecoveryComplete();
452                     } finally {
453                         recoveryComplete.countDown();
454                     }
455                 }
456             }
457
458             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
459                     Props.create(new DelegatingShardCreator(new Creator<Shard>() {
460                         @Override
461                         public TestShard create() throws Exception {
462                             return new TestShard();
463                         }
464                     })), "testPeerAddressResolved");
465
466             //waitUntilLeader(shard);
467             assertEquals("Recovery complete", true,
468                     Uninterruptibles.awaitUninterruptibly(recoveryComplete, 5, TimeUnit.SECONDS));
469
470             final String address = "akka://foobar";
471             shard.underlyingActor().onReceiveCommand(new PeerAddressResolved(shardID.toString(), address));
472
473             assertEquals("getPeerAddresses", address,
474                     ((TestShard)shard.underlyingActor()).getPeerAddresses().get(shardID.toString()));
475
476             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
477         }};
478     }
479
480     @Test
481     public void testApplySnapshot() throws Exception {
482         final TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(),
483                 "testApplySnapshot");
484
485         final DataTree store = InMemoryDataTreeFactory.getInstance().create();
486         store.setSchemaContext(SCHEMA_CONTEXT);
487
488         final ContainerNode container = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
489                 new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
490                     withChild(ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).addChild(
491                         ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1)).build()).build();
492
493         writeToStore(store, TestModel.TEST_PATH, container);
494
495         final YangInstanceIdentifier root = YangInstanceIdentifier.builder().build();
496         final NormalizedNode<?,?> expected = readStore(store, root);
497
498         final Snapshot snapshot = Snapshot.create(SerializationUtils.serializeNormalizedNode(expected),
499                 Collections.<ReplicatedLogEntry>emptyList(), 1, 2, 3, 4);
500
501         shard.underlyingActor().getRaftActorSnapshotCohort().applySnapshot(snapshot.getState());
502
503         final NormalizedNode<?,?> actual = readStore(shard, root);
504
505         assertEquals("Root node", expected, actual);
506
507         shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
508     }
509
510     @Test
511     public void testApplyState() throws Exception {
512
513         final TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(), "testApplyState");
514
515         final NormalizedNode<?, ?> node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
516
517         final ApplyState applyState = new ApplyState(null, "test", new ReplicatedLogImplEntry(1, 2,
518                 newModificationPayload(new WriteModification(TestModel.TEST_PATH, node))));
519
520         shard.underlyingActor().onReceiveCommand(applyState);
521
522         final NormalizedNode<?,?> actual = readStore(shard, TestModel.TEST_PATH);
523         assertEquals("Applied state", node, actual);
524
525         shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
526     }
527
528     @Test
529     public void testApplyStateWithCandidatePayload() throws Exception {
530
531         final TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardPropsWithRecoveryComplete(), "testApplyState");
532
533         recoveryComplete.await(5,  TimeUnit.SECONDS);
534
535         final NormalizedNode<?, ?> node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
536         final DataTreeCandidate candidate = DataTreeCandidates.fromNormalizedNode(TestModel.TEST_PATH, node);
537
538         final ApplyState applyState = new ApplyState(null, "test", new ReplicatedLogImplEntry(1, 2,
539                 DataTreeCandidatePayload.create(candidate)));
540
541         shard.underlyingActor().onReceiveCommand(applyState);
542
543         final NormalizedNode<?,?> actual = readStore(shard, TestModel.TEST_PATH);
544         assertEquals("Applied state", node, actual);
545
546         shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
547     }
548
549     DataTree setupInMemorySnapshotStore() throws DataValidationFailedException {
550         final DataTree testStore = InMemoryDataTreeFactory.getInstance().create();
551         testStore.setSchemaContext(SCHEMA_CONTEXT);
552
553         writeToStore(testStore, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
554
555         final NormalizedNode<?, ?> root = readStore(testStore, YangInstanceIdentifier.builder().build());
556
557         InMemorySnapshotStore.addSnapshot(shardID.toString(), Snapshot.create(
558                 SerializationUtils.serializeNormalizedNode(root),
559                 Collections.<ReplicatedLogEntry>emptyList(), 0, 1, -1, -1));
560         return testStore;
561     }
562
563     private static DataTreeCandidatePayload payloadForModification(final DataTree source, final DataTreeModification mod) throws DataValidationFailedException {
564         source.validate(mod);
565         final DataTreeCandidate candidate = source.prepare(mod);
566         source.commit(candidate);
567         return DataTreeCandidatePayload.create(candidate);
568     }
569
570     @Test
571     public void testDataTreeCandidateRecovery() throws Exception {
572         // Set up the InMemorySnapshotStore.
573         final DataTree source = setupInMemorySnapshotStore();
574
575         final DataTreeModification writeMod = source.takeSnapshot().newModification();
576         writeMod.write(TestModel.OUTER_LIST_PATH, ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
577         writeMod.ready();
578         InMemoryJournal.addEntry(shardID.toString(), 0, DUMMY_DATA);
579
580         // Set up the InMemoryJournal.
581         InMemoryJournal.addEntry(shardID.toString(), 1, new ReplicatedLogImplEntry(0, 1, payloadForModification(source, writeMod)));
582
583         final int nListEntries = 16;
584         final Set<Integer> listEntryKeys = new HashSet<>();
585
586         // Add some ModificationPayload entries
587         for (int i = 1; i <= nListEntries; i++) {
588             listEntryKeys.add(Integer.valueOf(i));
589
590             final YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
591                     .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
592
593             final DataTreeModification mod = source.takeSnapshot().newModification();
594             mod.merge(path, ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i));
595             mod.ready();
596             InMemoryJournal.addEntry(shardID.toString(), i+1, new ReplicatedLogImplEntry(i, 1,
597                 payloadForModification(source, mod)));
598         }
599
600         InMemoryJournal.addEntry(shardID.toString(), nListEntries + 2,
601                 new ApplyJournalEntries(nListEntries));
602
603         testRecovery(listEntryKeys);
604     }
605
606     @Test
607     public void testModicationRecovery() throws Exception {
608
609         // Set up the InMemorySnapshotStore.
610         setupInMemorySnapshotStore();
611
612         // Set up the InMemoryJournal.
613
614         InMemoryJournal.addEntry(shardID.toString(), 0, DUMMY_DATA);
615
616         InMemoryJournal.addEntry(shardID.toString(), 1, new ReplicatedLogImplEntry(0, 1, newModificationPayload(
617                   new WriteModification(TestModel.OUTER_LIST_PATH,
618                           ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build()))));
619
620         final int nListEntries = 16;
621         final Set<Integer> listEntryKeys = new HashSet<>();
622
623         // Add some ModificationPayload entries
624         for(int i = 1; i <= nListEntries; i++) {
625             listEntryKeys.add(Integer.valueOf(i));
626             final YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
627                     .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
628             final Modification mod = new MergeModification(path,
629                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i));
630             InMemoryJournal.addEntry(shardID.toString(), i + 1, new ReplicatedLogImplEntry(i, 1,
631                     newModificationPayload(mod)));
632         }
633
634         InMemoryJournal.addEntry(shardID.toString(), nListEntries + 2,
635                 new ApplyJournalEntries(nListEntries));
636
637         testRecovery(listEntryKeys);
638     }
639
640     private static ModificationPayload newModificationPayload(final Modification... mods) throws IOException {
641         final MutableCompositeModification compMod = new MutableCompositeModification();
642         for(final Modification mod: mods) {
643             compMod.addModification(mod);
644         }
645
646         return new ModificationPayload(compMod);
647     }
648
649     @Test
650     public void testConcurrentThreePhaseCommits() throws Throwable {
651         new ShardTestKit(getSystem()) {{
652             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
653                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
654                     "testConcurrentThreePhaseCommits");
655
656             waitUntilLeader(shard);
657
658          // Setup 3 simulated transactions with mock cohorts backed by real cohorts.
659
660             final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
661
662             final String transactionID1 = "tx1";
663             final MutableCompositeModification modification1 = new MutableCompositeModification();
664             final ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
665                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
666
667             final String transactionID2 = "tx2";
668             final MutableCompositeModification modification2 = new MutableCompositeModification();
669             final ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
670                     TestModel.OUTER_LIST_PATH,
671                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
672                     modification2);
673
674             final String transactionID3 = "tx3";
675             final MutableCompositeModification modification3 = new MutableCompositeModification();
676             final ShardDataTreeCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
677                     YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
678                         .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
679                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1),
680                     modification3);
681
682             final long timeoutSec = 5;
683             final FiniteDuration duration = FiniteDuration.create(timeoutSec, TimeUnit.SECONDS);
684             final Timeout timeout = new Timeout(duration);
685
686             // Simulate the ForwardedReadyTransaction message for the first Tx that would be sent
687             // by the ShardTransaction.
688
689             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
690                     cohort1, modification1, true, false), getRef());
691             final ReadyTransactionReply readyReply = ReadyTransactionReply.fromSerializable(
692                     expectMsgClass(duration, ReadyTransactionReply.class));
693             assertEquals("Cohort path", shard.path().toString(), readyReply.getCohortPath());
694
695             // Send the CanCommitTransaction message for the first Tx.
696
697             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
698             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
699                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
700             assertEquals("Can commit", true, canCommitReply.getCanCommit());
701
702             // Send the ForwardedReadyTransaction for the next 2 Tx's.
703
704             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
705                     cohort2, modification2, true, false), getRef());
706             expectMsgClass(duration, ReadyTransactionReply.class);
707
708             shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
709                     cohort3, modification3, true, false), getRef());
710             expectMsgClass(duration, ReadyTransactionReply.class);
711
712             // Send the CanCommitTransaction message for the next 2 Tx's. These should get queued and
713             // processed after the first Tx completes.
714
715             final Future<Object> canCommitFuture1 = Patterns.ask(shard,
716                     new CanCommitTransaction(transactionID2).toSerializable(), timeout);
717
718             final Future<Object> canCommitFuture2 = Patterns.ask(shard,
719                     new CanCommitTransaction(transactionID3).toSerializable(), timeout);
720
721             // Send the CommitTransaction message for the first Tx. After it completes, it should
722             // trigger the 2nd Tx to proceed which should in turn then trigger the 3rd.
723
724             shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
725             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
726
727             // Wait for the next 2 Tx's to complete.
728
729             final AtomicReference<Throwable> caughtEx = new AtomicReference<>();
730             final CountDownLatch commitLatch = new CountDownLatch(2);
731
732             class OnFutureComplete extends OnComplete<Object> {
733                 private final Class<?> expRespType;
734
735                 OnFutureComplete(final Class<?> expRespType) {
736                     this.expRespType = expRespType;
737                 }
738
739                 @Override
740                 public void onComplete(final Throwable error, final Object resp) {
741                     if(error != null) {
742                         caughtEx.set(new AssertionError(getClass().getSimpleName() + " failure", error));
743                     } else {
744                         try {
745                             assertEquals("Commit response type", expRespType, resp.getClass());
746                             onSuccess(resp);
747                         } catch (final Exception e) {
748                             caughtEx.set(e);
749                         }
750                     }
751                 }
752
753                 void onSuccess(final Object resp) throws Exception {
754                 }
755             }
756
757             class OnCommitFutureComplete extends OnFutureComplete {
758                 OnCommitFutureComplete() {
759                     super(CommitTransactionReply.SERIALIZABLE_CLASS);
760                 }
761
762                 @Override
763                 public void onComplete(final Throwable error, final Object resp) {
764                     super.onComplete(error, resp);
765                     commitLatch.countDown();
766                 }
767             }
768
769             class OnCanCommitFutureComplete extends OnFutureComplete {
770                 private final String transactionID;
771
772                 OnCanCommitFutureComplete(final String transactionID) {
773                     super(CanCommitTransactionReply.SERIALIZABLE_CLASS);
774                     this.transactionID = transactionID;
775                 }
776
777                 @Override
778                 void onSuccess(final Object resp) throws Exception {
779                     final CanCommitTransactionReply canCommitReply =
780                             CanCommitTransactionReply.fromSerializable(resp);
781                     assertEquals("Can commit", true, canCommitReply.getCanCommit());
782
783                     final Future<Object> commitFuture = Patterns.ask(shard,
784                             new CommitTransaction(transactionID).toSerializable(), timeout);
785                     commitFuture.onComplete(new OnCommitFutureComplete(), getSystem().dispatcher());
786                 }
787             }
788
789             canCommitFuture1.onComplete(new OnCanCommitFutureComplete(transactionID2),
790                     getSystem().dispatcher());
791
792             canCommitFuture2.onComplete(new OnCanCommitFutureComplete(transactionID3),
793                     getSystem().dispatcher());
794
795             final boolean done = commitLatch.await(timeoutSec, TimeUnit.SECONDS);
796
797             if(caughtEx.get() != null) {
798                 throw caughtEx.get();
799             }
800
801             assertEquals("Commits complete", true, done);
802
803             final InOrder inOrder = inOrder(cohort1, cohort2, cohort3);
804             inOrder.verify(cohort1).canCommit();
805             inOrder.verify(cohort1).preCommit();
806             inOrder.verify(cohort1).commit();
807             inOrder.verify(cohort2).canCommit();
808             inOrder.verify(cohort2).preCommit();
809             inOrder.verify(cohort2).commit();
810             inOrder.verify(cohort3).canCommit();
811             inOrder.verify(cohort3).preCommit();
812             inOrder.verify(cohort3).commit();
813
814             // Verify data in the data store.
815
816             verifyOuterListEntry(shard, 1);
817
818             verifyLastApplied(shard, 2);
819
820             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
821         }};
822     }
823
824     private static BatchedModifications newBatchedModifications(final String transactionID, final YangInstanceIdentifier path,
825             final NormalizedNode<?, ?> data, final boolean ready, final boolean doCommitOnReady, final int messagesSent) {
826         return newBatchedModifications(transactionID, null, path, data, ready, doCommitOnReady, messagesSent);
827     }
828
829     private static BatchedModifications newBatchedModifications(final String transactionID, final String transactionChainID,
830             final YangInstanceIdentifier path, final NormalizedNode<?, ?> data, final boolean ready, final boolean doCommitOnReady,
831             final int messagesSent) {
832         final BatchedModifications batched = new BatchedModifications(transactionID, CURRENT_VERSION, transactionChainID);
833         batched.addModification(new WriteModification(path, data));
834         batched.setReady(ready);
835         batched.setDoCommitOnReady(doCommitOnReady);
836         batched.setTotalMessagesSent(messagesSent);
837         return batched;
838     }
839
840     @Test
841     public void testBatchedModificationsWithNoCommitOnReady() throws Throwable {
842         new ShardTestKit(getSystem()) {{
843             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
844                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
845                     "testBatchedModificationsWithNoCommitOnReady");
846
847             waitUntilLeader(shard);
848
849             final String transactionID = "tx";
850             final FiniteDuration duration = duration("5 seconds");
851
852             final AtomicReference<ShardDataTreeCohort> mockCohort = new AtomicReference<>();
853             final ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() {
854                 @Override
855                 public ShardDataTreeCohort decorate(final String txID, final ShardDataTreeCohort actual) {
856                     if(mockCohort.get() == null) {
857                         mockCohort.set(createDelegatingMockCohort("cohort", actual));
858                     }
859
860                     return mockCohort.get();
861                 }
862             };
863
864             shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator);
865
866             // Send a BatchedModifications to start a transaction.
867
868             shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH,
869                     ImmutableNodes.containerNode(TestModel.TEST_QNAME), false, false, 1), getRef());
870             expectMsgClass(duration, BatchedModificationsReply.class);
871
872             // Send a couple more BatchedModifications.
873
874             shard.tell(newBatchedModifications(transactionID, TestModel.OUTER_LIST_PATH,
875                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), false, false, 2), getRef());
876             expectMsgClass(duration, BatchedModificationsReply.class);
877
878             shard.tell(newBatchedModifications(transactionID, YangInstanceIdentifier.builder(
879                     TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
880                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true, false, 3), getRef());
881             expectMsgClass(duration, ReadyTransactionReply.class);
882
883             // Send the CanCommitTransaction message.
884
885             shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
886             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
887                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
888             assertEquals("Can commit", true, canCommitReply.getCanCommit());
889
890             // Send the CanCommitTransaction message.
891
892             shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
893             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
894
895             final InOrder inOrder = inOrder(mockCohort.get());
896             inOrder.verify(mockCohort.get()).canCommit();
897             inOrder.verify(mockCohort.get()).preCommit();
898             inOrder.verify(mockCohort.get()).commit();
899
900             // Verify data in the data store.
901
902             verifyOuterListEntry(shard, 1);
903
904             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
905         }};
906     }
907
908     @Test
909     public void testBatchedModificationsWithCommitOnReady() throws Throwable {
910         new ShardTestKit(getSystem()) {{
911             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
912                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
913                     "testBatchedModificationsWithCommitOnReady");
914
915             waitUntilLeader(shard);
916
917             final String transactionID = "tx";
918             final FiniteDuration duration = duration("5 seconds");
919
920             final AtomicReference<ShardDataTreeCohort> mockCohort = new AtomicReference<>();
921             final ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() {
922                 @Override
923                 public ShardDataTreeCohort decorate(final String txID, final ShardDataTreeCohort actual) {
924                     if(mockCohort.get() == null) {
925                         mockCohort.set(createDelegatingMockCohort("cohort", actual));
926                     }
927
928                     return mockCohort.get();
929                 }
930             };
931
932             shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator);
933
934             // Send a BatchedModifications to start a transaction.
935
936             shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH,
937                     ImmutableNodes.containerNode(TestModel.TEST_QNAME), false, false, 1), getRef());
938             expectMsgClass(duration, BatchedModificationsReply.class);
939
940             // Send a couple more BatchedModifications.
941
942             shard.tell(newBatchedModifications(transactionID, TestModel.OUTER_LIST_PATH,
943                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), false, false, 2), getRef());
944             expectMsgClass(duration, BatchedModificationsReply.class);
945
946             shard.tell(newBatchedModifications(transactionID, YangInstanceIdentifier.builder(
947                     TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
948                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true, true, 3), getRef());
949
950             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
951
952             final InOrder inOrder = inOrder(mockCohort.get());
953             inOrder.verify(mockCohort.get()).canCommit();
954             inOrder.verify(mockCohort.get()).preCommit();
955             inOrder.verify(mockCohort.get()).commit();
956
957             // Verify data in the data store.
958
959             verifyOuterListEntry(shard, 1);
960
961             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
962         }};
963     }
964
965     @Test(expected=IllegalStateException.class)
966     public void testBatchedModificationsReadyWithIncorrectTotalMessageCount() throws Throwable {
967         new ShardTestKit(getSystem()) {{
968             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
969                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
970                     "testBatchedModificationsReadyWithIncorrectTotalMessageCount");
971
972             waitUntilLeader(shard);
973
974             final String transactionID = "tx1";
975             final BatchedModifications batched = new BatchedModifications(transactionID, DataStoreVersions.CURRENT_VERSION, null);
976             batched.setReady(true);
977             batched.setTotalMessagesSent(2);
978
979             shard.tell(batched, getRef());
980
981             final Failure failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
982
983             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
984
985             if(failure != null) {
986                 throw failure.cause();
987             }
988         }};
989     }
990
991     @SuppressWarnings("unchecked")
992     private static void verifyOuterListEntry(final TestActorRef<Shard> shard, final Object expIDValue) throws Exception {
993         final NormalizedNode<?, ?> outerList = readStore(shard, TestModel.OUTER_LIST_PATH);
994         assertNotNull(TestModel.OUTER_LIST_QNAME.getLocalName() + " not found", outerList);
995         assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " value is not Iterable",
996                 outerList.getValue() instanceof Iterable);
997         final Object entry = ((Iterable<Object>)outerList.getValue()).iterator().next();
998         assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " entry is not MapEntryNode",
999                 entry instanceof MapEntryNode);
1000         final MapEntryNode mapEntry = (MapEntryNode)entry;
1001         final Optional<DataContainerChild<? extends PathArgument, ?>> idLeaf =
1002                 mapEntry.getChild(new YangInstanceIdentifier.NodeIdentifier(TestModel.ID_QNAME));
1003         assertTrue("Missing leaf " + TestModel.ID_QNAME.getLocalName(), idLeaf.isPresent());
1004         assertEquals(TestModel.ID_QNAME.getLocalName() + " value", expIDValue, idLeaf.get().getValue());
1005     }
1006
1007     @Test
1008     public void testBatchedModificationsOnTransactionChain() throws Throwable {
1009         new ShardTestKit(getSystem()) {{
1010             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1011                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1012                     "testBatchedModificationsOnTransactionChain");
1013
1014             waitUntilLeader(shard);
1015
1016             final String transactionChainID = "txChain";
1017             final String transactionID1 = "tx1";
1018             final String transactionID2 = "tx2";
1019
1020             final FiniteDuration duration = duration("5 seconds");
1021
1022             // Send a BatchedModifications to start a chained write transaction and ready it.
1023
1024             final ContainerNode containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1025             final YangInstanceIdentifier path = TestModel.TEST_PATH;
1026             shard.tell(newBatchedModifications(transactionID1, transactionChainID, path,
1027                     containerNode, true, false, 1), getRef());
1028             expectMsgClass(duration, ReadyTransactionReply.class);
1029
1030             // Create a read Tx on the same chain.
1031
1032             shard.tell(new CreateTransaction(transactionID2, TransactionType.READ_ONLY.ordinal() ,
1033                     transactionChainID).toSerializable(), getRef());
1034
1035             final CreateTransactionReply createReply = expectMsgClass(duration("3 seconds"), CreateTransactionReply.class);
1036
1037             getSystem().actorSelection(createReply.getTransactionActorPath()).tell(new ReadData(path), getRef());
1038             final ReadDataReply readReply = expectMsgClass(duration("3 seconds"), ReadDataReply.class);
1039             assertEquals("Read node", containerNode, readReply.getNormalizedNode());
1040
1041             // Commit the write transaction.
1042
1043             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1044             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1045                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1046             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1047
1048             shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
1049             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1050
1051             // Verify data in the data store.
1052
1053             final NormalizedNode<?, ?> actualNode = readStore(shard, path);
1054             assertEquals("Stored node", containerNode, actualNode);
1055
1056             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1057         }};
1058     }
1059
1060     @Test
1061     public void testOnBatchedModificationsWhenNotLeader() {
1062         final AtomicBoolean overrideLeaderCalls = new AtomicBoolean();
1063         new ShardTestKit(getSystem()) {{
1064             final Creator<Shard> creator = new Creator<Shard>() {
1065                 private static final long serialVersionUID = 1L;
1066
1067                 @Override
1068                 public Shard create() throws Exception {
1069                     return new Shard(shardID, Collections.<String,String>emptyMap(),
1070                             newDatastoreContext(), SCHEMA_CONTEXT) {
1071                         @Override
1072                         protected boolean isLeader() {
1073                             return overrideLeaderCalls.get() ? false : super.isLeader();
1074                         }
1075
1076                         @Override
1077                         protected ActorSelection getLeader() {
1078                             return overrideLeaderCalls.get() ? getSystem().actorSelection(getRef().path()) :
1079                                 super.getLeader();
1080                         }
1081                     };
1082                 }
1083             };
1084
1085             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1086                     Props.create(new DelegatingShardCreator(creator)), "testOnBatchedModificationsWhenNotLeader");
1087
1088             waitUntilLeader(shard);
1089
1090             overrideLeaderCalls.set(true);
1091
1092             final BatchedModifications batched = new BatchedModifications("tx", DataStoreVersions.CURRENT_VERSION, "");
1093
1094             shard.tell(batched, ActorRef.noSender());
1095
1096             expectMsgEquals(batched);
1097
1098             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1099         }};
1100     }
1101
1102     @Test
1103     public void testForwardedReadyTransactionWithImmediateCommit() throws Exception{
1104         new ShardTestKit(getSystem()) {{
1105             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1106                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1107                     "testForwardedReadyTransactionWithImmediateCommit");
1108
1109             waitUntilLeader(shard);
1110
1111             final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1112
1113             final String transactionID = "tx1";
1114             final MutableCompositeModification modification = new MutableCompositeModification();
1115             final NormalizedNode<?, ?> containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1116             final ShardDataTreeCohort cohort = setupMockWriteTransaction("cohort", dataStore,
1117                     TestModel.TEST_PATH, containerNode, modification);
1118
1119             final FiniteDuration duration = duration("5 seconds");
1120
1121             // Simulate the ForwardedReadyTransaction messages that would be sent
1122             // by the ShardTransaction.
1123
1124             shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
1125                     cohort, modification, true, true), getRef());
1126
1127             expectMsgClass(duration, ThreePhaseCommitCohortMessages.CommitTransactionReply.class);
1128
1129             final InOrder inOrder = inOrder(cohort);
1130             inOrder.verify(cohort).canCommit();
1131             inOrder.verify(cohort).preCommit();
1132             inOrder.verify(cohort).commit();
1133
1134             final NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.TEST_PATH);
1135             assertEquals(TestModel.TEST_QNAME.getLocalName(), containerNode, actualNode);
1136
1137             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1138         }};
1139     }
1140
1141     @Test
1142     public void testReadyLocalTransactionWithImmediateCommit() throws Exception{
1143         new ShardTestKit(getSystem()) {{
1144             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1145                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1146                     "testReadyLocalTransactionWithImmediateCommit");
1147
1148             waitUntilLeader(shard);
1149
1150             final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1151
1152             final DataTreeModification modification = dataStore.getDataTree().takeSnapshot().newModification();
1153
1154             final ContainerNode writeData = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1155             new WriteModification(TestModel.TEST_PATH, writeData).apply(modification);
1156             final MapNode mergeData = ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build();
1157             new MergeModification(TestModel.OUTER_LIST_PATH, mergeData).apply(modification);
1158
1159             final String txId = "tx1";
1160             modification.ready();
1161             final ReadyLocalTransaction readyMessage = new ReadyLocalTransaction(txId, modification, true);
1162
1163             shard.tell(readyMessage, getRef());
1164
1165             expectMsgClass(CommitTransactionReply.SERIALIZABLE_CLASS);
1166
1167             final NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.OUTER_LIST_PATH);
1168             assertEquals(TestModel.OUTER_LIST_QNAME.getLocalName(), mergeData, actualNode);
1169
1170             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1171         }};
1172     }
1173
1174     @Test
1175     public void testReadyLocalTransactionWithThreePhaseCommit() throws Exception{
1176         new ShardTestKit(getSystem()) {{
1177             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1178                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1179                     "testReadyLocalTransactionWithThreePhaseCommit");
1180
1181             waitUntilLeader(shard);
1182
1183             final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1184
1185             final DataTreeModification modification = dataStore.getDataTree().takeSnapshot().newModification();
1186
1187             final ContainerNode writeData = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1188             new WriteModification(TestModel.TEST_PATH, writeData).apply(modification);
1189             final MapNode mergeData = ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build();
1190             new MergeModification(TestModel.OUTER_LIST_PATH, mergeData).apply(modification);
1191
1192             final String txId = "tx1";
1193                 modification.ready();
1194             final ReadyLocalTransaction readyMessage = new ReadyLocalTransaction(txId, modification, false);
1195
1196             shard.tell(readyMessage, getRef());
1197
1198             expectMsgClass(ReadyTransactionReply.class);
1199
1200             // Send the CanCommitTransaction message.
1201
1202             shard.tell(new CanCommitTransaction(txId).toSerializable(), getRef());
1203             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1204                     expectMsgClass(CanCommitTransactionReply.SERIALIZABLE_CLASS));
1205             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1206
1207             // Send the CanCommitTransaction message.
1208
1209             shard.tell(new CommitTransaction(txId).toSerializable(), getRef());
1210             expectMsgClass(CommitTransactionReply.SERIALIZABLE_CLASS);
1211
1212             final NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.OUTER_LIST_PATH);
1213             assertEquals(TestModel.OUTER_LIST_QNAME.getLocalName(), mergeData, actualNode);
1214
1215             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1216         }};
1217     }
1218
1219     @Test
1220     public void testCommitWithPersistenceDisabled() throws Throwable {
1221         dataStoreContextBuilder.persistent(false);
1222         new ShardTestKit(getSystem()) {{
1223             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1224                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1225                     "testCommitWithPersistenceDisabled");
1226
1227             waitUntilLeader(shard);
1228
1229             final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1230
1231             // Setup a simulated transactions with a mock cohort.
1232
1233             final String transactionID = "tx";
1234             final MutableCompositeModification modification = new MutableCompositeModification();
1235             final NormalizedNode<?, ?> containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1236             final ShardDataTreeCohort cohort = setupMockWriteTransaction("cohort", dataStore,
1237                     TestModel.TEST_PATH, containerNode, modification);
1238
1239             final FiniteDuration duration = duration("5 seconds");
1240
1241             // Simulate the ForwardedReadyTransaction messages that would be sent
1242             // by the ShardTransaction.
1243
1244             shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
1245                     cohort, modification, true, false), getRef());
1246             expectMsgClass(duration, ReadyTransactionReply.class);
1247
1248             // Send the CanCommitTransaction message.
1249
1250             shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
1251             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1252                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1253             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1254
1255             // Send the CanCommitTransaction message.
1256
1257             shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
1258             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1259
1260             final InOrder inOrder = inOrder(cohort);
1261             inOrder.verify(cohort).canCommit();
1262             inOrder.verify(cohort).preCommit();
1263             inOrder.verify(cohort).commit();
1264
1265             final NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.TEST_PATH);
1266             assertEquals(TestModel.TEST_QNAME.getLocalName(), containerNode, actualNode);
1267
1268             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1269         }};
1270     }
1271
1272     private static DataTreeCandidateTip mockCandidate(final String name) {
1273         final DataTreeCandidateTip mockCandidate = mock(DataTreeCandidateTip.class, name);
1274         final DataTreeCandidateNode mockCandidateNode = mock(DataTreeCandidateNode.class, name + "-node");
1275         doReturn(ModificationType.WRITE).when(mockCandidateNode).getModificationType();
1276         doReturn(Optional.of(ImmutableNodes.containerNode(CARS_QNAME))).when(mockCandidateNode).getDataAfter();
1277         doReturn(YangInstanceIdentifier.builder().build()).when(mockCandidate).getRootPath();
1278         doReturn(mockCandidateNode).when(mockCandidate).getRootNode();
1279         return mockCandidate;
1280     }
1281
1282     private static DataTreeCandidateTip mockUnmodifiedCandidate(final String name) {
1283         final DataTreeCandidateTip mockCandidate = mock(DataTreeCandidateTip.class, name);
1284         final DataTreeCandidateNode mockCandidateNode = mock(DataTreeCandidateNode.class, name + "-node");
1285         doReturn(ModificationType.UNMODIFIED).when(mockCandidateNode).getModificationType();
1286         doReturn(YangInstanceIdentifier.builder().build()).when(mockCandidate).getRootPath();
1287         doReturn(mockCandidateNode).when(mockCandidate).getRootNode();
1288         return mockCandidate;
1289     }
1290
1291     @Test
1292     public void testCommitWhenTransactionHasNoModifications(){
1293         // Note that persistence is enabled which would normally result in the entry getting written to the journal
1294         // but here that need not happen
1295         new ShardTestKit(getSystem()) {
1296             {
1297                 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1298                         newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1299                         "testCommitWhenTransactionHasNoModifications");
1300
1301                 waitUntilLeader(shard);
1302
1303                 final String transactionID = "tx1";
1304                 final MutableCompositeModification modification = new MutableCompositeModification();
1305                 final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1306                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1307                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).preCommit();
1308                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).commit();
1309                 doReturn(mockUnmodifiedCandidate("cohort1-candidate")).when(cohort).getCandidate();
1310
1311                 final FiniteDuration duration = duration("5 seconds");
1312
1313                 // Simulate the ForwardedReadyTransaction messages that would be sent
1314                 // by the ShardTransaction.
1315
1316                 shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
1317                         cohort, modification, true, false), getRef());
1318                 expectMsgClass(duration, ReadyTransactionReply.class);
1319
1320                 // Send the CanCommitTransaction message.
1321
1322                 shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
1323                 final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1324                         expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1325                 assertEquals("Can commit", true, canCommitReply.getCanCommit());
1326
1327                 shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
1328                 expectMsgClass(duration, ThreePhaseCommitCohortMessages.CommitTransactionReply.class);
1329
1330                 final InOrder inOrder = inOrder(cohort);
1331                 inOrder.verify(cohort).canCommit();
1332                 inOrder.verify(cohort).preCommit();
1333                 inOrder.verify(cohort).commit();
1334
1335                 shard.tell(Shard.GET_SHARD_MBEAN_MESSAGE, getRef());
1336                 final ShardStats shardStats = expectMsgClass(duration, ShardStats.class);
1337
1338                 // Use MBean for verification
1339                 // Committed transaction count should increase as usual
1340                 assertEquals(1,shardStats.getCommittedTransactionsCount());
1341
1342                 // Commit index should not advance because this does not go into the journal
1343                 assertEquals(-1, shardStats.getCommitIndex());
1344
1345                 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1346
1347             }
1348         };
1349     }
1350
1351     @Test
1352     public void testCommitWhenTransactionHasModifications(){
1353         new ShardTestKit(getSystem()) {
1354             {
1355                 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1356                         newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1357                         "testCommitWhenTransactionHasModifications");
1358
1359                 waitUntilLeader(shard);
1360
1361                 final String transactionID = "tx1";
1362                 final MutableCompositeModification modification = new MutableCompositeModification();
1363                 modification.addModification(new DeleteModification(YangInstanceIdentifier.builder().build()));
1364                 final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1365                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1366                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).preCommit();
1367                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).commit();
1368                 doReturn(mockCandidate("cohort1-candidate")).when(cohort).getCandidate();
1369
1370                 final FiniteDuration duration = duration("5 seconds");
1371
1372                 // Simulate the ForwardedReadyTransaction messages that would be sent
1373                 // by the ShardTransaction.
1374
1375                 shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
1376                         cohort, modification, true, false), getRef());
1377                 expectMsgClass(duration, ReadyTransactionReply.class);
1378
1379                 // Send the CanCommitTransaction message.
1380
1381                 shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
1382                 final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1383                         expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1384                 assertEquals("Can commit", true, canCommitReply.getCanCommit());
1385
1386                 shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
1387                 expectMsgClass(duration, ThreePhaseCommitCohortMessages.CommitTransactionReply.class);
1388
1389                 final InOrder inOrder = inOrder(cohort);
1390                 inOrder.verify(cohort).canCommit();
1391                 inOrder.verify(cohort).preCommit();
1392                 inOrder.verify(cohort).commit();
1393
1394                 shard.tell(Shard.GET_SHARD_MBEAN_MESSAGE, getRef());
1395                 final ShardStats shardStats = expectMsgClass(duration, ShardStats.class);
1396
1397                 // Use MBean for verification
1398                 // Committed transaction count should increase as usual
1399                 assertEquals(1, shardStats.getCommittedTransactionsCount());
1400
1401                 // Commit index should advance as we do not have an empty modification
1402                 assertEquals(0, shardStats.getCommitIndex());
1403
1404                 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1405
1406             }
1407         };
1408     }
1409
1410     @Test
1411     public void testCommitPhaseFailure() throws Throwable {
1412         new ShardTestKit(getSystem()) {{
1413             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1414                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1415                     "testCommitPhaseFailure");
1416
1417             waitUntilLeader(shard);
1418
1419             // Setup 2 simulated transactions with mock cohorts. The first one fails in the
1420             // commit phase.
1421
1422             final String transactionID1 = "tx1";
1423             final MutableCompositeModification modification1 = new MutableCompositeModification();
1424             final ShardDataTreeCohort cohort1 = mock(ShardDataTreeCohort.class, "cohort1");
1425             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
1426             doReturn(Futures.immediateFuture(null)).when(cohort1).preCommit();
1427             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort1).commit();
1428             doReturn(mockCandidate("cohort1-candidate")).when(cohort1).getCandidate();
1429
1430             final String transactionID2 = "tx2";
1431             final MutableCompositeModification modification2 = new MutableCompositeModification();
1432             final ShardDataTreeCohort cohort2 = mock(ShardDataTreeCohort.class, "cohort2");
1433             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
1434
1435             final FiniteDuration duration = duration("5 seconds");
1436             final Timeout timeout = new Timeout(duration);
1437
1438             // Simulate the ForwardedReadyTransaction messages that would be sent
1439             // by the ShardTransaction.
1440
1441             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1442                     cohort1, modification1, true, false), getRef());
1443             expectMsgClass(duration, ReadyTransactionReply.class);
1444
1445             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1446                     cohort2, modification2, true, false), getRef());
1447             expectMsgClass(duration, ReadyTransactionReply.class);
1448
1449             // Send the CanCommitTransaction message for the first Tx.
1450
1451             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1452             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1453                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1454             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1455
1456             // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
1457             // processed after the first Tx completes.
1458
1459             final Future<Object> canCommitFuture = Patterns.ask(shard,
1460                     new CanCommitTransaction(transactionID2).toSerializable(), timeout);
1461
1462             // Send the CommitTransaction message for the first Tx. This should send back an error
1463             // and trigger the 2nd Tx to proceed.
1464
1465             shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
1466             expectMsgClass(duration, akka.actor.Status.Failure.class);
1467
1468             // Wait for the 2nd Tx to complete the canCommit phase.
1469
1470             final CountDownLatch latch = new CountDownLatch(1);
1471             canCommitFuture.onComplete(new OnComplete<Object>() {
1472                 @Override
1473                 public void onComplete(final Throwable t, final Object resp) {
1474                     latch.countDown();
1475                 }
1476             }, getSystem().dispatcher());
1477
1478             assertEquals("2nd CanCommit complete", true, latch.await(5, TimeUnit.SECONDS));
1479
1480             final InOrder inOrder = inOrder(cohort1, cohort2);
1481             inOrder.verify(cohort1).canCommit();
1482             inOrder.verify(cohort1).preCommit();
1483             inOrder.verify(cohort1).commit();
1484             inOrder.verify(cohort2).canCommit();
1485
1486             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1487         }};
1488     }
1489
1490     @Test
1491     public void testPreCommitPhaseFailure() throws Throwable {
1492         new ShardTestKit(getSystem()) {{
1493             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1494                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1495                     "testPreCommitPhaseFailure");
1496
1497             waitUntilLeader(shard);
1498
1499             final String transactionID1 = "tx1";
1500             final MutableCompositeModification modification1 = new MutableCompositeModification();
1501             final ShardDataTreeCohort cohort1 = mock(ShardDataTreeCohort.class, "cohort1");
1502             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
1503             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort1).preCommit();
1504
1505             final String transactionID2 = "tx2";
1506             final MutableCompositeModification modification2 = new MutableCompositeModification();
1507             final ShardDataTreeCohort cohort2 = mock(ShardDataTreeCohort.class, "cohort2");
1508             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
1509
1510             final FiniteDuration duration = duration("5 seconds");
1511             final Timeout timeout = new Timeout(duration);
1512
1513             // Simulate the ForwardedReadyTransaction messages that would be sent
1514             // by the ShardTransaction.
1515
1516             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1517                     cohort1, modification1, true, false), getRef());
1518             expectMsgClass(duration, ReadyTransactionReply.class);
1519
1520             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1521                     cohort2, modification2, true, false), getRef());
1522             expectMsgClass(duration, ReadyTransactionReply.class);
1523
1524             // Send the CanCommitTransaction message for the first Tx.
1525
1526             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1527             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1528                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1529             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1530
1531             // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
1532             // processed after the first Tx completes.
1533
1534             final Future<Object> canCommitFuture = Patterns.ask(shard,
1535                     new CanCommitTransaction(transactionID2).toSerializable(), timeout);
1536
1537             // Send the CommitTransaction message for the first Tx. This should send back an error
1538             // and trigger the 2nd Tx to proceed.
1539
1540             shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
1541             expectMsgClass(duration, akka.actor.Status.Failure.class);
1542
1543             // Wait for the 2nd Tx to complete the canCommit phase.
1544
1545             final CountDownLatch latch = new CountDownLatch(1);
1546             canCommitFuture.onComplete(new OnComplete<Object>() {
1547                 @Override
1548                 public void onComplete(final Throwable t, final Object resp) {
1549                     latch.countDown();
1550                 }
1551             }, getSystem().dispatcher());
1552
1553             assertEquals("2nd CanCommit complete", true, latch.await(5, TimeUnit.SECONDS));
1554
1555             final InOrder inOrder = inOrder(cohort1, cohort2);
1556             inOrder.verify(cohort1).canCommit();
1557             inOrder.verify(cohort1).preCommit();
1558             inOrder.verify(cohort2).canCommit();
1559
1560             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1561         }};
1562     }
1563
1564     @Test
1565     public void testCanCommitPhaseFailure() throws Throwable {
1566         new ShardTestKit(getSystem()) {{
1567             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1568                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1569                     "testCanCommitPhaseFailure");
1570
1571             waitUntilLeader(shard);
1572
1573             final FiniteDuration duration = duration("5 seconds");
1574
1575             final String transactionID1 = "tx1";
1576             final MutableCompositeModification modification = new MutableCompositeModification();
1577             final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1578             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).canCommit();
1579
1580             // Simulate the ForwardedReadyTransaction messages that would be sent
1581             // by the ShardTransaction.
1582
1583             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1584                     cohort, modification, true, false), getRef());
1585             expectMsgClass(duration, ReadyTransactionReply.class);
1586
1587             // Send the CanCommitTransaction message.
1588
1589             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1590             expectMsgClass(duration, akka.actor.Status.Failure.class);
1591
1592             // Send another can commit to ensure the failed one got cleaned up.
1593
1594             reset(cohort);
1595
1596             final String transactionID2 = "tx2";
1597             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1598
1599             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1600                     cohort, modification, true, false), getRef());
1601             expectMsgClass(duration, ReadyTransactionReply.class);
1602
1603             shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
1604             final CanCommitTransactionReply reply = CanCommitTransactionReply.fromSerializable(
1605                     expectMsgClass(CanCommitTransactionReply.SERIALIZABLE_CLASS));
1606             assertEquals("getCanCommit", true, reply.getCanCommit());
1607
1608             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1609         }};
1610     }
1611
1612     @Test
1613     public void testCanCommitPhaseFalseResponse() throws Throwable {
1614         new ShardTestKit(getSystem()) {{
1615             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1616                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1617                     "testCanCommitPhaseFalseResponse");
1618
1619             waitUntilLeader(shard);
1620
1621             final FiniteDuration duration = duration("5 seconds");
1622
1623             final String transactionID1 = "tx1";
1624             final MutableCompositeModification modification = new MutableCompositeModification();
1625             final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1626             doReturn(Futures.immediateFuture(Boolean.FALSE)).when(cohort).canCommit();
1627
1628             // Simulate the ForwardedReadyTransaction messages that would be sent
1629             // by the ShardTransaction.
1630
1631             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1632                     cohort, modification, true, false), getRef());
1633             expectMsgClass(duration, ReadyTransactionReply.class);
1634
1635             // Send the CanCommitTransaction message.
1636
1637             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1638             CanCommitTransactionReply reply = CanCommitTransactionReply.fromSerializable(
1639                     expectMsgClass(CanCommitTransactionReply.SERIALIZABLE_CLASS));
1640             assertEquals("getCanCommit", false, reply.getCanCommit());
1641
1642             // Send another can commit to ensure the failed one got cleaned up.
1643
1644             reset(cohort);
1645
1646             final String transactionID2 = "tx2";
1647             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1648
1649             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1650                     cohort, modification, true, false), getRef());
1651             expectMsgClass(duration, ReadyTransactionReply.class);
1652
1653             shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
1654             reply = CanCommitTransactionReply.fromSerializable(
1655                     expectMsgClass(CanCommitTransactionReply.SERIALIZABLE_CLASS));
1656             assertEquals("getCanCommit", true, reply.getCanCommit());
1657
1658             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1659         }};
1660     }
1661
1662     @Test
1663     public void testImmediateCommitWithCanCommitPhaseFailure() throws Throwable {
1664         new ShardTestKit(getSystem()) {{
1665             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1666                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1667                     "testImmediateCommitWithCanCommitPhaseFailure");
1668
1669             waitUntilLeader(shard);
1670
1671             final FiniteDuration duration = duration("5 seconds");
1672
1673             final String transactionID1 = "tx1";
1674             final MutableCompositeModification modification = new MutableCompositeModification();
1675             final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1676             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).canCommit();
1677
1678             // Simulate the ForwardedReadyTransaction messages that would be sent
1679             // by the ShardTransaction.
1680
1681             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1682                     cohort, modification, true, true), getRef());
1683
1684             expectMsgClass(duration, akka.actor.Status.Failure.class);
1685
1686             // Send another can commit to ensure the failed one got cleaned up.
1687
1688             reset(cohort);
1689
1690             final String transactionID2 = "tx2";
1691             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1692             doReturn(Futures.immediateFuture(null)).when(cohort).preCommit();
1693             doReturn(Futures.immediateFuture(null)).when(cohort).commit();
1694             final DataTreeCandidateTip candidate = mock(DataTreeCandidateTip.class);
1695             final DataTreeCandidateNode candidateRoot = mock(DataTreeCandidateNode.class);
1696             doReturn(ModificationType.UNMODIFIED).when(candidateRoot).getModificationType();
1697             doReturn(candidateRoot).when(candidate).getRootNode();
1698             doReturn(candidate).when(cohort).getCandidate();
1699
1700             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1701                     cohort, modification, true, true), getRef());
1702
1703             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1704
1705             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1706         }};
1707     }
1708
1709     @Test
1710     public void testImmediateCommitWithCanCommitPhaseFalseResponse() throws Throwable {
1711         new ShardTestKit(getSystem()) {{
1712             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1713                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1714                     "testImmediateCommitWithCanCommitPhaseFalseResponse");
1715
1716             waitUntilLeader(shard);
1717
1718             final FiniteDuration duration = duration("5 seconds");
1719
1720             final String transactionID = "tx1";
1721             final MutableCompositeModification modification = new MutableCompositeModification();
1722             final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1723             doReturn(Futures.immediateFuture(Boolean.FALSE)).when(cohort).canCommit();
1724
1725             // Simulate the ForwardedReadyTransaction messages that would be sent
1726             // by the ShardTransaction.
1727
1728             shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
1729                     cohort, modification, true, true), getRef());
1730
1731             expectMsgClass(duration, akka.actor.Status.Failure.class);
1732
1733             // Send another can commit to ensure the failed one got cleaned up.
1734
1735             reset(cohort);
1736
1737             final String transactionID2 = "tx2";
1738             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1739             doReturn(Futures.immediateFuture(null)).when(cohort).preCommit();
1740             doReturn(Futures.immediateFuture(null)).when(cohort).commit();
1741             final DataTreeCandidateTip candidate = mock(DataTreeCandidateTip.class);
1742             final DataTreeCandidateNode candidateRoot = mock(DataTreeCandidateNode.class);
1743             doReturn(ModificationType.UNMODIFIED).when(candidateRoot).getModificationType();
1744             doReturn(candidateRoot).when(candidate).getRootNode();
1745             doReturn(candidate).when(cohort).getCandidate();
1746
1747             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1748                     cohort, modification, true, true), getRef());
1749
1750             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1751
1752             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1753         }};
1754     }
1755
1756     @Test
1757     public void testAbortBeforeFinishCommit() throws Throwable {
1758         new ShardTestKit(getSystem()) {{
1759             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1760                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1761                     "testAbortBeforeFinishCommit");
1762
1763             waitUntilLeader(shard);
1764
1765             final FiniteDuration duration = duration("5 seconds");
1766             final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1767
1768             final String transactionID = "tx1";
1769             final Function<ShardDataTreeCohort, ListenableFuture<Void>> preCommit =
1770                           new Function<ShardDataTreeCohort, ListenableFuture<Void>>() {
1771                 @Override
1772                 public ListenableFuture<Void> apply(final ShardDataTreeCohort cohort) {
1773                     final ListenableFuture<Void> preCommitFuture = cohort.preCommit();
1774
1775                     // Simulate an AbortTransaction message occurring during replication, after
1776                     // persisting and before finishing the commit to the in-memory store.
1777                     // We have no followers so due to optimizations in the RaftActor, it does not
1778                     // attempt replication and thus we can't send an AbortTransaction message b/c
1779                     // it would be processed too late after CommitTransaction completes. So we'll
1780                     // simulate an AbortTransaction message occurring during replication by calling
1781                     // the shard directly.
1782                     //
1783                     shard.underlyingActor().doAbortTransaction(transactionID, null);
1784
1785                     return preCommitFuture;
1786                 }
1787             };
1788
1789             final MutableCompositeModification modification = new MutableCompositeModification();
1790             final ShardDataTreeCohort cohort = setupMockWriteTransaction("cohort1", dataStore,
1791                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME),
1792                     modification, preCommit);
1793
1794             shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
1795                     cohort, modification, true, false), getRef());
1796             expectMsgClass(duration, ReadyTransactionReply.class);
1797
1798             shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
1799             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1800                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1801             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1802
1803             shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
1804             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1805
1806             final NormalizedNode<?, ?> node = readStore(shard, TestModel.TEST_PATH);
1807
1808             // Since we're simulating an abort occurring during replication and before finish commit,
1809             // the data should still get written to the in-memory store since we've gotten past
1810             // canCommit and preCommit and persisted the data.
1811             assertNotNull(TestModel.TEST_QNAME.getLocalName() + " not found", node);
1812
1813             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1814         }};
1815     }
1816
1817     @Test
1818     public void testTransactionCommitTimeout() throws Throwable {
1819         dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1);
1820
1821         new ShardTestKit(getSystem()) {{
1822             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1823                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1824                     "testTransactionCommitTimeout");
1825
1826             waitUntilLeader(shard);
1827
1828             final FiniteDuration duration = duration("5 seconds");
1829
1830             final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1831
1832             writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1833             writeToStore(shard, TestModel.OUTER_LIST_PATH,
1834                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
1835
1836             // Create 1st Tx - will timeout
1837
1838             final String transactionID1 = "tx1";
1839             final MutableCompositeModification modification1 = new MutableCompositeModification();
1840             final ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
1841                     YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
1842                         .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
1843                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1),
1844                     modification1);
1845
1846             // Create 2nd Tx
1847
1848             final String transactionID2 = "tx3";
1849             final MutableCompositeModification modification2 = new MutableCompositeModification();
1850             final YangInstanceIdentifier listNodePath = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
1851                 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build();
1852             final ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort3", dataStore,
1853                     listNodePath,
1854                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2),
1855                     modification2);
1856
1857             // Ready the Tx's
1858
1859             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1860                     cohort1, modification1, true, false), getRef());
1861             expectMsgClass(duration, ReadyTransactionReply.class);
1862
1863             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1864                     cohort2, modification2, true, false), getRef());
1865             expectMsgClass(duration, ReadyTransactionReply.class);
1866
1867             // canCommit 1st Tx. We don't send the commit so it should timeout.
1868
1869             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1870             expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
1871
1872             // canCommit the 2nd Tx - it should complete after the 1st Tx times out.
1873
1874             shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
1875             expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
1876
1877             // Try to commit the 1st Tx - should fail as it's not the current Tx.
1878
1879             shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
1880             expectMsgClass(duration, akka.actor.Status.Failure.class);
1881
1882             // Commit the 2nd Tx.
1883
1884             shard.tell(new CommitTransaction(transactionID2).toSerializable(), getRef());
1885             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1886
1887             final NormalizedNode<?, ?> node = readStore(shard, listNodePath);
1888             assertNotNull(listNodePath + " not found", node);
1889
1890             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1891         }};
1892     }
1893
1894     @Test
1895     public void testTransactionCommitQueueCapacityExceeded() throws Throwable {
1896         dataStoreContextBuilder.shardTransactionCommitQueueCapacity(2);
1897
1898         new ShardTestKit(getSystem()) {{
1899             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1900                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1901                     "testTransactionCommitQueueCapacityExceeded");
1902
1903             waitUntilLeader(shard);
1904
1905             final FiniteDuration duration = duration("5 seconds");
1906
1907             final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1908
1909             final String transactionID1 = "tx1";
1910             final MutableCompositeModification modification1 = new MutableCompositeModification();
1911             final ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
1912                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
1913
1914             final String transactionID2 = "tx2";
1915             final MutableCompositeModification modification2 = new MutableCompositeModification();
1916             final ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
1917                     TestModel.OUTER_LIST_PATH,
1918                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
1919                     modification2);
1920
1921             final String transactionID3 = "tx3";
1922             final MutableCompositeModification modification3 = new MutableCompositeModification();
1923             final ShardDataTreeCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
1924                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification3);
1925
1926             // Ready the Tx's
1927
1928             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1929                     cohort1, modification1, true, false), getRef());
1930             expectMsgClass(duration, ReadyTransactionReply.class);
1931
1932             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1933                     cohort2, modification2, true, false), getRef());
1934             expectMsgClass(duration, ReadyTransactionReply.class);
1935
1936             // The 3rd Tx should exceed queue capacity and fail.
1937
1938             shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
1939                     cohort3, modification3, true, false), getRef());
1940             expectMsgClass(duration, akka.actor.Status.Failure.class);
1941
1942             // canCommit 1st Tx.
1943
1944             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1945             expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
1946
1947             // canCommit the 2nd Tx - it should get queued.
1948
1949             shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
1950
1951             // canCommit the 3rd Tx - should exceed queue capacity and fail.
1952
1953             shard.tell(new CanCommitTransaction(transactionID3).toSerializable(), getRef());
1954             expectMsgClass(duration, akka.actor.Status.Failure.class);
1955
1956             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1957         }};
1958     }
1959
1960     @Test
1961     public void testTransactionCommitWithPriorExpiredCohortEntries() throws Throwable {
1962         dataStoreContextBuilder.shardCommitQueueExpiryTimeoutInMillis(1300).shardTransactionCommitTimeoutInSeconds(1);
1963
1964         new ShardTestKit(getSystem()) {{
1965             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1966                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1967                     "testTransactionCommitWithPriorExpiredCohortEntries");
1968
1969             waitUntilLeader(shard);
1970
1971             final FiniteDuration duration = duration("5 seconds");
1972
1973             final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1974
1975             final String transactionID1 = "tx1";
1976             final MutableCompositeModification modification1 = new MutableCompositeModification();
1977             final ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
1978                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
1979
1980             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1981                     cohort1, modification1, true, false), getRef());
1982             expectMsgClass(duration, ReadyTransactionReply.class);
1983
1984             final String transactionID2 = "tx2";
1985             final MutableCompositeModification modification2 = new MutableCompositeModification();
1986             final ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
1987                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification2);
1988
1989             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1990                     cohort2, modification2, true, false), getRef());
1991             expectMsgClass(duration, ReadyTransactionReply.class);
1992
1993             final String transactionID3 = "tx3";
1994             final MutableCompositeModification modification3 = new MutableCompositeModification();
1995             final ShardDataTreeCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
1996                     TestModel.TEST2_PATH, ImmutableNodes.containerNode(TestModel.TEST2_QNAME), modification3);
1997
1998             shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
1999                     cohort3, modification3, true, false), getRef());
2000             expectMsgClass(duration, ReadyTransactionReply.class);
2001
2002             // All Tx's are readied. We'll send canCommit for the last one but not the others. The others
2003             // should expire from the queue and the last one should be processed.
2004
2005             shard.tell(new CanCommitTransaction(transactionID3).toSerializable(), getRef());
2006             expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
2007
2008             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2009         }};
2010     }
2011
2012     @Test
2013     public void testTransactionCommitWithSubsequentExpiredCohortEntry() throws Throwable {
2014         dataStoreContextBuilder.shardCommitQueueExpiryTimeoutInMillis(1300).shardTransactionCommitTimeoutInSeconds(1);
2015
2016         new ShardTestKit(getSystem()) {{
2017             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
2018                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
2019                     "testTransactionCommitWithSubsequentExpiredCohortEntry");
2020
2021             waitUntilLeader(shard);
2022
2023             final FiniteDuration duration = duration("5 seconds");
2024
2025             final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
2026
2027             final String transactionID1 = "tx1";
2028             final MutableCompositeModification modification1 = new MutableCompositeModification();
2029             final ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
2030                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
2031
2032             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
2033                     cohort1, modification1, true, false), getRef());
2034             expectMsgClass(duration, ReadyTransactionReply.class);
2035
2036             // CanCommit the first one so it's the current in-progress CohortEntry.
2037
2038             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
2039             expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
2040
2041             // Ready the second Tx.
2042
2043             final String transactionID2 = "tx2";
2044             final MutableCompositeModification modification2 = new MutableCompositeModification();
2045             final ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
2046                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification2);
2047
2048             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
2049                     cohort2, modification2, true, false), getRef());
2050             expectMsgClass(duration, ReadyTransactionReply.class);
2051
2052             // Ready the third Tx.
2053
2054             final String transactionID3 = "tx3";
2055             final DataTreeModification modification3 = dataStore.getDataTree().takeSnapshot().newModification();
2056             new WriteModification(TestModel.TEST2_PATH, ImmutableNodes.containerNode(TestModel.TEST2_QNAME))
2057                     .apply(modification3);
2058                 modification3.ready();
2059             final ReadyLocalTransaction readyMessage = new ReadyLocalTransaction(transactionID3, modification3, true);
2060
2061             shard.tell(readyMessage, getRef());
2062
2063             // Commit the first Tx. After completing, the second should expire from the queue and the third
2064             // Tx committed.
2065
2066             shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
2067             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
2068
2069             // Expect commit reply from the third Tx.
2070
2071             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
2072
2073             final NormalizedNode<?, ?> node = readStore(shard, TestModel.TEST2_PATH);
2074             assertNotNull(TestModel.TEST2_PATH + " not found", node);
2075
2076             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2077         }};
2078     }
2079
2080     @Test
2081     public void testCanCommitBeforeReadyFailure() throws Throwable {
2082         new ShardTestKit(getSystem()) {{
2083             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
2084                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
2085                     "testCanCommitBeforeReadyFailure");
2086
2087             shard.tell(new CanCommitTransaction("tx").toSerializable(), getRef());
2088             expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
2089
2090             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2091         }};
2092     }
2093
2094     @Test
2095     public void testAbortTransaction() throws Throwable {
2096         new ShardTestKit(getSystem()) {{
2097             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
2098                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
2099                     "testAbortTransaction");
2100
2101             waitUntilLeader(shard);
2102
2103             // Setup 2 simulated transactions with mock cohorts. The first one will be aborted.
2104
2105             final String transactionID1 = "tx1";
2106             final MutableCompositeModification modification1 = new MutableCompositeModification();
2107             final ShardDataTreeCohort cohort1 = mock(ShardDataTreeCohort.class, "cohort1");
2108             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
2109             doReturn(Futures.immediateFuture(null)).when(cohort1).abort();
2110
2111             final String transactionID2 = "tx2";
2112             final MutableCompositeModification modification2 = new MutableCompositeModification();
2113             final ShardDataTreeCohort cohort2 = mock(ShardDataTreeCohort.class, "cohort2");
2114             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
2115
2116             final FiniteDuration duration = duration("5 seconds");
2117             final Timeout timeout = new Timeout(duration);
2118
2119             // Simulate the ForwardedReadyTransaction messages that would be sent
2120             // by the ShardTransaction.
2121
2122             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
2123                     cohort1, modification1, true, false), getRef());
2124             expectMsgClass(duration, ReadyTransactionReply.class);
2125
2126             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
2127                     cohort2, modification2, true, false), getRef());
2128             expectMsgClass(duration, ReadyTransactionReply.class);
2129
2130             // Send the CanCommitTransaction message for the first Tx.
2131
2132             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
2133             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
2134                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
2135             assertEquals("Can commit", true, canCommitReply.getCanCommit());
2136
2137             // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
2138             // processed after the first Tx completes.
2139
2140             final Future<Object> canCommitFuture = Patterns.ask(shard,
2141                     new CanCommitTransaction(transactionID2).toSerializable(), timeout);
2142
2143             // Send the AbortTransaction message for the first Tx. This should trigger the 2nd
2144             // Tx to proceed.
2145
2146             shard.tell(new AbortTransaction(transactionID1).toSerializable(), getRef());
2147             expectMsgClass(duration, AbortTransactionReply.SERIALIZABLE_CLASS);
2148
2149             // Wait for the 2nd Tx to complete the canCommit phase.
2150
2151             Await.ready(canCommitFuture, duration);
2152
2153             final InOrder inOrder = inOrder(cohort1, cohort2);
2154             inOrder.verify(cohort1).canCommit();
2155             inOrder.verify(cohort2).canCommit();
2156
2157             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2158         }};
2159     }
2160
2161     @Test
2162     public void testCreateSnapshot() throws Exception {
2163         testCreateSnapshot(true, "testCreateSnapshot");
2164     }
2165
2166     @Test
2167     public void testCreateSnapshotWithNonPersistentData() throws Exception {
2168         testCreateSnapshot(false, "testCreateSnapshotWithNonPersistentData");
2169     }
2170
2171     @SuppressWarnings("serial")
2172     public void testCreateSnapshot(final boolean persistent, final String shardActorName) throws Exception{
2173
2174         final AtomicReference<CountDownLatch> latch = new AtomicReference<>(new CountDownLatch(1));
2175
2176         final AtomicReference<Object> savedSnapshot = new AtomicReference<>();
2177         class TestPersistentDataProvider extends DelegatingPersistentDataProvider {
2178             TestPersistentDataProvider(final DataPersistenceProvider delegate) {
2179                 super(delegate);
2180             }
2181
2182             @Override
2183             public void saveSnapshot(final Object o) {
2184                 savedSnapshot.set(o);
2185                 super.saveSnapshot(o);
2186             }
2187         }
2188
2189         dataStoreContextBuilder.persistent(persistent);
2190
2191         new ShardTestKit(getSystem()) {{
2192             class TestShard extends Shard {
2193
2194                 protected TestShard(final ShardIdentifier name, final Map<String, String> peerAddresses,
2195                                     final DatastoreContext datastoreContext, final SchemaContext schemaContext) {
2196                     super(name, peerAddresses, datastoreContext, schemaContext);
2197                     setPersistence(new TestPersistentDataProvider(super.persistence()));
2198                 }
2199
2200                 @Override
2201                 public void handleCommand(final Object message) {
2202                     super.handleCommand(message);
2203
2204                     if (message instanceof SaveSnapshotSuccess || message.equals("commit_snapshot")) {
2205                         latch.get().countDown();
2206                     }
2207                 }
2208
2209                 @Override
2210                 public RaftActorContext getRaftActorContext() {
2211                     return super.getRaftActorContext();
2212                 }
2213             }
2214
2215             final Creator<Shard> creator = new Creator<Shard>() {
2216                 @Override
2217                 public Shard create() throws Exception {
2218                     return new TestShard(shardID, Collections.<String,String>emptyMap(),
2219                             newDatastoreContext(), SCHEMA_CONTEXT);
2220                 }
2221             };
2222
2223             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
2224                     Props.create(new DelegatingShardCreator(creator)), shardActorName);
2225
2226             waitUntilLeader(shard);
2227
2228             writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
2229
2230             final NormalizedNode<?,?> expectedRoot = readStore(shard, YangInstanceIdentifier.builder().build());
2231
2232             // Trigger creation of a snapshot by ensuring
2233             final RaftActorContext raftActorContext = ((TestShard) shard.underlyingActor()).getRaftActorContext();
2234             raftActorContext.getSnapshotManager().capture(mock(ReplicatedLogEntry.class), -1);
2235
2236             assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS));
2237
2238             assertTrue("Invalid saved snapshot " + savedSnapshot.get(),
2239                     savedSnapshot.get() instanceof Snapshot);
2240
2241             verifySnapshot((Snapshot)savedSnapshot.get(), expectedRoot);
2242
2243             latch.set(new CountDownLatch(1));
2244             savedSnapshot.set(null);
2245
2246             raftActorContext.getSnapshotManager().capture(mock(ReplicatedLogEntry.class), -1);
2247
2248             assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS));
2249
2250             assertTrue("Invalid saved snapshot " + savedSnapshot.get(),
2251                     savedSnapshot.get() instanceof Snapshot);
2252
2253             verifySnapshot((Snapshot)savedSnapshot.get(), expectedRoot);
2254
2255             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2256         }
2257
2258         private void verifySnapshot(final Snapshot snapshot, final NormalizedNode<?,?> expectedRoot) {
2259
2260             final NormalizedNode<?, ?> actual = SerializationUtils.deserializeNormalizedNode(snapshot.getState());
2261             assertEquals("Root node", expectedRoot, actual);
2262
2263         }};
2264     }
2265
2266     /**
2267      * This test simply verifies that the applySnapShot logic will work
2268      * @throws ReadFailedException
2269      * @throws DataValidationFailedException
2270      */
2271     @Test
2272     public void testInMemoryDataTreeRestore() throws ReadFailedException, DataValidationFailedException {
2273         final DataTree store = InMemoryDataTreeFactory.getInstance().create();
2274         store.setSchemaContext(SCHEMA_CONTEXT);
2275
2276         final DataTreeModification putTransaction = store.takeSnapshot().newModification();
2277         putTransaction.write(TestModel.TEST_PATH,
2278             ImmutableNodes.containerNode(TestModel.TEST_QNAME));
2279         commitTransaction(store, putTransaction);
2280
2281
2282         final NormalizedNode<?, ?> expected = readStore(store, YangInstanceIdentifier.builder().build());
2283
2284         final DataTreeModification writeTransaction = store.takeSnapshot().newModification();
2285
2286         writeTransaction.delete(YangInstanceIdentifier.builder().build());
2287         writeTransaction.write(YangInstanceIdentifier.builder().build(), expected);
2288
2289         commitTransaction(store, writeTransaction);
2290
2291         final NormalizedNode<?, ?> actual = readStore(store, YangInstanceIdentifier.builder().build());
2292
2293         assertEquals(expected, actual);
2294     }
2295
2296     @Test
2297     public void testRecoveryApplicable(){
2298
2299         final DatastoreContext persistentContext = DatastoreContext.newBuilder().
2300                 shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(true).build();
2301
2302         final Props persistentProps = Shard.props(shardID, Collections.<String, String>emptyMap(),
2303                 persistentContext, SCHEMA_CONTEXT);
2304
2305         final DatastoreContext nonPersistentContext = DatastoreContext.newBuilder().
2306                 shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(false).build();
2307
2308         final Props nonPersistentProps = Shard.props(shardID, Collections.<String, String>emptyMap(),
2309                 nonPersistentContext, SCHEMA_CONTEXT);
2310
2311         new ShardTestKit(getSystem()) {{
2312             final TestActorRef<Shard> shard1 = TestActorRef.create(getSystem(),
2313                     persistentProps, "testPersistence1");
2314
2315             assertTrue("Recovery Applicable", shard1.underlyingActor().persistence().isRecoveryApplicable());
2316
2317             shard1.tell(PoisonPill.getInstance(), ActorRef.noSender());
2318
2319             final TestActorRef<Shard> shard2 = TestActorRef.create(getSystem(),
2320                     nonPersistentProps, "testPersistence2");
2321
2322             assertFalse("Recovery Not Applicable", shard2.underlyingActor().persistence().isRecoveryApplicable());
2323
2324             shard2.tell(PoisonPill.getInstance(), ActorRef.noSender());
2325
2326         }};
2327
2328     }
2329
2330     @Test
2331     public void testOnDatastoreContext() {
2332         new ShardTestKit(getSystem()) {{
2333             dataStoreContextBuilder.persistent(true);
2334
2335             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(), "testOnDatastoreContext");
2336
2337             assertEquals("isRecoveryApplicable", true,
2338                     shard.underlyingActor().persistence().isRecoveryApplicable());
2339
2340             waitUntilLeader(shard);
2341
2342             shard.tell(dataStoreContextBuilder.persistent(false).build(), ActorRef.noSender());
2343
2344             assertEquals("isRecoveryApplicable", false,
2345                     shard.underlyingActor().persistence().isRecoveryApplicable());
2346
2347             shard.tell(dataStoreContextBuilder.persistent(true).build(), ActorRef.noSender());
2348
2349             assertEquals("isRecoveryApplicable", true,
2350                     shard.underlyingActor().persistence().isRecoveryApplicable());
2351
2352             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2353         }};
2354     }
2355
2356     @Test
2357     public void testRegisterRoleChangeListener() throws Exception {
2358         new ShardTestKit(getSystem()) {
2359             {
2360                 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
2361                         newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
2362                         "testRegisterRoleChangeListener");
2363
2364                 waitUntilLeader(shard);
2365
2366                 final TestActorRef<MessageCollectorActor> listener =
2367                         TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
2368
2369                 shard.tell(new RegisterRoleChangeListener(), listener);
2370
2371                 MessageCollectorActor.expectFirstMatching(listener, RegisterRoleChangeListenerReply.class);
2372
2373                 ShardLeaderStateChanged leaderStateChanged = MessageCollectorActor.expectFirstMatching(listener,
2374                         ShardLeaderStateChanged.class);
2375                 assertEquals("getLocalShardDataTree present", true,
2376                         leaderStateChanged.getLocalShardDataTree().isPresent());
2377                 assertSame("getLocalShardDataTree", shard.underlyingActor().getDataStore().getDataTree(),
2378                         leaderStateChanged.getLocalShardDataTree().get());
2379
2380                 MessageCollectorActor.clearMessages(listener);
2381
2382                 // Force a leader change
2383
2384                 shard.tell(new RequestVote(10000, "member2", 50, 50), getRef());
2385
2386                 leaderStateChanged = MessageCollectorActor.expectFirstMatching(listener,
2387                         ShardLeaderStateChanged.class);
2388                 assertEquals("getLocalShardDataTree present", false,
2389                         leaderStateChanged.getLocalShardDataTree().isPresent());
2390
2391                 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2392             }
2393         };
2394     }
2395
2396     @Test
2397     public void testFollowerInitialSyncStatus() throws Exception {
2398         final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
2399                 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
2400                 "testFollowerInitialSyncStatus");
2401
2402         shard.underlyingActor().onReceiveCommand(new FollowerInitialSyncUpStatus(false, "member-1-shard-inventory-operational"));
2403
2404         assertEquals(false, shard.underlyingActor().getShardMBean().getFollowerInitialSyncStatus());
2405
2406         shard.underlyingActor().onReceiveCommand(new FollowerInitialSyncUpStatus(true, "member-1-shard-inventory-operational"));
2407
2408         assertEquals(true, shard.underlyingActor().getShardMBean().getFollowerInitialSyncStatus());
2409
2410         shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2411     }
2412
2413     private static void commitTransaction(final DataTree store, final DataTreeModification modification) throws DataValidationFailedException {
2414         modification.ready();
2415         store.validate(modification);
2416         store.commit(store.prepare(modification));
2417     }
2418 }