40227e056bfff82351c8e19eafb237e93f4d7580
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / ShardTest.java
1 package org.opendaylight.controller.cluster.datastore;
2
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertFalse;
5 import static org.junit.Assert.assertNotNull;
6 import static org.junit.Assert.assertNull;
7 import static org.junit.Assert.assertSame;
8 import static org.junit.Assert.assertTrue;
9 import static org.mockito.Mockito.doReturn;
10 import static org.mockito.Mockito.inOrder;
11 import static org.mockito.Mockito.mock;
12 import static org.mockito.Mockito.reset;
13 import static org.opendaylight.controller.cluster.datastore.DataStoreVersions.CURRENT_VERSION;
14 import akka.actor.ActorRef;
15 import akka.actor.ActorSelection;
16 import akka.actor.PoisonPill;
17 import akka.actor.Props;
18 import akka.dispatch.Dispatchers;
19 import akka.dispatch.OnComplete;
20 import akka.japi.Creator;
21 import akka.pattern.Patterns;
22 import akka.persistence.SaveSnapshotSuccess;
23 import akka.testkit.TestActorRef;
24 import akka.util.Timeout;
25 import com.google.common.base.Function;
26 import com.google.common.base.Optional;
27 import com.google.common.util.concurrent.Futures;
28 import com.google.common.util.concurrent.ListenableFuture;
29 import com.google.common.util.concurrent.Uninterruptibles;
30 import java.io.IOException;
31 import java.util.Collections;
32 import java.util.HashSet;
33 import java.util.Map;
34 import java.util.Set;
35 import java.util.concurrent.CountDownLatch;
36 import java.util.concurrent.TimeUnit;
37 import java.util.concurrent.atomic.AtomicBoolean;
38 import java.util.concurrent.atomic.AtomicReference;
39 import org.junit.Test;
40 import org.mockito.InOrder;
41 import org.opendaylight.controller.cluster.DataPersistenceProvider;
42 import org.opendaylight.controller.cluster.DelegatingPersistentDataProvider;
43 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
44 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
45 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
46 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
47 import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
48 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
49 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
50 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
51 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
52 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
53 import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
54 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
55 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
56 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
57 import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction;
58 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
59 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
60 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
61 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener;
62 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListenerReply;
63 import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
64 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
65 import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
66 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
67 import org.opendaylight.controller.cluster.datastore.modification.Modification;
68 import org.opendaylight.controller.cluster.datastore.modification.ModificationPayload;
69 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
70 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
71 import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener;
72 import org.opendaylight.controller.cluster.datastore.utils.MockDataTreeChangeListener;
73 import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
74 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
75 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListenerReply;
76 import org.opendaylight.controller.cluster.raft.RaftActorContext;
77 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
78 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
79 import org.opendaylight.controller.cluster.raft.Snapshot;
80 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
81 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
82 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
83 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
84 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
85 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
86 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
87 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
88 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
89 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
90 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
91 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
92 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
93 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
94 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
95 import org.opendaylight.controller.protobuff.messages.cohort3pc.ThreePhaseCommitCohortMessages;
96 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
97 import org.opendaylight.yangtools.yang.common.QName;
98 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
99 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
100 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
101 import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
102 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
103 import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
104 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
105 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
106 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
107 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateNode;
108 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateTip;
109 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidates;
110 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
111 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
112 import org.opendaylight.yangtools.yang.data.api.schema.tree.ModificationType;
113 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
114 import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory;
115 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
116 import scala.concurrent.Await;
117 import scala.concurrent.Future;
118 import scala.concurrent.duration.FiniteDuration;
119
120 public class ShardTest extends AbstractShardTest {
121     private static final QName CARS_QNAME = QName.create("urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:test:cars", "2014-03-13", "cars");
122
123     @Test
124     public void testRegisterChangeListener() throws Exception {
125         new ShardTestKit(getSystem()) {{
126             TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
127                     newShardProps(),  "testRegisterChangeListener");
128
129             waitUntilLeader(shard);
130
131             shard.tell(new UpdateSchemaContext(SchemaContextHelper.full()), ActorRef.noSender());
132
133             MockDataChangeListener listener = new MockDataChangeListener(1);
134             ActorRef dclActor = getSystem().actorOf(DataChangeListener.props(listener),
135                     "testRegisterChangeListener-DataChangeListener");
136
137             shard.tell(new RegisterChangeListener(TestModel.TEST_PATH,
138                     dclActor, AsyncDataBroker.DataChangeScope.BASE), getRef());
139
140             RegisterChangeListenerReply reply = expectMsgClass(duration("3 seconds"),
141                     RegisterChangeListenerReply.class);
142             String replyPath = reply.getListenerRegistrationPath().toString();
143             assertTrue("Incorrect reply path: " + replyPath, replyPath.matches(
144                     "akka:\\/\\/test\\/user\\/testRegisterChangeListener\\/\\$.*"));
145
146             YangInstanceIdentifier path = TestModel.TEST_PATH;
147             writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
148
149             listener.waitForChangeEvents(path);
150
151             dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
152             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
153         }};
154     }
155
156     @SuppressWarnings("serial")
157     @Test
158     public void testChangeListenerNotifiedWhenNotTheLeaderOnRegistration() throws Exception {
159         // This test tests the timing window in which a change listener is registered before the
160         // shard becomes the leader. We verify that the listener is registered and notified of the
161         // existing data when the shard becomes the leader.
162         new ShardTestKit(getSystem()) {{
163             // For this test, we want to send the RegisterChangeListener message after the shard
164             // has recovered from persistence and before it becomes the leader. So we subclass
165             // Shard to override onReceiveCommand and, when the first ElectionTimeout is received,
166             // we know that the shard has been initialized to a follower and has started the
167             // election process. The following 2 CountDownLatches are used to coordinate the
168             // ElectionTimeout with the sending of the RegisterChangeListener message.
169             final CountDownLatch onFirstElectionTimeout = new CountDownLatch(1);
170             final CountDownLatch onChangeListenerRegistered = new CountDownLatch(1);
171             Creator<Shard> creator = new Creator<Shard>() {
172                 boolean firstElectionTimeout = true;
173
174                 @Override
175                 public Shard create() throws Exception {
176                     // Use a non persistent provider because this test actually invokes persist on the journal
177                     // this will cause all other messages to not be queued properly after that.
178                     // The basic issue is that you cannot use TestActorRef with a persistent actor (at least when
179                     // it does do a persist)
180                     return new Shard(shardID, Collections.<String,String>emptyMap(),
181                             dataStoreContextBuilder.persistent(false).build(), SCHEMA_CONTEXT) {
182                         @Override
183                         public void onReceiveCommand(final Object message) throws Exception {
184                             if(message instanceof ElectionTimeout && firstElectionTimeout) {
185                                 // Got the first ElectionTimeout. We don't forward it to the
186                                 // base Shard yet until we've sent the RegisterChangeListener
187                                 // message. So we signal the onFirstElectionTimeout latch to tell
188                                 // the main thread to send the RegisterChangeListener message and
189                                 // start a thread to wait on the onChangeListenerRegistered latch,
190                                 // which the main thread signals after it has sent the message.
191                                 // After the onChangeListenerRegistered is triggered, we send the
192                                 // original ElectionTimeout message to proceed with the election.
193                                 firstElectionTimeout = false;
194                                 final ActorRef self = getSelf();
195                                 new Thread() {
196                                     @Override
197                                     public void run() {
198                                         Uninterruptibles.awaitUninterruptibly(
199                                                 onChangeListenerRegistered, 5, TimeUnit.SECONDS);
200                                         self.tell(message, self);
201                                     }
202                                 }.start();
203
204                                 onFirstElectionTimeout.countDown();
205                             } else {
206                                 super.onReceiveCommand(message);
207                             }
208                         }
209                     };
210                 }
211             };
212
213             MockDataChangeListener listener = new MockDataChangeListener(1);
214             ActorRef dclActor = getSystem().actorOf(DataChangeListener.props(listener),
215                     "testRegisterChangeListenerWhenNotLeaderInitially-DataChangeListener");
216
217             TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
218                     Props.create(new DelegatingShardCreator(creator)),
219                     "testRegisterChangeListenerWhenNotLeaderInitially");
220
221             // Write initial data into the in-memory store.
222             YangInstanceIdentifier path = TestModel.TEST_PATH;
223             writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
224
225             // Wait until the shard receives the first ElectionTimeout message.
226             assertEquals("Got first ElectionTimeout", true,
227                     onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
228
229             // Now send the RegisterChangeListener and wait for the reply.
230             shard.tell(new RegisterChangeListener(path, dclActor,
231                     AsyncDataBroker.DataChangeScope.SUBTREE), getRef());
232
233             RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
234                     RegisterChangeListenerReply.class);
235             assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
236
237             // Sanity check - verify the shard is not the leader yet.
238             shard.tell(new FindLeader(), getRef());
239             FindLeaderReply findLeadeReply =
240                     expectMsgClass(duration("5 seconds"), FindLeaderReply.class);
241             assertNull("Expected the shard not to be the leader", findLeadeReply.getLeaderActor());
242
243             // Signal the onChangeListenerRegistered latch to tell the thread above to proceed
244             // with the election process.
245             onChangeListenerRegistered.countDown();
246
247             // Wait for the shard to become the leader and notify our listener with the existing
248             // data in the store.
249             listener.waitForChangeEvents(path);
250
251             dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
252             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
253         }};
254     }
255
256     @Test
257     public void testRegisterDataTreeChangeListener() throws Exception {
258         new ShardTestKit(getSystem()) {{
259             TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
260                     newShardProps(), "testRegisterDataTreeChangeListener");
261
262             waitUntilLeader(shard);
263
264             shard.tell(new UpdateSchemaContext(SchemaContextHelper.full()), ActorRef.noSender());
265
266             MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
267             ActorRef dclActor = getSystem().actorOf(DataTreeChangeListenerActor.props(listener),
268                     "testRegisterDataTreeChangeListener-DataTreeChangeListener");
269
270             shard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor), getRef());
271
272             RegisterDataTreeChangeListenerReply reply = expectMsgClass(duration("3 seconds"),
273                     RegisterDataTreeChangeListenerReply.class);
274             String replyPath = reply.getListenerRegistrationPath().toString();
275             assertTrue("Incorrect reply path: " + replyPath, replyPath.matches(
276                     "akka:\\/\\/test\\/user\\/testRegisterDataTreeChangeListener\\/\\$.*"));
277
278             YangInstanceIdentifier path = TestModel.TEST_PATH;
279             writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
280
281             listener.waitForChangeEvents();
282
283             dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
284             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
285         }};
286     }
287
288     @SuppressWarnings("serial")
289     @Test
290     public void testDataTreeChangeListenerNotifiedWhenNotTheLeaderOnRegistration() throws Exception {
291         new ShardTestKit(getSystem()) {{
292             final CountDownLatch onFirstElectionTimeout = new CountDownLatch(1);
293             final CountDownLatch onChangeListenerRegistered = new CountDownLatch(1);
294             Creator<Shard> creator = new Creator<Shard>() {
295                 boolean firstElectionTimeout = true;
296
297                 @Override
298                 public Shard create() throws Exception {
299                     return new Shard(shardID, Collections.<String,String>emptyMap(),
300                             dataStoreContextBuilder.persistent(false).build(), SCHEMA_CONTEXT) {
301                         @Override
302                         public void onReceiveCommand(final Object message) throws Exception {
303                             if(message instanceof ElectionTimeout && firstElectionTimeout) {
304                                 firstElectionTimeout = false;
305                                 final ActorRef self = getSelf();
306                                 new Thread() {
307                                     @Override
308                                     public void run() {
309                                         Uninterruptibles.awaitUninterruptibly(
310                                                 onChangeListenerRegistered, 5, TimeUnit.SECONDS);
311                                         self.tell(message, self);
312                                     }
313                                 }.start();
314
315                                 onFirstElectionTimeout.countDown();
316                             } else {
317                                 super.onReceiveCommand(message);
318                             }
319                         }
320                     };
321                 }
322             };
323
324             MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
325             ActorRef dclActor = getSystem().actorOf(DataTreeChangeListenerActor.props(listener),
326                     "testDataTreeChangeListenerNotifiedWhenNotTheLeaderOnRegistration-DataChangeListener");
327
328             TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
329                     Props.create(new DelegatingShardCreator(creator)),
330                     "testDataTreeChangeListenerNotifiedWhenNotTheLeaderOnRegistration");
331
332             YangInstanceIdentifier path = TestModel.TEST_PATH;
333             writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
334
335             assertEquals("Got first ElectionTimeout", true,
336                     onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
337
338             shard.tell(new RegisterDataTreeChangeListener(path, dclActor), getRef());
339             RegisterDataTreeChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
340                     RegisterDataTreeChangeListenerReply.class);
341             assertNotNull("getListenerRegistratioznPath", reply.getListenerRegistrationPath());
342
343             shard.tell(new FindLeader(), getRef());
344             FindLeaderReply findLeadeReply =
345                     expectMsgClass(duration("5 seconds"), FindLeaderReply.class);
346             assertNull("Expected the shard not to be the leader", findLeadeReply.getLeaderActor());
347
348             writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
349
350             onChangeListenerRegistered.countDown();
351
352             // TODO: investigate why we do not receive data chage events
353             listener.waitForChangeEvents();
354
355             dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
356             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
357         }};
358     }
359
360     @Test
361     public void testCreateTransaction(){
362         new ShardTestKit(getSystem()) {{
363             ActorRef shard = getSystem().actorOf(newShardProps(), "testCreateTransaction");
364
365             waitUntilLeader(shard);
366
367             shard.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
368
369             shard.tell(new CreateTransaction("txn-1",
370                     TransactionType.READ_ONLY.ordinal() ).toSerializable(), getRef());
371
372             CreateTransactionReply reply = expectMsgClass(duration("3 seconds"),
373                     CreateTransactionReply.class);
374
375             String path = reply.getTransactionActorPath().toString();
376             assertTrue("Unexpected transaction path " + path,
377                     path.contains("akka://test/user/testCreateTransaction/shard-txn-1"));
378
379             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
380         }};
381     }
382
383     @Test
384     public void testCreateTransactionOnChain(){
385         new ShardTestKit(getSystem()) {{
386             final ActorRef shard = getSystem().actorOf(newShardProps(), "testCreateTransactionOnChain");
387
388             waitUntilLeader(shard);
389
390             shard.tell(new CreateTransaction("txn-1",
391                     TransactionType.READ_ONLY.ordinal() , "foobar").toSerializable(),
392                     getRef());
393
394             CreateTransactionReply reply = expectMsgClass(duration("3 seconds"),
395                     CreateTransactionReply.class);
396
397             String path = reply.getTransactionActorPath().toString();
398             assertTrue("Unexpected transaction path " + path,
399                     path.contains("akka://test/user/testCreateTransactionOnChain/shard-txn-1"));
400
401             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
402         }};
403     }
404
405     @SuppressWarnings("serial")
406     @Test
407     public void testPeerAddressResolved() throws Exception {
408         new ShardTestKit(getSystem()) {{
409             final CountDownLatch recoveryComplete = new CountDownLatch(1);
410             class TestShard extends Shard {
411                 TestShard() {
412                     super(shardID, Collections.<String, String>singletonMap(shardID.toString(), null),
413                             newDatastoreContext(), SCHEMA_CONTEXT);
414                 }
415
416                 Map<String, String> getPeerAddresses() {
417                     return getRaftActorContext().getPeerAddresses();
418                 }
419
420                 @Override
421                 protected void onRecoveryComplete() {
422                     try {
423                         super.onRecoveryComplete();
424                     } finally {
425                         recoveryComplete.countDown();
426                     }
427                 }
428             }
429
430             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
431                     Props.create(new DelegatingShardCreator(new Creator<Shard>() {
432                         @Override
433                         public TestShard create() throws Exception {
434                             return new TestShard();
435                         }
436                     })), "testPeerAddressResolved");
437
438             //waitUntilLeader(shard);
439             assertEquals("Recovery complete", true,
440                     Uninterruptibles.awaitUninterruptibly(recoveryComplete, 5, TimeUnit.SECONDS));
441
442             String address = "akka://foobar";
443             shard.underlyingActor().onReceiveCommand(new PeerAddressResolved(shardID.toString(), address));
444
445             assertEquals("getPeerAddresses", address,
446                     ((TestShard)shard.underlyingActor()).getPeerAddresses().get(shardID.toString()));
447
448             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
449         }};
450     }
451
452     @Test
453     public void testApplySnapshot() throws Exception {
454         TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(),
455                 "testApplySnapshot");
456
457         DataTree store = InMemoryDataTreeFactory.getInstance().create();
458         store.setSchemaContext(SCHEMA_CONTEXT);
459
460         writeToStore(store, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
461
462         YangInstanceIdentifier root = YangInstanceIdentifier.builder().build();
463         NormalizedNode<?,?> expected = readStore(store, root);
464
465         ApplySnapshot applySnapshot = new ApplySnapshot(Snapshot.create(
466                 SerializationUtils.serializeNormalizedNode(expected),
467                 Collections.<ReplicatedLogEntry>emptyList(), 1, 2, 3, 4));
468
469         shard.underlyingActor().onReceiveCommand(applySnapshot);
470
471         NormalizedNode<?,?> actual = readStore(shard, root);
472
473         assertEquals("Root node", expected, actual);
474
475         shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
476     }
477
478     @Test
479     public void testApplyState() throws Exception {
480
481         TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(), "testApplyState");
482
483         NormalizedNode<?, ?> node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
484
485         ApplyState applyState = new ApplyState(null, "test", new ReplicatedLogImplEntry(1, 2,
486                 newModificationPayload(new WriteModification(TestModel.TEST_PATH, node))));
487
488         shard.underlyingActor().onReceiveCommand(applyState);
489
490         NormalizedNode<?,?> actual = readStore(shard, TestModel.TEST_PATH);
491         assertEquals("Applied state", node, actual);
492
493         shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
494     }
495
496     @Test
497     public void testApplyStateWithCandidatePayload() throws Exception {
498
499         TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(), "testApplyState");
500
501         NormalizedNode<?, ?> node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
502         DataTreeCandidate candidate = DataTreeCandidates.fromNormalizedNode(TestModel.TEST_PATH, node);
503
504         ApplyState applyState = new ApplyState(null, "test", new ReplicatedLogImplEntry(1, 2,
505                 DataTreeCandidatePayload.create(candidate)));
506
507         shard.underlyingActor().onReceiveCommand(applyState);
508
509         NormalizedNode<?,?> actual = readStore(shard, TestModel.TEST_PATH);
510         assertEquals("Applied state", node, actual);
511
512         shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
513     }
514
515     DataTree setupInMemorySnapshotStore() throws DataValidationFailedException {
516         DataTree testStore = InMemoryDataTreeFactory.getInstance().create();
517         testStore.setSchemaContext(SCHEMA_CONTEXT);
518
519         writeToStore(testStore, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
520
521         NormalizedNode<?, ?> root = readStore(testStore, YangInstanceIdentifier.builder().build());
522
523         InMemorySnapshotStore.addSnapshot(shardID.toString(), Snapshot.create(
524                 SerializationUtils.serializeNormalizedNode(root),
525                 Collections.<ReplicatedLogEntry>emptyList(), 0, 1, -1, -1));
526         return testStore;
527     }
528
529     private static DataTreeCandidatePayload payloadForModification(DataTree source, DataTreeModification mod) throws DataValidationFailedException {
530         source.validate(mod);
531         final DataTreeCandidate candidate = source.prepare(mod);
532         source.commit(candidate);
533         return DataTreeCandidatePayload.create(candidate);
534     }
535
536     @Test
537     public void testDataTreeCandidateRecovery() throws Exception {
538         // Set up the InMemorySnapshotStore.
539         final DataTree source = setupInMemorySnapshotStore();
540
541         final DataTreeModification writeMod = source.takeSnapshot().newModification();
542         writeMod.write(TestModel.OUTER_LIST_PATH, ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
543
544         // Set up the InMemoryJournal.
545         InMemoryJournal.addEntry(shardID.toString(), 0, new ReplicatedLogImplEntry(0, 1, payloadForModification(source, writeMod)));
546
547         int nListEntries = 16;
548         Set<Integer> listEntryKeys = new HashSet<>();
549
550         // Add some ModificationPayload entries
551         for (int i = 1; i <= nListEntries; i++) {
552             listEntryKeys.add(Integer.valueOf(i));
553
554             YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
555                     .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
556
557             final DataTreeModification mod = source.takeSnapshot().newModification();
558             mod.merge(path, ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i));
559
560             InMemoryJournal.addEntry(shardID.toString(), i, new ReplicatedLogImplEntry(i, 1,
561                 payloadForModification(source, mod)));
562         }
563
564         InMemoryJournal.addEntry(shardID.toString(), nListEntries + 1,
565                 new ApplyJournalEntries(nListEntries));
566
567         testRecovery(listEntryKeys);
568     }
569
570     @Test
571     public void testModicationRecovery() throws Exception {
572
573         // Set up the InMemorySnapshotStore.
574         setupInMemorySnapshotStore();
575
576         // Set up the InMemoryJournal.
577
578         InMemoryJournal.addEntry(shardID.toString(), 0, new ReplicatedLogImplEntry(0, 1, newModificationPayload(
579                   new WriteModification(TestModel.OUTER_LIST_PATH,
580                           ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build()))));
581
582         int nListEntries = 16;
583         Set<Integer> listEntryKeys = new HashSet<>();
584
585         // Add some ModificationPayload entries
586         for(int i = 1; i <= nListEntries; i++) {
587             listEntryKeys.add(Integer.valueOf(i));
588             YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
589                     .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
590             Modification mod = new MergeModification(path,
591                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i));
592             InMemoryJournal.addEntry(shardID.toString(), i, new ReplicatedLogImplEntry(i, 1,
593                     newModificationPayload(mod)));
594         }
595
596         InMemoryJournal.addEntry(shardID.toString(), nListEntries + 1,
597                 new ApplyJournalEntries(nListEntries));
598
599         testRecovery(listEntryKeys);
600     }
601
602     private static ModificationPayload newModificationPayload(final Modification... mods) throws IOException {
603         MutableCompositeModification compMod = new MutableCompositeModification();
604         for(Modification mod: mods) {
605             compMod.addModification(mod);
606         }
607
608         return new ModificationPayload(compMod);
609     }
610
611     @Test
612     public void testConcurrentThreePhaseCommits() throws Throwable {
613         new ShardTestKit(getSystem()) {{
614             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
615                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
616                     "testConcurrentThreePhaseCommits");
617
618             waitUntilLeader(shard);
619
620          // Setup 3 simulated transactions with mock cohorts backed by real cohorts.
621
622             ShardDataTree dataStore = shard.underlyingActor().getDataStore();
623
624             String transactionID1 = "tx1";
625             MutableCompositeModification modification1 = new MutableCompositeModification();
626             ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
627                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
628
629             String transactionID2 = "tx2";
630             MutableCompositeModification modification2 = new MutableCompositeModification();
631             ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
632                     TestModel.OUTER_LIST_PATH,
633                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
634                     modification2);
635
636             String transactionID3 = "tx3";
637             MutableCompositeModification modification3 = new MutableCompositeModification();
638             ShardDataTreeCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
639                     YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
640                         .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
641                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1),
642                     modification3);
643
644             long timeoutSec = 5;
645             final FiniteDuration duration = FiniteDuration.create(timeoutSec, TimeUnit.SECONDS);
646             final Timeout timeout = new Timeout(duration);
647
648             // Simulate the ForwardedReadyTransaction message for the first Tx that would be sent
649             // by the ShardTransaction.
650
651             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
652                     cohort1, modification1, true, false), getRef());
653             ReadyTransactionReply readyReply = ReadyTransactionReply.fromSerializable(
654                     expectMsgClass(duration, ReadyTransactionReply.class));
655             assertEquals("Cohort path", shard.path().toString(), readyReply.getCohortPath());
656
657             // Send the CanCommitTransaction message for the first Tx.
658
659             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
660             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
661                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
662             assertEquals("Can commit", true, canCommitReply.getCanCommit());
663
664             // Send the ForwardedReadyTransaction for the next 2 Tx's.
665
666             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
667                     cohort2, modification2, true, false), getRef());
668             expectMsgClass(duration, ReadyTransactionReply.class);
669
670             shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
671                     cohort3, modification3, true, false), getRef());
672             expectMsgClass(duration, ReadyTransactionReply.class);
673
674             // Send the CanCommitTransaction message for the next 2 Tx's. These should get queued and
675             // processed after the first Tx completes.
676
677             Future<Object> canCommitFuture1 = Patterns.ask(shard,
678                     new CanCommitTransaction(transactionID2).toSerializable(), timeout);
679
680             Future<Object> canCommitFuture2 = Patterns.ask(shard,
681                     new CanCommitTransaction(transactionID3).toSerializable(), timeout);
682
683             // Send the CommitTransaction message for the first Tx. After it completes, it should
684             // trigger the 2nd Tx to proceed which should in turn then trigger the 3rd.
685
686             shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
687             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
688
689             // Wait for the next 2 Tx's to complete.
690
691             final AtomicReference<Throwable> caughtEx = new AtomicReference<>();
692             final CountDownLatch commitLatch = new CountDownLatch(2);
693
694             class OnFutureComplete extends OnComplete<Object> {
695                 private final Class<?> expRespType;
696
697                 OnFutureComplete(final Class<?> expRespType) {
698                     this.expRespType = expRespType;
699                 }
700
701                 @Override
702                 public void onComplete(final Throwable error, final Object resp) {
703                     if(error != null) {
704                         caughtEx.set(new AssertionError(getClass().getSimpleName() + " failure", error));
705                     } else {
706                         try {
707                             assertEquals("Commit response type", expRespType, resp.getClass());
708                             onSuccess(resp);
709                         } catch (Exception e) {
710                             caughtEx.set(e);
711                         }
712                     }
713                 }
714
715                 void onSuccess(final Object resp) throws Exception {
716                 }
717             }
718
719             class OnCommitFutureComplete extends OnFutureComplete {
720                 OnCommitFutureComplete() {
721                     super(CommitTransactionReply.SERIALIZABLE_CLASS);
722                 }
723
724                 @Override
725                 public void onComplete(final Throwable error, final Object resp) {
726                     super.onComplete(error, resp);
727                     commitLatch.countDown();
728                 }
729             }
730
731             class OnCanCommitFutureComplete extends OnFutureComplete {
732                 private final String transactionID;
733
734                 OnCanCommitFutureComplete(final String transactionID) {
735                     super(CanCommitTransactionReply.SERIALIZABLE_CLASS);
736                     this.transactionID = transactionID;
737                 }
738
739                 @Override
740                 void onSuccess(final Object resp) throws Exception {
741                     CanCommitTransactionReply canCommitReply =
742                             CanCommitTransactionReply.fromSerializable(resp);
743                     assertEquals("Can commit", true, canCommitReply.getCanCommit());
744
745                     Future<Object> commitFuture = Patterns.ask(shard,
746                             new CommitTransaction(transactionID).toSerializable(), timeout);
747                     commitFuture.onComplete(new OnCommitFutureComplete(), getSystem().dispatcher());
748                 }
749             }
750
751             canCommitFuture1.onComplete(new OnCanCommitFutureComplete(transactionID2),
752                     getSystem().dispatcher());
753
754             canCommitFuture2.onComplete(new OnCanCommitFutureComplete(transactionID3),
755                     getSystem().dispatcher());
756
757             boolean done = commitLatch.await(timeoutSec, TimeUnit.SECONDS);
758
759             if(caughtEx.get() != null) {
760                 throw caughtEx.get();
761             }
762
763             assertEquals("Commits complete", true, done);
764
765             InOrder inOrder = inOrder(cohort1, cohort2, cohort3);
766             inOrder.verify(cohort1).canCommit();
767             inOrder.verify(cohort1).preCommit();
768             inOrder.verify(cohort1).commit();
769             inOrder.verify(cohort2).canCommit();
770             inOrder.verify(cohort2).preCommit();
771             inOrder.verify(cohort2).commit();
772             inOrder.verify(cohort3).canCommit();
773             inOrder.verify(cohort3).preCommit();
774             inOrder.verify(cohort3).commit();
775
776             // Verify data in the data store.
777
778             verifyOuterListEntry(shard, 1);
779
780             verifyLastApplied(shard, 2);
781
782             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
783         }};
784     }
785
786     private static BatchedModifications newBatchedModifications(String transactionID, YangInstanceIdentifier path,
787             NormalizedNode<?, ?> data, boolean ready, boolean doCommitOnReady) {
788         return newBatchedModifications(transactionID, null, path, data, ready, doCommitOnReady);
789     }
790
791     private static BatchedModifications newBatchedModifications(String transactionID, String transactionChainID,
792             YangInstanceIdentifier path, NormalizedNode<?, ?> data, boolean ready, boolean doCommitOnReady) {
793         BatchedModifications batched = new BatchedModifications(transactionID, CURRENT_VERSION, transactionChainID);
794         batched.addModification(new WriteModification(path, data));
795         batched.setReady(ready);
796         batched.setDoCommitOnReady(doCommitOnReady);
797         return batched;
798     }
799
800     @Test
801     public void testBatchedModificationsWithNoCommitOnReady() throws Throwable {
802         new ShardTestKit(getSystem()) {{
803             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
804                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
805                     "testBatchedModificationsWithNoCommitOnReady");
806
807             waitUntilLeader(shard);
808
809             final String transactionID = "tx";
810             FiniteDuration duration = duration("5 seconds");
811
812             final AtomicReference<ShardDataTreeCohort> mockCohort = new AtomicReference<>();
813             ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() {
814                 @Override
815                 public ShardDataTreeCohort decorate(String txID, ShardDataTreeCohort actual) {
816                     if(mockCohort.get() == null) {
817                         mockCohort.set(createDelegatingMockCohort("cohort", actual));
818                     }
819
820                     return mockCohort.get();
821                 }
822             };
823
824             shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator);
825
826             // Send a BatchedModifications to start a transaction.
827
828             shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH,
829                     ImmutableNodes.containerNode(TestModel.TEST_QNAME), false, false), getRef());
830             expectMsgClass(duration, BatchedModificationsReply.class);
831
832             // Send a couple more BatchedModifications.
833
834             shard.tell(newBatchedModifications(transactionID, TestModel.OUTER_LIST_PATH,
835                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), false, false), getRef());
836             expectMsgClass(duration, BatchedModificationsReply.class);
837
838             shard.tell(newBatchedModifications(transactionID, YangInstanceIdentifier.builder(
839                     TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
840                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true, false), getRef());
841             expectMsgClass(duration, ReadyTransactionReply.class);
842
843             // Send the CanCommitTransaction message.
844
845             shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
846             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
847                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
848             assertEquals("Can commit", true, canCommitReply.getCanCommit());
849
850             // Send the CanCommitTransaction message.
851
852             shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
853             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
854
855             InOrder inOrder = inOrder(mockCohort.get());
856             inOrder.verify(mockCohort.get()).canCommit();
857             inOrder.verify(mockCohort.get()).preCommit();
858             inOrder.verify(mockCohort.get()).commit();
859
860             // Verify data in the data store.
861
862             verifyOuterListEntry(shard, 1);
863
864             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
865         }};
866     }
867
868     @Test
869     public void testBatchedModificationsWithCommitOnReady() throws Throwable {
870         new ShardTestKit(getSystem()) {{
871             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
872                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
873                     "testBatchedModificationsWithCommitOnReady");
874
875             waitUntilLeader(shard);
876
877             final String transactionID = "tx";
878             FiniteDuration duration = duration("5 seconds");
879
880             final AtomicReference<ShardDataTreeCohort> mockCohort = new AtomicReference<>();
881             ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() {
882                 @Override
883                 public ShardDataTreeCohort decorate(String txID, ShardDataTreeCohort actual) {
884                     if(mockCohort.get() == null) {
885                         mockCohort.set(createDelegatingMockCohort("cohort", actual));
886                     }
887
888                     return mockCohort.get();
889                 }
890             };
891
892             shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator);
893
894             // Send a BatchedModifications to start a transaction.
895
896             shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH,
897                     ImmutableNodes.containerNode(TestModel.TEST_QNAME), false, false), getRef());
898             expectMsgClass(duration, BatchedModificationsReply.class);
899
900             // Send a couple more BatchedModifications.
901
902             shard.tell(newBatchedModifications(transactionID, TestModel.OUTER_LIST_PATH,
903                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), false, false), getRef());
904             expectMsgClass(duration, BatchedModificationsReply.class);
905
906             shard.tell(newBatchedModifications(transactionID, YangInstanceIdentifier.builder(
907                     TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
908                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true, true), getRef());
909
910             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
911
912             InOrder inOrder = inOrder(mockCohort.get());
913             inOrder.verify(mockCohort.get()).canCommit();
914             inOrder.verify(mockCohort.get()).preCommit();
915             inOrder.verify(mockCohort.get()).commit();
916
917             // Verify data in the data store.
918
919             verifyOuterListEntry(shard, 1);
920
921             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
922         }};
923     }
924
925     @SuppressWarnings("unchecked")
926     private static void verifyOuterListEntry(final TestActorRef<Shard> shard, Object expIDValue) throws Exception {
927         NormalizedNode<?, ?> outerList = readStore(shard, TestModel.OUTER_LIST_PATH);
928         assertNotNull(TestModel.OUTER_LIST_QNAME.getLocalName() + " not found", outerList);
929         assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " value is not Iterable",
930                 outerList.getValue() instanceof Iterable);
931         Object entry = ((Iterable<Object>)outerList.getValue()).iterator().next();
932         assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " entry is not MapEntryNode",
933                 entry instanceof MapEntryNode);
934         MapEntryNode mapEntry = (MapEntryNode)entry;
935         Optional<DataContainerChild<? extends PathArgument, ?>> idLeaf =
936                 mapEntry.getChild(new YangInstanceIdentifier.NodeIdentifier(TestModel.ID_QNAME));
937         assertTrue("Missing leaf " + TestModel.ID_QNAME.getLocalName(), idLeaf.isPresent());
938         assertEquals(TestModel.ID_QNAME.getLocalName() + " value", expIDValue, idLeaf.get().getValue());
939     }
940
941     @Test
942     public void testBatchedModificationsOnTransactionChain() throws Throwable {
943         new ShardTestKit(getSystem()) {{
944             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
945                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
946                     "testBatchedModificationsOnTransactionChain");
947
948             waitUntilLeader(shard);
949
950             String transactionChainID = "txChain";
951             String transactionID1 = "tx1";
952             String transactionID2 = "tx2";
953
954             FiniteDuration duration = duration("5 seconds");
955
956             // Send a BatchedModifications to start a chained write transaction and ready it.
957
958             ContainerNode containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
959             YangInstanceIdentifier path = TestModel.TEST_PATH;
960             shard.tell(newBatchedModifications(transactionID1, transactionChainID, path,
961                     containerNode, true, false), getRef());
962             expectMsgClass(duration, ReadyTransactionReply.class);
963
964             // Create a read Tx on the same chain.
965
966             shard.tell(new CreateTransaction(transactionID2, TransactionType.READ_ONLY.ordinal() ,
967                     transactionChainID).toSerializable(), getRef());
968
969             CreateTransactionReply createReply = expectMsgClass(duration("3 seconds"), CreateTransactionReply.class);
970
971             getSystem().actorSelection(createReply.getTransactionActorPath()).tell(new ReadData(path), getRef());
972             ReadDataReply readReply = expectMsgClass(duration("3 seconds"), ReadDataReply.class);
973             assertEquals("Read node", containerNode, readReply.getNormalizedNode());
974
975             // Commit the write transaction.
976
977             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
978             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
979                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
980             assertEquals("Can commit", true, canCommitReply.getCanCommit());
981
982             shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
983             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
984
985             // Verify data in the data store.
986
987             NormalizedNode<?, ?> actualNode = readStore(shard, path);
988             assertEquals("Stored node", containerNode, actualNode);
989
990             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
991         }};
992     }
993
994     @Test
995     public void testOnBatchedModificationsWhenNotLeader() {
996         final AtomicBoolean overrideLeaderCalls = new AtomicBoolean();
997         new ShardTestKit(getSystem()) {{
998             Creator<Shard> creator = new Creator<Shard>() {
999                 private static final long serialVersionUID = 1L;
1000
1001                 @Override
1002                 public Shard create() throws Exception {
1003                     return new Shard(shardID, Collections.<String,String>emptyMap(),
1004                             newDatastoreContext(), SCHEMA_CONTEXT) {
1005                         @Override
1006                         protected boolean isLeader() {
1007                             return overrideLeaderCalls.get() ? false : super.isLeader();
1008                         }
1009
1010                         @Override
1011                         protected ActorSelection getLeader() {
1012                             return overrideLeaderCalls.get() ? getSystem().actorSelection(getRef().path()) :
1013                                 super.getLeader();
1014                         }
1015                     };
1016                 }
1017             };
1018
1019             TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1020                     Props.create(new DelegatingShardCreator(creator)), "testOnBatchedModificationsWhenNotLeader");
1021
1022             waitUntilLeader(shard);
1023
1024             overrideLeaderCalls.set(true);
1025
1026             BatchedModifications batched = new BatchedModifications("tx", DataStoreVersions.CURRENT_VERSION, "");
1027
1028             shard.tell(batched, ActorRef.noSender());
1029
1030             expectMsgEquals(batched);
1031
1032             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1033         }};
1034     }
1035
1036     @Test
1037     public void testForwardedReadyTransactionWithImmediateCommit() throws Exception{
1038         new ShardTestKit(getSystem()) {{
1039             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1040                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1041                     "testForwardedReadyTransactionWithImmediateCommit");
1042
1043             waitUntilLeader(shard);
1044
1045             ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1046
1047             String transactionID = "tx1";
1048             MutableCompositeModification modification = new MutableCompositeModification();
1049             NormalizedNode<?, ?> containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1050             ShardDataTreeCohort cohort = setupMockWriteTransaction("cohort", dataStore,
1051                     TestModel.TEST_PATH, containerNode, modification);
1052
1053             FiniteDuration duration = duration("5 seconds");
1054
1055             // Simulate the ForwardedReadyTransaction messages that would be sent
1056             // by the ShardTransaction.
1057
1058             shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
1059                     cohort, modification, true, true), getRef());
1060
1061             expectMsgClass(duration, ThreePhaseCommitCohortMessages.CommitTransactionReply.class);
1062
1063             InOrder inOrder = inOrder(cohort);
1064             inOrder.verify(cohort).canCommit();
1065             inOrder.verify(cohort).preCommit();
1066             inOrder.verify(cohort).commit();
1067
1068             NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.TEST_PATH);
1069             assertEquals(TestModel.TEST_QNAME.getLocalName(), containerNode, actualNode);
1070
1071             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1072         }};
1073     }
1074
1075     @Test
1076     public void testReadyLocalTransactionWithImmediateCommit() throws Exception{
1077         new ShardTestKit(getSystem()) {{
1078             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1079                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1080                     "testReadyLocalTransactionWithImmediateCommit");
1081
1082             waitUntilLeader(shard);
1083
1084             ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1085
1086             DataTreeModification modification = dataStore.getDataTree().takeSnapshot().newModification();
1087
1088             ContainerNode writeData = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1089             new WriteModification(TestModel.TEST_PATH, writeData).apply(modification);
1090             MapNode mergeData = ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build();
1091             new MergeModification(TestModel.OUTER_LIST_PATH, mergeData).apply(modification);
1092
1093             String txId = "tx1";
1094             ReadyLocalTransaction readyMessage = new ReadyLocalTransaction(txId, modification, true);
1095
1096             shard.tell(readyMessage, getRef());
1097
1098             expectMsgClass(CommitTransactionReply.SERIALIZABLE_CLASS);
1099
1100             NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.OUTER_LIST_PATH);
1101             assertEquals(TestModel.OUTER_LIST_QNAME.getLocalName(), mergeData, actualNode);
1102
1103             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1104         }};
1105     }
1106
1107     @Test
1108     public void testReadyLocalTransactionWithThreePhaseCommit() throws Exception{
1109         new ShardTestKit(getSystem()) {{
1110             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1111                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1112                     "testReadyLocalTransactionWithThreePhaseCommit");
1113
1114             waitUntilLeader(shard);
1115
1116             ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1117
1118             DataTreeModification modification = dataStore.getDataTree().takeSnapshot().newModification();
1119
1120             ContainerNode writeData = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1121             new WriteModification(TestModel.TEST_PATH, writeData).apply(modification);
1122             MapNode mergeData = ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build();
1123             new MergeModification(TestModel.OUTER_LIST_PATH, mergeData).apply(modification);
1124
1125             String txId = "tx1";
1126             ReadyLocalTransaction readyMessage = new ReadyLocalTransaction(txId, modification, false);
1127
1128             shard.tell(readyMessage, getRef());
1129
1130             expectMsgClass(ReadyTransactionReply.class);
1131
1132             // Send the CanCommitTransaction message.
1133
1134             shard.tell(new CanCommitTransaction(txId).toSerializable(), getRef());
1135             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1136                     expectMsgClass(CanCommitTransactionReply.SERIALIZABLE_CLASS));
1137             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1138
1139             // Send the CanCommitTransaction message.
1140
1141             shard.tell(new CommitTransaction(txId).toSerializable(), getRef());
1142             expectMsgClass(CommitTransactionReply.SERIALIZABLE_CLASS);
1143
1144             NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.OUTER_LIST_PATH);
1145             assertEquals(TestModel.OUTER_LIST_QNAME.getLocalName(), mergeData, actualNode);
1146
1147             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1148         }};
1149     }
1150
1151     @Test
1152     public void testCommitWithPersistenceDisabled() throws Throwable {
1153         dataStoreContextBuilder.persistent(false);
1154         new ShardTestKit(getSystem()) {{
1155             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1156                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1157                     "testCommitWithPersistenceDisabled");
1158
1159             waitUntilLeader(shard);
1160
1161             ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1162
1163             // Setup a simulated transactions with a mock cohort.
1164
1165             String transactionID = "tx";
1166             MutableCompositeModification modification = new MutableCompositeModification();
1167             NormalizedNode<?, ?> containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1168             ShardDataTreeCohort cohort = setupMockWriteTransaction("cohort", dataStore,
1169                     TestModel.TEST_PATH, containerNode, modification);
1170
1171             FiniteDuration duration = duration("5 seconds");
1172
1173             // Simulate the ForwardedReadyTransaction messages that would be sent
1174             // by the ShardTransaction.
1175
1176             shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
1177                     cohort, modification, true, false), getRef());
1178             expectMsgClass(duration, ReadyTransactionReply.class);
1179
1180             // Send the CanCommitTransaction message.
1181
1182             shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
1183             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1184                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1185             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1186
1187             // Send the CanCommitTransaction message.
1188
1189             shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
1190             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1191
1192             InOrder inOrder = inOrder(cohort);
1193             inOrder.verify(cohort).canCommit();
1194             inOrder.verify(cohort).preCommit();
1195             inOrder.verify(cohort).commit();
1196
1197             NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.TEST_PATH);
1198             assertEquals(TestModel.TEST_QNAME.getLocalName(), containerNode, actualNode);
1199
1200             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1201         }};
1202     }
1203
1204     private static DataTreeCandidateTip mockCandidate(final String name) {
1205         DataTreeCandidateTip mockCandidate = mock(DataTreeCandidateTip.class, name);
1206         DataTreeCandidateNode mockCandidateNode = mock(DataTreeCandidateNode.class, name + "-node");
1207         doReturn(ModificationType.WRITE).when(mockCandidateNode).getModificationType();
1208         doReturn(Optional.of(ImmutableNodes.containerNode(CARS_QNAME))).when(mockCandidateNode).getDataAfter();
1209         doReturn(YangInstanceIdentifier.builder().build()).when(mockCandidate).getRootPath();
1210         doReturn(mockCandidateNode).when(mockCandidate).getRootNode();
1211         return mockCandidate;
1212     }
1213
1214     private static DataTreeCandidateTip mockUnmodifiedCandidate(final String name) {
1215         DataTreeCandidateTip mockCandidate = mock(DataTreeCandidateTip.class, name);
1216         DataTreeCandidateNode mockCandidateNode = mock(DataTreeCandidateNode.class, name + "-node");
1217         doReturn(ModificationType.UNMODIFIED).when(mockCandidateNode).getModificationType();
1218         doReturn(YangInstanceIdentifier.builder().build()).when(mockCandidate).getRootPath();
1219         doReturn(mockCandidateNode).when(mockCandidate).getRootNode();
1220         return mockCandidate;
1221     }
1222
1223     @Test
1224     public void testCommitWhenTransactionHasNoModifications(){
1225         // Note that persistence is enabled which would normally result in the entry getting written to the journal
1226         // but here that need not happen
1227         new ShardTestKit(getSystem()) {
1228             {
1229                 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1230                         newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1231                         "testCommitWhenTransactionHasNoModifications");
1232
1233                 waitUntilLeader(shard);
1234
1235                 String transactionID = "tx1";
1236                 MutableCompositeModification modification = new MutableCompositeModification();
1237                 ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1238                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1239                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).preCommit();
1240                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).commit();
1241                 doReturn(mockUnmodifiedCandidate("cohort1-candidate")).when(cohort).getCandidate();
1242
1243                 FiniteDuration duration = duration("5 seconds");
1244
1245                 // Simulate the ForwardedReadyTransaction messages that would be sent
1246                 // by the ShardTransaction.
1247
1248                 shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
1249                         cohort, modification, true, false), getRef());
1250                 expectMsgClass(duration, ReadyTransactionReply.class);
1251
1252                 // Send the CanCommitTransaction message.
1253
1254                 shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
1255                 CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1256                         expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1257                 assertEquals("Can commit", true, canCommitReply.getCanCommit());
1258
1259                 shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
1260                 expectMsgClass(duration, ThreePhaseCommitCohortMessages.CommitTransactionReply.class);
1261
1262                 InOrder inOrder = inOrder(cohort);
1263                 inOrder.verify(cohort).canCommit();
1264                 inOrder.verify(cohort).preCommit();
1265                 inOrder.verify(cohort).commit();
1266
1267                 // Use MBean for verification
1268                 // Committed transaction count should increase as usual
1269                 assertEquals(1,shard.underlyingActor().getShardMBean().getCommittedTransactionsCount());
1270
1271                 // Commit index should not advance because this does not go into the journal
1272                 assertEquals(-1, shard.underlyingActor().getShardMBean().getCommitIndex());
1273
1274                 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1275
1276             }
1277         };
1278     }
1279
1280     @Test
1281     public void testCommitWhenTransactionHasModifications(){
1282         new ShardTestKit(getSystem()) {
1283             {
1284                 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1285                         newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1286                         "testCommitWhenTransactionHasModifications");
1287
1288                 waitUntilLeader(shard);
1289
1290                 String transactionID = "tx1";
1291                 MutableCompositeModification modification = new MutableCompositeModification();
1292                 modification.addModification(new DeleteModification(YangInstanceIdentifier.builder().build()));
1293                 ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1294                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1295                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).preCommit();
1296                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).commit();
1297                 doReturn(mockCandidate("cohort1-candidate")).when(cohort).getCandidate();
1298
1299                 FiniteDuration duration = duration("5 seconds");
1300
1301                 // Simulate the ForwardedReadyTransaction messages that would be sent
1302                 // by the ShardTransaction.
1303
1304                 shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
1305                         cohort, modification, true, false), getRef());
1306                 expectMsgClass(duration, ReadyTransactionReply.class);
1307
1308                 // Send the CanCommitTransaction message.
1309
1310                 shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
1311                 CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1312                         expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1313                 assertEquals("Can commit", true, canCommitReply.getCanCommit());
1314
1315                 shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
1316                 expectMsgClass(duration, ThreePhaseCommitCohortMessages.CommitTransactionReply.class);
1317
1318                 InOrder inOrder = inOrder(cohort);
1319                 inOrder.verify(cohort).canCommit();
1320                 inOrder.verify(cohort).preCommit();
1321                 inOrder.verify(cohort).commit();
1322
1323                 // Use MBean for verification
1324                 // Committed transaction count should increase as usual
1325                 assertEquals(1, shard.underlyingActor().getShardMBean().getCommittedTransactionsCount());
1326
1327                 // Commit index should advance as we do not have an empty modification
1328                 assertEquals(0, shard.underlyingActor().getShardMBean().getCommitIndex());
1329
1330                 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1331
1332             }
1333         };
1334     }
1335
1336     @Test
1337     public void testCommitPhaseFailure() throws Throwable {
1338         new ShardTestKit(getSystem()) {{
1339             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1340                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1341                     "testCommitPhaseFailure");
1342
1343             waitUntilLeader(shard);
1344
1345             // Setup 2 simulated transactions with mock cohorts. The first one fails in the
1346             // commit phase.
1347
1348             String transactionID1 = "tx1";
1349             MutableCompositeModification modification1 = new MutableCompositeModification();
1350             ShardDataTreeCohort cohort1 = mock(ShardDataTreeCohort.class, "cohort1");
1351             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
1352             doReturn(Futures.immediateFuture(null)).when(cohort1).preCommit();
1353             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort1).commit();
1354             doReturn(mockCandidate("cohort1-candidate")).when(cohort1).getCandidate();
1355
1356             String transactionID2 = "tx2";
1357             MutableCompositeModification modification2 = new MutableCompositeModification();
1358             ShardDataTreeCohort cohort2 = mock(ShardDataTreeCohort.class, "cohort2");
1359             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
1360
1361             FiniteDuration duration = duration("5 seconds");
1362             final Timeout timeout = new Timeout(duration);
1363
1364             // Simulate the ForwardedReadyTransaction messages that would be sent
1365             // by the ShardTransaction.
1366
1367             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1368                     cohort1, modification1, true, false), getRef());
1369             expectMsgClass(duration, ReadyTransactionReply.class);
1370
1371             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1372                     cohort2, modification2, true, false), getRef());
1373             expectMsgClass(duration, ReadyTransactionReply.class);
1374
1375             // Send the CanCommitTransaction message for the first Tx.
1376
1377             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1378             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1379                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1380             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1381
1382             // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
1383             // processed after the first Tx completes.
1384
1385             Future<Object> canCommitFuture = Patterns.ask(shard,
1386                     new CanCommitTransaction(transactionID2).toSerializable(), timeout);
1387
1388             // Send the CommitTransaction message for the first Tx. This should send back an error
1389             // and trigger the 2nd Tx to proceed.
1390
1391             shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
1392             expectMsgClass(duration, akka.actor.Status.Failure.class);
1393
1394             // Wait for the 2nd Tx to complete the canCommit phase.
1395
1396             final CountDownLatch latch = new CountDownLatch(1);
1397             canCommitFuture.onComplete(new OnComplete<Object>() {
1398                 @Override
1399                 public void onComplete(final Throwable t, final Object resp) {
1400                     latch.countDown();
1401                 }
1402             }, getSystem().dispatcher());
1403
1404             assertEquals("2nd CanCommit complete", true, latch.await(5, TimeUnit.SECONDS));
1405
1406             InOrder inOrder = inOrder(cohort1, cohort2);
1407             inOrder.verify(cohort1).canCommit();
1408             inOrder.verify(cohort1).preCommit();
1409             inOrder.verify(cohort1).commit();
1410             inOrder.verify(cohort2).canCommit();
1411
1412             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1413         }};
1414     }
1415
1416     @Test
1417     public void testPreCommitPhaseFailure() throws Throwable {
1418         new ShardTestKit(getSystem()) {{
1419             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1420                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1421                     "testPreCommitPhaseFailure");
1422
1423             waitUntilLeader(shard);
1424
1425             String transactionID1 = "tx1";
1426             MutableCompositeModification modification1 = new MutableCompositeModification();
1427             ShardDataTreeCohort cohort1 = mock(ShardDataTreeCohort.class, "cohort1");
1428             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
1429             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort1).preCommit();
1430
1431             String transactionID2 = "tx2";
1432             MutableCompositeModification modification2 = new MutableCompositeModification();
1433             ShardDataTreeCohort cohort2 = mock(ShardDataTreeCohort.class, "cohort2");
1434             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
1435
1436             FiniteDuration duration = duration("5 seconds");
1437             final Timeout timeout = new Timeout(duration);
1438
1439             // Simulate the ForwardedReadyTransaction messages that would be sent
1440             // by the ShardTransaction.
1441
1442             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1443                     cohort1, modification1, true, false), getRef());
1444             expectMsgClass(duration, ReadyTransactionReply.class);
1445
1446             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1447                     cohort2, modification2, true, false), getRef());
1448             expectMsgClass(duration, ReadyTransactionReply.class);
1449
1450             // Send the CanCommitTransaction message for the first Tx.
1451
1452             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1453             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1454                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1455             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1456
1457             // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
1458             // processed after the first Tx completes.
1459
1460             Future<Object> canCommitFuture = Patterns.ask(shard,
1461                     new CanCommitTransaction(transactionID2).toSerializable(), timeout);
1462
1463             // Send the CommitTransaction message for the first Tx. This should send back an error
1464             // and trigger the 2nd Tx to proceed.
1465
1466             shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
1467             expectMsgClass(duration, akka.actor.Status.Failure.class);
1468
1469             // Wait for the 2nd Tx to complete the canCommit phase.
1470
1471             final CountDownLatch latch = new CountDownLatch(1);
1472             canCommitFuture.onComplete(new OnComplete<Object>() {
1473                 @Override
1474                 public void onComplete(final Throwable t, final Object resp) {
1475                     latch.countDown();
1476                 }
1477             }, getSystem().dispatcher());
1478
1479             assertEquals("2nd CanCommit complete", true, latch.await(5, TimeUnit.SECONDS));
1480
1481             InOrder inOrder = inOrder(cohort1, cohort2);
1482             inOrder.verify(cohort1).canCommit();
1483             inOrder.verify(cohort1).preCommit();
1484             inOrder.verify(cohort2).canCommit();
1485
1486             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1487         }};
1488     }
1489
1490     @Test
1491     public void testCanCommitPhaseFailure() throws Throwable {
1492         new ShardTestKit(getSystem()) {{
1493             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1494                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1495                     "testCanCommitPhaseFailure");
1496
1497             waitUntilLeader(shard);
1498
1499             final FiniteDuration duration = duration("5 seconds");
1500
1501             String transactionID1 = "tx1";
1502             MutableCompositeModification modification = new MutableCompositeModification();
1503             ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1504             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).canCommit();
1505
1506             // Simulate the ForwardedReadyTransaction messages that would be sent
1507             // by the ShardTransaction.
1508
1509             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1510                     cohort, modification, true, false), getRef());
1511             expectMsgClass(duration, ReadyTransactionReply.class);
1512
1513             // Send the CanCommitTransaction message.
1514
1515             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1516             expectMsgClass(duration, akka.actor.Status.Failure.class);
1517
1518             // Send another can commit to ensure the failed one got cleaned up.
1519
1520             reset(cohort);
1521
1522             String transactionID2 = "tx2";
1523             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1524
1525             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1526                     cohort, modification, true, false), getRef());
1527             expectMsgClass(duration, ReadyTransactionReply.class);
1528
1529             shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
1530             CanCommitTransactionReply reply = CanCommitTransactionReply.fromSerializable(
1531                     expectMsgClass(CanCommitTransactionReply.SERIALIZABLE_CLASS));
1532             assertEquals("getCanCommit", true, reply.getCanCommit());
1533
1534             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1535         }};
1536     }
1537
1538     @Test
1539     public void testCanCommitPhaseFalseResponse() throws Throwable {
1540         new ShardTestKit(getSystem()) {{
1541             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1542                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1543                     "testCanCommitPhaseFalseResponse");
1544
1545             waitUntilLeader(shard);
1546
1547             final FiniteDuration duration = duration("5 seconds");
1548
1549             String transactionID1 = "tx1";
1550             MutableCompositeModification modification = new MutableCompositeModification();
1551             ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1552             doReturn(Futures.immediateFuture(Boolean.FALSE)).when(cohort).canCommit();
1553
1554             // Simulate the ForwardedReadyTransaction messages that would be sent
1555             // by the ShardTransaction.
1556
1557             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1558                     cohort, modification, true, false), getRef());
1559             expectMsgClass(duration, ReadyTransactionReply.class);
1560
1561             // Send the CanCommitTransaction message.
1562
1563             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1564             CanCommitTransactionReply reply = CanCommitTransactionReply.fromSerializable(
1565                     expectMsgClass(CanCommitTransactionReply.SERIALIZABLE_CLASS));
1566             assertEquals("getCanCommit", false, reply.getCanCommit());
1567
1568             // Send another can commit to ensure the failed one got cleaned up.
1569
1570             reset(cohort);
1571
1572             String transactionID2 = "tx2";
1573             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1574
1575             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1576                     cohort, modification, true, false), getRef());
1577             expectMsgClass(duration, ReadyTransactionReply.class);
1578
1579             shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
1580             reply = CanCommitTransactionReply.fromSerializable(
1581                     expectMsgClass(CanCommitTransactionReply.SERIALIZABLE_CLASS));
1582             assertEquals("getCanCommit", true, reply.getCanCommit());
1583
1584             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1585         }};
1586     }
1587
1588     @Test
1589     public void testImmediateCommitWithCanCommitPhaseFailure() throws Throwable {
1590         new ShardTestKit(getSystem()) {{
1591             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1592                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1593                     "testImmediateCommitWithCanCommitPhaseFailure");
1594
1595             waitUntilLeader(shard);
1596
1597             final FiniteDuration duration = duration("5 seconds");
1598
1599             String transactionID1 = "tx1";
1600             MutableCompositeModification modification = new MutableCompositeModification();
1601             ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1602             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).canCommit();
1603
1604             // Simulate the ForwardedReadyTransaction messages that would be sent
1605             // by the ShardTransaction.
1606
1607             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1608                     cohort, modification, true, true), getRef());
1609
1610             expectMsgClass(duration, akka.actor.Status.Failure.class);
1611
1612             // Send another can commit to ensure the failed one got cleaned up.
1613
1614             reset(cohort);
1615
1616             String transactionID2 = "tx2";
1617             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1618             doReturn(Futures.immediateFuture(null)).when(cohort).preCommit();
1619             doReturn(Futures.immediateFuture(null)).when(cohort).commit();
1620             DataTreeCandidateTip candidate = mock(DataTreeCandidateTip.class);
1621             DataTreeCandidateNode candidateRoot = mock(DataTreeCandidateNode.class);
1622             doReturn(ModificationType.UNMODIFIED).when(candidateRoot).getModificationType();
1623             doReturn(candidateRoot).when(candidate).getRootNode();
1624             doReturn(candidate).when(cohort).getCandidate();
1625
1626             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1627                     cohort, modification, true, true), getRef());
1628
1629             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1630
1631             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1632         }};
1633     }
1634
1635     @Test
1636     public void testImmediateCommitWithCanCommitPhaseFalseResponse() throws Throwable {
1637         new ShardTestKit(getSystem()) {{
1638             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1639                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1640                     "testImmediateCommitWithCanCommitPhaseFalseResponse");
1641
1642             waitUntilLeader(shard);
1643
1644             final FiniteDuration duration = duration("5 seconds");
1645
1646             String transactionID = "tx1";
1647             MutableCompositeModification modification = new MutableCompositeModification();
1648             ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1649             doReturn(Futures.immediateFuture(Boolean.FALSE)).when(cohort).canCommit();
1650
1651             // Simulate the ForwardedReadyTransaction messages that would be sent
1652             // by the ShardTransaction.
1653
1654             shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
1655                     cohort, modification, true, true), getRef());
1656
1657             expectMsgClass(duration, akka.actor.Status.Failure.class);
1658
1659             // Send another can commit to ensure the failed one got cleaned up.
1660
1661             reset(cohort);
1662
1663             String transactionID2 = "tx2";
1664             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1665             doReturn(Futures.immediateFuture(null)).when(cohort).preCommit();
1666             doReturn(Futures.immediateFuture(null)).when(cohort).commit();
1667             DataTreeCandidateTip candidate = mock(DataTreeCandidateTip.class);
1668             DataTreeCandidateNode candidateRoot = mock(DataTreeCandidateNode.class);
1669             doReturn(ModificationType.UNMODIFIED).when(candidateRoot).getModificationType();
1670             doReturn(candidateRoot).when(candidate).getRootNode();
1671             doReturn(candidate).when(cohort).getCandidate();
1672
1673             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1674                     cohort, modification, true, true), getRef());
1675
1676             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1677
1678             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1679         }};
1680     }
1681
1682     @Test
1683     public void testAbortBeforeFinishCommit() throws Throwable {
1684         new ShardTestKit(getSystem()) {{
1685             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1686                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1687                     "testAbortBeforeFinishCommit");
1688
1689             waitUntilLeader(shard);
1690
1691             final FiniteDuration duration = duration("5 seconds");
1692             ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1693
1694             final String transactionID = "tx1";
1695             Function<ShardDataTreeCohort, ListenableFuture<Void>> preCommit =
1696                           new Function<ShardDataTreeCohort, ListenableFuture<Void>>() {
1697                 @Override
1698                 public ListenableFuture<Void> apply(final ShardDataTreeCohort cohort) {
1699                     ListenableFuture<Void> preCommitFuture = cohort.preCommit();
1700
1701                     // Simulate an AbortTransaction message occurring during replication, after
1702                     // persisting and before finishing the commit to the in-memory store.
1703                     // We have no followers so due to optimizations in the RaftActor, it does not
1704                     // attempt replication and thus we can't send an AbortTransaction message b/c
1705                     // it would be processed too late after CommitTransaction completes. So we'll
1706                     // simulate an AbortTransaction message occurring during replication by calling
1707                     // the shard directly.
1708                     //
1709                     shard.underlyingActor().doAbortTransaction(transactionID, null);
1710
1711                     return preCommitFuture;
1712                 }
1713             };
1714
1715             MutableCompositeModification modification = new MutableCompositeModification();
1716             ShardDataTreeCohort cohort = setupMockWriteTransaction("cohort1", dataStore,
1717                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME),
1718                     modification, preCommit);
1719
1720             shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
1721                     cohort, modification, true, false), getRef());
1722             expectMsgClass(duration, ReadyTransactionReply.class);
1723
1724             shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
1725             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1726                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1727             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1728
1729             shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
1730             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1731
1732             NormalizedNode<?, ?> node = readStore(shard, TestModel.TEST_PATH);
1733
1734             // Since we're simulating an abort occurring during replication and before finish commit,
1735             // the data should still get written to the in-memory store since we've gotten past
1736             // canCommit and preCommit and persisted the data.
1737             assertNotNull(TestModel.TEST_QNAME.getLocalName() + " not found", node);
1738
1739             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1740         }};
1741     }
1742
1743     @Test
1744     public void testTransactionCommitTimeout() throws Throwable {
1745         dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1);
1746
1747         new ShardTestKit(getSystem()) {{
1748             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1749                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1750                     "testTransactionCommitTimeout");
1751
1752             waitUntilLeader(shard);
1753
1754             final FiniteDuration duration = duration("5 seconds");
1755
1756             ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1757
1758             writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1759             writeToStore(shard, TestModel.OUTER_LIST_PATH,
1760                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
1761
1762             // Create 1st Tx - will timeout
1763
1764             String transactionID1 = "tx1";
1765             MutableCompositeModification modification1 = new MutableCompositeModification();
1766             ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
1767                     YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
1768                         .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
1769                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1),
1770                     modification1);
1771
1772             // Create 2nd Tx
1773
1774             String transactionID2 = "tx3";
1775             MutableCompositeModification modification2 = new MutableCompositeModification();
1776             YangInstanceIdentifier listNodePath = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
1777                 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build();
1778             ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort3", dataStore,
1779                     listNodePath,
1780                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2),
1781                     modification2);
1782
1783             // Ready the Tx's
1784
1785             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1786                     cohort1, modification1, true, false), getRef());
1787             expectMsgClass(duration, ReadyTransactionReply.class);
1788
1789             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1790                     cohort2, modification2, true, false), getRef());
1791             expectMsgClass(duration, ReadyTransactionReply.class);
1792
1793             // canCommit 1st Tx. We don't send the commit so it should timeout.
1794
1795             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1796             expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
1797
1798             // canCommit the 2nd Tx - it should complete after the 1st Tx times out.
1799
1800             shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
1801             expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
1802
1803             // Try to commit the 1st Tx - should fail as it's not the current Tx.
1804
1805             shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
1806             expectMsgClass(duration, akka.actor.Status.Failure.class);
1807
1808             // Commit the 2nd Tx.
1809
1810             shard.tell(new CommitTransaction(transactionID2).toSerializable(), getRef());
1811             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1812
1813             NormalizedNode<?, ?> node = readStore(shard, listNodePath);
1814             assertNotNull(listNodePath + " not found", node);
1815
1816             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1817         }};
1818     }
1819
1820     @Test
1821     public void testTransactionCommitQueueCapacityExceeded() throws Throwable {
1822         dataStoreContextBuilder.shardTransactionCommitQueueCapacity(1);
1823
1824         new ShardTestKit(getSystem()) {{
1825             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1826                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1827                     "testTransactionCommitQueueCapacityExceeded");
1828
1829             waitUntilLeader(shard);
1830
1831             final FiniteDuration duration = duration("5 seconds");
1832
1833             ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1834
1835             String transactionID1 = "tx1";
1836             MutableCompositeModification modification1 = new MutableCompositeModification();
1837             ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
1838                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
1839
1840             String transactionID2 = "tx2";
1841             MutableCompositeModification modification2 = new MutableCompositeModification();
1842             ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
1843                     TestModel.OUTER_LIST_PATH,
1844                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
1845                     modification2);
1846
1847             String transactionID3 = "tx3";
1848             MutableCompositeModification modification3 = new MutableCompositeModification();
1849             ShardDataTreeCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
1850                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification3);
1851
1852             // Ready the Tx's
1853
1854             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1855                     cohort1, modification1, true, false), getRef());
1856             expectMsgClass(duration, ReadyTransactionReply.class);
1857
1858             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1859                     cohort2, modification2, true, false), getRef());
1860             expectMsgClass(duration, ReadyTransactionReply.class);
1861
1862             shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
1863                     cohort3, modification3, true, false), getRef());
1864             expectMsgClass(duration, ReadyTransactionReply.class);
1865
1866             // canCommit 1st Tx.
1867
1868             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1869             expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
1870
1871             // canCommit the 2nd Tx - it should get queued.
1872
1873             shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
1874
1875             // canCommit the 3rd Tx - should exceed queue capacity and fail.
1876
1877             shard.tell(new CanCommitTransaction(transactionID3).toSerializable(), getRef());
1878             expectMsgClass(duration, akka.actor.Status.Failure.class);
1879
1880             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1881         }};
1882     }
1883
1884     @Test
1885     public void testCanCommitBeforeReadyFailure() throws Throwable {
1886         new ShardTestKit(getSystem()) {{
1887             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1888                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1889                     "testCanCommitBeforeReadyFailure");
1890
1891             shard.tell(new CanCommitTransaction("tx").toSerializable(), getRef());
1892             expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
1893
1894             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1895         }};
1896     }
1897
1898     @Test
1899     public void testAbortTransaction() throws Throwable {
1900         new ShardTestKit(getSystem()) {{
1901             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1902                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1903                     "testAbortTransaction");
1904
1905             waitUntilLeader(shard);
1906
1907             // Setup 2 simulated transactions with mock cohorts. The first one will be aborted.
1908
1909             String transactionID1 = "tx1";
1910             MutableCompositeModification modification1 = new MutableCompositeModification();
1911             ShardDataTreeCohort cohort1 = mock(ShardDataTreeCohort.class, "cohort1");
1912             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
1913             doReturn(Futures.immediateFuture(null)).when(cohort1).abort();
1914
1915             String transactionID2 = "tx2";
1916             MutableCompositeModification modification2 = new MutableCompositeModification();
1917             ShardDataTreeCohort cohort2 = mock(ShardDataTreeCohort.class, "cohort2");
1918             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
1919
1920             FiniteDuration duration = duration("5 seconds");
1921             final Timeout timeout = new Timeout(duration);
1922
1923             // Simulate the ForwardedReadyTransaction messages that would be sent
1924             // by the ShardTransaction.
1925
1926             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1927                     cohort1, modification1, true, false), getRef());
1928             expectMsgClass(duration, ReadyTransactionReply.class);
1929
1930             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1931                     cohort2, modification2, true, false), getRef());
1932             expectMsgClass(duration, ReadyTransactionReply.class);
1933
1934             // Send the CanCommitTransaction message for the first Tx.
1935
1936             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1937             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1938                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1939             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1940
1941             // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
1942             // processed after the first Tx completes.
1943
1944             Future<Object> canCommitFuture = Patterns.ask(shard,
1945                     new CanCommitTransaction(transactionID2).toSerializable(), timeout);
1946
1947             // Send the AbortTransaction message for the first Tx. This should trigger the 2nd
1948             // Tx to proceed.
1949
1950             shard.tell(new AbortTransaction(transactionID1).toSerializable(), getRef());
1951             expectMsgClass(duration, AbortTransactionReply.SERIALIZABLE_CLASS);
1952
1953             // Wait for the 2nd Tx to complete the canCommit phase.
1954
1955             Await.ready(canCommitFuture, duration);
1956
1957             InOrder inOrder = inOrder(cohort1, cohort2);
1958             inOrder.verify(cohort1).canCommit();
1959             inOrder.verify(cohort2).canCommit();
1960
1961             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1962         }};
1963     }
1964
1965     @Test
1966     public void testCreateSnapshot() throws Exception {
1967         testCreateSnapshot(true, "testCreateSnapshot");
1968     }
1969
1970     @Test
1971     public void testCreateSnapshotWithNonPersistentData() throws Exception {
1972         testCreateSnapshot(false, "testCreateSnapshotWithNonPersistentData");
1973     }
1974
1975     @SuppressWarnings("serial")
1976     public void testCreateSnapshot(final boolean persistent, final String shardActorName) throws Exception{
1977
1978         final AtomicReference<CountDownLatch> latch = new AtomicReference<>(new CountDownLatch(1));
1979
1980         final AtomicReference<Object> savedSnapshot = new AtomicReference<>();
1981         class TestPersistentDataProvider extends DelegatingPersistentDataProvider {
1982             TestPersistentDataProvider(DataPersistenceProvider delegate) {
1983                 super(delegate);
1984             }
1985
1986             @Override
1987             public void saveSnapshot(Object o) {
1988                 savedSnapshot.set(o);
1989                 super.saveSnapshot(o);
1990             }
1991         }
1992
1993         dataStoreContextBuilder.persistent(persistent);
1994
1995         new ShardTestKit(getSystem()) {{
1996             class TestShard extends Shard {
1997
1998                 protected TestShard(ShardIdentifier name, Map<String, String> peerAddresses,
1999                                     DatastoreContext datastoreContext, SchemaContext schemaContext) {
2000                     super(name, peerAddresses, datastoreContext, schemaContext);
2001                     setPersistence(new TestPersistentDataProvider(super.persistence()));
2002                 }
2003
2004                 @Override
2005                 public void handleCommand(Object message) {
2006                     super.handleCommand(message);
2007
2008                     if (message instanceof SaveSnapshotSuccess || message.equals("commit_snapshot")) {
2009                         latch.get().countDown();
2010                     }
2011                 }
2012
2013                 @Override
2014                 public RaftActorContext getRaftActorContext() {
2015                     return super.getRaftActorContext();
2016                 }
2017             }
2018
2019             Creator<Shard> creator = new Creator<Shard>() {
2020                 @Override
2021                 public Shard create() throws Exception {
2022                     return new TestShard(shardID, Collections.<String,String>emptyMap(),
2023                             newDatastoreContext(), SCHEMA_CONTEXT);
2024                 }
2025             };
2026
2027             TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
2028                     Props.create(new DelegatingShardCreator(creator)), shardActorName);
2029
2030             waitUntilLeader(shard);
2031
2032             writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
2033
2034             NormalizedNode<?,?> expectedRoot = readStore(shard, YangInstanceIdentifier.builder().build());
2035
2036             // Trigger creation of a snapshot by ensuring
2037             RaftActorContext raftActorContext = ((TestShard) shard.underlyingActor()).getRaftActorContext();
2038             raftActorContext.getSnapshotManager().capture(mock(ReplicatedLogEntry.class), -1);
2039
2040             assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS));
2041
2042             assertTrue("Invalid saved snapshot " + savedSnapshot.get(),
2043                     savedSnapshot.get() instanceof Snapshot);
2044
2045             verifySnapshot((Snapshot)savedSnapshot.get(), expectedRoot);
2046
2047             latch.set(new CountDownLatch(1));
2048             savedSnapshot.set(null);
2049
2050             raftActorContext.getSnapshotManager().capture(mock(ReplicatedLogEntry.class), -1);
2051
2052             assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS));
2053
2054             assertTrue("Invalid saved snapshot " + savedSnapshot.get(),
2055                     savedSnapshot.get() instanceof Snapshot);
2056
2057             verifySnapshot((Snapshot)savedSnapshot.get(), expectedRoot);
2058
2059             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2060         }
2061
2062         private void verifySnapshot(Snapshot snapshot, NormalizedNode<?,?> expectedRoot) {
2063
2064             NormalizedNode<?, ?> actual = SerializationUtils.deserializeNormalizedNode(snapshot.getState());
2065             assertEquals("Root node", expectedRoot, actual);
2066
2067         }};
2068     }
2069
2070     /**
2071      * This test simply verifies that the applySnapShot logic will work
2072      * @throws ReadFailedException
2073      * @throws DataValidationFailedException
2074      */
2075     @Test
2076     public void testInMemoryDataTreeRestore() throws ReadFailedException, DataValidationFailedException {
2077         DataTree store = InMemoryDataTreeFactory.getInstance().create();
2078         store.setSchemaContext(SCHEMA_CONTEXT);
2079
2080         DataTreeModification putTransaction = store.takeSnapshot().newModification();
2081         putTransaction.write(TestModel.TEST_PATH,
2082             ImmutableNodes.containerNode(TestModel.TEST_QNAME));
2083         commitTransaction(store, putTransaction);
2084
2085
2086         NormalizedNode<?, ?> expected = readStore(store, YangInstanceIdentifier.builder().build());
2087
2088         DataTreeModification writeTransaction = store.takeSnapshot().newModification();
2089
2090         writeTransaction.delete(YangInstanceIdentifier.builder().build());
2091         writeTransaction.write(YangInstanceIdentifier.builder().build(), expected);
2092
2093         commitTransaction(store, writeTransaction);
2094
2095         NormalizedNode<?, ?> actual = readStore(store, YangInstanceIdentifier.builder().build());
2096
2097         assertEquals(expected, actual);
2098     }
2099
2100     @Test
2101     public void testRecoveryApplicable(){
2102
2103         final DatastoreContext persistentContext = DatastoreContext.newBuilder().
2104                 shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(true).build();
2105
2106         final Props persistentProps = Shard.props(shardID, Collections.<String, String>emptyMap(),
2107                 persistentContext, SCHEMA_CONTEXT);
2108
2109         final DatastoreContext nonPersistentContext = DatastoreContext.newBuilder().
2110                 shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(false).build();
2111
2112         final Props nonPersistentProps = Shard.props(shardID, Collections.<String, String>emptyMap(),
2113                 nonPersistentContext, SCHEMA_CONTEXT);
2114
2115         new ShardTestKit(getSystem()) {{
2116             TestActorRef<Shard> shard1 = TestActorRef.create(getSystem(),
2117                     persistentProps, "testPersistence1");
2118
2119             assertTrue("Recovery Applicable", shard1.underlyingActor().persistence().isRecoveryApplicable());
2120
2121             shard1.tell(PoisonPill.getInstance(), ActorRef.noSender());
2122
2123             TestActorRef<Shard> shard2 = TestActorRef.create(getSystem(),
2124                     nonPersistentProps, "testPersistence2");
2125
2126             assertFalse("Recovery Not Applicable", shard2.underlyingActor().persistence().isRecoveryApplicable());
2127
2128             shard2.tell(PoisonPill.getInstance(), ActorRef.noSender());
2129
2130         }};
2131
2132     }
2133
2134     @Test
2135     public void testOnDatastoreContext() {
2136         new ShardTestKit(getSystem()) {{
2137             dataStoreContextBuilder.persistent(true);
2138
2139             TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(), "testOnDatastoreContext");
2140
2141             assertEquals("isRecoveryApplicable", true,
2142                     shard.underlyingActor().persistence().isRecoveryApplicable());
2143
2144             waitUntilLeader(shard);
2145
2146             shard.tell(dataStoreContextBuilder.persistent(false).build(), ActorRef.noSender());
2147
2148             assertEquals("isRecoveryApplicable", false,
2149                     shard.underlyingActor().persistence().isRecoveryApplicable());
2150
2151             shard.tell(dataStoreContextBuilder.persistent(true).build(), ActorRef.noSender());
2152
2153             assertEquals("isRecoveryApplicable", true,
2154                     shard.underlyingActor().persistence().isRecoveryApplicable());
2155
2156             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2157         }};
2158     }
2159
2160     @Test
2161     public void testRegisterRoleChangeListener() throws Exception {
2162         new ShardTestKit(getSystem()) {
2163             {
2164                 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
2165                         newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
2166                         "testRegisterRoleChangeListener");
2167
2168                 waitUntilLeader(shard);
2169
2170                 TestActorRef<MessageCollectorActor> listener =
2171                         TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
2172
2173                 shard.tell(new RegisterRoleChangeListener(), listener);
2174
2175                 MessageCollectorActor.expectFirstMatching(listener, RegisterRoleChangeListenerReply.class);
2176
2177                 ShardLeaderStateChanged leaderStateChanged = MessageCollectorActor.expectFirstMatching(listener,
2178                         ShardLeaderStateChanged.class);
2179                 assertEquals("getLocalShardDataTree present", true,
2180                         leaderStateChanged.getLocalShardDataTree().isPresent());
2181                 assertSame("getLocalShardDataTree", shard.underlyingActor().getDataStore().getDataTree(),
2182                         leaderStateChanged.getLocalShardDataTree().get());
2183
2184                 MessageCollectorActor.clearMessages(listener);
2185
2186                 // Force a leader change
2187
2188                 shard.tell(new RequestVote(10000, "member2", 50, 50), getRef());
2189
2190                 leaderStateChanged = MessageCollectorActor.expectFirstMatching(listener,
2191                         ShardLeaderStateChanged.class);
2192                 assertEquals("getLocalShardDataTree present", false,
2193                         leaderStateChanged.getLocalShardDataTree().isPresent());
2194
2195                 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2196             }
2197         };
2198     }
2199
2200     @Test
2201     public void testFollowerInitialSyncStatus() throws Exception {
2202         final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
2203                 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
2204                 "testFollowerInitialSyncStatus");
2205
2206         shard.underlyingActor().onReceiveCommand(new FollowerInitialSyncUpStatus(false, "member-1-shard-inventory-operational"));
2207
2208         assertEquals(false, shard.underlyingActor().getShardMBean().getFollowerInitialSyncStatus());
2209
2210         shard.underlyingActor().onReceiveCommand(new FollowerInitialSyncUpStatus(true, "member-1-shard-inventory-operational"));
2211
2212         assertEquals(true, shard.underlyingActor().getShardMBean().getFollowerInitialSyncStatus());
2213
2214         shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2215     }
2216
2217     private static void commitTransaction(DataTree store, final DataTreeModification modification) throws DataValidationFailedException {
2218         modification.ready();
2219         store.validate(modification);
2220         store.commit(store.prepare(modification));
2221     }
2222 }