Bug:3260-Recovery misses flows installed on single node
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / ShardTest.java
1 package org.opendaylight.controller.cluster.datastore;
2
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertFalse;
5 import static org.junit.Assert.assertNotNull;
6 import static org.junit.Assert.assertNull;
7 import static org.junit.Assert.assertSame;
8 import static org.junit.Assert.assertTrue;
9 import static org.mockito.Mockito.doReturn;
10 import static org.mockito.Mockito.inOrder;
11 import static org.mockito.Mockito.mock;
12 import static org.mockito.Mockito.reset;
13 import static org.opendaylight.controller.cluster.datastore.DataStoreVersions.CURRENT_VERSION;
14 import akka.actor.ActorRef;
15 import akka.actor.ActorSelection;
16 import akka.actor.PoisonPill;
17 import akka.actor.Props;
18 import akka.dispatch.Dispatchers;
19 import akka.dispatch.OnComplete;
20 import akka.japi.Creator;
21 import akka.pattern.Patterns;
22 import akka.persistence.SaveSnapshotSuccess;
23 import akka.testkit.TestActorRef;
24 import akka.util.Timeout;
25 import com.google.common.base.Function;
26 import com.google.common.base.Optional;
27 import com.google.common.util.concurrent.Futures;
28 import com.google.common.util.concurrent.ListenableFuture;
29 import com.google.common.util.concurrent.Uninterruptibles;
30 import java.io.IOException;
31 import java.util.Collections;
32 import java.util.HashSet;
33 import java.util.Map;
34 import java.util.Set;
35 import java.util.concurrent.CountDownLatch;
36 import java.util.concurrent.TimeUnit;
37 import java.util.concurrent.atomic.AtomicBoolean;
38 import java.util.concurrent.atomic.AtomicReference;
39 import org.junit.Test;
40 import org.mockito.InOrder;
41 import org.opendaylight.controller.cluster.DataPersistenceProvider;
42 import org.opendaylight.controller.cluster.DelegatingPersistentDataProvider;
43 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
44 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
45 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
46 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
47 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
48 import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
49 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
50 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
51 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
52 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
53 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
54 import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
55 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
56 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
57 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
58 import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction;
59 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
60 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
61 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
62 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener;
63 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListenerReply;
64 import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
65 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
66 import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
67 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
68 import org.opendaylight.controller.cluster.datastore.modification.Modification;
69 import org.opendaylight.controller.cluster.datastore.modification.ModificationPayload;
70 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
71 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
72 import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener;
73 import org.opendaylight.controller.cluster.datastore.utils.MockDataTreeChangeListener;
74 import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
75 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
76 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListenerReply;
77 import org.opendaylight.controller.cluster.raft.RaftActorContext;
78 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
79 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
80 import org.opendaylight.controller.cluster.raft.Snapshot;
81 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
82 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
83 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
84 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
85 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
86 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
87 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
88 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
89 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
90 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
91 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
92 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
93 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
94 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
95 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
96 import org.opendaylight.controller.protobuff.messages.cohort3pc.ThreePhaseCommitCohortMessages;
97 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
98 import org.opendaylight.yangtools.yang.common.QName;
99 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
100 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
101 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
102 import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
103 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
104 import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
105 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
106 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
107 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
108 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateNode;
109 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateTip;
110 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidates;
111 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
112 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
113 import org.opendaylight.yangtools.yang.data.api.schema.tree.ModificationType;
114 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
115 import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory;
116 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
117 import scala.concurrent.Await;
118 import scala.concurrent.Future;
119 import scala.concurrent.duration.FiniteDuration;
120
121 public class ShardTest extends AbstractShardTest {
122     private static final QName CARS_QNAME = QName.create("urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:test:cars", "2014-03-13", "cars");
123
124     private static final String DUMMY_DATA = "Dummy data as snapshot sequence number is set to 0 in InMemorySnapshotStore and journal recovery seq number will start from 1";
125
126     final CountDownLatch recoveryComplete = new CountDownLatch(1);
127
128     protected Props newShardPropsWithRecoveryComplete() {
129
130         Creator<Shard> creator = new Creator<Shard>() {
131             @Override
132             public Shard create() throws Exception {
133                 return new Shard(shardID, Collections.<String,String>emptyMap(),
134                         newDatastoreContext(), SCHEMA_CONTEXT) {
135                     @Override
136                     protected void onRecoveryComplete() {
137                         try {
138                             super.onRecoveryComplete();
139                         } finally {
140                             recoveryComplete.countDown();
141                         }
142                     }
143                 };
144             }
145         };
146         return Props.create(new DelegatingShardCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId());
147     }
148
149     @Test
150     public void testRegisterChangeListener() throws Exception {
151         new ShardTestKit(getSystem()) {{
152             TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
153                     newShardProps(),  "testRegisterChangeListener");
154
155             waitUntilLeader(shard);
156
157             shard.tell(new UpdateSchemaContext(SchemaContextHelper.full()), ActorRef.noSender());
158
159             MockDataChangeListener listener = new MockDataChangeListener(1);
160             ActorRef dclActor = getSystem().actorOf(DataChangeListener.props(listener),
161                     "testRegisterChangeListener-DataChangeListener");
162
163             shard.tell(new RegisterChangeListener(TestModel.TEST_PATH,
164                     dclActor, AsyncDataBroker.DataChangeScope.BASE), getRef());
165
166             RegisterChangeListenerReply reply = expectMsgClass(duration("3 seconds"),
167                     RegisterChangeListenerReply.class);
168             String replyPath = reply.getListenerRegistrationPath().toString();
169             assertTrue("Incorrect reply path: " + replyPath, replyPath.matches(
170                     "akka:\\/\\/test\\/user\\/testRegisterChangeListener\\/\\$.*"));
171
172             YangInstanceIdentifier path = TestModel.TEST_PATH;
173             writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
174
175             listener.waitForChangeEvents(path);
176
177             dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
178             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
179         }};
180     }
181
182     @SuppressWarnings("serial")
183     @Test
184     public void testChangeListenerNotifiedWhenNotTheLeaderOnRegistration() throws Exception {
185         // This test tests the timing window in which a change listener is registered before the
186         // shard becomes the leader. We verify that the listener is registered and notified of the
187         // existing data when the shard becomes the leader.
188         new ShardTestKit(getSystem()) {{
189             // For this test, we want to send the RegisterChangeListener message after the shard
190             // has recovered from persistence and before it becomes the leader. So we subclass
191             // Shard to override onReceiveCommand and, when the first ElectionTimeout is received,
192             // we know that the shard has been initialized to a follower and has started the
193             // election process. The following 2 CountDownLatches are used to coordinate the
194             // ElectionTimeout with the sending of the RegisterChangeListener message.
195             final CountDownLatch onFirstElectionTimeout = new CountDownLatch(1);
196             final CountDownLatch onChangeListenerRegistered = new CountDownLatch(1);
197             Creator<Shard> creator = new Creator<Shard>() {
198                 boolean firstElectionTimeout = true;
199
200                 @Override
201                 public Shard create() throws Exception {
202                     // Use a non persistent provider because this test actually invokes persist on the journal
203                     // this will cause all other messages to not be queued properly after that.
204                     // The basic issue is that you cannot use TestActorRef with a persistent actor (at least when
205                     // it does do a persist)
206                     return new Shard(shardID, Collections.<String,String>emptyMap(),
207                             dataStoreContextBuilder.persistent(false).build(), SCHEMA_CONTEXT) {
208                         @Override
209                         public void onReceiveCommand(final Object message) throws Exception {
210                             if(message instanceof ElectionTimeout && firstElectionTimeout) {
211                                 // Got the first ElectionTimeout. We don't forward it to the
212                                 // base Shard yet until we've sent the RegisterChangeListener
213                                 // message. So we signal the onFirstElectionTimeout latch to tell
214                                 // the main thread to send the RegisterChangeListener message and
215                                 // start a thread to wait on the onChangeListenerRegistered latch,
216                                 // which the main thread signals after it has sent the message.
217                                 // After the onChangeListenerRegistered is triggered, we send the
218                                 // original ElectionTimeout message to proceed with the election.
219                                 firstElectionTimeout = false;
220                                 final ActorRef self = getSelf();
221                                 new Thread() {
222                                     @Override
223                                     public void run() {
224                                         Uninterruptibles.awaitUninterruptibly(
225                                                 onChangeListenerRegistered, 5, TimeUnit.SECONDS);
226                                         self.tell(message, self);
227                                     }
228                                 }.start();
229
230                                 onFirstElectionTimeout.countDown();
231                             } else {
232                                 super.onReceiveCommand(message);
233                             }
234                         }
235                     };
236                 }
237             };
238
239             MockDataChangeListener listener = new MockDataChangeListener(1);
240             ActorRef dclActor = getSystem().actorOf(DataChangeListener.props(listener),
241                     "testRegisterChangeListenerWhenNotLeaderInitially-DataChangeListener");
242
243             TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
244                     Props.create(new DelegatingShardCreator(creator)),
245                     "testRegisterChangeListenerWhenNotLeaderInitially");
246
247             // Write initial data into the in-memory store.
248             YangInstanceIdentifier path = TestModel.TEST_PATH;
249             writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
250
251             // Wait until the shard receives the first ElectionTimeout message.
252             assertEquals("Got first ElectionTimeout", true,
253                     onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
254
255             // Now send the RegisterChangeListener and wait for the reply.
256             shard.tell(new RegisterChangeListener(path, dclActor,
257                     AsyncDataBroker.DataChangeScope.SUBTREE), getRef());
258
259             RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
260                     RegisterChangeListenerReply.class);
261             assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
262
263             // Sanity check - verify the shard is not the leader yet.
264             shard.tell(new FindLeader(), getRef());
265             FindLeaderReply findLeadeReply =
266                     expectMsgClass(duration("5 seconds"), FindLeaderReply.class);
267             assertNull("Expected the shard not to be the leader", findLeadeReply.getLeaderActor());
268
269             // Signal the onChangeListenerRegistered latch to tell the thread above to proceed
270             // with the election process.
271             onChangeListenerRegistered.countDown();
272
273             // Wait for the shard to become the leader and notify our listener with the existing
274             // data in the store.
275             listener.waitForChangeEvents(path);
276
277             dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
278             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
279         }};
280     }
281
282     @Test
283     public void testRegisterDataTreeChangeListener() throws Exception {
284         new ShardTestKit(getSystem()) {{
285             TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
286                     newShardProps(), "testRegisterDataTreeChangeListener");
287
288             waitUntilLeader(shard);
289
290             shard.tell(new UpdateSchemaContext(SchemaContextHelper.full()), ActorRef.noSender());
291
292             MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
293             ActorRef dclActor = getSystem().actorOf(DataTreeChangeListenerActor.props(listener),
294                     "testRegisterDataTreeChangeListener-DataTreeChangeListener");
295
296             shard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor), getRef());
297
298             RegisterDataTreeChangeListenerReply reply = expectMsgClass(duration("3 seconds"),
299                     RegisterDataTreeChangeListenerReply.class);
300             String replyPath = reply.getListenerRegistrationPath().toString();
301             assertTrue("Incorrect reply path: " + replyPath, replyPath.matches(
302                     "akka:\\/\\/test\\/user\\/testRegisterDataTreeChangeListener\\/\\$.*"));
303
304             YangInstanceIdentifier path = TestModel.TEST_PATH;
305             writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
306
307             listener.waitForChangeEvents();
308
309             dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
310             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
311         }};
312     }
313
314     @SuppressWarnings("serial")
315     @Test
316     public void testDataTreeChangeListenerNotifiedWhenNotTheLeaderOnRegistration() throws Exception {
317         new ShardTestKit(getSystem()) {{
318             final CountDownLatch onFirstElectionTimeout = new CountDownLatch(1);
319             final CountDownLatch onChangeListenerRegistered = new CountDownLatch(1);
320             Creator<Shard> creator = new Creator<Shard>() {
321                 boolean firstElectionTimeout = true;
322
323                 @Override
324                 public Shard create() throws Exception {
325                     return new Shard(shardID, Collections.<String,String>emptyMap(),
326                             dataStoreContextBuilder.persistent(false).build(), SCHEMA_CONTEXT) {
327                         @Override
328                         public void onReceiveCommand(final Object message) throws Exception {
329                             if(message instanceof ElectionTimeout && firstElectionTimeout) {
330                                 firstElectionTimeout = false;
331                                 final ActorRef self = getSelf();
332                                 new Thread() {
333                                     @Override
334                                     public void run() {
335                                         Uninterruptibles.awaitUninterruptibly(
336                                                 onChangeListenerRegistered, 5, TimeUnit.SECONDS);
337                                         self.tell(message, self);
338                                     }
339                                 }.start();
340
341                                 onFirstElectionTimeout.countDown();
342                             } else {
343                                 super.onReceiveCommand(message);
344                             }
345                         }
346                     };
347                 }
348             };
349
350             MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
351             ActorRef dclActor = getSystem().actorOf(DataTreeChangeListenerActor.props(listener),
352                     "testDataTreeChangeListenerNotifiedWhenNotTheLeaderOnRegistration-DataChangeListener");
353
354             TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
355                     Props.create(new DelegatingShardCreator(creator)),
356                     "testDataTreeChangeListenerNotifiedWhenNotTheLeaderOnRegistration");
357
358             YangInstanceIdentifier path = TestModel.TEST_PATH;
359             writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
360
361             assertEquals("Got first ElectionTimeout", true,
362                     onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
363
364             shard.tell(new RegisterDataTreeChangeListener(path, dclActor), getRef());
365             RegisterDataTreeChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
366                     RegisterDataTreeChangeListenerReply.class);
367             assertNotNull("getListenerRegistratioznPath", reply.getListenerRegistrationPath());
368
369             shard.tell(new FindLeader(), getRef());
370             FindLeaderReply findLeadeReply =
371                     expectMsgClass(duration("5 seconds"), FindLeaderReply.class);
372             assertNull("Expected the shard not to be the leader", findLeadeReply.getLeaderActor());
373
374             writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
375
376             onChangeListenerRegistered.countDown();
377
378             // TODO: investigate why we do not receive data chage events
379             listener.waitForChangeEvents();
380
381             dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
382             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
383         }};
384     }
385
386     @Test
387     public void testCreateTransaction(){
388         new ShardTestKit(getSystem()) {{
389             ActorRef shard = getSystem().actorOf(newShardProps(), "testCreateTransaction");
390
391             waitUntilLeader(shard);
392
393             shard.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
394
395             shard.tell(new CreateTransaction("txn-1",
396                     TransactionType.READ_ONLY.ordinal() ).toSerializable(), getRef());
397
398             CreateTransactionReply reply = expectMsgClass(duration("3 seconds"),
399                     CreateTransactionReply.class);
400
401             String path = reply.getTransactionActorPath().toString();
402             assertTrue("Unexpected transaction path " + path,
403                     path.contains("akka://test/user/testCreateTransaction/shard-txn-1"));
404
405             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
406         }};
407     }
408
409     @Test
410     public void testCreateTransactionOnChain(){
411         new ShardTestKit(getSystem()) {{
412             final ActorRef shard = getSystem().actorOf(newShardProps(), "testCreateTransactionOnChain");
413
414             waitUntilLeader(shard);
415
416             shard.tell(new CreateTransaction("txn-1",
417                     TransactionType.READ_ONLY.ordinal() , "foobar").toSerializable(),
418                     getRef());
419
420             CreateTransactionReply reply = expectMsgClass(duration("3 seconds"),
421                     CreateTransactionReply.class);
422
423             String path = reply.getTransactionActorPath().toString();
424             assertTrue("Unexpected transaction path " + path,
425                     path.contains("akka://test/user/testCreateTransactionOnChain/shard-txn-1"));
426
427             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
428         }};
429     }
430
431     @SuppressWarnings("serial")
432     @Test
433     public void testPeerAddressResolved() throws Exception {
434         new ShardTestKit(getSystem()) {{
435             final CountDownLatch recoveryComplete = new CountDownLatch(1);
436             class TestShard extends Shard {
437                 TestShard() {
438                     super(shardID, Collections.<String, String>singletonMap(shardID.toString(), null),
439                             newDatastoreContext(), SCHEMA_CONTEXT);
440                 }
441
442                 Map<String, String> getPeerAddresses() {
443                     return getRaftActorContext().getPeerAddresses();
444                 }
445
446                 @Override
447                 protected void onRecoveryComplete() {
448                     try {
449                         super.onRecoveryComplete();
450                     } finally {
451                         recoveryComplete.countDown();
452                     }
453                 }
454             }
455
456             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
457                     Props.create(new DelegatingShardCreator(new Creator<Shard>() {
458                         @Override
459                         public TestShard create() throws Exception {
460                             return new TestShard();
461                         }
462                     })), "testPeerAddressResolved");
463
464             //waitUntilLeader(shard);
465             assertEquals("Recovery complete", true,
466                     Uninterruptibles.awaitUninterruptibly(recoveryComplete, 5, TimeUnit.SECONDS));
467
468             String address = "akka://foobar";
469             shard.underlyingActor().onReceiveCommand(new PeerAddressResolved(shardID.toString(), address));
470
471             assertEquals("getPeerAddresses", address,
472                     ((TestShard)shard.underlyingActor()).getPeerAddresses().get(shardID.toString()));
473
474             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
475         }};
476     }
477
478     @Test
479     public void testApplySnapshot() throws Exception {
480         TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(),
481                 "testApplySnapshot");
482
483         DataTree store = InMemoryDataTreeFactory.getInstance().create();
484         store.setSchemaContext(SCHEMA_CONTEXT);
485
486         writeToStore(store, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
487
488         YangInstanceIdentifier root = YangInstanceIdentifier.builder().build();
489         NormalizedNode<?,?> expected = readStore(store, root);
490
491         ApplySnapshot applySnapshot = new ApplySnapshot(Snapshot.create(
492                 SerializationUtils.serializeNormalizedNode(expected),
493                 Collections.<ReplicatedLogEntry>emptyList(), 1, 2, 3, 4));
494
495         shard.underlyingActor().onReceiveCommand(applySnapshot);
496
497         NormalizedNode<?,?> actual = readStore(shard, root);
498
499         assertEquals("Root node", expected, actual);
500
501         shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
502     }
503
504     @Test
505     public void testApplyState() throws Exception {
506
507         TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(), "testApplyState");
508
509         NormalizedNode<?, ?> node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
510
511         ApplyState applyState = new ApplyState(null, "test", new ReplicatedLogImplEntry(1, 2,
512                 newModificationPayload(new WriteModification(TestModel.TEST_PATH, node))));
513
514         shard.underlyingActor().onReceiveCommand(applyState);
515
516         NormalizedNode<?,?> actual = readStore(shard, TestModel.TEST_PATH);
517         assertEquals("Applied state", node, actual);
518
519         shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
520     }
521
522     @Test
523     public void testApplyStateWithCandidatePayload() throws Exception {
524
525         TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardPropsWithRecoveryComplete(), "testApplyState");
526
527         recoveryComplete.await(5,  TimeUnit.SECONDS);
528
529         NormalizedNode<?, ?> node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
530         DataTreeCandidate candidate = DataTreeCandidates.fromNormalizedNode(TestModel.TEST_PATH, node);
531
532         ApplyState applyState = new ApplyState(null, "test", new ReplicatedLogImplEntry(1, 2,
533                 DataTreeCandidatePayload.create(candidate)));
534
535         shard.underlyingActor().onReceiveCommand(applyState);
536
537         NormalizedNode<?,?> actual = readStore(shard, TestModel.TEST_PATH);
538         assertEquals("Applied state", node, actual);
539
540         shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
541     }
542
543     DataTree setupInMemorySnapshotStore() throws DataValidationFailedException {
544         DataTree testStore = InMemoryDataTreeFactory.getInstance().create();
545         testStore.setSchemaContext(SCHEMA_CONTEXT);
546
547         writeToStore(testStore, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
548
549         NormalizedNode<?, ?> root = readStore(testStore, YangInstanceIdentifier.builder().build());
550
551         InMemorySnapshotStore.addSnapshot(shardID.toString(), Snapshot.create(
552                 SerializationUtils.serializeNormalizedNode(root),
553                 Collections.<ReplicatedLogEntry>emptyList(), 0, 1, -1, -1));
554         return testStore;
555     }
556
557     private static DataTreeCandidatePayload payloadForModification(DataTree source, DataTreeModification mod) throws DataValidationFailedException {
558         source.validate(mod);
559         final DataTreeCandidate candidate = source.prepare(mod);
560         source.commit(candidate);
561         return DataTreeCandidatePayload.create(candidate);
562     }
563
564     @Test
565     public void testDataTreeCandidateRecovery() throws Exception {
566         // Set up the InMemorySnapshotStore.
567         final DataTree source = setupInMemorySnapshotStore();
568
569         final DataTreeModification writeMod = source.takeSnapshot().newModification();
570         writeMod.write(TestModel.OUTER_LIST_PATH, ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
571
572         InMemoryJournal.addEntry(shardID.toString(), 0, DUMMY_DATA);
573
574         // Set up the InMemoryJournal.
575         InMemoryJournal.addEntry(shardID.toString(), 1, new ReplicatedLogImplEntry(0, 1, payloadForModification(source, writeMod)));
576
577         int nListEntries = 16;
578         Set<Integer> listEntryKeys = new HashSet<>();
579
580         // Add some ModificationPayload entries
581         for (int i = 1; i <= nListEntries; i++) {
582             listEntryKeys.add(Integer.valueOf(i));
583
584             YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
585                     .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
586
587             final DataTreeModification mod = source.takeSnapshot().newModification();
588             mod.merge(path, ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i));
589
590             InMemoryJournal.addEntry(shardID.toString(), i+1, new ReplicatedLogImplEntry(i, 1,
591                 payloadForModification(source, mod)));
592         }
593
594         InMemoryJournal.addEntry(shardID.toString(), nListEntries + 2,
595                 new ApplyJournalEntries(nListEntries));
596
597         testRecovery(listEntryKeys);
598     }
599
600     @Test
601     public void testModicationRecovery() throws Exception {
602
603         // Set up the InMemorySnapshotStore.
604         setupInMemorySnapshotStore();
605
606         // Set up the InMemoryJournal.
607
608         InMemoryJournal.addEntry(shardID.toString(), 0, DUMMY_DATA);
609
610         InMemoryJournal.addEntry(shardID.toString(), 1, new ReplicatedLogImplEntry(0, 1, newModificationPayload(
611                   new WriteModification(TestModel.OUTER_LIST_PATH,
612                           ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build()))));
613
614         int nListEntries = 16;
615         Set<Integer> listEntryKeys = new HashSet<>();
616
617         // Add some ModificationPayload entries
618         for(int i = 1; i <= nListEntries; i++) {
619             listEntryKeys.add(Integer.valueOf(i));
620             YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
621                     .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
622             Modification mod = new MergeModification(path,
623                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i));
624             InMemoryJournal.addEntry(shardID.toString(), i + 1, new ReplicatedLogImplEntry(i, 1,
625                     newModificationPayload(mod)));
626         }
627
628         InMemoryJournal.addEntry(shardID.toString(), nListEntries + 2,
629                 new ApplyJournalEntries(nListEntries));
630
631         testRecovery(listEntryKeys);
632     }
633
634     private static ModificationPayload newModificationPayload(final Modification... mods) throws IOException {
635         MutableCompositeModification compMod = new MutableCompositeModification();
636         for(Modification mod: mods) {
637             compMod.addModification(mod);
638         }
639
640         return new ModificationPayload(compMod);
641     }
642
643     @Test
644     public void testConcurrentThreePhaseCommits() throws Throwable {
645         new ShardTestKit(getSystem()) {{
646             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
647                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
648                     "testConcurrentThreePhaseCommits");
649
650             waitUntilLeader(shard);
651
652          // Setup 3 simulated transactions with mock cohorts backed by real cohorts.
653
654             ShardDataTree dataStore = shard.underlyingActor().getDataStore();
655
656             String transactionID1 = "tx1";
657             MutableCompositeModification modification1 = new MutableCompositeModification();
658             ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
659                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
660
661             String transactionID2 = "tx2";
662             MutableCompositeModification modification2 = new MutableCompositeModification();
663             ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
664                     TestModel.OUTER_LIST_PATH,
665                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
666                     modification2);
667
668             String transactionID3 = "tx3";
669             MutableCompositeModification modification3 = new MutableCompositeModification();
670             ShardDataTreeCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
671                     YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
672                         .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
673                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1),
674                     modification3);
675
676             long timeoutSec = 5;
677             final FiniteDuration duration = FiniteDuration.create(timeoutSec, TimeUnit.SECONDS);
678             final Timeout timeout = new Timeout(duration);
679
680             // Simulate the ForwardedReadyTransaction message for the first Tx that would be sent
681             // by the ShardTransaction.
682
683             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
684                     cohort1, modification1, true, false), getRef());
685             ReadyTransactionReply readyReply = ReadyTransactionReply.fromSerializable(
686                     expectMsgClass(duration, ReadyTransactionReply.class));
687             assertEquals("Cohort path", shard.path().toString(), readyReply.getCohortPath());
688
689             // Send the CanCommitTransaction message for the first Tx.
690
691             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
692             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
693                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
694             assertEquals("Can commit", true, canCommitReply.getCanCommit());
695
696             // Send the ForwardedReadyTransaction for the next 2 Tx's.
697
698             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
699                     cohort2, modification2, true, false), getRef());
700             expectMsgClass(duration, ReadyTransactionReply.class);
701
702             shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
703                     cohort3, modification3, true, false), getRef());
704             expectMsgClass(duration, ReadyTransactionReply.class);
705
706             // Send the CanCommitTransaction message for the next 2 Tx's. These should get queued and
707             // processed after the first Tx completes.
708
709             Future<Object> canCommitFuture1 = Patterns.ask(shard,
710                     new CanCommitTransaction(transactionID2).toSerializable(), timeout);
711
712             Future<Object> canCommitFuture2 = Patterns.ask(shard,
713                     new CanCommitTransaction(transactionID3).toSerializable(), timeout);
714
715             // Send the CommitTransaction message for the first Tx. After it completes, it should
716             // trigger the 2nd Tx to proceed which should in turn then trigger the 3rd.
717
718             shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
719             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
720
721             // Wait for the next 2 Tx's to complete.
722
723             final AtomicReference<Throwable> caughtEx = new AtomicReference<>();
724             final CountDownLatch commitLatch = new CountDownLatch(2);
725
726             class OnFutureComplete extends OnComplete<Object> {
727                 private final Class<?> expRespType;
728
729                 OnFutureComplete(final Class<?> expRespType) {
730                     this.expRespType = expRespType;
731                 }
732
733                 @Override
734                 public void onComplete(final Throwable error, final Object resp) {
735                     if(error != null) {
736                         caughtEx.set(new AssertionError(getClass().getSimpleName() + " failure", error));
737                     } else {
738                         try {
739                             assertEquals("Commit response type", expRespType, resp.getClass());
740                             onSuccess(resp);
741                         } catch (Exception e) {
742                             caughtEx.set(e);
743                         }
744                     }
745                 }
746
747                 void onSuccess(final Object resp) throws Exception {
748                 }
749             }
750
751             class OnCommitFutureComplete extends OnFutureComplete {
752                 OnCommitFutureComplete() {
753                     super(CommitTransactionReply.SERIALIZABLE_CLASS);
754                 }
755
756                 @Override
757                 public void onComplete(final Throwable error, final Object resp) {
758                     super.onComplete(error, resp);
759                     commitLatch.countDown();
760                 }
761             }
762
763             class OnCanCommitFutureComplete extends OnFutureComplete {
764                 private final String transactionID;
765
766                 OnCanCommitFutureComplete(final String transactionID) {
767                     super(CanCommitTransactionReply.SERIALIZABLE_CLASS);
768                     this.transactionID = transactionID;
769                 }
770
771                 @Override
772                 void onSuccess(final Object resp) throws Exception {
773                     CanCommitTransactionReply canCommitReply =
774                             CanCommitTransactionReply.fromSerializable(resp);
775                     assertEquals("Can commit", true, canCommitReply.getCanCommit());
776
777                     Future<Object> commitFuture = Patterns.ask(shard,
778                             new CommitTransaction(transactionID).toSerializable(), timeout);
779                     commitFuture.onComplete(new OnCommitFutureComplete(), getSystem().dispatcher());
780                 }
781             }
782
783             canCommitFuture1.onComplete(new OnCanCommitFutureComplete(transactionID2),
784                     getSystem().dispatcher());
785
786             canCommitFuture2.onComplete(new OnCanCommitFutureComplete(transactionID3),
787                     getSystem().dispatcher());
788
789             boolean done = commitLatch.await(timeoutSec, TimeUnit.SECONDS);
790
791             if(caughtEx.get() != null) {
792                 throw caughtEx.get();
793             }
794
795             assertEquals("Commits complete", true, done);
796
797             InOrder inOrder = inOrder(cohort1, cohort2, cohort3);
798             inOrder.verify(cohort1).canCommit();
799             inOrder.verify(cohort1).preCommit();
800             inOrder.verify(cohort1).commit();
801             inOrder.verify(cohort2).canCommit();
802             inOrder.verify(cohort2).preCommit();
803             inOrder.verify(cohort2).commit();
804             inOrder.verify(cohort3).canCommit();
805             inOrder.verify(cohort3).preCommit();
806             inOrder.verify(cohort3).commit();
807
808             // Verify data in the data store.
809
810             verifyOuterListEntry(shard, 1);
811
812             verifyLastApplied(shard, 2);
813
814             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
815         }};
816     }
817
818     private static BatchedModifications newBatchedModifications(String transactionID, YangInstanceIdentifier path,
819             NormalizedNode<?, ?> data, boolean ready, boolean doCommitOnReady) {
820         return newBatchedModifications(transactionID, null, path, data, ready, doCommitOnReady);
821     }
822
823     private static BatchedModifications newBatchedModifications(String transactionID, String transactionChainID,
824             YangInstanceIdentifier path, NormalizedNode<?, ?> data, boolean ready, boolean doCommitOnReady) {
825         BatchedModifications batched = new BatchedModifications(transactionID, CURRENT_VERSION, transactionChainID);
826         batched.addModification(new WriteModification(path, data));
827         batched.setReady(ready);
828         batched.setDoCommitOnReady(doCommitOnReady);
829         return batched;
830     }
831
832     @Test
833     public void testBatchedModificationsWithNoCommitOnReady() throws Throwable {
834         new ShardTestKit(getSystem()) {{
835             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
836                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
837                     "testBatchedModificationsWithNoCommitOnReady");
838
839             waitUntilLeader(shard);
840
841             final String transactionID = "tx";
842             FiniteDuration duration = duration("5 seconds");
843
844             final AtomicReference<ShardDataTreeCohort> mockCohort = new AtomicReference<>();
845             ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() {
846                 @Override
847                 public ShardDataTreeCohort decorate(String txID, ShardDataTreeCohort actual) {
848                     if(mockCohort.get() == null) {
849                         mockCohort.set(createDelegatingMockCohort("cohort", actual));
850                     }
851
852                     return mockCohort.get();
853                 }
854             };
855
856             shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator);
857
858             // Send a BatchedModifications to start a transaction.
859
860             shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH,
861                     ImmutableNodes.containerNode(TestModel.TEST_QNAME), false, false), getRef());
862             expectMsgClass(duration, BatchedModificationsReply.class);
863
864             // Send a couple more BatchedModifications.
865
866             shard.tell(newBatchedModifications(transactionID, TestModel.OUTER_LIST_PATH,
867                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), false, false), getRef());
868             expectMsgClass(duration, BatchedModificationsReply.class);
869
870             shard.tell(newBatchedModifications(transactionID, YangInstanceIdentifier.builder(
871                     TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
872                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true, false), getRef());
873             expectMsgClass(duration, ReadyTransactionReply.class);
874
875             // Send the CanCommitTransaction message.
876
877             shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
878             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
879                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
880             assertEquals("Can commit", true, canCommitReply.getCanCommit());
881
882             // Send the CanCommitTransaction message.
883
884             shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
885             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
886
887             InOrder inOrder = inOrder(mockCohort.get());
888             inOrder.verify(mockCohort.get()).canCommit();
889             inOrder.verify(mockCohort.get()).preCommit();
890             inOrder.verify(mockCohort.get()).commit();
891
892             // Verify data in the data store.
893
894             verifyOuterListEntry(shard, 1);
895
896             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
897         }};
898     }
899
900     @Test
901     public void testBatchedModificationsWithCommitOnReady() throws Throwable {
902         new ShardTestKit(getSystem()) {{
903             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
904                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
905                     "testBatchedModificationsWithCommitOnReady");
906
907             waitUntilLeader(shard);
908
909             final String transactionID = "tx";
910             FiniteDuration duration = duration("5 seconds");
911
912             final AtomicReference<ShardDataTreeCohort> mockCohort = new AtomicReference<>();
913             ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() {
914                 @Override
915                 public ShardDataTreeCohort decorate(String txID, ShardDataTreeCohort actual) {
916                     if(mockCohort.get() == null) {
917                         mockCohort.set(createDelegatingMockCohort("cohort", actual));
918                     }
919
920                     return mockCohort.get();
921                 }
922             };
923
924             shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator);
925
926             // Send a BatchedModifications to start a transaction.
927
928             shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH,
929                     ImmutableNodes.containerNode(TestModel.TEST_QNAME), false, false), getRef());
930             expectMsgClass(duration, BatchedModificationsReply.class);
931
932             // Send a couple more BatchedModifications.
933
934             shard.tell(newBatchedModifications(transactionID, TestModel.OUTER_LIST_PATH,
935                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), false, false), getRef());
936             expectMsgClass(duration, BatchedModificationsReply.class);
937
938             shard.tell(newBatchedModifications(transactionID, YangInstanceIdentifier.builder(
939                     TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
940                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true, true), getRef());
941
942             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
943
944             InOrder inOrder = inOrder(mockCohort.get());
945             inOrder.verify(mockCohort.get()).canCommit();
946             inOrder.verify(mockCohort.get()).preCommit();
947             inOrder.verify(mockCohort.get()).commit();
948
949             // Verify data in the data store.
950
951             verifyOuterListEntry(shard, 1);
952
953             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
954         }};
955     }
956
957     @SuppressWarnings("unchecked")
958     private static void verifyOuterListEntry(final TestActorRef<Shard> shard, Object expIDValue) throws Exception {
959         NormalizedNode<?, ?> outerList = readStore(shard, TestModel.OUTER_LIST_PATH);
960         assertNotNull(TestModel.OUTER_LIST_QNAME.getLocalName() + " not found", outerList);
961         assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " value is not Iterable",
962                 outerList.getValue() instanceof Iterable);
963         Object entry = ((Iterable<Object>)outerList.getValue()).iterator().next();
964         assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " entry is not MapEntryNode",
965                 entry instanceof MapEntryNode);
966         MapEntryNode mapEntry = (MapEntryNode)entry;
967         Optional<DataContainerChild<? extends PathArgument, ?>> idLeaf =
968                 mapEntry.getChild(new YangInstanceIdentifier.NodeIdentifier(TestModel.ID_QNAME));
969         assertTrue("Missing leaf " + TestModel.ID_QNAME.getLocalName(), idLeaf.isPresent());
970         assertEquals(TestModel.ID_QNAME.getLocalName() + " value", expIDValue, idLeaf.get().getValue());
971     }
972
973     @Test
974     public void testBatchedModificationsOnTransactionChain() throws Throwable {
975         new ShardTestKit(getSystem()) {{
976             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
977                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
978                     "testBatchedModificationsOnTransactionChain");
979
980             waitUntilLeader(shard);
981
982             String transactionChainID = "txChain";
983             String transactionID1 = "tx1";
984             String transactionID2 = "tx2";
985
986             FiniteDuration duration = duration("5 seconds");
987
988             // Send a BatchedModifications to start a chained write transaction and ready it.
989
990             ContainerNode containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
991             YangInstanceIdentifier path = TestModel.TEST_PATH;
992             shard.tell(newBatchedModifications(transactionID1, transactionChainID, path,
993                     containerNode, true, false), getRef());
994             expectMsgClass(duration, ReadyTransactionReply.class);
995
996             // Create a read Tx on the same chain.
997
998             shard.tell(new CreateTransaction(transactionID2, TransactionType.READ_ONLY.ordinal() ,
999                     transactionChainID).toSerializable(), getRef());
1000
1001             CreateTransactionReply createReply = expectMsgClass(duration("3 seconds"), CreateTransactionReply.class);
1002
1003             getSystem().actorSelection(createReply.getTransactionActorPath()).tell(new ReadData(path), getRef());
1004             ReadDataReply readReply = expectMsgClass(duration("3 seconds"), ReadDataReply.class);
1005             assertEquals("Read node", containerNode, readReply.getNormalizedNode());
1006
1007             // Commit the write transaction.
1008
1009             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1010             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1011                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1012             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1013
1014             shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
1015             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1016
1017             // Verify data in the data store.
1018
1019             NormalizedNode<?, ?> actualNode = readStore(shard, path);
1020             assertEquals("Stored node", containerNode, actualNode);
1021
1022             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1023         }};
1024     }
1025
1026     @Test
1027     public void testOnBatchedModificationsWhenNotLeader() {
1028         final AtomicBoolean overrideLeaderCalls = new AtomicBoolean();
1029         new ShardTestKit(getSystem()) {{
1030             Creator<Shard> creator = new Creator<Shard>() {
1031                 private static final long serialVersionUID = 1L;
1032
1033                 @Override
1034                 public Shard create() throws Exception {
1035                     return new Shard(shardID, Collections.<String,String>emptyMap(),
1036                             newDatastoreContext(), SCHEMA_CONTEXT) {
1037                         @Override
1038                         protected boolean isLeader() {
1039                             return overrideLeaderCalls.get() ? false : super.isLeader();
1040                         }
1041
1042                         @Override
1043                         protected ActorSelection getLeader() {
1044                             return overrideLeaderCalls.get() ? getSystem().actorSelection(getRef().path()) :
1045                                 super.getLeader();
1046                         }
1047                     };
1048                 }
1049             };
1050
1051             TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1052                     Props.create(new DelegatingShardCreator(creator)), "testOnBatchedModificationsWhenNotLeader");
1053
1054             waitUntilLeader(shard);
1055
1056             overrideLeaderCalls.set(true);
1057
1058             BatchedModifications batched = new BatchedModifications("tx", DataStoreVersions.CURRENT_VERSION, "");
1059
1060             shard.tell(batched, ActorRef.noSender());
1061
1062             expectMsgEquals(batched);
1063
1064             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1065         }};
1066     }
1067
1068     @Test
1069     public void testForwardedReadyTransactionWithImmediateCommit() throws Exception{
1070         new ShardTestKit(getSystem()) {{
1071             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1072                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1073                     "testForwardedReadyTransactionWithImmediateCommit");
1074
1075             waitUntilLeader(shard);
1076
1077             ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1078
1079             String transactionID = "tx1";
1080             MutableCompositeModification modification = new MutableCompositeModification();
1081             NormalizedNode<?, ?> containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1082             ShardDataTreeCohort cohort = setupMockWriteTransaction("cohort", dataStore,
1083                     TestModel.TEST_PATH, containerNode, modification);
1084
1085             FiniteDuration duration = duration("5 seconds");
1086
1087             // Simulate the ForwardedReadyTransaction messages that would be sent
1088             // by the ShardTransaction.
1089
1090             shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
1091                     cohort, modification, true, true), getRef());
1092
1093             expectMsgClass(duration, ThreePhaseCommitCohortMessages.CommitTransactionReply.class);
1094
1095             InOrder inOrder = inOrder(cohort);
1096             inOrder.verify(cohort).canCommit();
1097             inOrder.verify(cohort).preCommit();
1098             inOrder.verify(cohort).commit();
1099
1100             NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.TEST_PATH);
1101             assertEquals(TestModel.TEST_QNAME.getLocalName(), containerNode, actualNode);
1102
1103             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1104         }};
1105     }
1106
1107     @Test
1108     public void testReadyLocalTransactionWithImmediateCommit() throws Exception{
1109         new ShardTestKit(getSystem()) {{
1110             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1111                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1112                     "testReadyLocalTransactionWithImmediateCommit");
1113
1114             waitUntilLeader(shard);
1115
1116             ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1117
1118             DataTreeModification modification = dataStore.getDataTree().takeSnapshot().newModification();
1119
1120             ContainerNode writeData = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1121             new WriteModification(TestModel.TEST_PATH, writeData).apply(modification);
1122             MapNode mergeData = ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build();
1123             new MergeModification(TestModel.OUTER_LIST_PATH, mergeData).apply(modification);
1124
1125             String txId = "tx1";
1126             ReadyLocalTransaction readyMessage = new ReadyLocalTransaction(txId, modification, true);
1127
1128             shard.tell(readyMessage, getRef());
1129
1130             expectMsgClass(CommitTransactionReply.SERIALIZABLE_CLASS);
1131
1132             NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.OUTER_LIST_PATH);
1133             assertEquals(TestModel.OUTER_LIST_QNAME.getLocalName(), mergeData, actualNode);
1134
1135             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1136         }};
1137     }
1138
1139     @Test
1140     public void testReadyLocalTransactionWithThreePhaseCommit() throws Exception{
1141         new ShardTestKit(getSystem()) {{
1142             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1143                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1144                     "testReadyLocalTransactionWithThreePhaseCommit");
1145
1146             waitUntilLeader(shard);
1147
1148             ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1149
1150             DataTreeModification modification = dataStore.getDataTree().takeSnapshot().newModification();
1151
1152             ContainerNode writeData = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1153             new WriteModification(TestModel.TEST_PATH, writeData).apply(modification);
1154             MapNode mergeData = ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build();
1155             new MergeModification(TestModel.OUTER_LIST_PATH, mergeData).apply(modification);
1156
1157             String txId = "tx1";
1158             ReadyLocalTransaction readyMessage = new ReadyLocalTransaction(txId, modification, false);
1159
1160             shard.tell(readyMessage, getRef());
1161
1162             expectMsgClass(ReadyTransactionReply.class);
1163
1164             // Send the CanCommitTransaction message.
1165
1166             shard.tell(new CanCommitTransaction(txId).toSerializable(), getRef());
1167             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1168                     expectMsgClass(CanCommitTransactionReply.SERIALIZABLE_CLASS));
1169             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1170
1171             // Send the CanCommitTransaction message.
1172
1173             shard.tell(new CommitTransaction(txId).toSerializable(), getRef());
1174             expectMsgClass(CommitTransactionReply.SERIALIZABLE_CLASS);
1175
1176             NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.OUTER_LIST_PATH);
1177             assertEquals(TestModel.OUTER_LIST_QNAME.getLocalName(), mergeData, actualNode);
1178
1179             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1180         }};
1181     }
1182
1183     @Test
1184     public void testCommitWithPersistenceDisabled() throws Throwable {
1185         dataStoreContextBuilder.persistent(false);
1186         new ShardTestKit(getSystem()) {{
1187             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1188                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1189                     "testCommitWithPersistenceDisabled");
1190
1191             waitUntilLeader(shard);
1192
1193             ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1194
1195             // Setup a simulated transactions with a mock cohort.
1196
1197             String transactionID = "tx";
1198             MutableCompositeModification modification = new MutableCompositeModification();
1199             NormalizedNode<?, ?> containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1200             ShardDataTreeCohort cohort = setupMockWriteTransaction("cohort", dataStore,
1201                     TestModel.TEST_PATH, containerNode, modification);
1202
1203             FiniteDuration duration = duration("5 seconds");
1204
1205             // Simulate the ForwardedReadyTransaction messages that would be sent
1206             // by the ShardTransaction.
1207
1208             shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
1209                     cohort, modification, true, false), getRef());
1210             expectMsgClass(duration, ReadyTransactionReply.class);
1211
1212             // Send the CanCommitTransaction message.
1213
1214             shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
1215             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1216                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1217             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1218
1219             // Send the CanCommitTransaction message.
1220
1221             shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
1222             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1223
1224             InOrder inOrder = inOrder(cohort);
1225             inOrder.verify(cohort).canCommit();
1226             inOrder.verify(cohort).preCommit();
1227             inOrder.verify(cohort).commit();
1228
1229             NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.TEST_PATH);
1230             assertEquals(TestModel.TEST_QNAME.getLocalName(), containerNode, actualNode);
1231
1232             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1233         }};
1234     }
1235
1236     private static DataTreeCandidateTip mockCandidate(final String name) {
1237         DataTreeCandidateTip mockCandidate = mock(DataTreeCandidateTip.class, name);
1238         DataTreeCandidateNode mockCandidateNode = mock(DataTreeCandidateNode.class, name + "-node");
1239         doReturn(ModificationType.WRITE).when(mockCandidateNode).getModificationType();
1240         doReturn(Optional.of(ImmutableNodes.containerNode(CARS_QNAME))).when(mockCandidateNode).getDataAfter();
1241         doReturn(YangInstanceIdentifier.builder().build()).when(mockCandidate).getRootPath();
1242         doReturn(mockCandidateNode).when(mockCandidate).getRootNode();
1243         return mockCandidate;
1244     }
1245
1246     private static DataTreeCandidateTip mockUnmodifiedCandidate(final String name) {
1247         DataTreeCandidateTip mockCandidate = mock(DataTreeCandidateTip.class, name);
1248         DataTreeCandidateNode mockCandidateNode = mock(DataTreeCandidateNode.class, name + "-node");
1249         doReturn(ModificationType.UNMODIFIED).when(mockCandidateNode).getModificationType();
1250         doReturn(YangInstanceIdentifier.builder().build()).when(mockCandidate).getRootPath();
1251         doReturn(mockCandidateNode).when(mockCandidate).getRootNode();
1252         return mockCandidate;
1253     }
1254
1255     @Test
1256     public void testCommitWhenTransactionHasNoModifications(){
1257         // Note that persistence is enabled which would normally result in the entry getting written to the journal
1258         // but here that need not happen
1259         new ShardTestKit(getSystem()) {
1260             {
1261                 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1262                         newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1263                         "testCommitWhenTransactionHasNoModifications");
1264
1265                 waitUntilLeader(shard);
1266
1267                 String transactionID = "tx1";
1268                 MutableCompositeModification modification = new MutableCompositeModification();
1269                 ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1270                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1271                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).preCommit();
1272                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).commit();
1273                 doReturn(mockUnmodifiedCandidate("cohort1-candidate")).when(cohort).getCandidate();
1274
1275                 FiniteDuration duration = duration("5 seconds");
1276
1277                 // Simulate the ForwardedReadyTransaction messages that would be sent
1278                 // by the ShardTransaction.
1279
1280                 shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
1281                         cohort, modification, true, false), getRef());
1282                 expectMsgClass(duration, ReadyTransactionReply.class);
1283
1284                 // Send the CanCommitTransaction message.
1285
1286                 shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
1287                 CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1288                         expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1289                 assertEquals("Can commit", true, canCommitReply.getCanCommit());
1290
1291                 shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
1292                 expectMsgClass(duration, ThreePhaseCommitCohortMessages.CommitTransactionReply.class);
1293
1294                 InOrder inOrder = inOrder(cohort);
1295                 inOrder.verify(cohort).canCommit();
1296                 inOrder.verify(cohort).preCommit();
1297                 inOrder.verify(cohort).commit();
1298
1299                 shard.tell(Shard.GET_SHARD_MBEAN_MESSAGE, getRef());
1300                 ShardStats shardStats = expectMsgClass(duration, ShardStats.class);
1301
1302                 // Use MBean for verification
1303                 // Committed transaction count should increase as usual
1304                 assertEquals(1,shardStats.getCommittedTransactionsCount());
1305
1306                 // Commit index should not advance because this does not go into the journal
1307                 assertEquals(-1, shardStats.getCommitIndex());
1308
1309                 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1310
1311             }
1312         };
1313     }
1314
1315     @Test
1316     public void testCommitWhenTransactionHasModifications(){
1317         new ShardTestKit(getSystem()) {
1318             {
1319                 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1320                         newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1321                         "testCommitWhenTransactionHasModifications");
1322
1323                 waitUntilLeader(shard);
1324
1325                 String transactionID = "tx1";
1326                 MutableCompositeModification modification = new MutableCompositeModification();
1327                 modification.addModification(new DeleteModification(YangInstanceIdentifier.builder().build()));
1328                 ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1329                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1330                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).preCommit();
1331                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).commit();
1332                 doReturn(mockCandidate("cohort1-candidate")).when(cohort).getCandidate();
1333
1334                 FiniteDuration duration = duration("5 seconds");
1335
1336                 // Simulate the ForwardedReadyTransaction messages that would be sent
1337                 // by the ShardTransaction.
1338
1339                 shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
1340                         cohort, modification, true, false), getRef());
1341                 expectMsgClass(duration, ReadyTransactionReply.class);
1342
1343                 // Send the CanCommitTransaction message.
1344
1345                 shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
1346                 CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1347                         expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1348                 assertEquals("Can commit", true, canCommitReply.getCanCommit());
1349
1350                 shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
1351                 expectMsgClass(duration, ThreePhaseCommitCohortMessages.CommitTransactionReply.class);
1352
1353                 InOrder inOrder = inOrder(cohort);
1354                 inOrder.verify(cohort).canCommit();
1355                 inOrder.verify(cohort).preCommit();
1356                 inOrder.verify(cohort).commit();
1357
1358                 shard.tell(Shard.GET_SHARD_MBEAN_MESSAGE, getRef());
1359                 ShardStats shardStats = expectMsgClass(duration, ShardStats.class);
1360
1361                 // Use MBean for verification
1362                 // Committed transaction count should increase as usual
1363                 assertEquals(1, shardStats.getCommittedTransactionsCount());
1364
1365                 // Commit index should advance as we do not have an empty modification
1366                 assertEquals(0, shardStats.getCommitIndex());
1367
1368                 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1369
1370             }
1371         };
1372     }
1373
1374     @Test
1375     public void testCommitPhaseFailure() throws Throwable {
1376         new ShardTestKit(getSystem()) {{
1377             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1378                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1379                     "testCommitPhaseFailure");
1380
1381             waitUntilLeader(shard);
1382
1383             // Setup 2 simulated transactions with mock cohorts. The first one fails in the
1384             // commit phase.
1385
1386             String transactionID1 = "tx1";
1387             MutableCompositeModification modification1 = new MutableCompositeModification();
1388             ShardDataTreeCohort cohort1 = mock(ShardDataTreeCohort.class, "cohort1");
1389             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
1390             doReturn(Futures.immediateFuture(null)).when(cohort1).preCommit();
1391             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort1).commit();
1392             doReturn(mockCandidate("cohort1-candidate")).when(cohort1).getCandidate();
1393
1394             String transactionID2 = "tx2";
1395             MutableCompositeModification modification2 = new MutableCompositeModification();
1396             ShardDataTreeCohort cohort2 = mock(ShardDataTreeCohort.class, "cohort2");
1397             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
1398
1399             FiniteDuration duration = duration("5 seconds");
1400             final Timeout timeout = new Timeout(duration);
1401
1402             // Simulate the ForwardedReadyTransaction messages that would be sent
1403             // by the ShardTransaction.
1404
1405             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1406                     cohort1, modification1, true, false), getRef());
1407             expectMsgClass(duration, ReadyTransactionReply.class);
1408
1409             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1410                     cohort2, modification2, true, false), getRef());
1411             expectMsgClass(duration, ReadyTransactionReply.class);
1412
1413             // Send the CanCommitTransaction message for the first Tx.
1414
1415             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1416             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1417                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1418             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1419
1420             // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
1421             // processed after the first Tx completes.
1422
1423             Future<Object> canCommitFuture = Patterns.ask(shard,
1424                     new CanCommitTransaction(transactionID2).toSerializable(), timeout);
1425
1426             // Send the CommitTransaction message for the first Tx. This should send back an error
1427             // and trigger the 2nd Tx to proceed.
1428
1429             shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
1430             expectMsgClass(duration, akka.actor.Status.Failure.class);
1431
1432             // Wait for the 2nd Tx to complete the canCommit phase.
1433
1434             final CountDownLatch latch = new CountDownLatch(1);
1435             canCommitFuture.onComplete(new OnComplete<Object>() {
1436                 @Override
1437                 public void onComplete(final Throwable t, final Object resp) {
1438                     latch.countDown();
1439                 }
1440             }, getSystem().dispatcher());
1441
1442             assertEquals("2nd CanCommit complete", true, latch.await(5, TimeUnit.SECONDS));
1443
1444             InOrder inOrder = inOrder(cohort1, cohort2);
1445             inOrder.verify(cohort1).canCommit();
1446             inOrder.verify(cohort1).preCommit();
1447             inOrder.verify(cohort1).commit();
1448             inOrder.verify(cohort2).canCommit();
1449
1450             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1451         }};
1452     }
1453
1454     @Test
1455     public void testPreCommitPhaseFailure() throws Throwable {
1456         new ShardTestKit(getSystem()) {{
1457             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1458                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1459                     "testPreCommitPhaseFailure");
1460
1461             waitUntilLeader(shard);
1462
1463             String transactionID1 = "tx1";
1464             MutableCompositeModification modification1 = new MutableCompositeModification();
1465             ShardDataTreeCohort cohort1 = mock(ShardDataTreeCohort.class, "cohort1");
1466             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
1467             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort1).preCommit();
1468
1469             String transactionID2 = "tx2";
1470             MutableCompositeModification modification2 = new MutableCompositeModification();
1471             ShardDataTreeCohort cohort2 = mock(ShardDataTreeCohort.class, "cohort2");
1472             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
1473
1474             FiniteDuration duration = duration("5 seconds");
1475             final Timeout timeout = new Timeout(duration);
1476
1477             // Simulate the ForwardedReadyTransaction messages that would be sent
1478             // by the ShardTransaction.
1479
1480             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1481                     cohort1, modification1, true, false), getRef());
1482             expectMsgClass(duration, ReadyTransactionReply.class);
1483
1484             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1485                     cohort2, modification2, true, false), getRef());
1486             expectMsgClass(duration, ReadyTransactionReply.class);
1487
1488             // Send the CanCommitTransaction message for the first Tx.
1489
1490             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1491             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1492                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1493             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1494
1495             // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
1496             // processed after the first Tx completes.
1497
1498             Future<Object> canCommitFuture = Patterns.ask(shard,
1499                     new CanCommitTransaction(transactionID2).toSerializable(), timeout);
1500
1501             // Send the CommitTransaction message for the first Tx. This should send back an error
1502             // and trigger the 2nd Tx to proceed.
1503
1504             shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
1505             expectMsgClass(duration, akka.actor.Status.Failure.class);
1506
1507             // Wait for the 2nd Tx to complete the canCommit phase.
1508
1509             final CountDownLatch latch = new CountDownLatch(1);
1510             canCommitFuture.onComplete(new OnComplete<Object>() {
1511                 @Override
1512                 public void onComplete(final Throwable t, final Object resp) {
1513                     latch.countDown();
1514                 }
1515             }, getSystem().dispatcher());
1516
1517             assertEquals("2nd CanCommit complete", true, latch.await(5, TimeUnit.SECONDS));
1518
1519             InOrder inOrder = inOrder(cohort1, cohort2);
1520             inOrder.verify(cohort1).canCommit();
1521             inOrder.verify(cohort1).preCommit();
1522             inOrder.verify(cohort2).canCommit();
1523
1524             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1525         }};
1526     }
1527
1528     @Test
1529     public void testCanCommitPhaseFailure() throws Throwable {
1530         new ShardTestKit(getSystem()) {{
1531             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1532                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1533                     "testCanCommitPhaseFailure");
1534
1535             waitUntilLeader(shard);
1536
1537             final FiniteDuration duration = duration("5 seconds");
1538
1539             String transactionID1 = "tx1";
1540             MutableCompositeModification modification = new MutableCompositeModification();
1541             ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1542             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).canCommit();
1543
1544             // Simulate the ForwardedReadyTransaction messages that would be sent
1545             // by the ShardTransaction.
1546
1547             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1548                     cohort, modification, true, false), getRef());
1549             expectMsgClass(duration, ReadyTransactionReply.class);
1550
1551             // Send the CanCommitTransaction message.
1552
1553             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1554             expectMsgClass(duration, akka.actor.Status.Failure.class);
1555
1556             // Send another can commit to ensure the failed one got cleaned up.
1557
1558             reset(cohort);
1559
1560             String transactionID2 = "tx2";
1561             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1562
1563             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1564                     cohort, modification, true, false), getRef());
1565             expectMsgClass(duration, ReadyTransactionReply.class);
1566
1567             shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
1568             CanCommitTransactionReply reply = CanCommitTransactionReply.fromSerializable(
1569                     expectMsgClass(CanCommitTransactionReply.SERIALIZABLE_CLASS));
1570             assertEquals("getCanCommit", true, reply.getCanCommit());
1571
1572             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1573         }};
1574     }
1575
1576     @Test
1577     public void testCanCommitPhaseFalseResponse() throws Throwable {
1578         new ShardTestKit(getSystem()) {{
1579             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1580                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1581                     "testCanCommitPhaseFalseResponse");
1582
1583             waitUntilLeader(shard);
1584
1585             final FiniteDuration duration = duration("5 seconds");
1586
1587             String transactionID1 = "tx1";
1588             MutableCompositeModification modification = new MutableCompositeModification();
1589             ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1590             doReturn(Futures.immediateFuture(Boolean.FALSE)).when(cohort).canCommit();
1591
1592             // Simulate the ForwardedReadyTransaction messages that would be sent
1593             // by the ShardTransaction.
1594
1595             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1596                     cohort, modification, true, false), getRef());
1597             expectMsgClass(duration, ReadyTransactionReply.class);
1598
1599             // Send the CanCommitTransaction message.
1600
1601             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1602             CanCommitTransactionReply reply = CanCommitTransactionReply.fromSerializable(
1603                     expectMsgClass(CanCommitTransactionReply.SERIALIZABLE_CLASS));
1604             assertEquals("getCanCommit", false, reply.getCanCommit());
1605
1606             // Send another can commit to ensure the failed one got cleaned up.
1607
1608             reset(cohort);
1609
1610             String transactionID2 = "tx2";
1611             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1612
1613             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1614                     cohort, modification, true, false), getRef());
1615             expectMsgClass(duration, ReadyTransactionReply.class);
1616
1617             shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
1618             reply = CanCommitTransactionReply.fromSerializable(
1619                     expectMsgClass(CanCommitTransactionReply.SERIALIZABLE_CLASS));
1620             assertEquals("getCanCommit", true, reply.getCanCommit());
1621
1622             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1623         }};
1624     }
1625
1626     @Test
1627     public void testImmediateCommitWithCanCommitPhaseFailure() throws Throwable {
1628         new ShardTestKit(getSystem()) {{
1629             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1630                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1631                     "testImmediateCommitWithCanCommitPhaseFailure");
1632
1633             waitUntilLeader(shard);
1634
1635             final FiniteDuration duration = duration("5 seconds");
1636
1637             String transactionID1 = "tx1";
1638             MutableCompositeModification modification = new MutableCompositeModification();
1639             ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1640             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).canCommit();
1641
1642             // Simulate the ForwardedReadyTransaction messages that would be sent
1643             // by the ShardTransaction.
1644
1645             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1646                     cohort, modification, true, true), getRef());
1647
1648             expectMsgClass(duration, akka.actor.Status.Failure.class);
1649
1650             // Send another can commit to ensure the failed one got cleaned up.
1651
1652             reset(cohort);
1653
1654             String transactionID2 = "tx2";
1655             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1656             doReturn(Futures.immediateFuture(null)).when(cohort).preCommit();
1657             doReturn(Futures.immediateFuture(null)).when(cohort).commit();
1658             DataTreeCandidateTip candidate = mock(DataTreeCandidateTip.class);
1659             DataTreeCandidateNode candidateRoot = mock(DataTreeCandidateNode.class);
1660             doReturn(ModificationType.UNMODIFIED).when(candidateRoot).getModificationType();
1661             doReturn(candidateRoot).when(candidate).getRootNode();
1662             doReturn(candidate).when(cohort).getCandidate();
1663
1664             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1665                     cohort, modification, true, true), getRef());
1666
1667             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1668
1669             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1670         }};
1671     }
1672
1673     @Test
1674     public void testImmediateCommitWithCanCommitPhaseFalseResponse() throws Throwable {
1675         new ShardTestKit(getSystem()) {{
1676             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1677                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1678                     "testImmediateCommitWithCanCommitPhaseFalseResponse");
1679
1680             waitUntilLeader(shard);
1681
1682             final FiniteDuration duration = duration("5 seconds");
1683
1684             String transactionID = "tx1";
1685             MutableCompositeModification modification = new MutableCompositeModification();
1686             ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1687             doReturn(Futures.immediateFuture(Boolean.FALSE)).when(cohort).canCommit();
1688
1689             // Simulate the ForwardedReadyTransaction messages that would be sent
1690             // by the ShardTransaction.
1691
1692             shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
1693                     cohort, modification, true, true), getRef());
1694
1695             expectMsgClass(duration, akka.actor.Status.Failure.class);
1696
1697             // Send another can commit to ensure the failed one got cleaned up.
1698
1699             reset(cohort);
1700
1701             String transactionID2 = "tx2";
1702             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1703             doReturn(Futures.immediateFuture(null)).when(cohort).preCommit();
1704             doReturn(Futures.immediateFuture(null)).when(cohort).commit();
1705             DataTreeCandidateTip candidate = mock(DataTreeCandidateTip.class);
1706             DataTreeCandidateNode candidateRoot = mock(DataTreeCandidateNode.class);
1707             doReturn(ModificationType.UNMODIFIED).when(candidateRoot).getModificationType();
1708             doReturn(candidateRoot).when(candidate).getRootNode();
1709             doReturn(candidate).when(cohort).getCandidate();
1710
1711             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1712                     cohort, modification, true, true), getRef());
1713
1714             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1715
1716             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1717         }};
1718     }
1719
1720     @Test
1721     public void testAbortBeforeFinishCommit() throws Throwable {
1722         new ShardTestKit(getSystem()) {{
1723             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1724                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1725                     "testAbortBeforeFinishCommit");
1726
1727             waitUntilLeader(shard);
1728
1729             final FiniteDuration duration = duration("5 seconds");
1730             ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1731
1732             final String transactionID = "tx1";
1733             Function<ShardDataTreeCohort, ListenableFuture<Void>> preCommit =
1734                           new Function<ShardDataTreeCohort, ListenableFuture<Void>>() {
1735                 @Override
1736                 public ListenableFuture<Void> apply(final ShardDataTreeCohort cohort) {
1737                     ListenableFuture<Void> preCommitFuture = cohort.preCommit();
1738
1739                     // Simulate an AbortTransaction message occurring during replication, after
1740                     // persisting and before finishing the commit to the in-memory store.
1741                     // We have no followers so due to optimizations in the RaftActor, it does not
1742                     // attempt replication and thus we can't send an AbortTransaction message b/c
1743                     // it would be processed too late after CommitTransaction completes. So we'll
1744                     // simulate an AbortTransaction message occurring during replication by calling
1745                     // the shard directly.
1746                     //
1747                     shard.underlyingActor().doAbortTransaction(transactionID, null);
1748
1749                     return preCommitFuture;
1750                 }
1751             };
1752
1753             MutableCompositeModification modification = new MutableCompositeModification();
1754             ShardDataTreeCohort cohort = setupMockWriteTransaction("cohort1", dataStore,
1755                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME),
1756                     modification, preCommit);
1757
1758             shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
1759                     cohort, modification, true, false), getRef());
1760             expectMsgClass(duration, ReadyTransactionReply.class);
1761
1762             shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
1763             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1764                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1765             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1766
1767             shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
1768             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1769
1770             NormalizedNode<?, ?> node = readStore(shard, TestModel.TEST_PATH);
1771
1772             // Since we're simulating an abort occurring during replication and before finish commit,
1773             // the data should still get written to the in-memory store since we've gotten past
1774             // canCommit and preCommit and persisted the data.
1775             assertNotNull(TestModel.TEST_QNAME.getLocalName() + " not found", node);
1776
1777             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1778         }};
1779     }
1780
1781     @Test
1782     public void testTransactionCommitTimeout() throws Throwable {
1783         dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1);
1784
1785         new ShardTestKit(getSystem()) {{
1786             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1787                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1788                     "testTransactionCommitTimeout");
1789
1790             waitUntilLeader(shard);
1791
1792             final FiniteDuration duration = duration("5 seconds");
1793
1794             ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1795
1796             writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1797             writeToStore(shard, TestModel.OUTER_LIST_PATH,
1798                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
1799
1800             // Create 1st Tx - will timeout
1801
1802             String transactionID1 = "tx1";
1803             MutableCompositeModification modification1 = new MutableCompositeModification();
1804             ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
1805                     YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
1806                         .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
1807                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1),
1808                     modification1);
1809
1810             // Create 2nd Tx
1811
1812             String transactionID2 = "tx3";
1813             MutableCompositeModification modification2 = new MutableCompositeModification();
1814             YangInstanceIdentifier listNodePath = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
1815                 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build();
1816             ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort3", dataStore,
1817                     listNodePath,
1818                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2),
1819                     modification2);
1820
1821             // Ready the Tx's
1822
1823             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1824                     cohort1, modification1, true, false), getRef());
1825             expectMsgClass(duration, ReadyTransactionReply.class);
1826
1827             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1828                     cohort2, modification2, true, false), getRef());
1829             expectMsgClass(duration, ReadyTransactionReply.class);
1830
1831             // canCommit 1st Tx. We don't send the commit so it should timeout.
1832
1833             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1834             expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
1835
1836             // canCommit the 2nd Tx - it should complete after the 1st Tx times out.
1837
1838             shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
1839             expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
1840
1841             // Try to commit the 1st Tx - should fail as it's not the current Tx.
1842
1843             shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
1844             expectMsgClass(duration, akka.actor.Status.Failure.class);
1845
1846             // Commit the 2nd Tx.
1847
1848             shard.tell(new CommitTransaction(transactionID2).toSerializable(), getRef());
1849             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1850
1851             NormalizedNode<?, ?> node = readStore(shard, listNodePath);
1852             assertNotNull(listNodePath + " not found", node);
1853
1854             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1855         }};
1856     }
1857
1858     @Test
1859     public void testTransactionCommitQueueCapacityExceeded() throws Throwable {
1860         dataStoreContextBuilder.shardTransactionCommitQueueCapacity(2);
1861
1862         new ShardTestKit(getSystem()) {{
1863             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1864                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1865                     "testTransactionCommitQueueCapacityExceeded");
1866
1867             waitUntilLeader(shard);
1868
1869             final FiniteDuration duration = duration("5 seconds");
1870
1871             ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1872
1873             String transactionID1 = "tx1";
1874             MutableCompositeModification modification1 = new MutableCompositeModification();
1875             ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
1876                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
1877
1878             String transactionID2 = "tx2";
1879             MutableCompositeModification modification2 = new MutableCompositeModification();
1880             ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
1881                     TestModel.OUTER_LIST_PATH,
1882                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
1883                     modification2);
1884
1885             String transactionID3 = "tx3";
1886             MutableCompositeModification modification3 = new MutableCompositeModification();
1887             ShardDataTreeCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
1888                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification3);
1889
1890             // Ready the Tx's
1891
1892             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1893                     cohort1, modification1, true, false), getRef());
1894             expectMsgClass(duration, ReadyTransactionReply.class);
1895
1896             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1897                     cohort2, modification2, true, false), getRef());
1898             expectMsgClass(duration, ReadyTransactionReply.class);
1899
1900             // The 3rd Tx should exceed queue capacity and fail.
1901
1902             shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
1903                     cohort3, modification3, true, false), getRef());
1904             expectMsgClass(duration, akka.actor.Status.Failure.class);
1905
1906             // canCommit 1st Tx.
1907
1908             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1909             expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
1910
1911             // canCommit the 2nd Tx - it should get queued.
1912
1913             shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
1914
1915             // canCommit the 3rd Tx - should exceed queue capacity and fail.
1916
1917             shard.tell(new CanCommitTransaction(transactionID3).toSerializable(), getRef());
1918             expectMsgClass(duration, akka.actor.Status.Failure.class);
1919
1920             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1921         }};
1922     }
1923
1924     @Test
1925     public void testTransactionCommitWithPriorExpiredCohortEntries() throws Throwable {
1926         dataStoreContextBuilder.shardCommitQueueExpiryTimeoutInMillis(1300).shardTransactionCommitTimeoutInSeconds(1);
1927
1928         new ShardTestKit(getSystem()) {{
1929             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1930                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1931                     "testTransactionCommitWithPriorExpiredCohortEntries");
1932
1933             waitUntilLeader(shard);
1934
1935             final FiniteDuration duration = duration("5 seconds");
1936
1937             ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1938
1939             String transactionID1 = "tx1";
1940             MutableCompositeModification modification1 = new MutableCompositeModification();
1941             ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
1942                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
1943
1944             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1945                     cohort1, modification1, true, false), getRef());
1946             expectMsgClass(duration, ReadyTransactionReply.class);
1947
1948             String transactionID2 = "tx2";
1949             MutableCompositeModification modification2 = new MutableCompositeModification();
1950             ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
1951                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification2);
1952
1953             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1954                     cohort2, modification2, true, false), getRef());
1955             expectMsgClass(duration, ReadyTransactionReply.class);
1956
1957             String transactionID3 = "tx3";
1958             MutableCompositeModification modification3 = new MutableCompositeModification();
1959             ShardDataTreeCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
1960                     TestModel.TEST2_PATH, ImmutableNodes.containerNode(TestModel.TEST2_QNAME), modification3);
1961
1962             shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
1963                     cohort3, modification3, true, false), getRef());
1964             expectMsgClass(duration, ReadyTransactionReply.class);
1965
1966             // All Tx's are readied. We'll send canCommit for the last one but not the others. The others
1967             // should expire from the queue and the last one should be processed.
1968
1969             shard.tell(new CanCommitTransaction(transactionID3).toSerializable(), getRef());
1970             expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
1971
1972             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1973         }};
1974     }
1975
1976     @Test
1977     public void testTransactionCommitWithSubsequentExpiredCohortEntry() throws Throwable {
1978         dataStoreContextBuilder.shardCommitQueueExpiryTimeoutInMillis(1300).shardTransactionCommitTimeoutInSeconds(1);
1979
1980         new ShardTestKit(getSystem()) {{
1981             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1982                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1983                     "testTransactionCommitWithSubsequentExpiredCohortEntry");
1984
1985             waitUntilLeader(shard);
1986
1987             final FiniteDuration duration = duration("5 seconds");
1988
1989             ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1990
1991             String transactionID1 = "tx1";
1992             MutableCompositeModification modification1 = new MutableCompositeModification();
1993             ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
1994                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
1995
1996             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1997                     cohort1, modification1, true, false), getRef());
1998             expectMsgClass(duration, ReadyTransactionReply.class);
1999
2000             // CanCommit the first one so it's the current in-progress CohortEntry.
2001
2002             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
2003             expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
2004
2005             // Ready the second Tx.
2006
2007             String transactionID2 = "tx2";
2008             MutableCompositeModification modification2 = new MutableCompositeModification();
2009             ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
2010                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification2);
2011
2012             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
2013                     cohort2, modification2, true, false), getRef());
2014             expectMsgClass(duration, ReadyTransactionReply.class);
2015
2016             // Ready the third Tx.
2017
2018             String transactionID3 = "tx3";
2019             DataTreeModification modification3 = dataStore.getDataTree().takeSnapshot().newModification();
2020             new WriteModification(TestModel.TEST2_PATH, ImmutableNodes.containerNode(TestModel.TEST2_QNAME))
2021                     .apply(modification3);
2022             ReadyLocalTransaction readyMessage = new ReadyLocalTransaction(transactionID3, modification3, true);
2023
2024             shard.tell(readyMessage, getRef());
2025
2026             // Commit the first Tx. After completing, the second should expire from the queue and the third
2027             // Tx committed.
2028
2029             shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
2030             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
2031
2032             // Expect commit reply from the third Tx.
2033
2034             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
2035
2036             NormalizedNode<?, ?> node = readStore(shard, TestModel.TEST2_PATH);
2037             assertNotNull(TestModel.TEST2_PATH + " not found", node);
2038
2039             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2040         }};
2041     }
2042
2043     @Test
2044     public void testCanCommitBeforeReadyFailure() throws Throwable {
2045         new ShardTestKit(getSystem()) {{
2046             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
2047                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
2048                     "testCanCommitBeforeReadyFailure");
2049
2050             shard.tell(new CanCommitTransaction("tx").toSerializable(), getRef());
2051             expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
2052
2053             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2054         }};
2055     }
2056
2057     @Test
2058     public void testAbortTransaction() throws Throwable {
2059         new ShardTestKit(getSystem()) {{
2060             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
2061                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
2062                     "testAbortTransaction");
2063
2064             waitUntilLeader(shard);
2065
2066             // Setup 2 simulated transactions with mock cohorts. The first one will be aborted.
2067
2068             String transactionID1 = "tx1";
2069             MutableCompositeModification modification1 = new MutableCompositeModification();
2070             ShardDataTreeCohort cohort1 = mock(ShardDataTreeCohort.class, "cohort1");
2071             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
2072             doReturn(Futures.immediateFuture(null)).when(cohort1).abort();
2073
2074             String transactionID2 = "tx2";
2075             MutableCompositeModification modification2 = new MutableCompositeModification();
2076             ShardDataTreeCohort cohort2 = mock(ShardDataTreeCohort.class, "cohort2");
2077             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
2078
2079             FiniteDuration duration = duration("5 seconds");
2080             final Timeout timeout = new Timeout(duration);
2081
2082             // Simulate the ForwardedReadyTransaction messages that would be sent
2083             // by the ShardTransaction.
2084
2085             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
2086                     cohort1, modification1, true, false), getRef());
2087             expectMsgClass(duration, ReadyTransactionReply.class);
2088
2089             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
2090                     cohort2, modification2, true, false), getRef());
2091             expectMsgClass(duration, ReadyTransactionReply.class);
2092
2093             // Send the CanCommitTransaction message for the first Tx.
2094
2095             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
2096             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
2097                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
2098             assertEquals("Can commit", true, canCommitReply.getCanCommit());
2099
2100             // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
2101             // processed after the first Tx completes.
2102
2103             Future<Object> canCommitFuture = Patterns.ask(shard,
2104                     new CanCommitTransaction(transactionID2).toSerializable(), timeout);
2105
2106             // Send the AbortTransaction message for the first Tx. This should trigger the 2nd
2107             // Tx to proceed.
2108
2109             shard.tell(new AbortTransaction(transactionID1).toSerializable(), getRef());
2110             expectMsgClass(duration, AbortTransactionReply.SERIALIZABLE_CLASS);
2111
2112             // Wait for the 2nd Tx to complete the canCommit phase.
2113
2114             Await.ready(canCommitFuture, duration);
2115
2116             InOrder inOrder = inOrder(cohort1, cohort2);
2117             inOrder.verify(cohort1).canCommit();
2118             inOrder.verify(cohort2).canCommit();
2119
2120             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2121         }};
2122     }
2123
2124     @Test
2125     public void testCreateSnapshot() throws Exception {
2126         testCreateSnapshot(true, "testCreateSnapshot");
2127     }
2128
2129     @Test
2130     public void testCreateSnapshotWithNonPersistentData() throws Exception {
2131         testCreateSnapshot(false, "testCreateSnapshotWithNonPersistentData");
2132     }
2133
2134     @SuppressWarnings("serial")
2135     public void testCreateSnapshot(final boolean persistent, final String shardActorName) throws Exception{
2136
2137         final AtomicReference<CountDownLatch> latch = new AtomicReference<>(new CountDownLatch(1));
2138
2139         final AtomicReference<Object> savedSnapshot = new AtomicReference<>();
2140         class TestPersistentDataProvider extends DelegatingPersistentDataProvider {
2141             TestPersistentDataProvider(DataPersistenceProvider delegate) {
2142                 super(delegate);
2143             }
2144
2145             @Override
2146             public void saveSnapshot(Object o) {
2147                 savedSnapshot.set(o);
2148                 super.saveSnapshot(o);
2149             }
2150         }
2151
2152         dataStoreContextBuilder.persistent(persistent);
2153
2154         new ShardTestKit(getSystem()) {{
2155             class TestShard extends Shard {
2156
2157                 protected TestShard(ShardIdentifier name, Map<String, String> peerAddresses,
2158                                     DatastoreContext datastoreContext, SchemaContext schemaContext) {
2159                     super(name, peerAddresses, datastoreContext, schemaContext);
2160                     setPersistence(new TestPersistentDataProvider(super.persistence()));
2161                 }
2162
2163                 @Override
2164                 public void handleCommand(Object message) {
2165                     super.handleCommand(message);
2166
2167                     if (message instanceof SaveSnapshotSuccess || message.equals("commit_snapshot")) {
2168                         latch.get().countDown();
2169                     }
2170                 }
2171
2172                 @Override
2173                 public RaftActorContext getRaftActorContext() {
2174                     return super.getRaftActorContext();
2175                 }
2176             }
2177
2178             Creator<Shard> creator = new Creator<Shard>() {
2179                 @Override
2180                 public Shard create() throws Exception {
2181                     return new TestShard(shardID, Collections.<String,String>emptyMap(),
2182                             newDatastoreContext(), SCHEMA_CONTEXT);
2183                 }
2184             };
2185
2186             TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
2187                     Props.create(new DelegatingShardCreator(creator)), shardActorName);
2188
2189             waitUntilLeader(shard);
2190
2191             writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
2192
2193             NormalizedNode<?,?> expectedRoot = readStore(shard, YangInstanceIdentifier.builder().build());
2194
2195             // Trigger creation of a snapshot by ensuring
2196             RaftActorContext raftActorContext = ((TestShard) shard.underlyingActor()).getRaftActorContext();
2197             raftActorContext.getSnapshotManager().capture(mock(ReplicatedLogEntry.class), -1);
2198
2199             assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS));
2200
2201             assertTrue("Invalid saved snapshot " + savedSnapshot.get(),
2202                     savedSnapshot.get() instanceof Snapshot);
2203
2204             verifySnapshot((Snapshot)savedSnapshot.get(), expectedRoot);
2205
2206             latch.set(new CountDownLatch(1));
2207             savedSnapshot.set(null);
2208
2209             raftActorContext.getSnapshotManager().capture(mock(ReplicatedLogEntry.class), -1);
2210
2211             assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS));
2212
2213             assertTrue("Invalid saved snapshot " + savedSnapshot.get(),
2214                     savedSnapshot.get() instanceof Snapshot);
2215
2216             verifySnapshot((Snapshot)savedSnapshot.get(), expectedRoot);
2217
2218             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2219         }
2220
2221         private void verifySnapshot(Snapshot snapshot, NormalizedNode<?,?> expectedRoot) {
2222
2223             NormalizedNode<?, ?> actual = SerializationUtils.deserializeNormalizedNode(snapshot.getState());
2224             assertEquals("Root node", expectedRoot, actual);
2225
2226         }};
2227     }
2228
2229     /**
2230      * This test simply verifies that the applySnapShot logic will work
2231      * @throws ReadFailedException
2232      * @throws DataValidationFailedException
2233      */
2234     @Test
2235     public void testInMemoryDataTreeRestore() throws ReadFailedException, DataValidationFailedException {
2236         DataTree store = InMemoryDataTreeFactory.getInstance().create();
2237         store.setSchemaContext(SCHEMA_CONTEXT);
2238
2239         DataTreeModification putTransaction = store.takeSnapshot().newModification();
2240         putTransaction.write(TestModel.TEST_PATH,
2241             ImmutableNodes.containerNode(TestModel.TEST_QNAME));
2242         commitTransaction(store, putTransaction);
2243
2244
2245         NormalizedNode<?, ?> expected = readStore(store, YangInstanceIdentifier.builder().build());
2246
2247         DataTreeModification writeTransaction = store.takeSnapshot().newModification();
2248
2249         writeTransaction.delete(YangInstanceIdentifier.builder().build());
2250         writeTransaction.write(YangInstanceIdentifier.builder().build(), expected);
2251
2252         commitTransaction(store, writeTransaction);
2253
2254         NormalizedNode<?, ?> actual = readStore(store, YangInstanceIdentifier.builder().build());
2255
2256         assertEquals(expected, actual);
2257     }
2258
2259     @Test
2260     public void testRecoveryApplicable(){
2261
2262         final DatastoreContext persistentContext = DatastoreContext.newBuilder().
2263                 shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(true).build();
2264
2265         final Props persistentProps = Shard.props(shardID, Collections.<String, String>emptyMap(),
2266                 persistentContext, SCHEMA_CONTEXT);
2267
2268         final DatastoreContext nonPersistentContext = DatastoreContext.newBuilder().
2269                 shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(false).build();
2270
2271         final Props nonPersistentProps = Shard.props(shardID, Collections.<String, String>emptyMap(),
2272                 nonPersistentContext, SCHEMA_CONTEXT);
2273
2274         new ShardTestKit(getSystem()) {{
2275             TestActorRef<Shard> shard1 = TestActorRef.create(getSystem(),
2276                     persistentProps, "testPersistence1");
2277
2278             assertTrue("Recovery Applicable", shard1.underlyingActor().persistence().isRecoveryApplicable());
2279
2280             shard1.tell(PoisonPill.getInstance(), ActorRef.noSender());
2281
2282             TestActorRef<Shard> shard2 = TestActorRef.create(getSystem(),
2283                     nonPersistentProps, "testPersistence2");
2284
2285             assertFalse("Recovery Not Applicable", shard2.underlyingActor().persistence().isRecoveryApplicable());
2286
2287             shard2.tell(PoisonPill.getInstance(), ActorRef.noSender());
2288
2289         }};
2290
2291     }
2292
2293     @Test
2294     public void testOnDatastoreContext() {
2295         new ShardTestKit(getSystem()) {{
2296             dataStoreContextBuilder.persistent(true);
2297
2298             TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(), "testOnDatastoreContext");
2299
2300             assertEquals("isRecoveryApplicable", true,
2301                     shard.underlyingActor().persistence().isRecoveryApplicable());
2302
2303             waitUntilLeader(shard);
2304
2305             shard.tell(dataStoreContextBuilder.persistent(false).build(), ActorRef.noSender());
2306
2307             assertEquals("isRecoveryApplicable", false,
2308                     shard.underlyingActor().persistence().isRecoveryApplicable());
2309
2310             shard.tell(dataStoreContextBuilder.persistent(true).build(), ActorRef.noSender());
2311
2312             assertEquals("isRecoveryApplicable", true,
2313                     shard.underlyingActor().persistence().isRecoveryApplicable());
2314
2315             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2316         }};
2317     }
2318
2319     @Test
2320     public void testRegisterRoleChangeListener() throws Exception {
2321         new ShardTestKit(getSystem()) {
2322             {
2323                 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
2324                         newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
2325                         "testRegisterRoleChangeListener");
2326
2327                 waitUntilLeader(shard);
2328
2329                 TestActorRef<MessageCollectorActor> listener =
2330                         TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
2331
2332                 shard.tell(new RegisterRoleChangeListener(), listener);
2333
2334                 MessageCollectorActor.expectFirstMatching(listener, RegisterRoleChangeListenerReply.class);
2335
2336                 ShardLeaderStateChanged leaderStateChanged = MessageCollectorActor.expectFirstMatching(listener,
2337                         ShardLeaderStateChanged.class);
2338                 assertEquals("getLocalShardDataTree present", true,
2339                         leaderStateChanged.getLocalShardDataTree().isPresent());
2340                 assertSame("getLocalShardDataTree", shard.underlyingActor().getDataStore().getDataTree(),
2341                         leaderStateChanged.getLocalShardDataTree().get());
2342
2343                 MessageCollectorActor.clearMessages(listener);
2344
2345                 // Force a leader change
2346
2347                 shard.tell(new RequestVote(10000, "member2", 50, 50), getRef());
2348
2349                 leaderStateChanged = MessageCollectorActor.expectFirstMatching(listener,
2350                         ShardLeaderStateChanged.class);
2351                 assertEquals("getLocalShardDataTree present", false,
2352                         leaderStateChanged.getLocalShardDataTree().isPresent());
2353
2354                 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2355             }
2356         };
2357     }
2358
2359     @Test
2360     public void testFollowerInitialSyncStatus() throws Exception {
2361         final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
2362                 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
2363                 "testFollowerInitialSyncStatus");
2364
2365         shard.underlyingActor().onReceiveCommand(new FollowerInitialSyncUpStatus(false, "member-1-shard-inventory-operational"));
2366
2367         assertEquals(false, shard.underlyingActor().getShardMBean().getFollowerInitialSyncStatus());
2368
2369         shard.underlyingActor().onReceiveCommand(new FollowerInitialSyncUpStatus(true, "member-1-shard-inventory-operational"));
2370
2371         assertEquals(true, shard.underlyingActor().getShardMBean().getFollowerInitialSyncStatus());
2372
2373         shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2374     }
2375
2376     private static void commitTransaction(DataTree store, final DataTreeModification modification) throws DataValidationFailedException {
2377         modification.ready();
2378         store.validate(modification);
2379         store.commit(store.prepare(modification));
2380     }
2381 }