Bug 3206: CDS - issues with direct commit
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / ShardTest.java
1 package org.opendaylight.controller.cluster.datastore;
2
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertFalse;
5 import static org.junit.Assert.assertNotNull;
6 import static org.junit.Assert.assertNull;
7 import static org.junit.Assert.assertSame;
8 import static org.junit.Assert.assertTrue;
9 import static org.mockito.Mockito.doReturn;
10 import static org.mockito.Mockito.inOrder;
11 import static org.mockito.Mockito.mock;
12 import static org.mockito.Mockito.reset;
13 import static org.opendaylight.controller.cluster.datastore.DataStoreVersions.CURRENT_VERSION;
14 import akka.actor.ActorRef;
15 import akka.actor.ActorSelection;
16 import akka.actor.PoisonPill;
17 import akka.actor.Props;
18 import akka.dispatch.Dispatchers;
19 import akka.dispatch.OnComplete;
20 import akka.japi.Creator;
21 import akka.pattern.Patterns;
22 import akka.persistence.SaveSnapshotSuccess;
23 import akka.testkit.TestActorRef;
24 import akka.util.Timeout;
25 import com.google.common.base.Function;
26 import com.google.common.base.Optional;
27 import com.google.common.util.concurrent.Futures;
28 import com.google.common.util.concurrent.ListenableFuture;
29 import com.google.common.util.concurrent.Uninterruptibles;
30 import java.io.IOException;
31 import java.util.Collections;
32 import java.util.HashSet;
33 import java.util.Map;
34 import java.util.Set;
35 import java.util.concurrent.CountDownLatch;
36 import java.util.concurrent.TimeUnit;
37 import java.util.concurrent.atomic.AtomicBoolean;
38 import java.util.concurrent.atomic.AtomicReference;
39 import org.junit.Test;
40 import org.mockito.InOrder;
41 import org.opendaylight.controller.cluster.DataPersistenceProvider;
42 import org.opendaylight.controller.cluster.DelegatingPersistentDataProvider;
43 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
44 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
45 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
46 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
47 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
48 import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
49 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
50 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
51 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
52 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
53 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
54 import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
55 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
56 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
57 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
58 import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction;
59 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
60 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
61 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
62 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener;
63 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListenerReply;
64 import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
65 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
66 import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
67 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
68 import org.opendaylight.controller.cluster.datastore.modification.Modification;
69 import org.opendaylight.controller.cluster.datastore.modification.ModificationPayload;
70 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
71 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
72 import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener;
73 import org.opendaylight.controller.cluster.datastore.utils.MockDataTreeChangeListener;
74 import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
75 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
76 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListenerReply;
77 import org.opendaylight.controller.cluster.raft.RaftActorContext;
78 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
79 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
80 import org.opendaylight.controller.cluster.raft.Snapshot;
81 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
82 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
83 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
84 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
85 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
86 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
87 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
88 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
89 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
90 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
91 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
92 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
93 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
94 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
95 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
96 import org.opendaylight.controller.protobuff.messages.cohort3pc.ThreePhaseCommitCohortMessages;
97 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
98 import org.opendaylight.yangtools.yang.common.QName;
99 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
100 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
101 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
102 import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
103 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
104 import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
105 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
106 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
107 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
108 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateNode;
109 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateTip;
110 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidates;
111 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
112 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
113 import org.opendaylight.yangtools.yang.data.api.schema.tree.ModificationType;
114 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
115 import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory;
116 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
117 import scala.concurrent.Await;
118 import scala.concurrent.Future;
119 import scala.concurrent.duration.FiniteDuration;
120
121 public class ShardTest extends AbstractShardTest {
122     private static final QName CARS_QNAME = QName.create("urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:test:cars", "2014-03-13", "cars");
123
124     @Test
125     public void testRegisterChangeListener() throws Exception {
126         new ShardTestKit(getSystem()) {{
127             TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
128                     newShardProps(),  "testRegisterChangeListener");
129
130             waitUntilLeader(shard);
131
132             shard.tell(new UpdateSchemaContext(SchemaContextHelper.full()), ActorRef.noSender());
133
134             MockDataChangeListener listener = new MockDataChangeListener(1);
135             ActorRef dclActor = getSystem().actorOf(DataChangeListener.props(listener),
136                     "testRegisterChangeListener-DataChangeListener");
137
138             shard.tell(new RegisterChangeListener(TestModel.TEST_PATH,
139                     dclActor, AsyncDataBroker.DataChangeScope.BASE), getRef());
140
141             RegisterChangeListenerReply reply = expectMsgClass(duration("3 seconds"),
142                     RegisterChangeListenerReply.class);
143             String replyPath = reply.getListenerRegistrationPath().toString();
144             assertTrue("Incorrect reply path: " + replyPath, replyPath.matches(
145                     "akka:\\/\\/test\\/user\\/testRegisterChangeListener\\/\\$.*"));
146
147             YangInstanceIdentifier path = TestModel.TEST_PATH;
148             writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
149
150             listener.waitForChangeEvents(path);
151
152             dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
153             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
154         }};
155     }
156
157     @SuppressWarnings("serial")
158     @Test
159     public void testChangeListenerNotifiedWhenNotTheLeaderOnRegistration() throws Exception {
160         // This test tests the timing window in which a change listener is registered before the
161         // shard becomes the leader. We verify that the listener is registered and notified of the
162         // existing data when the shard becomes the leader.
163         new ShardTestKit(getSystem()) {{
164             // For this test, we want to send the RegisterChangeListener message after the shard
165             // has recovered from persistence and before it becomes the leader. So we subclass
166             // Shard to override onReceiveCommand and, when the first ElectionTimeout is received,
167             // we know that the shard has been initialized to a follower and has started the
168             // election process. The following 2 CountDownLatches are used to coordinate the
169             // ElectionTimeout with the sending of the RegisterChangeListener message.
170             final CountDownLatch onFirstElectionTimeout = new CountDownLatch(1);
171             final CountDownLatch onChangeListenerRegistered = new CountDownLatch(1);
172             Creator<Shard> creator = new Creator<Shard>() {
173                 boolean firstElectionTimeout = true;
174
175                 @Override
176                 public Shard create() throws Exception {
177                     // Use a non persistent provider because this test actually invokes persist on the journal
178                     // this will cause all other messages to not be queued properly after that.
179                     // The basic issue is that you cannot use TestActorRef with a persistent actor (at least when
180                     // it does do a persist)
181                     return new Shard(shardID, Collections.<String,String>emptyMap(),
182                             dataStoreContextBuilder.persistent(false).build(), SCHEMA_CONTEXT) {
183                         @Override
184                         public void onReceiveCommand(final Object message) throws Exception {
185                             if(message instanceof ElectionTimeout && firstElectionTimeout) {
186                                 // Got the first ElectionTimeout. We don't forward it to the
187                                 // base Shard yet until we've sent the RegisterChangeListener
188                                 // message. So we signal the onFirstElectionTimeout latch to tell
189                                 // the main thread to send the RegisterChangeListener message and
190                                 // start a thread to wait on the onChangeListenerRegistered latch,
191                                 // which the main thread signals after it has sent the message.
192                                 // After the onChangeListenerRegistered is triggered, we send the
193                                 // original ElectionTimeout message to proceed with the election.
194                                 firstElectionTimeout = false;
195                                 final ActorRef self = getSelf();
196                                 new Thread() {
197                                     @Override
198                                     public void run() {
199                                         Uninterruptibles.awaitUninterruptibly(
200                                                 onChangeListenerRegistered, 5, TimeUnit.SECONDS);
201                                         self.tell(message, self);
202                                     }
203                                 }.start();
204
205                                 onFirstElectionTimeout.countDown();
206                             } else {
207                                 super.onReceiveCommand(message);
208                             }
209                         }
210                     };
211                 }
212             };
213
214             MockDataChangeListener listener = new MockDataChangeListener(1);
215             ActorRef dclActor = getSystem().actorOf(DataChangeListener.props(listener),
216                     "testRegisterChangeListenerWhenNotLeaderInitially-DataChangeListener");
217
218             TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
219                     Props.create(new DelegatingShardCreator(creator)),
220                     "testRegisterChangeListenerWhenNotLeaderInitially");
221
222             // Write initial data into the in-memory store.
223             YangInstanceIdentifier path = TestModel.TEST_PATH;
224             writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
225
226             // Wait until the shard receives the first ElectionTimeout message.
227             assertEquals("Got first ElectionTimeout", true,
228                     onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
229
230             // Now send the RegisterChangeListener and wait for the reply.
231             shard.tell(new RegisterChangeListener(path, dclActor,
232                     AsyncDataBroker.DataChangeScope.SUBTREE), getRef());
233
234             RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
235                     RegisterChangeListenerReply.class);
236             assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
237
238             // Sanity check - verify the shard is not the leader yet.
239             shard.tell(new FindLeader(), getRef());
240             FindLeaderReply findLeadeReply =
241                     expectMsgClass(duration("5 seconds"), FindLeaderReply.class);
242             assertNull("Expected the shard not to be the leader", findLeadeReply.getLeaderActor());
243
244             // Signal the onChangeListenerRegistered latch to tell the thread above to proceed
245             // with the election process.
246             onChangeListenerRegistered.countDown();
247
248             // Wait for the shard to become the leader and notify our listener with the existing
249             // data in the store.
250             listener.waitForChangeEvents(path);
251
252             dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
253             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
254         }};
255     }
256
257     @Test
258     public void testRegisterDataTreeChangeListener() throws Exception {
259         new ShardTestKit(getSystem()) {{
260             TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
261                     newShardProps(), "testRegisterDataTreeChangeListener");
262
263             waitUntilLeader(shard);
264
265             shard.tell(new UpdateSchemaContext(SchemaContextHelper.full()), ActorRef.noSender());
266
267             MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
268             ActorRef dclActor = getSystem().actorOf(DataTreeChangeListenerActor.props(listener),
269                     "testRegisterDataTreeChangeListener-DataTreeChangeListener");
270
271             shard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor), getRef());
272
273             RegisterDataTreeChangeListenerReply reply = expectMsgClass(duration("3 seconds"),
274                     RegisterDataTreeChangeListenerReply.class);
275             String replyPath = reply.getListenerRegistrationPath().toString();
276             assertTrue("Incorrect reply path: " + replyPath, replyPath.matches(
277                     "akka:\\/\\/test\\/user\\/testRegisterDataTreeChangeListener\\/\\$.*"));
278
279             YangInstanceIdentifier path = TestModel.TEST_PATH;
280             writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
281
282             listener.waitForChangeEvents();
283
284             dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
285             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
286         }};
287     }
288
289     @SuppressWarnings("serial")
290     @Test
291     public void testDataTreeChangeListenerNotifiedWhenNotTheLeaderOnRegistration() throws Exception {
292         new ShardTestKit(getSystem()) {{
293             final CountDownLatch onFirstElectionTimeout = new CountDownLatch(1);
294             final CountDownLatch onChangeListenerRegistered = new CountDownLatch(1);
295             Creator<Shard> creator = new Creator<Shard>() {
296                 boolean firstElectionTimeout = true;
297
298                 @Override
299                 public Shard create() throws Exception {
300                     return new Shard(shardID, Collections.<String,String>emptyMap(),
301                             dataStoreContextBuilder.persistent(false).build(), SCHEMA_CONTEXT) {
302                         @Override
303                         public void onReceiveCommand(final Object message) throws Exception {
304                             if(message instanceof ElectionTimeout && firstElectionTimeout) {
305                                 firstElectionTimeout = false;
306                                 final ActorRef self = getSelf();
307                                 new Thread() {
308                                     @Override
309                                     public void run() {
310                                         Uninterruptibles.awaitUninterruptibly(
311                                                 onChangeListenerRegistered, 5, TimeUnit.SECONDS);
312                                         self.tell(message, self);
313                                     }
314                                 }.start();
315
316                                 onFirstElectionTimeout.countDown();
317                             } else {
318                                 super.onReceiveCommand(message);
319                             }
320                         }
321                     };
322                 }
323             };
324
325             MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
326             ActorRef dclActor = getSystem().actorOf(DataTreeChangeListenerActor.props(listener),
327                     "testDataTreeChangeListenerNotifiedWhenNotTheLeaderOnRegistration-DataChangeListener");
328
329             TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
330                     Props.create(new DelegatingShardCreator(creator)),
331                     "testDataTreeChangeListenerNotifiedWhenNotTheLeaderOnRegistration");
332
333             YangInstanceIdentifier path = TestModel.TEST_PATH;
334             writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
335
336             assertEquals("Got first ElectionTimeout", true,
337                     onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
338
339             shard.tell(new RegisterDataTreeChangeListener(path, dclActor), getRef());
340             RegisterDataTreeChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
341                     RegisterDataTreeChangeListenerReply.class);
342             assertNotNull("getListenerRegistratioznPath", reply.getListenerRegistrationPath());
343
344             shard.tell(new FindLeader(), getRef());
345             FindLeaderReply findLeadeReply =
346                     expectMsgClass(duration("5 seconds"), FindLeaderReply.class);
347             assertNull("Expected the shard not to be the leader", findLeadeReply.getLeaderActor());
348
349             writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
350
351             onChangeListenerRegistered.countDown();
352
353             // TODO: investigate why we do not receive data chage events
354             listener.waitForChangeEvents();
355
356             dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
357             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
358         }};
359     }
360
361     @Test
362     public void testCreateTransaction(){
363         new ShardTestKit(getSystem()) {{
364             ActorRef shard = getSystem().actorOf(newShardProps(), "testCreateTransaction");
365
366             waitUntilLeader(shard);
367
368             shard.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
369
370             shard.tell(new CreateTransaction("txn-1",
371                     TransactionType.READ_ONLY.ordinal() ).toSerializable(), getRef());
372
373             CreateTransactionReply reply = expectMsgClass(duration("3 seconds"),
374                     CreateTransactionReply.class);
375
376             String path = reply.getTransactionActorPath().toString();
377             assertTrue("Unexpected transaction path " + path,
378                     path.contains("akka://test/user/testCreateTransaction/shard-txn-1"));
379
380             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
381         }};
382     }
383
384     @Test
385     public void testCreateTransactionOnChain(){
386         new ShardTestKit(getSystem()) {{
387             final ActorRef shard = getSystem().actorOf(newShardProps(), "testCreateTransactionOnChain");
388
389             waitUntilLeader(shard);
390
391             shard.tell(new CreateTransaction("txn-1",
392                     TransactionType.READ_ONLY.ordinal() , "foobar").toSerializable(),
393                     getRef());
394
395             CreateTransactionReply reply = expectMsgClass(duration("3 seconds"),
396                     CreateTransactionReply.class);
397
398             String path = reply.getTransactionActorPath().toString();
399             assertTrue("Unexpected transaction path " + path,
400                     path.contains("akka://test/user/testCreateTransactionOnChain/shard-txn-1"));
401
402             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
403         }};
404     }
405
406     @SuppressWarnings("serial")
407     @Test
408     public void testPeerAddressResolved() throws Exception {
409         new ShardTestKit(getSystem()) {{
410             final CountDownLatch recoveryComplete = new CountDownLatch(1);
411             class TestShard extends Shard {
412                 TestShard() {
413                     super(shardID, Collections.<String, String>singletonMap(shardID.toString(), null),
414                             newDatastoreContext(), SCHEMA_CONTEXT);
415                 }
416
417                 Map<String, String> getPeerAddresses() {
418                     return getRaftActorContext().getPeerAddresses();
419                 }
420
421                 @Override
422                 protected void onRecoveryComplete() {
423                     try {
424                         super.onRecoveryComplete();
425                     } finally {
426                         recoveryComplete.countDown();
427                     }
428                 }
429             }
430
431             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
432                     Props.create(new DelegatingShardCreator(new Creator<Shard>() {
433                         @Override
434                         public TestShard create() throws Exception {
435                             return new TestShard();
436                         }
437                     })), "testPeerAddressResolved");
438
439             //waitUntilLeader(shard);
440             assertEquals("Recovery complete", true,
441                     Uninterruptibles.awaitUninterruptibly(recoveryComplete, 5, TimeUnit.SECONDS));
442
443             String address = "akka://foobar";
444             shard.underlyingActor().onReceiveCommand(new PeerAddressResolved(shardID.toString(), address));
445
446             assertEquals("getPeerAddresses", address,
447                     ((TestShard)shard.underlyingActor()).getPeerAddresses().get(shardID.toString()));
448
449             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
450         }};
451     }
452
453     @Test
454     public void testApplySnapshot() throws Exception {
455         TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(),
456                 "testApplySnapshot");
457
458         DataTree store = InMemoryDataTreeFactory.getInstance().create();
459         store.setSchemaContext(SCHEMA_CONTEXT);
460
461         writeToStore(store, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
462
463         YangInstanceIdentifier root = YangInstanceIdentifier.builder().build();
464         NormalizedNode<?,?> expected = readStore(store, root);
465
466         ApplySnapshot applySnapshot = new ApplySnapshot(Snapshot.create(
467                 SerializationUtils.serializeNormalizedNode(expected),
468                 Collections.<ReplicatedLogEntry>emptyList(), 1, 2, 3, 4));
469
470         shard.underlyingActor().onReceiveCommand(applySnapshot);
471
472         NormalizedNode<?,?> actual = readStore(shard, root);
473
474         assertEquals("Root node", expected, actual);
475
476         shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
477     }
478
479     @Test
480     public void testApplyState() throws Exception {
481
482         TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(), "testApplyState");
483
484         NormalizedNode<?, ?> node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
485
486         ApplyState applyState = new ApplyState(null, "test", new ReplicatedLogImplEntry(1, 2,
487                 newModificationPayload(new WriteModification(TestModel.TEST_PATH, node))));
488
489         shard.underlyingActor().onReceiveCommand(applyState);
490
491         NormalizedNode<?,?> actual = readStore(shard, TestModel.TEST_PATH);
492         assertEquals("Applied state", node, actual);
493
494         shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
495     }
496
497     @Test
498     public void testApplyStateWithCandidatePayload() throws Exception {
499
500         TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(), "testApplyState");
501
502         NormalizedNode<?, ?> node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
503         DataTreeCandidate candidate = DataTreeCandidates.fromNormalizedNode(TestModel.TEST_PATH, node);
504
505         ApplyState applyState = new ApplyState(null, "test", new ReplicatedLogImplEntry(1, 2,
506                 DataTreeCandidatePayload.create(candidate)));
507
508         shard.underlyingActor().onReceiveCommand(applyState);
509
510         NormalizedNode<?,?> actual = readStore(shard, TestModel.TEST_PATH);
511         assertEquals("Applied state", node, actual);
512
513         shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
514     }
515
516     DataTree setupInMemorySnapshotStore() throws DataValidationFailedException {
517         DataTree testStore = InMemoryDataTreeFactory.getInstance().create();
518         testStore.setSchemaContext(SCHEMA_CONTEXT);
519
520         writeToStore(testStore, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
521
522         NormalizedNode<?, ?> root = readStore(testStore, YangInstanceIdentifier.builder().build());
523
524         InMemorySnapshotStore.addSnapshot(shardID.toString(), Snapshot.create(
525                 SerializationUtils.serializeNormalizedNode(root),
526                 Collections.<ReplicatedLogEntry>emptyList(), 0, 1, -1, -1));
527         return testStore;
528     }
529
530     private static DataTreeCandidatePayload payloadForModification(DataTree source, DataTreeModification mod) throws DataValidationFailedException {
531         source.validate(mod);
532         final DataTreeCandidate candidate = source.prepare(mod);
533         source.commit(candidate);
534         return DataTreeCandidatePayload.create(candidate);
535     }
536
537     @Test
538     public void testDataTreeCandidateRecovery() throws Exception {
539         // Set up the InMemorySnapshotStore.
540         final DataTree source = setupInMemorySnapshotStore();
541
542         final DataTreeModification writeMod = source.takeSnapshot().newModification();
543         writeMod.write(TestModel.OUTER_LIST_PATH, ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
544
545         // Set up the InMemoryJournal.
546         InMemoryJournal.addEntry(shardID.toString(), 0, new ReplicatedLogImplEntry(0, 1, payloadForModification(source, writeMod)));
547
548         int nListEntries = 16;
549         Set<Integer> listEntryKeys = new HashSet<>();
550
551         // Add some ModificationPayload entries
552         for (int i = 1; i <= nListEntries; i++) {
553             listEntryKeys.add(Integer.valueOf(i));
554
555             YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
556                     .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
557
558             final DataTreeModification mod = source.takeSnapshot().newModification();
559             mod.merge(path, ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i));
560
561             InMemoryJournal.addEntry(shardID.toString(), i, new ReplicatedLogImplEntry(i, 1,
562                 payloadForModification(source, mod)));
563         }
564
565         InMemoryJournal.addEntry(shardID.toString(), nListEntries + 1,
566                 new ApplyJournalEntries(nListEntries));
567
568         testRecovery(listEntryKeys);
569     }
570
571     @Test
572     public void testModicationRecovery() throws Exception {
573
574         // Set up the InMemorySnapshotStore.
575         setupInMemorySnapshotStore();
576
577         // Set up the InMemoryJournal.
578
579         InMemoryJournal.addEntry(shardID.toString(), 0, new ReplicatedLogImplEntry(0, 1, newModificationPayload(
580                   new WriteModification(TestModel.OUTER_LIST_PATH,
581                           ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build()))));
582
583         int nListEntries = 16;
584         Set<Integer> listEntryKeys = new HashSet<>();
585
586         // Add some ModificationPayload entries
587         for(int i = 1; i <= nListEntries; i++) {
588             listEntryKeys.add(Integer.valueOf(i));
589             YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
590                     .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
591             Modification mod = new MergeModification(path,
592                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i));
593             InMemoryJournal.addEntry(shardID.toString(), i, new ReplicatedLogImplEntry(i, 1,
594                     newModificationPayload(mod)));
595         }
596
597         InMemoryJournal.addEntry(shardID.toString(), nListEntries + 1,
598                 new ApplyJournalEntries(nListEntries));
599
600         testRecovery(listEntryKeys);
601     }
602
603     private static ModificationPayload newModificationPayload(final Modification... mods) throws IOException {
604         MutableCompositeModification compMod = new MutableCompositeModification();
605         for(Modification mod: mods) {
606             compMod.addModification(mod);
607         }
608
609         return new ModificationPayload(compMod);
610     }
611
612     @Test
613     public void testConcurrentThreePhaseCommits() throws Throwable {
614         new ShardTestKit(getSystem()) {{
615             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
616                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
617                     "testConcurrentThreePhaseCommits");
618
619             waitUntilLeader(shard);
620
621          // Setup 3 simulated transactions with mock cohorts backed by real cohorts.
622
623             ShardDataTree dataStore = shard.underlyingActor().getDataStore();
624
625             String transactionID1 = "tx1";
626             MutableCompositeModification modification1 = new MutableCompositeModification();
627             ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
628                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
629
630             String transactionID2 = "tx2";
631             MutableCompositeModification modification2 = new MutableCompositeModification();
632             ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
633                     TestModel.OUTER_LIST_PATH,
634                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
635                     modification2);
636
637             String transactionID3 = "tx3";
638             MutableCompositeModification modification3 = new MutableCompositeModification();
639             ShardDataTreeCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
640                     YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
641                         .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
642                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1),
643                     modification3);
644
645             long timeoutSec = 5;
646             final FiniteDuration duration = FiniteDuration.create(timeoutSec, TimeUnit.SECONDS);
647             final Timeout timeout = new Timeout(duration);
648
649             // Simulate the ForwardedReadyTransaction message for the first Tx that would be sent
650             // by the ShardTransaction.
651
652             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
653                     cohort1, modification1, true, false), getRef());
654             ReadyTransactionReply readyReply = ReadyTransactionReply.fromSerializable(
655                     expectMsgClass(duration, ReadyTransactionReply.class));
656             assertEquals("Cohort path", shard.path().toString(), readyReply.getCohortPath());
657
658             // Send the CanCommitTransaction message for the first Tx.
659
660             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
661             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
662                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
663             assertEquals("Can commit", true, canCommitReply.getCanCommit());
664
665             // Send the ForwardedReadyTransaction for the next 2 Tx's.
666
667             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
668                     cohort2, modification2, true, false), getRef());
669             expectMsgClass(duration, ReadyTransactionReply.class);
670
671             shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
672                     cohort3, modification3, true, false), getRef());
673             expectMsgClass(duration, ReadyTransactionReply.class);
674
675             // Send the CanCommitTransaction message for the next 2 Tx's. These should get queued and
676             // processed after the first Tx completes.
677
678             Future<Object> canCommitFuture1 = Patterns.ask(shard,
679                     new CanCommitTransaction(transactionID2).toSerializable(), timeout);
680
681             Future<Object> canCommitFuture2 = Patterns.ask(shard,
682                     new CanCommitTransaction(transactionID3).toSerializable(), timeout);
683
684             // Send the CommitTransaction message for the first Tx. After it completes, it should
685             // trigger the 2nd Tx to proceed which should in turn then trigger the 3rd.
686
687             shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
688             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
689
690             // Wait for the next 2 Tx's to complete.
691
692             final AtomicReference<Throwable> caughtEx = new AtomicReference<>();
693             final CountDownLatch commitLatch = new CountDownLatch(2);
694
695             class OnFutureComplete extends OnComplete<Object> {
696                 private final Class<?> expRespType;
697
698                 OnFutureComplete(final Class<?> expRespType) {
699                     this.expRespType = expRespType;
700                 }
701
702                 @Override
703                 public void onComplete(final Throwable error, final Object resp) {
704                     if(error != null) {
705                         caughtEx.set(new AssertionError(getClass().getSimpleName() + " failure", error));
706                     } else {
707                         try {
708                             assertEquals("Commit response type", expRespType, resp.getClass());
709                             onSuccess(resp);
710                         } catch (Exception e) {
711                             caughtEx.set(e);
712                         }
713                     }
714                 }
715
716                 void onSuccess(final Object resp) throws Exception {
717                 }
718             }
719
720             class OnCommitFutureComplete extends OnFutureComplete {
721                 OnCommitFutureComplete() {
722                     super(CommitTransactionReply.SERIALIZABLE_CLASS);
723                 }
724
725                 @Override
726                 public void onComplete(final Throwable error, final Object resp) {
727                     super.onComplete(error, resp);
728                     commitLatch.countDown();
729                 }
730             }
731
732             class OnCanCommitFutureComplete extends OnFutureComplete {
733                 private final String transactionID;
734
735                 OnCanCommitFutureComplete(final String transactionID) {
736                     super(CanCommitTransactionReply.SERIALIZABLE_CLASS);
737                     this.transactionID = transactionID;
738                 }
739
740                 @Override
741                 void onSuccess(final Object resp) throws Exception {
742                     CanCommitTransactionReply canCommitReply =
743                             CanCommitTransactionReply.fromSerializable(resp);
744                     assertEquals("Can commit", true, canCommitReply.getCanCommit());
745
746                     Future<Object> commitFuture = Patterns.ask(shard,
747                             new CommitTransaction(transactionID).toSerializable(), timeout);
748                     commitFuture.onComplete(new OnCommitFutureComplete(), getSystem().dispatcher());
749                 }
750             }
751
752             canCommitFuture1.onComplete(new OnCanCommitFutureComplete(transactionID2),
753                     getSystem().dispatcher());
754
755             canCommitFuture2.onComplete(new OnCanCommitFutureComplete(transactionID3),
756                     getSystem().dispatcher());
757
758             boolean done = commitLatch.await(timeoutSec, TimeUnit.SECONDS);
759
760             if(caughtEx.get() != null) {
761                 throw caughtEx.get();
762             }
763
764             assertEquals("Commits complete", true, done);
765
766             InOrder inOrder = inOrder(cohort1, cohort2, cohort3);
767             inOrder.verify(cohort1).canCommit();
768             inOrder.verify(cohort1).preCommit();
769             inOrder.verify(cohort1).commit();
770             inOrder.verify(cohort2).canCommit();
771             inOrder.verify(cohort2).preCommit();
772             inOrder.verify(cohort2).commit();
773             inOrder.verify(cohort3).canCommit();
774             inOrder.verify(cohort3).preCommit();
775             inOrder.verify(cohort3).commit();
776
777             // Verify data in the data store.
778
779             verifyOuterListEntry(shard, 1);
780
781             verifyLastApplied(shard, 2);
782
783             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
784         }};
785     }
786
787     private static BatchedModifications newBatchedModifications(String transactionID, YangInstanceIdentifier path,
788             NormalizedNode<?, ?> data, boolean ready, boolean doCommitOnReady) {
789         return newBatchedModifications(transactionID, null, path, data, ready, doCommitOnReady);
790     }
791
792     private static BatchedModifications newBatchedModifications(String transactionID, String transactionChainID,
793             YangInstanceIdentifier path, NormalizedNode<?, ?> data, boolean ready, boolean doCommitOnReady) {
794         BatchedModifications batched = new BatchedModifications(transactionID, CURRENT_VERSION, transactionChainID);
795         batched.addModification(new WriteModification(path, data));
796         batched.setReady(ready);
797         batched.setDoCommitOnReady(doCommitOnReady);
798         return batched;
799     }
800
801     @Test
802     public void testBatchedModificationsWithNoCommitOnReady() throws Throwable {
803         new ShardTestKit(getSystem()) {{
804             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
805                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
806                     "testBatchedModificationsWithNoCommitOnReady");
807
808             waitUntilLeader(shard);
809
810             final String transactionID = "tx";
811             FiniteDuration duration = duration("5 seconds");
812
813             final AtomicReference<ShardDataTreeCohort> mockCohort = new AtomicReference<>();
814             ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() {
815                 @Override
816                 public ShardDataTreeCohort decorate(String txID, ShardDataTreeCohort actual) {
817                     if(mockCohort.get() == null) {
818                         mockCohort.set(createDelegatingMockCohort("cohort", actual));
819                     }
820
821                     return mockCohort.get();
822                 }
823             };
824
825             shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator);
826
827             // Send a BatchedModifications to start a transaction.
828
829             shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH,
830                     ImmutableNodes.containerNode(TestModel.TEST_QNAME), false, false), getRef());
831             expectMsgClass(duration, BatchedModificationsReply.class);
832
833             // Send a couple more BatchedModifications.
834
835             shard.tell(newBatchedModifications(transactionID, TestModel.OUTER_LIST_PATH,
836                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), false, false), getRef());
837             expectMsgClass(duration, BatchedModificationsReply.class);
838
839             shard.tell(newBatchedModifications(transactionID, YangInstanceIdentifier.builder(
840                     TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
841                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true, false), getRef());
842             expectMsgClass(duration, ReadyTransactionReply.class);
843
844             // Send the CanCommitTransaction message.
845
846             shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
847             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
848                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
849             assertEquals("Can commit", true, canCommitReply.getCanCommit());
850
851             // Send the CanCommitTransaction message.
852
853             shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
854             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
855
856             InOrder inOrder = inOrder(mockCohort.get());
857             inOrder.verify(mockCohort.get()).canCommit();
858             inOrder.verify(mockCohort.get()).preCommit();
859             inOrder.verify(mockCohort.get()).commit();
860
861             // Verify data in the data store.
862
863             verifyOuterListEntry(shard, 1);
864
865             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
866         }};
867     }
868
869     @Test
870     public void testBatchedModificationsWithCommitOnReady() throws Throwable {
871         new ShardTestKit(getSystem()) {{
872             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
873                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
874                     "testBatchedModificationsWithCommitOnReady");
875
876             waitUntilLeader(shard);
877
878             final String transactionID = "tx";
879             FiniteDuration duration = duration("5 seconds");
880
881             final AtomicReference<ShardDataTreeCohort> mockCohort = new AtomicReference<>();
882             ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() {
883                 @Override
884                 public ShardDataTreeCohort decorate(String txID, ShardDataTreeCohort actual) {
885                     if(mockCohort.get() == null) {
886                         mockCohort.set(createDelegatingMockCohort("cohort", actual));
887                     }
888
889                     return mockCohort.get();
890                 }
891             };
892
893             shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator);
894
895             // Send a BatchedModifications to start a transaction.
896
897             shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH,
898                     ImmutableNodes.containerNode(TestModel.TEST_QNAME), false, false), getRef());
899             expectMsgClass(duration, BatchedModificationsReply.class);
900
901             // Send a couple more BatchedModifications.
902
903             shard.tell(newBatchedModifications(transactionID, TestModel.OUTER_LIST_PATH,
904                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), false, false), getRef());
905             expectMsgClass(duration, BatchedModificationsReply.class);
906
907             shard.tell(newBatchedModifications(transactionID, YangInstanceIdentifier.builder(
908                     TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
909                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true, true), getRef());
910
911             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
912
913             InOrder inOrder = inOrder(mockCohort.get());
914             inOrder.verify(mockCohort.get()).canCommit();
915             inOrder.verify(mockCohort.get()).preCommit();
916             inOrder.verify(mockCohort.get()).commit();
917
918             // Verify data in the data store.
919
920             verifyOuterListEntry(shard, 1);
921
922             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
923         }};
924     }
925
926     @SuppressWarnings("unchecked")
927     private static void verifyOuterListEntry(final TestActorRef<Shard> shard, Object expIDValue) throws Exception {
928         NormalizedNode<?, ?> outerList = readStore(shard, TestModel.OUTER_LIST_PATH);
929         assertNotNull(TestModel.OUTER_LIST_QNAME.getLocalName() + " not found", outerList);
930         assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " value is not Iterable",
931                 outerList.getValue() instanceof Iterable);
932         Object entry = ((Iterable<Object>)outerList.getValue()).iterator().next();
933         assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " entry is not MapEntryNode",
934                 entry instanceof MapEntryNode);
935         MapEntryNode mapEntry = (MapEntryNode)entry;
936         Optional<DataContainerChild<? extends PathArgument, ?>> idLeaf =
937                 mapEntry.getChild(new YangInstanceIdentifier.NodeIdentifier(TestModel.ID_QNAME));
938         assertTrue("Missing leaf " + TestModel.ID_QNAME.getLocalName(), idLeaf.isPresent());
939         assertEquals(TestModel.ID_QNAME.getLocalName() + " value", expIDValue, idLeaf.get().getValue());
940     }
941
942     @Test
943     public void testBatchedModificationsOnTransactionChain() throws Throwable {
944         new ShardTestKit(getSystem()) {{
945             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
946                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
947                     "testBatchedModificationsOnTransactionChain");
948
949             waitUntilLeader(shard);
950
951             String transactionChainID = "txChain";
952             String transactionID1 = "tx1";
953             String transactionID2 = "tx2";
954
955             FiniteDuration duration = duration("5 seconds");
956
957             // Send a BatchedModifications to start a chained write transaction and ready it.
958
959             ContainerNode containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
960             YangInstanceIdentifier path = TestModel.TEST_PATH;
961             shard.tell(newBatchedModifications(transactionID1, transactionChainID, path,
962                     containerNode, true, false), getRef());
963             expectMsgClass(duration, ReadyTransactionReply.class);
964
965             // Create a read Tx on the same chain.
966
967             shard.tell(new CreateTransaction(transactionID2, TransactionType.READ_ONLY.ordinal() ,
968                     transactionChainID).toSerializable(), getRef());
969
970             CreateTransactionReply createReply = expectMsgClass(duration("3 seconds"), CreateTransactionReply.class);
971
972             getSystem().actorSelection(createReply.getTransactionActorPath()).tell(new ReadData(path), getRef());
973             ReadDataReply readReply = expectMsgClass(duration("3 seconds"), ReadDataReply.class);
974             assertEquals("Read node", containerNode, readReply.getNormalizedNode());
975
976             // Commit the write transaction.
977
978             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
979             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
980                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
981             assertEquals("Can commit", true, canCommitReply.getCanCommit());
982
983             shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
984             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
985
986             // Verify data in the data store.
987
988             NormalizedNode<?, ?> actualNode = readStore(shard, path);
989             assertEquals("Stored node", containerNode, actualNode);
990
991             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
992         }};
993     }
994
995     @Test
996     public void testOnBatchedModificationsWhenNotLeader() {
997         final AtomicBoolean overrideLeaderCalls = new AtomicBoolean();
998         new ShardTestKit(getSystem()) {{
999             Creator<Shard> creator = new Creator<Shard>() {
1000                 private static final long serialVersionUID = 1L;
1001
1002                 @Override
1003                 public Shard create() throws Exception {
1004                     return new Shard(shardID, Collections.<String,String>emptyMap(),
1005                             newDatastoreContext(), SCHEMA_CONTEXT) {
1006                         @Override
1007                         protected boolean isLeader() {
1008                             return overrideLeaderCalls.get() ? false : super.isLeader();
1009                         }
1010
1011                         @Override
1012                         protected ActorSelection getLeader() {
1013                             return overrideLeaderCalls.get() ? getSystem().actorSelection(getRef().path()) :
1014                                 super.getLeader();
1015                         }
1016                     };
1017                 }
1018             };
1019
1020             TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1021                     Props.create(new DelegatingShardCreator(creator)), "testOnBatchedModificationsWhenNotLeader");
1022
1023             waitUntilLeader(shard);
1024
1025             overrideLeaderCalls.set(true);
1026
1027             BatchedModifications batched = new BatchedModifications("tx", DataStoreVersions.CURRENT_VERSION, "");
1028
1029             shard.tell(batched, ActorRef.noSender());
1030
1031             expectMsgEquals(batched);
1032
1033             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1034         }};
1035     }
1036
1037     @Test
1038     public void testForwardedReadyTransactionWithImmediateCommit() throws Exception{
1039         new ShardTestKit(getSystem()) {{
1040             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1041                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1042                     "testForwardedReadyTransactionWithImmediateCommit");
1043
1044             waitUntilLeader(shard);
1045
1046             ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1047
1048             String transactionID = "tx1";
1049             MutableCompositeModification modification = new MutableCompositeModification();
1050             NormalizedNode<?, ?> containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1051             ShardDataTreeCohort cohort = setupMockWriteTransaction("cohort", dataStore,
1052                     TestModel.TEST_PATH, containerNode, modification);
1053
1054             FiniteDuration duration = duration("5 seconds");
1055
1056             // Simulate the ForwardedReadyTransaction messages that would be sent
1057             // by the ShardTransaction.
1058
1059             shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
1060                     cohort, modification, true, true), getRef());
1061
1062             expectMsgClass(duration, ThreePhaseCommitCohortMessages.CommitTransactionReply.class);
1063
1064             InOrder inOrder = inOrder(cohort);
1065             inOrder.verify(cohort).canCommit();
1066             inOrder.verify(cohort).preCommit();
1067             inOrder.verify(cohort).commit();
1068
1069             NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.TEST_PATH);
1070             assertEquals(TestModel.TEST_QNAME.getLocalName(), containerNode, actualNode);
1071
1072             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1073         }};
1074     }
1075
1076     @Test
1077     public void testReadyLocalTransactionWithImmediateCommit() throws Exception{
1078         new ShardTestKit(getSystem()) {{
1079             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1080                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1081                     "testReadyLocalTransactionWithImmediateCommit");
1082
1083             waitUntilLeader(shard);
1084
1085             ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1086
1087             DataTreeModification modification = dataStore.getDataTree().takeSnapshot().newModification();
1088
1089             ContainerNode writeData = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1090             new WriteModification(TestModel.TEST_PATH, writeData).apply(modification);
1091             MapNode mergeData = ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build();
1092             new MergeModification(TestModel.OUTER_LIST_PATH, mergeData).apply(modification);
1093
1094             String txId = "tx1";
1095             ReadyLocalTransaction readyMessage = new ReadyLocalTransaction(txId, modification, true);
1096
1097             shard.tell(readyMessage, getRef());
1098
1099             expectMsgClass(CommitTransactionReply.SERIALIZABLE_CLASS);
1100
1101             NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.OUTER_LIST_PATH);
1102             assertEquals(TestModel.OUTER_LIST_QNAME.getLocalName(), mergeData, actualNode);
1103
1104             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1105         }};
1106     }
1107
1108     @Test
1109     public void testReadyLocalTransactionWithThreePhaseCommit() throws Exception{
1110         new ShardTestKit(getSystem()) {{
1111             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1112                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1113                     "testReadyLocalTransactionWithThreePhaseCommit");
1114
1115             waitUntilLeader(shard);
1116
1117             ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1118
1119             DataTreeModification modification = dataStore.getDataTree().takeSnapshot().newModification();
1120
1121             ContainerNode writeData = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1122             new WriteModification(TestModel.TEST_PATH, writeData).apply(modification);
1123             MapNode mergeData = ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build();
1124             new MergeModification(TestModel.OUTER_LIST_PATH, mergeData).apply(modification);
1125
1126             String txId = "tx1";
1127             ReadyLocalTransaction readyMessage = new ReadyLocalTransaction(txId, modification, false);
1128
1129             shard.tell(readyMessage, getRef());
1130
1131             expectMsgClass(ReadyTransactionReply.class);
1132
1133             // Send the CanCommitTransaction message.
1134
1135             shard.tell(new CanCommitTransaction(txId).toSerializable(), getRef());
1136             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1137                     expectMsgClass(CanCommitTransactionReply.SERIALIZABLE_CLASS));
1138             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1139
1140             // Send the CanCommitTransaction message.
1141
1142             shard.tell(new CommitTransaction(txId).toSerializable(), getRef());
1143             expectMsgClass(CommitTransactionReply.SERIALIZABLE_CLASS);
1144
1145             NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.OUTER_LIST_PATH);
1146             assertEquals(TestModel.OUTER_LIST_QNAME.getLocalName(), mergeData, actualNode);
1147
1148             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1149         }};
1150     }
1151
1152     @Test
1153     public void testCommitWithPersistenceDisabled() throws Throwable {
1154         dataStoreContextBuilder.persistent(false);
1155         new ShardTestKit(getSystem()) {{
1156             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1157                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1158                     "testCommitWithPersistenceDisabled");
1159
1160             waitUntilLeader(shard);
1161
1162             ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1163
1164             // Setup a simulated transactions with a mock cohort.
1165
1166             String transactionID = "tx";
1167             MutableCompositeModification modification = new MutableCompositeModification();
1168             NormalizedNode<?, ?> containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1169             ShardDataTreeCohort cohort = setupMockWriteTransaction("cohort", dataStore,
1170                     TestModel.TEST_PATH, containerNode, modification);
1171
1172             FiniteDuration duration = duration("5 seconds");
1173
1174             // Simulate the ForwardedReadyTransaction messages that would be sent
1175             // by the ShardTransaction.
1176
1177             shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
1178                     cohort, modification, true, false), getRef());
1179             expectMsgClass(duration, ReadyTransactionReply.class);
1180
1181             // Send the CanCommitTransaction message.
1182
1183             shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
1184             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1185                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1186             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1187
1188             // Send the CanCommitTransaction message.
1189
1190             shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
1191             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1192
1193             InOrder inOrder = inOrder(cohort);
1194             inOrder.verify(cohort).canCommit();
1195             inOrder.verify(cohort).preCommit();
1196             inOrder.verify(cohort).commit();
1197
1198             NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.TEST_PATH);
1199             assertEquals(TestModel.TEST_QNAME.getLocalName(), containerNode, actualNode);
1200
1201             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1202         }};
1203     }
1204
1205     private static DataTreeCandidateTip mockCandidate(final String name) {
1206         DataTreeCandidateTip mockCandidate = mock(DataTreeCandidateTip.class, name);
1207         DataTreeCandidateNode mockCandidateNode = mock(DataTreeCandidateNode.class, name + "-node");
1208         doReturn(ModificationType.WRITE).when(mockCandidateNode).getModificationType();
1209         doReturn(Optional.of(ImmutableNodes.containerNode(CARS_QNAME))).when(mockCandidateNode).getDataAfter();
1210         doReturn(YangInstanceIdentifier.builder().build()).when(mockCandidate).getRootPath();
1211         doReturn(mockCandidateNode).when(mockCandidate).getRootNode();
1212         return mockCandidate;
1213     }
1214
1215     private static DataTreeCandidateTip mockUnmodifiedCandidate(final String name) {
1216         DataTreeCandidateTip mockCandidate = mock(DataTreeCandidateTip.class, name);
1217         DataTreeCandidateNode mockCandidateNode = mock(DataTreeCandidateNode.class, name + "-node");
1218         doReturn(ModificationType.UNMODIFIED).when(mockCandidateNode).getModificationType();
1219         doReturn(YangInstanceIdentifier.builder().build()).when(mockCandidate).getRootPath();
1220         doReturn(mockCandidateNode).when(mockCandidate).getRootNode();
1221         return mockCandidate;
1222     }
1223
1224     @Test
1225     public void testCommitWhenTransactionHasNoModifications(){
1226         // Note that persistence is enabled which would normally result in the entry getting written to the journal
1227         // but here that need not happen
1228         new ShardTestKit(getSystem()) {
1229             {
1230                 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1231                         newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1232                         "testCommitWhenTransactionHasNoModifications");
1233
1234                 waitUntilLeader(shard);
1235
1236                 String transactionID = "tx1";
1237                 MutableCompositeModification modification = new MutableCompositeModification();
1238                 ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1239                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1240                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).preCommit();
1241                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).commit();
1242                 doReturn(mockUnmodifiedCandidate("cohort1-candidate")).when(cohort).getCandidate();
1243
1244                 FiniteDuration duration = duration("5 seconds");
1245
1246                 // Simulate the ForwardedReadyTransaction messages that would be sent
1247                 // by the ShardTransaction.
1248
1249                 shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
1250                         cohort, modification, true, false), getRef());
1251                 expectMsgClass(duration, ReadyTransactionReply.class);
1252
1253                 // Send the CanCommitTransaction message.
1254
1255                 shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
1256                 CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1257                         expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1258                 assertEquals("Can commit", true, canCommitReply.getCanCommit());
1259
1260                 shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
1261                 expectMsgClass(duration, ThreePhaseCommitCohortMessages.CommitTransactionReply.class);
1262
1263                 InOrder inOrder = inOrder(cohort);
1264                 inOrder.verify(cohort).canCommit();
1265                 inOrder.verify(cohort).preCommit();
1266                 inOrder.verify(cohort).commit();
1267
1268                 shard.tell(Shard.GET_SHARD_MBEAN_MESSAGE, getRef());
1269                 ShardStats shardStats = expectMsgClass(duration, ShardStats.class);
1270
1271                 // Use MBean for verification
1272                 // Committed transaction count should increase as usual
1273                 assertEquals(1,shardStats.getCommittedTransactionsCount());
1274
1275                 // Commit index should not advance because this does not go into the journal
1276                 assertEquals(-1, shardStats.getCommitIndex());
1277
1278                 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1279
1280             }
1281         };
1282     }
1283
1284     @Test
1285     public void testCommitWhenTransactionHasModifications(){
1286         new ShardTestKit(getSystem()) {
1287             {
1288                 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1289                         newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1290                         "testCommitWhenTransactionHasModifications");
1291
1292                 waitUntilLeader(shard);
1293
1294                 String transactionID = "tx1";
1295                 MutableCompositeModification modification = new MutableCompositeModification();
1296                 modification.addModification(new DeleteModification(YangInstanceIdentifier.builder().build()));
1297                 ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1298                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1299                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).preCommit();
1300                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).commit();
1301                 doReturn(mockCandidate("cohort1-candidate")).when(cohort).getCandidate();
1302
1303                 FiniteDuration duration = duration("5 seconds");
1304
1305                 // Simulate the ForwardedReadyTransaction messages that would be sent
1306                 // by the ShardTransaction.
1307
1308                 shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
1309                         cohort, modification, true, false), getRef());
1310                 expectMsgClass(duration, ReadyTransactionReply.class);
1311
1312                 // Send the CanCommitTransaction message.
1313
1314                 shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
1315                 CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1316                         expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1317                 assertEquals("Can commit", true, canCommitReply.getCanCommit());
1318
1319                 shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
1320                 expectMsgClass(duration, ThreePhaseCommitCohortMessages.CommitTransactionReply.class);
1321
1322                 InOrder inOrder = inOrder(cohort);
1323                 inOrder.verify(cohort).canCommit();
1324                 inOrder.verify(cohort).preCommit();
1325                 inOrder.verify(cohort).commit();
1326
1327                 shard.tell(Shard.GET_SHARD_MBEAN_MESSAGE, getRef());
1328                 ShardStats shardStats = expectMsgClass(duration, ShardStats.class);
1329
1330                 // Use MBean for verification
1331                 // Committed transaction count should increase as usual
1332                 assertEquals(1, shardStats.getCommittedTransactionsCount());
1333
1334                 // Commit index should advance as we do not have an empty modification
1335                 assertEquals(0, shardStats.getCommitIndex());
1336
1337                 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1338
1339             }
1340         };
1341     }
1342
1343     @Test
1344     public void testCommitPhaseFailure() throws Throwable {
1345         new ShardTestKit(getSystem()) {{
1346             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1347                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1348                     "testCommitPhaseFailure");
1349
1350             waitUntilLeader(shard);
1351
1352             // Setup 2 simulated transactions with mock cohorts. The first one fails in the
1353             // commit phase.
1354
1355             String transactionID1 = "tx1";
1356             MutableCompositeModification modification1 = new MutableCompositeModification();
1357             ShardDataTreeCohort cohort1 = mock(ShardDataTreeCohort.class, "cohort1");
1358             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
1359             doReturn(Futures.immediateFuture(null)).when(cohort1).preCommit();
1360             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort1).commit();
1361             doReturn(mockCandidate("cohort1-candidate")).when(cohort1).getCandidate();
1362
1363             String transactionID2 = "tx2";
1364             MutableCompositeModification modification2 = new MutableCompositeModification();
1365             ShardDataTreeCohort cohort2 = mock(ShardDataTreeCohort.class, "cohort2");
1366             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
1367
1368             FiniteDuration duration = duration("5 seconds");
1369             final Timeout timeout = new Timeout(duration);
1370
1371             // Simulate the ForwardedReadyTransaction messages that would be sent
1372             // by the ShardTransaction.
1373
1374             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1375                     cohort1, modification1, true, false), getRef());
1376             expectMsgClass(duration, ReadyTransactionReply.class);
1377
1378             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1379                     cohort2, modification2, true, false), getRef());
1380             expectMsgClass(duration, ReadyTransactionReply.class);
1381
1382             // Send the CanCommitTransaction message for the first Tx.
1383
1384             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1385             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1386                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1387             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1388
1389             // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
1390             // processed after the first Tx completes.
1391
1392             Future<Object> canCommitFuture = Patterns.ask(shard,
1393                     new CanCommitTransaction(transactionID2).toSerializable(), timeout);
1394
1395             // Send the CommitTransaction message for the first Tx. This should send back an error
1396             // and trigger the 2nd Tx to proceed.
1397
1398             shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
1399             expectMsgClass(duration, akka.actor.Status.Failure.class);
1400
1401             // Wait for the 2nd Tx to complete the canCommit phase.
1402
1403             final CountDownLatch latch = new CountDownLatch(1);
1404             canCommitFuture.onComplete(new OnComplete<Object>() {
1405                 @Override
1406                 public void onComplete(final Throwable t, final Object resp) {
1407                     latch.countDown();
1408                 }
1409             }, getSystem().dispatcher());
1410
1411             assertEquals("2nd CanCommit complete", true, latch.await(5, TimeUnit.SECONDS));
1412
1413             InOrder inOrder = inOrder(cohort1, cohort2);
1414             inOrder.verify(cohort1).canCommit();
1415             inOrder.verify(cohort1).preCommit();
1416             inOrder.verify(cohort1).commit();
1417             inOrder.verify(cohort2).canCommit();
1418
1419             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1420         }};
1421     }
1422
1423     @Test
1424     public void testPreCommitPhaseFailure() throws Throwable {
1425         new ShardTestKit(getSystem()) {{
1426             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1427                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1428                     "testPreCommitPhaseFailure");
1429
1430             waitUntilLeader(shard);
1431
1432             String transactionID1 = "tx1";
1433             MutableCompositeModification modification1 = new MutableCompositeModification();
1434             ShardDataTreeCohort cohort1 = mock(ShardDataTreeCohort.class, "cohort1");
1435             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
1436             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort1).preCommit();
1437
1438             String transactionID2 = "tx2";
1439             MutableCompositeModification modification2 = new MutableCompositeModification();
1440             ShardDataTreeCohort cohort2 = mock(ShardDataTreeCohort.class, "cohort2");
1441             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
1442
1443             FiniteDuration duration = duration("5 seconds");
1444             final Timeout timeout = new Timeout(duration);
1445
1446             // Simulate the ForwardedReadyTransaction messages that would be sent
1447             // by the ShardTransaction.
1448
1449             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1450                     cohort1, modification1, true, false), getRef());
1451             expectMsgClass(duration, ReadyTransactionReply.class);
1452
1453             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1454                     cohort2, modification2, true, false), getRef());
1455             expectMsgClass(duration, ReadyTransactionReply.class);
1456
1457             // Send the CanCommitTransaction message for the first Tx.
1458
1459             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1460             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1461                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1462             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1463
1464             // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
1465             // processed after the first Tx completes.
1466
1467             Future<Object> canCommitFuture = Patterns.ask(shard,
1468                     new CanCommitTransaction(transactionID2).toSerializable(), timeout);
1469
1470             // Send the CommitTransaction message for the first Tx. This should send back an error
1471             // and trigger the 2nd Tx to proceed.
1472
1473             shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
1474             expectMsgClass(duration, akka.actor.Status.Failure.class);
1475
1476             // Wait for the 2nd Tx to complete the canCommit phase.
1477
1478             final CountDownLatch latch = new CountDownLatch(1);
1479             canCommitFuture.onComplete(new OnComplete<Object>() {
1480                 @Override
1481                 public void onComplete(final Throwable t, final Object resp) {
1482                     latch.countDown();
1483                 }
1484             }, getSystem().dispatcher());
1485
1486             assertEquals("2nd CanCommit complete", true, latch.await(5, TimeUnit.SECONDS));
1487
1488             InOrder inOrder = inOrder(cohort1, cohort2);
1489             inOrder.verify(cohort1).canCommit();
1490             inOrder.verify(cohort1).preCommit();
1491             inOrder.verify(cohort2).canCommit();
1492
1493             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1494         }};
1495     }
1496
1497     @Test
1498     public void testCanCommitPhaseFailure() throws Throwable {
1499         new ShardTestKit(getSystem()) {{
1500             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1501                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1502                     "testCanCommitPhaseFailure");
1503
1504             waitUntilLeader(shard);
1505
1506             final FiniteDuration duration = duration("5 seconds");
1507
1508             String transactionID1 = "tx1";
1509             MutableCompositeModification modification = new MutableCompositeModification();
1510             ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1511             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).canCommit();
1512
1513             // Simulate the ForwardedReadyTransaction messages that would be sent
1514             // by the ShardTransaction.
1515
1516             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1517                     cohort, modification, true, false), getRef());
1518             expectMsgClass(duration, ReadyTransactionReply.class);
1519
1520             // Send the CanCommitTransaction message.
1521
1522             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1523             expectMsgClass(duration, akka.actor.Status.Failure.class);
1524
1525             // Send another can commit to ensure the failed one got cleaned up.
1526
1527             reset(cohort);
1528
1529             String transactionID2 = "tx2";
1530             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1531
1532             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1533                     cohort, modification, true, false), getRef());
1534             expectMsgClass(duration, ReadyTransactionReply.class);
1535
1536             shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
1537             CanCommitTransactionReply reply = CanCommitTransactionReply.fromSerializable(
1538                     expectMsgClass(CanCommitTransactionReply.SERIALIZABLE_CLASS));
1539             assertEquals("getCanCommit", true, reply.getCanCommit());
1540
1541             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1542         }};
1543     }
1544
1545     @Test
1546     public void testCanCommitPhaseFalseResponse() throws Throwable {
1547         new ShardTestKit(getSystem()) {{
1548             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1549                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1550                     "testCanCommitPhaseFalseResponse");
1551
1552             waitUntilLeader(shard);
1553
1554             final FiniteDuration duration = duration("5 seconds");
1555
1556             String transactionID1 = "tx1";
1557             MutableCompositeModification modification = new MutableCompositeModification();
1558             ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1559             doReturn(Futures.immediateFuture(Boolean.FALSE)).when(cohort).canCommit();
1560
1561             // Simulate the ForwardedReadyTransaction messages that would be sent
1562             // by the ShardTransaction.
1563
1564             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1565                     cohort, modification, true, false), getRef());
1566             expectMsgClass(duration, ReadyTransactionReply.class);
1567
1568             // Send the CanCommitTransaction message.
1569
1570             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1571             CanCommitTransactionReply reply = CanCommitTransactionReply.fromSerializable(
1572                     expectMsgClass(CanCommitTransactionReply.SERIALIZABLE_CLASS));
1573             assertEquals("getCanCommit", false, reply.getCanCommit());
1574
1575             // Send another can commit to ensure the failed one got cleaned up.
1576
1577             reset(cohort);
1578
1579             String transactionID2 = "tx2";
1580             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1581
1582             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1583                     cohort, modification, true, false), getRef());
1584             expectMsgClass(duration, ReadyTransactionReply.class);
1585
1586             shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
1587             reply = CanCommitTransactionReply.fromSerializable(
1588                     expectMsgClass(CanCommitTransactionReply.SERIALIZABLE_CLASS));
1589             assertEquals("getCanCommit", true, reply.getCanCommit());
1590
1591             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1592         }};
1593     }
1594
1595     @Test
1596     public void testImmediateCommitWithCanCommitPhaseFailure() throws Throwable {
1597         new ShardTestKit(getSystem()) {{
1598             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1599                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1600                     "testImmediateCommitWithCanCommitPhaseFailure");
1601
1602             waitUntilLeader(shard);
1603
1604             final FiniteDuration duration = duration("5 seconds");
1605
1606             String transactionID1 = "tx1";
1607             MutableCompositeModification modification = new MutableCompositeModification();
1608             ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1609             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).canCommit();
1610
1611             // Simulate the ForwardedReadyTransaction messages that would be sent
1612             // by the ShardTransaction.
1613
1614             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1615                     cohort, modification, true, true), getRef());
1616
1617             expectMsgClass(duration, akka.actor.Status.Failure.class);
1618
1619             // Send another can commit to ensure the failed one got cleaned up.
1620
1621             reset(cohort);
1622
1623             String transactionID2 = "tx2";
1624             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1625             doReturn(Futures.immediateFuture(null)).when(cohort).preCommit();
1626             doReturn(Futures.immediateFuture(null)).when(cohort).commit();
1627             DataTreeCandidateTip candidate = mock(DataTreeCandidateTip.class);
1628             DataTreeCandidateNode candidateRoot = mock(DataTreeCandidateNode.class);
1629             doReturn(ModificationType.UNMODIFIED).when(candidateRoot).getModificationType();
1630             doReturn(candidateRoot).when(candidate).getRootNode();
1631             doReturn(candidate).when(cohort).getCandidate();
1632
1633             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1634                     cohort, modification, true, true), getRef());
1635
1636             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1637
1638             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1639         }};
1640     }
1641
1642     @Test
1643     public void testImmediateCommitWithCanCommitPhaseFalseResponse() throws Throwable {
1644         new ShardTestKit(getSystem()) {{
1645             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1646                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1647                     "testImmediateCommitWithCanCommitPhaseFalseResponse");
1648
1649             waitUntilLeader(shard);
1650
1651             final FiniteDuration duration = duration("5 seconds");
1652
1653             String transactionID = "tx1";
1654             MutableCompositeModification modification = new MutableCompositeModification();
1655             ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1656             doReturn(Futures.immediateFuture(Boolean.FALSE)).when(cohort).canCommit();
1657
1658             // Simulate the ForwardedReadyTransaction messages that would be sent
1659             // by the ShardTransaction.
1660
1661             shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
1662                     cohort, modification, true, true), getRef());
1663
1664             expectMsgClass(duration, akka.actor.Status.Failure.class);
1665
1666             // Send another can commit to ensure the failed one got cleaned up.
1667
1668             reset(cohort);
1669
1670             String transactionID2 = "tx2";
1671             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1672             doReturn(Futures.immediateFuture(null)).when(cohort).preCommit();
1673             doReturn(Futures.immediateFuture(null)).when(cohort).commit();
1674             DataTreeCandidateTip candidate = mock(DataTreeCandidateTip.class);
1675             DataTreeCandidateNode candidateRoot = mock(DataTreeCandidateNode.class);
1676             doReturn(ModificationType.UNMODIFIED).when(candidateRoot).getModificationType();
1677             doReturn(candidateRoot).when(candidate).getRootNode();
1678             doReturn(candidate).when(cohort).getCandidate();
1679
1680             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1681                     cohort, modification, true, true), getRef());
1682
1683             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1684
1685             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1686         }};
1687     }
1688
1689     @Test
1690     public void testAbortBeforeFinishCommit() throws Throwable {
1691         new ShardTestKit(getSystem()) {{
1692             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1693                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1694                     "testAbortBeforeFinishCommit");
1695
1696             waitUntilLeader(shard);
1697
1698             final FiniteDuration duration = duration("5 seconds");
1699             ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1700
1701             final String transactionID = "tx1";
1702             Function<ShardDataTreeCohort, ListenableFuture<Void>> preCommit =
1703                           new Function<ShardDataTreeCohort, ListenableFuture<Void>>() {
1704                 @Override
1705                 public ListenableFuture<Void> apply(final ShardDataTreeCohort cohort) {
1706                     ListenableFuture<Void> preCommitFuture = cohort.preCommit();
1707
1708                     // Simulate an AbortTransaction message occurring during replication, after
1709                     // persisting and before finishing the commit to the in-memory store.
1710                     // We have no followers so due to optimizations in the RaftActor, it does not
1711                     // attempt replication and thus we can't send an AbortTransaction message b/c
1712                     // it would be processed too late after CommitTransaction completes. So we'll
1713                     // simulate an AbortTransaction message occurring during replication by calling
1714                     // the shard directly.
1715                     //
1716                     shard.underlyingActor().doAbortTransaction(transactionID, null);
1717
1718                     return preCommitFuture;
1719                 }
1720             };
1721
1722             MutableCompositeModification modification = new MutableCompositeModification();
1723             ShardDataTreeCohort cohort = setupMockWriteTransaction("cohort1", dataStore,
1724                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME),
1725                     modification, preCommit);
1726
1727             shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
1728                     cohort, modification, true, false), getRef());
1729             expectMsgClass(duration, ReadyTransactionReply.class);
1730
1731             shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
1732             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1733                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1734             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1735
1736             shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
1737             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1738
1739             NormalizedNode<?, ?> node = readStore(shard, TestModel.TEST_PATH);
1740
1741             // Since we're simulating an abort occurring during replication and before finish commit,
1742             // the data should still get written to the in-memory store since we've gotten past
1743             // canCommit and preCommit and persisted the data.
1744             assertNotNull(TestModel.TEST_QNAME.getLocalName() + " not found", node);
1745
1746             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1747         }};
1748     }
1749
1750     @Test
1751     public void testTransactionCommitTimeout() throws Throwable {
1752         dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1);
1753
1754         new ShardTestKit(getSystem()) {{
1755             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1756                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1757                     "testTransactionCommitTimeout");
1758
1759             waitUntilLeader(shard);
1760
1761             final FiniteDuration duration = duration("5 seconds");
1762
1763             ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1764
1765             writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1766             writeToStore(shard, TestModel.OUTER_LIST_PATH,
1767                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
1768
1769             // Create 1st Tx - will timeout
1770
1771             String transactionID1 = "tx1";
1772             MutableCompositeModification modification1 = new MutableCompositeModification();
1773             ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
1774                     YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
1775                         .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
1776                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1),
1777                     modification1);
1778
1779             // Create 2nd Tx
1780
1781             String transactionID2 = "tx3";
1782             MutableCompositeModification modification2 = new MutableCompositeModification();
1783             YangInstanceIdentifier listNodePath = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
1784                 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build();
1785             ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort3", dataStore,
1786                     listNodePath,
1787                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2),
1788                     modification2);
1789
1790             // Ready the Tx's
1791
1792             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1793                     cohort1, modification1, true, false), getRef());
1794             expectMsgClass(duration, ReadyTransactionReply.class);
1795
1796             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1797                     cohort2, modification2, true, false), getRef());
1798             expectMsgClass(duration, ReadyTransactionReply.class);
1799
1800             // canCommit 1st Tx. We don't send the commit so it should timeout.
1801
1802             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1803             expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
1804
1805             // canCommit the 2nd Tx - it should complete after the 1st Tx times out.
1806
1807             shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
1808             expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
1809
1810             // Try to commit the 1st Tx - should fail as it's not the current Tx.
1811
1812             shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
1813             expectMsgClass(duration, akka.actor.Status.Failure.class);
1814
1815             // Commit the 2nd Tx.
1816
1817             shard.tell(new CommitTransaction(transactionID2).toSerializable(), getRef());
1818             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1819
1820             NormalizedNode<?, ?> node = readStore(shard, listNodePath);
1821             assertNotNull(listNodePath + " not found", node);
1822
1823             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1824         }};
1825     }
1826
1827     @Test
1828     public void testTransactionCommitQueueCapacityExceeded() throws Throwable {
1829         dataStoreContextBuilder.shardTransactionCommitQueueCapacity(2);
1830
1831         new ShardTestKit(getSystem()) {{
1832             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1833                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1834                     "testTransactionCommitQueueCapacityExceeded");
1835
1836             waitUntilLeader(shard);
1837
1838             final FiniteDuration duration = duration("5 seconds");
1839
1840             ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1841
1842             String transactionID1 = "tx1";
1843             MutableCompositeModification modification1 = new MutableCompositeModification();
1844             ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
1845                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
1846
1847             String transactionID2 = "tx2";
1848             MutableCompositeModification modification2 = new MutableCompositeModification();
1849             ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
1850                     TestModel.OUTER_LIST_PATH,
1851                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
1852                     modification2);
1853
1854             String transactionID3 = "tx3";
1855             MutableCompositeModification modification3 = new MutableCompositeModification();
1856             ShardDataTreeCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
1857                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification3);
1858
1859             // Ready the Tx's
1860
1861             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1862                     cohort1, modification1, true, false), getRef());
1863             expectMsgClass(duration, ReadyTransactionReply.class);
1864
1865             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1866                     cohort2, modification2, true, false), getRef());
1867             expectMsgClass(duration, ReadyTransactionReply.class);
1868
1869             // The 3rd Tx should exceed queue capacity and fail.
1870
1871             shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
1872                     cohort3, modification3, true, false), getRef());
1873             expectMsgClass(duration, akka.actor.Status.Failure.class);
1874
1875             // canCommit 1st Tx.
1876
1877             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1878             expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
1879
1880             // canCommit the 2nd Tx - it should get queued.
1881
1882             shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
1883
1884             // canCommit the 3rd Tx - should exceed queue capacity and fail.
1885
1886             shard.tell(new CanCommitTransaction(transactionID3).toSerializable(), getRef());
1887             expectMsgClass(duration, akka.actor.Status.Failure.class);
1888
1889             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1890         }};
1891     }
1892
1893     @Test
1894     public void testTransactionCommitWithPriorExpiredCohortEntries() throws Throwable {
1895         dataStoreContextBuilder.shardCommitQueueExpiryTimeoutInMillis(1300).shardTransactionCommitTimeoutInSeconds(1);
1896
1897         new ShardTestKit(getSystem()) {{
1898             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1899                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1900                     "testTransactionCommitWithPriorExpiredCohortEntries");
1901
1902             waitUntilLeader(shard);
1903
1904             final FiniteDuration duration = duration("5 seconds");
1905
1906             ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1907
1908             String transactionID1 = "tx1";
1909             MutableCompositeModification modification1 = new MutableCompositeModification();
1910             ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
1911                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
1912
1913             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1914                     cohort1, modification1, true, false), getRef());
1915             expectMsgClass(duration, ReadyTransactionReply.class);
1916
1917             String transactionID2 = "tx2";
1918             MutableCompositeModification modification2 = new MutableCompositeModification();
1919             ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
1920                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification2);
1921
1922             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1923                     cohort2, modification2, true, false), getRef());
1924             expectMsgClass(duration, ReadyTransactionReply.class);
1925
1926             String transactionID3 = "tx3";
1927             MutableCompositeModification modification3 = new MutableCompositeModification();
1928             ShardDataTreeCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
1929                     TestModel.TEST2_PATH, ImmutableNodes.containerNode(TestModel.TEST2_QNAME), modification3);
1930
1931             shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
1932                     cohort3, modification3, true, false), getRef());
1933             expectMsgClass(duration, ReadyTransactionReply.class);
1934
1935             // All Tx's are readied. We'll send canCommit for the last one but not the others. The others
1936             // should expire from the queue and the last one should be processed.
1937
1938             shard.tell(new CanCommitTransaction(transactionID3).toSerializable(), getRef());
1939             expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
1940
1941             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1942         }};
1943     }
1944
1945     @Test
1946     public void testTransactionCommitWithSubsequentExpiredCohortEntry() throws Throwable {
1947         dataStoreContextBuilder.shardCommitQueueExpiryTimeoutInMillis(1300).shardTransactionCommitTimeoutInSeconds(1);
1948
1949         new ShardTestKit(getSystem()) {{
1950             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1951                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1952                     "testTransactionCommitWithSubsequentExpiredCohortEntry");
1953
1954             waitUntilLeader(shard);
1955
1956             final FiniteDuration duration = duration("5 seconds");
1957
1958             ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1959
1960             String transactionID1 = "tx1";
1961             MutableCompositeModification modification1 = new MutableCompositeModification();
1962             ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
1963                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
1964
1965             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1966                     cohort1, modification1, true, false), getRef());
1967             expectMsgClass(duration, ReadyTransactionReply.class);
1968
1969             // CanCommit the first one so it's the current in-progress CohortEntry.
1970
1971             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1972             expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
1973
1974             // Ready the second Tx.
1975
1976             String transactionID2 = "tx2";
1977             MutableCompositeModification modification2 = new MutableCompositeModification();
1978             ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
1979                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification2);
1980
1981             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1982                     cohort2, modification2, true, false), getRef());
1983             expectMsgClass(duration, ReadyTransactionReply.class);
1984
1985             // Ready the third Tx.
1986
1987             String transactionID3 = "tx3";
1988             DataTreeModification modification3 = dataStore.getDataTree().takeSnapshot().newModification();
1989             new WriteModification(TestModel.TEST2_PATH, ImmutableNodes.containerNode(TestModel.TEST2_QNAME))
1990                     .apply(modification3);
1991             ReadyLocalTransaction readyMessage = new ReadyLocalTransaction(transactionID3, modification3, true);
1992
1993             shard.tell(readyMessage, getRef());
1994
1995             // Commit the first Tx. After completing, the second should expire from the queue and the third
1996             // Tx committed.
1997
1998             shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
1999             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
2000
2001             // Expect commit reply from the third Tx.
2002
2003             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
2004
2005             NormalizedNode<?, ?> node = readStore(shard, TestModel.TEST2_PATH);
2006             assertNotNull(TestModel.TEST2_PATH + " not found", node);
2007
2008             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2009         }};
2010     }
2011
2012     @Test
2013     public void testCanCommitBeforeReadyFailure() throws Throwable {
2014         new ShardTestKit(getSystem()) {{
2015             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
2016                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
2017                     "testCanCommitBeforeReadyFailure");
2018
2019             shard.tell(new CanCommitTransaction("tx").toSerializable(), getRef());
2020             expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
2021
2022             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2023         }};
2024     }
2025
2026     @Test
2027     public void testAbortTransaction() throws Throwable {
2028         new ShardTestKit(getSystem()) {{
2029             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
2030                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
2031                     "testAbortTransaction");
2032
2033             waitUntilLeader(shard);
2034
2035             // Setup 2 simulated transactions with mock cohorts. The first one will be aborted.
2036
2037             String transactionID1 = "tx1";
2038             MutableCompositeModification modification1 = new MutableCompositeModification();
2039             ShardDataTreeCohort cohort1 = mock(ShardDataTreeCohort.class, "cohort1");
2040             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
2041             doReturn(Futures.immediateFuture(null)).when(cohort1).abort();
2042
2043             String transactionID2 = "tx2";
2044             MutableCompositeModification modification2 = new MutableCompositeModification();
2045             ShardDataTreeCohort cohort2 = mock(ShardDataTreeCohort.class, "cohort2");
2046             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
2047
2048             FiniteDuration duration = duration("5 seconds");
2049             final Timeout timeout = new Timeout(duration);
2050
2051             // Simulate the ForwardedReadyTransaction messages that would be sent
2052             // by the ShardTransaction.
2053
2054             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
2055                     cohort1, modification1, true, false), getRef());
2056             expectMsgClass(duration, ReadyTransactionReply.class);
2057
2058             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
2059                     cohort2, modification2, true, false), getRef());
2060             expectMsgClass(duration, ReadyTransactionReply.class);
2061
2062             // Send the CanCommitTransaction message for the first Tx.
2063
2064             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
2065             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
2066                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
2067             assertEquals("Can commit", true, canCommitReply.getCanCommit());
2068
2069             // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
2070             // processed after the first Tx completes.
2071
2072             Future<Object> canCommitFuture = Patterns.ask(shard,
2073                     new CanCommitTransaction(transactionID2).toSerializable(), timeout);
2074
2075             // Send the AbortTransaction message for the first Tx. This should trigger the 2nd
2076             // Tx to proceed.
2077
2078             shard.tell(new AbortTransaction(transactionID1).toSerializable(), getRef());
2079             expectMsgClass(duration, AbortTransactionReply.SERIALIZABLE_CLASS);
2080
2081             // Wait for the 2nd Tx to complete the canCommit phase.
2082
2083             Await.ready(canCommitFuture, duration);
2084
2085             InOrder inOrder = inOrder(cohort1, cohort2);
2086             inOrder.verify(cohort1).canCommit();
2087             inOrder.verify(cohort2).canCommit();
2088
2089             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2090         }};
2091     }
2092
2093     @Test
2094     public void testCreateSnapshot() throws Exception {
2095         testCreateSnapshot(true, "testCreateSnapshot");
2096     }
2097
2098     @Test
2099     public void testCreateSnapshotWithNonPersistentData() throws Exception {
2100         testCreateSnapshot(false, "testCreateSnapshotWithNonPersistentData");
2101     }
2102
2103     @SuppressWarnings("serial")
2104     public void testCreateSnapshot(final boolean persistent, final String shardActorName) throws Exception{
2105
2106         final AtomicReference<CountDownLatch> latch = new AtomicReference<>(new CountDownLatch(1));
2107
2108         final AtomicReference<Object> savedSnapshot = new AtomicReference<>();
2109         class TestPersistentDataProvider extends DelegatingPersistentDataProvider {
2110             TestPersistentDataProvider(DataPersistenceProvider delegate) {
2111                 super(delegate);
2112             }
2113
2114             @Override
2115             public void saveSnapshot(Object o) {
2116                 savedSnapshot.set(o);
2117                 super.saveSnapshot(o);
2118             }
2119         }
2120
2121         dataStoreContextBuilder.persistent(persistent);
2122
2123         new ShardTestKit(getSystem()) {{
2124             class TestShard extends Shard {
2125
2126                 protected TestShard(ShardIdentifier name, Map<String, String> peerAddresses,
2127                                     DatastoreContext datastoreContext, SchemaContext schemaContext) {
2128                     super(name, peerAddresses, datastoreContext, schemaContext);
2129                     setPersistence(new TestPersistentDataProvider(super.persistence()));
2130                 }
2131
2132                 @Override
2133                 public void handleCommand(Object message) {
2134                     super.handleCommand(message);
2135
2136                     if (message instanceof SaveSnapshotSuccess || message.equals("commit_snapshot")) {
2137                         latch.get().countDown();
2138                     }
2139                 }
2140
2141                 @Override
2142                 public RaftActorContext getRaftActorContext() {
2143                     return super.getRaftActorContext();
2144                 }
2145             }
2146
2147             Creator<Shard> creator = new Creator<Shard>() {
2148                 @Override
2149                 public Shard create() throws Exception {
2150                     return new TestShard(shardID, Collections.<String,String>emptyMap(),
2151                             newDatastoreContext(), SCHEMA_CONTEXT);
2152                 }
2153             };
2154
2155             TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
2156                     Props.create(new DelegatingShardCreator(creator)), shardActorName);
2157
2158             waitUntilLeader(shard);
2159
2160             writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
2161
2162             NormalizedNode<?,?> expectedRoot = readStore(shard, YangInstanceIdentifier.builder().build());
2163
2164             // Trigger creation of a snapshot by ensuring
2165             RaftActorContext raftActorContext = ((TestShard) shard.underlyingActor()).getRaftActorContext();
2166             raftActorContext.getSnapshotManager().capture(mock(ReplicatedLogEntry.class), -1);
2167
2168             assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS));
2169
2170             assertTrue("Invalid saved snapshot " + savedSnapshot.get(),
2171                     savedSnapshot.get() instanceof Snapshot);
2172
2173             verifySnapshot((Snapshot)savedSnapshot.get(), expectedRoot);
2174
2175             latch.set(new CountDownLatch(1));
2176             savedSnapshot.set(null);
2177
2178             raftActorContext.getSnapshotManager().capture(mock(ReplicatedLogEntry.class), -1);
2179
2180             assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS));
2181
2182             assertTrue("Invalid saved snapshot " + savedSnapshot.get(),
2183                     savedSnapshot.get() instanceof Snapshot);
2184
2185             verifySnapshot((Snapshot)savedSnapshot.get(), expectedRoot);
2186
2187             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2188         }
2189
2190         private void verifySnapshot(Snapshot snapshot, NormalizedNode<?,?> expectedRoot) {
2191
2192             NormalizedNode<?, ?> actual = SerializationUtils.deserializeNormalizedNode(snapshot.getState());
2193             assertEquals("Root node", expectedRoot, actual);
2194
2195         }};
2196     }
2197
2198     /**
2199      * This test simply verifies that the applySnapShot logic will work
2200      * @throws ReadFailedException
2201      * @throws DataValidationFailedException
2202      */
2203     @Test
2204     public void testInMemoryDataTreeRestore() throws ReadFailedException, DataValidationFailedException {
2205         DataTree store = InMemoryDataTreeFactory.getInstance().create();
2206         store.setSchemaContext(SCHEMA_CONTEXT);
2207
2208         DataTreeModification putTransaction = store.takeSnapshot().newModification();
2209         putTransaction.write(TestModel.TEST_PATH,
2210             ImmutableNodes.containerNode(TestModel.TEST_QNAME));
2211         commitTransaction(store, putTransaction);
2212
2213
2214         NormalizedNode<?, ?> expected = readStore(store, YangInstanceIdentifier.builder().build());
2215
2216         DataTreeModification writeTransaction = store.takeSnapshot().newModification();
2217
2218         writeTransaction.delete(YangInstanceIdentifier.builder().build());
2219         writeTransaction.write(YangInstanceIdentifier.builder().build(), expected);
2220
2221         commitTransaction(store, writeTransaction);
2222
2223         NormalizedNode<?, ?> actual = readStore(store, YangInstanceIdentifier.builder().build());
2224
2225         assertEquals(expected, actual);
2226     }
2227
2228     @Test
2229     public void testRecoveryApplicable(){
2230
2231         final DatastoreContext persistentContext = DatastoreContext.newBuilder().
2232                 shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(true).build();
2233
2234         final Props persistentProps = Shard.props(shardID, Collections.<String, String>emptyMap(),
2235                 persistentContext, SCHEMA_CONTEXT);
2236
2237         final DatastoreContext nonPersistentContext = DatastoreContext.newBuilder().
2238                 shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(false).build();
2239
2240         final Props nonPersistentProps = Shard.props(shardID, Collections.<String, String>emptyMap(),
2241                 nonPersistentContext, SCHEMA_CONTEXT);
2242
2243         new ShardTestKit(getSystem()) {{
2244             TestActorRef<Shard> shard1 = TestActorRef.create(getSystem(),
2245                     persistentProps, "testPersistence1");
2246
2247             assertTrue("Recovery Applicable", shard1.underlyingActor().persistence().isRecoveryApplicable());
2248
2249             shard1.tell(PoisonPill.getInstance(), ActorRef.noSender());
2250
2251             TestActorRef<Shard> shard2 = TestActorRef.create(getSystem(),
2252                     nonPersistentProps, "testPersistence2");
2253
2254             assertFalse("Recovery Not Applicable", shard2.underlyingActor().persistence().isRecoveryApplicable());
2255
2256             shard2.tell(PoisonPill.getInstance(), ActorRef.noSender());
2257
2258         }};
2259
2260     }
2261
2262     @Test
2263     public void testOnDatastoreContext() {
2264         new ShardTestKit(getSystem()) {{
2265             dataStoreContextBuilder.persistent(true);
2266
2267             TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(), "testOnDatastoreContext");
2268
2269             assertEquals("isRecoveryApplicable", true,
2270                     shard.underlyingActor().persistence().isRecoveryApplicable());
2271
2272             waitUntilLeader(shard);
2273
2274             shard.tell(dataStoreContextBuilder.persistent(false).build(), ActorRef.noSender());
2275
2276             assertEquals("isRecoveryApplicable", false,
2277                     shard.underlyingActor().persistence().isRecoveryApplicable());
2278
2279             shard.tell(dataStoreContextBuilder.persistent(true).build(), ActorRef.noSender());
2280
2281             assertEquals("isRecoveryApplicable", true,
2282                     shard.underlyingActor().persistence().isRecoveryApplicable());
2283
2284             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2285         }};
2286     }
2287
2288     @Test
2289     public void testRegisterRoleChangeListener() throws Exception {
2290         new ShardTestKit(getSystem()) {
2291             {
2292                 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
2293                         newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
2294                         "testRegisterRoleChangeListener");
2295
2296                 waitUntilLeader(shard);
2297
2298                 TestActorRef<MessageCollectorActor> listener =
2299                         TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
2300
2301                 shard.tell(new RegisterRoleChangeListener(), listener);
2302
2303                 MessageCollectorActor.expectFirstMatching(listener, RegisterRoleChangeListenerReply.class);
2304
2305                 ShardLeaderStateChanged leaderStateChanged = MessageCollectorActor.expectFirstMatching(listener,
2306                         ShardLeaderStateChanged.class);
2307                 assertEquals("getLocalShardDataTree present", true,
2308                         leaderStateChanged.getLocalShardDataTree().isPresent());
2309                 assertSame("getLocalShardDataTree", shard.underlyingActor().getDataStore().getDataTree(),
2310                         leaderStateChanged.getLocalShardDataTree().get());
2311
2312                 MessageCollectorActor.clearMessages(listener);
2313
2314                 // Force a leader change
2315
2316                 shard.tell(new RequestVote(10000, "member2", 50, 50), getRef());
2317
2318                 leaderStateChanged = MessageCollectorActor.expectFirstMatching(listener,
2319                         ShardLeaderStateChanged.class);
2320                 assertEquals("getLocalShardDataTree present", false,
2321                         leaderStateChanged.getLocalShardDataTree().isPresent());
2322
2323                 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2324             }
2325         };
2326     }
2327
2328     @Test
2329     public void testFollowerInitialSyncStatus() throws Exception {
2330         final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
2331                 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
2332                 "testFollowerInitialSyncStatus");
2333
2334         shard.underlyingActor().onReceiveCommand(new FollowerInitialSyncUpStatus(false, "member-1-shard-inventory-operational"));
2335
2336         assertEquals(false, shard.underlyingActor().getShardMBean().getFollowerInitialSyncStatus());
2337
2338         shard.underlyingActor().onReceiveCommand(new FollowerInitialSyncUpStatus(true, "member-1-shard-inventory-operational"));
2339
2340         assertEquals(true, shard.underlyingActor().getShardMBean().getFollowerInitialSyncStatus());
2341
2342         shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2343     }
2344
2345     private static void commitTransaction(DataTree store, final DataTreeModification modification) throws DataValidationFailedException {
2346         modification.ready();
2347         store.validate(modification);
2348         store.commit(store.prepare(modification));
2349     }
2350 }