BUG-5280: expand ShardDataTree to cover transaction mechanics
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / ShardTest.java
1 /*
2  * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertFalse;
13 import static org.junit.Assert.assertNotNull;
14 import static org.junit.Assert.assertSame;
15 import static org.junit.Assert.assertTrue;
16 import static org.junit.Assert.fail;
17 import static org.mockito.Matchers.any;
18 import static org.mockito.Mockito.doThrow;
19 import static org.mockito.Mockito.inOrder;
20 import static org.mockito.Mockito.mock;
21 import static org.opendaylight.controller.cluster.datastore.DataStoreVersions.CURRENT_VERSION;
22 import akka.actor.ActorRef;
23 import akka.actor.ActorSelection;
24 import akka.actor.Props;
25 import akka.actor.Status.Failure;
26 import akka.dispatch.Dispatchers;
27 import akka.dispatch.OnComplete;
28 import akka.japi.Creator;
29 import akka.pattern.Patterns;
30 import akka.persistence.SaveSnapshotSuccess;
31 import akka.testkit.TestActorRef;
32 import akka.util.Timeout;
33 import com.google.common.base.Stopwatch;
34 import com.google.common.primitives.UnsignedLong;
35 import com.google.common.util.concurrent.Uninterruptibles;
36 import java.io.IOException;
37 import java.util.Collections;
38 import java.util.HashSet;
39 import java.util.Map;
40 import java.util.Set;
41 import java.util.concurrent.CountDownLatch;
42 import java.util.concurrent.TimeUnit;
43 import java.util.concurrent.atomic.AtomicBoolean;
44 import java.util.concurrent.atomic.AtomicReference;
45 import org.junit.Test;
46 import org.mockito.InOrder;
47 import org.opendaylight.controller.cluster.DataPersistenceProvider;
48 import org.opendaylight.controller.cluster.DelegatingPersistentDataProvider;
49 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
50 import org.opendaylight.controller.cluster.access.concepts.MemberName;
51 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
52 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
53 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
54 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
55 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
56 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
57 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
58 import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
59 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
60 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
61 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
62 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
63 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
64 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
65 import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
66 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
67 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
68 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
69 import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction;
70 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
71 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
72 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
73 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener;
74 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListenerReply;
75 import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
76 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
77 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
78 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
79 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
80 import org.opendaylight.controller.cluster.datastore.persisted.MetadataShardDataTreeSnapshot;
81 import org.opendaylight.controller.cluster.datastore.persisted.ShardDataTreeSnapshot;
82 import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener;
83 import org.opendaylight.controller.cluster.datastore.utils.MockDataTreeChangeListener;
84 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
85 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListenerReply;
86 import org.opendaylight.controller.cluster.raft.RaftActorContext;
87 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
88 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
89 import org.opendaylight.controller.cluster.raft.Snapshot;
90 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
91 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
92 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
93 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
94 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
95 import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
96 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
97 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
98 import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState;
99 import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState;
100 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
101 import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
102 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
103 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
104 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
105 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
106 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
107 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
108 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
109 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
110 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
111 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
112 import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
113 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
114 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
115 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
116 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
117 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
118 import org.opendaylight.yangtools.yang.data.api.schema.tree.TipProducingDataTree;
119 import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType;
120 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
121 import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
122 import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory;
123 import scala.concurrent.Await;
124 import scala.concurrent.Future;
125 import scala.concurrent.duration.FiniteDuration;
126
127 public class ShardTest extends AbstractShardTest {
128     private static final String DUMMY_DATA = "Dummy data as snapshot sequence number is set to 0 in InMemorySnapshotStore and journal recovery seq number will start from 1";
129
130     @Test
131     public void testRegisterChangeListener() throws Exception {
132         new ShardTestKit(getSystem()) {{
133             final TestActorRef<Shard> shard = actorFactory.createTestActor(
134                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), "testRegisterChangeListener");
135
136             waitUntilLeader(shard);
137
138             shard.tell(new UpdateSchemaContext(SchemaContextHelper.full()), ActorRef.noSender());
139
140             final MockDataChangeListener listener = new MockDataChangeListener(1);
141             final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener),
142                     "testRegisterChangeListener-DataChangeListener");
143
144             shard.tell(new RegisterChangeListener(TestModel.TEST_PATH,
145                     dclActor, AsyncDataBroker.DataChangeScope.BASE, true), getRef());
146
147             final RegisterChangeListenerReply reply = expectMsgClass(duration("3 seconds"),
148                     RegisterChangeListenerReply.class);
149             final String replyPath = reply.getListenerRegistrationPath().toString();
150             assertTrue("Incorrect reply path: " + replyPath, replyPath.matches(
151                     "akka:\\/\\/test\\/user\\/testRegisterChangeListener\\/\\$.*"));
152
153             final YangInstanceIdentifier path = TestModel.TEST_PATH;
154             writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
155
156             listener.waitForChangeEvents(path);
157         }};
158     }
159
160     @SuppressWarnings("serial")
161     @Test
162     public void testChangeListenerNotifiedWhenNotTheLeaderOnRegistration() throws Exception {
163         // This test tests the timing window in which a change listener is registered before the
164         // shard becomes the leader. We verify that the listener is registered and notified of the
165         // existing data when the shard becomes the leader.
166         new ShardTestKit(getSystem()) {{
167             // For this test, we want to send the RegisterChangeListener message after the shard
168             // has recovered from persistence and before it becomes the leader. So we subclass
169             // Shard to override onReceiveCommand and, when the first ElectionTimeout is received,
170             // we know that the shard has been initialized to a follower and has started the
171             // election process. The following 2 CountDownLatches are used to coordinate the
172             // ElectionTimeout with the sending of the RegisterChangeListener message.
173             final CountDownLatch onFirstElectionTimeout = new CountDownLatch(1);
174             final CountDownLatch onChangeListenerRegistered = new CountDownLatch(1);
175             final Creator<Shard> creator = new Creator<Shard>() {
176                 boolean firstElectionTimeout = true;
177
178                 @Override
179                 public Shard create() throws Exception {
180                     // Use a non persistent provider because this test actually invokes persist on the journal
181                     // this will cause all other messages to not be queued properly after that.
182                     // The basic issue is that you cannot use TestActorRef with a persistent actor (at least when
183                     // it does do a persist)
184                     return new Shard(newShardBuilder()) {
185                         @Override
186                         public void handleCommand(final Object message) {
187                             if(message instanceof ElectionTimeout && firstElectionTimeout) {
188                                 // Got the first ElectionTimeout. We don't forward it to the
189                                 // base Shard yet until we've sent the RegisterChangeListener
190                                 // message. So we signal the onFirstElectionTimeout latch to tell
191                                 // the main thread to send the RegisterChangeListener message and
192                                 // start a thread to wait on the onChangeListenerRegistered latch,
193                                 // which the main thread signals after it has sent the message.
194                                 // After the onChangeListenerRegistered is triggered, we send the
195                                 // original ElectionTimeout message to proceed with the election.
196                                 firstElectionTimeout = false;
197                                 final ActorRef self = getSelf();
198                                 new Thread() {
199                                     @Override
200                                     public void run() {
201                                         Uninterruptibles.awaitUninterruptibly(
202                                                 onChangeListenerRegistered, 5, TimeUnit.SECONDS);
203                                         self.tell(message, self);
204                                     }
205                                 }.start();
206
207                                 onFirstElectionTimeout.countDown();
208                             } else {
209                                 super.handleCommand(message);
210                             }
211                         }
212                     };
213                 }
214             };
215
216             setupInMemorySnapshotStore();
217
218             final MockDataChangeListener listener = new MockDataChangeListener(1);
219             final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener),
220                     "testRegisterChangeListenerWhenNotLeaderInitially-DataChangeListener");
221
222             final TestActorRef<Shard> shard = actorFactory.createTestActor(
223                     Props.create(new DelegatingShardCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()),
224                     "testRegisterChangeListenerWhenNotLeaderInitially");
225
226             final YangInstanceIdentifier path = TestModel.TEST_PATH;
227
228             // Wait until the shard receives the first ElectionTimeout message.
229             assertEquals("Got first ElectionTimeout", true,
230                     onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
231
232             // Now send the RegisterChangeListener and wait for the reply.
233             shard.tell(new RegisterChangeListener(path, dclActor,
234                     AsyncDataBroker.DataChangeScope.SUBTREE, false), getRef());
235
236             final RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
237                     RegisterChangeListenerReply.class);
238             assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
239
240             // Sanity check - verify the shard is not the leader yet.
241             shard.tell(FindLeader.INSTANCE, getRef());
242             final FindLeaderReply findLeadeReply =
243                     expectMsgClass(duration("5 seconds"), FindLeaderReply.class);
244             assertFalse("Expected the shard not to be the leader", findLeadeReply.getLeaderActor().isPresent());
245
246             // Signal the onChangeListenerRegistered latch to tell the thread above to proceed
247             // with the election process.
248             onChangeListenerRegistered.countDown();
249
250             // Wait for the shard to become the leader and notify our listener with the existing
251             // data in the store.
252             listener.waitForChangeEvents(path);
253         }};
254     }
255
256     @Test
257     public void testRegisterDataTreeChangeListener() throws Exception {
258         new ShardTestKit(getSystem()) {{
259             final TestActorRef<Shard> shard = actorFactory.createTestActor(
260                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), "testRegisterDataTreeChangeListener");
261
262             waitUntilLeader(shard);
263
264             shard.tell(new UpdateSchemaContext(SchemaContextHelper.full()), ActorRef.noSender());
265
266             final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
267             final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener),
268                     "testRegisterDataTreeChangeListener-DataTreeChangeListener");
269
270             shard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor, false), getRef());
271
272             final RegisterDataTreeChangeListenerReply reply = expectMsgClass(duration("3 seconds"),
273                     RegisterDataTreeChangeListenerReply.class);
274             final String replyPath = reply.getListenerRegistrationPath().toString();
275             assertTrue("Incorrect reply path: " + replyPath, replyPath.matches(
276                     "akka:\\/\\/test\\/user\\/testRegisterDataTreeChangeListener\\/\\$.*"));
277
278             final YangInstanceIdentifier path = TestModel.TEST_PATH;
279             writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
280
281             listener.waitForChangeEvents();
282         }};
283     }
284
285     @SuppressWarnings("serial")
286     @Test
287     public void testDataTreeChangeListenerNotifiedWhenNotTheLeaderOnRegistration() throws Exception {
288         new ShardTestKit(getSystem()) {{
289             final CountDownLatch onFirstElectionTimeout = new CountDownLatch(1);
290             final CountDownLatch onChangeListenerRegistered = new CountDownLatch(1);
291             final Creator<Shard> creator = new Creator<Shard>() {
292                 boolean firstElectionTimeout = true;
293
294                 @Override
295                 public Shard create() throws Exception {
296                     return new Shard(newShardBuilder()) {
297                         @Override
298                         public void handleCommand(final Object message) {
299                             if(message instanceof ElectionTimeout && firstElectionTimeout) {
300                                 firstElectionTimeout = false;
301                                 final ActorRef self = getSelf();
302                                 new Thread() {
303                                     @Override
304                                     public void run() {
305                                         Uninterruptibles.awaitUninterruptibly(
306                                                 onChangeListenerRegistered, 5, TimeUnit.SECONDS);
307                                         self.tell(message, self);
308                                     }
309                                 }.start();
310
311                                 onFirstElectionTimeout.countDown();
312                             } else {
313                                 super.handleCommand(message);
314                             }
315                         }
316                     };
317                 }
318             };
319
320             setupInMemorySnapshotStore();
321
322             final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
323             final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener),
324                     "testDataTreeChangeListenerNotifiedWhenNotTheLeaderOnRegistration-DataChangeListener");
325
326             final TestActorRef<Shard> shard = actorFactory.createTestActor(
327                     Props.create(new DelegatingShardCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()),
328                     "testDataTreeChangeListenerNotifiedWhenNotTheLeaderOnRegistration");
329
330             final YangInstanceIdentifier path = TestModel.TEST_PATH;
331
332             assertEquals("Got first ElectionTimeout", true,
333                 onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
334
335             shard.tell(new RegisterDataTreeChangeListener(path, dclActor, false), getRef());
336             final RegisterDataTreeChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
337                 RegisterDataTreeChangeListenerReply.class);
338             assertNotNull("getListenerRegistratioznPath", reply.getListenerRegistrationPath());
339
340             shard.tell(FindLeader.INSTANCE, getRef());
341             final FindLeaderReply findLeadeReply =
342                     expectMsgClass(duration("5 seconds"), FindLeaderReply.class);
343             assertFalse("Expected the shard not to be the leader", findLeadeReply.getLeaderActor().isPresent());
344
345
346             onChangeListenerRegistered.countDown();
347
348             // TODO: investigate why we do not receive data chage events
349             listener.waitForChangeEvents();
350         }};
351     }
352
353     @Test
354     public void testCreateTransaction(){
355         new ShardTestKit(getSystem()) {{
356             final ActorRef shard = actorFactory.createActor(newShardProps(), "testCreateTransaction");
357
358             waitUntilLeader(shard);
359
360             shard.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
361
362             shard.tell(new CreateTransaction(nextTransactionId(), TransactionType.READ_ONLY.ordinal(),
363                     DataStoreVersions.CURRENT_VERSION).toSerializable(), getRef());
364
365             final CreateTransactionReply reply = expectMsgClass(duration("3 seconds"),
366                     CreateTransactionReply.class);
367
368             final String path = reply.getTransactionPath().toString();
369             assertTrue("Unexpected transaction path " + path,
370                     path.startsWith("akka://test/user/testCreateTransaction/shard-member-1:ShardTransactionTest@0:"));
371         }};
372     }
373
374     @Test
375     public void testCreateTransactionOnChain(){
376         new ShardTestKit(getSystem()) {{
377             final ActorRef shard = actorFactory.createActor(newShardProps(), "testCreateTransactionOnChain");
378
379             waitUntilLeader(shard);
380
381             shard.tell(new CreateTransaction(nextTransactionId(),TransactionType.READ_ONLY.ordinal(),
382                     DataStoreVersions.CURRENT_VERSION).toSerializable(), getRef());
383
384             final CreateTransactionReply reply = expectMsgClass(duration("3 seconds"),
385                     CreateTransactionReply.class);
386
387             final String path = reply.getTransactionPath().toString();
388             assertTrue("Unexpected transaction path " + path,
389                     path.startsWith("akka://test/user/testCreateTransactionOnChain/shard-member-1:ShardTransactionTest@0:"));
390         }};
391     }
392
393     @Test
394     public void testPeerAddressResolved() throws Exception {
395         new ShardTestKit(getSystem()) {{
396             ShardIdentifier peerID = ShardIdentifier.create("inventory", MemberName.forName("member-2"), "config");
397             final TestActorRef<Shard> shard = actorFactory.createTestActor(newShardBuilder().
398                     peerAddresses(Collections.<String, String>singletonMap(peerID.toString(), null)).props().
399                         withDispatcher(Dispatchers.DefaultDispatcherId()), "testPeerAddressResolved");
400
401             final String address = "akka://foobar";
402             shard.tell(new PeerAddressResolved(peerID.toString(), address), ActorRef.noSender());
403
404             shard.tell(GetOnDemandRaftState.INSTANCE, getRef());
405             OnDemandRaftState state = expectMsgClass(OnDemandRaftState.class);
406             assertEquals("getPeerAddress", address, state.getPeerAddresses().get(peerID.toString()));
407         }};
408     }
409
410     @Test
411     public void testApplySnapshot() throws Exception {
412
413         final TestActorRef<Shard> shard = actorFactory.createTestActor(newShardProps().
414                 withDispatcher(Dispatchers.DefaultDispatcherId()), "testApplySnapshot");
415
416         ShardTestKit.waitUntilLeader(shard);
417
418         final DataTree store = InMemoryDataTreeFactory.getInstance().create(TreeType.OPERATIONAL);
419         store.setSchemaContext(SCHEMA_CONTEXT);
420
421         final ContainerNode container = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
422                 new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
423                     withChild(ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).addChild(
424                         ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1)).build()).build();
425
426         writeToStore(store, TestModel.TEST_PATH, container);
427
428         final YangInstanceIdentifier root = YangInstanceIdentifier.EMPTY;
429         final NormalizedNode<?,?> expected = readStore(store, root);
430
431         final Snapshot snapshot = Snapshot.create(new MetadataShardDataTreeSnapshot(expected).serialize(),
432                 Collections.<ReplicatedLogEntry>emptyList(), 1, 2, 3, 4);
433
434         shard.tell(new ApplySnapshot(snapshot), ActorRef.noSender());
435
436         Stopwatch sw = Stopwatch.createStarted();
437         while(sw.elapsed(TimeUnit.SECONDS) <= 5) {
438             Uninterruptibles.sleepUninterruptibly(75, TimeUnit.MILLISECONDS);
439
440             try {
441                 assertEquals("Root node", expected, readStore(shard, root));
442                 return;
443             } catch(AssertionError e) {
444                 // try again
445             }
446         }
447
448         fail("Snapshot was not applied");
449     }
450
451     @Test
452     public void testApplyState() throws Exception {
453         final TestActorRef<Shard> shard = actorFactory.createTestActor(
454                 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), "testApplyState");
455
456         ShardTestKit.waitUntilLeader(shard);
457
458         final DataTree source = setupInMemorySnapshotStore();
459         final DataTreeModification writeMod = source.takeSnapshot().newModification();
460         ContainerNode node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
461         writeMod.write(TestModel.TEST_PATH, node);
462         writeMod.ready();
463
464         final TransactionIdentifier tx = nextTransactionId();
465         final ApplyState applyState = new ApplyState(null, tx,
466             new ReplicatedLogImplEntry(1, 2, payloadForModification(source, writeMod, tx)));
467
468         shard.tell(applyState, shard);
469
470         Stopwatch sw = Stopwatch.createStarted();
471         while(sw.elapsed(TimeUnit.SECONDS) <= 5) {
472             Uninterruptibles.sleepUninterruptibly(75, TimeUnit.MILLISECONDS);
473
474             final NormalizedNode<?,?> actual = readStore(shard, TestModel.TEST_PATH);
475             if(actual != null) {
476                 assertEquals("Applied state", node, actual);
477                 return;
478             }
479         }
480
481         fail("State was not applied");
482     }
483
484     @Test
485     public void testDataTreeCandidateRecovery() throws Exception {
486         // Set up the InMemorySnapshotStore.
487         final DataTree source = setupInMemorySnapshotStore();
488
489         final DataTreeModification writeMod = source.takeSnapshot().newModification();
490         writeMod.write(TestModel.OUTER_LIST_PATH, ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
491         writeMod.ready();
492         InMemoryJournal.addEntry(shardID.toString(), 0, DUMMY_DATA);
493
494         // Set up the InMemoryJournal.
495         InMemoryJournal.addEntry(shardID.toString(), 1, new ReplicatedLogImplEntry(0, 1,
496             payloadForModification(source, writeMod, nextTransactionId())));
497
498         final int nListEntries = 16;
499         final Set<Integer> listEntryKeys = new HashSet<>();
500
501         // Add some ModificationPayload entries
502         for (int i = 1; i <= nListEntries; i++) {
503             listEntryKeys.add(Integer.valueOf(i));
504
505             final YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
506                     .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
507
508             final DataTreeModification mod = source.takeSnapshot().newModification();
509             mod.merge(path, ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i));
510             mod.ready();
511
512             InMemoryJournal.addEntry(shardID.toString(), i+1, new ReplicatedLogImplEntry(i, 1,
513                 payloadForModification(source, mod, nextTransactionId())));
514         }
515
516         InMemoryJournal.addEntry(shardID.toString(), nListEntries + 2,
517             new ApplyJournalEntries(nListEntries));
518
519         testRecovery(listEntryKeys);
520     }
521
522     @Test
523     public void testConcurrentThreePhaseCommits() throws Throwable {
524         new ShardTestKit(getSystem()) {{
525             final TestActorRef<Shard> shard = actorFactory.createTestActor(
526                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
527                     "testConcurrentThreePhaseCommits");
528
529             waitUntilLeader(shard);
530
531             final TransactionIdentifier transactionID1 = nextTransactionId();
532             final TransactionIdentifier transactionID2 = nextTransactionId();
533             final TransactionIdentifier transactionID3 = nextTransactionId();
534
535             Map<TransactionIdentifier, CapturingShardDataTreeCohort> cohortMap = setupCohortDecorator(
536                     shard.underlyingActor(), transactionID1, transactionID2, transactionID3);
537             final CapturingShardDataTreeCohort cohort1 = cohortMap.get(transactionID1);
538             final CapturingShardDataTreeCohort cohort2 = cohortMap.get(transactionID2);
539             final CapturingShardDataTreeCohort cohort3 = cohortMap.get(transactionID3);
540
541             final long timeoutSec = 5;
542             final FiniteDuration duration = FiniteDuration.create(timeoutSec, TimeUnit.SECONDS);
543             final Timeout timeout = new Timeout(duration);
544
545             shard.tell(prepareBatchedModifications(transactionID1, TestModel.TEST_PATH,
546                     ImmutableNodes.containerNode(TestModel.TEST_QNAME), false), getRef());
547             final ReadyTransactionReply readyReply = ReadyTransactionReply.fromSerializable(
548                     expectMsgClass(duration, ReadyTransactionReply.class));
549             assertEquals("Cohort path", shard.path().toString(), readyReply.getCohortPath());
550
551             // Send the CanCommitTransaction message for the first Tx.
552
553             shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
554             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
555                     expectMsgClass(duration, CanCommitTransactionReply.class));
556             assertEquals("Can commit", true, canCommitReply.getCanCommit());
557
558             // Ready 2 more Tx's.
559
560             shard.tell(prepareBatchedModifications(transactionID2, TestModel.OUTER_LIST_PATH,
561                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), false), getRef());
562             expectMsgClass(duration, ReadyTransactionReply.class);
563
564             shard.tell(prepareBatchedModifications(transactionID3, YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
565                     .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
566                 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), false), getRef());
567             expectMsgClass(duration, ReadyTransactionReply.class);
568
569             // Send the CanCommitTransaction message for the next 2 Tx's. These should get queued and
570             // processed after the first Tx completes.
571
572             final Future<Object> canCommitFuture1 = Patterns.ask(shard,
573                     new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), timeout);
574
575             final Future<Object> canCommitFuture2 = Patterns.ask(shard,
576                     new CanCommitTransaction(transactionID3, CURRENT_VERSION).toSerializable(), timeout);
577
578             // Send the CommitTransaction message for the first Tx. After it completes, it should
579             // trigger the 2nd Tx to proceed which should in turn then trigger the 3rd.
580
581             shard.tell(new CommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
582             expectMsgClass(duration, CommitTransactionReply.class);
583
584             // Wait for the next 2 Tx's to complete.
585
586             final AtomicReference<Throwable> caughtEx = new AtomicReference<>();
587             final CountDownLatch commitLatch = new CountDownLatch(2);
588
589             class OnFutureComplete extends OnComplete<Object> {
590                 private final Class<?> expRespType;
591
592                 OnFutureComplete(final Class<?> expRespType) {
593                     this.expRespType = expRespType;
594                 }
595
596                 @Override
597                 public void onComplete(final Throwable error, final Object resp) {
598                     if(error != null) {
599                         caughtEx.set(new AssertionError(getClass().getSimpleName() + " failure", error));
600                     } else {
601                         try {
602                             assertEquals("Commit response type", expRespType, resp.getClass());
603                             onSuccess(resp);
604                         } catch (final Exception e) {
605                             caughtEx.set(e);
606                         }
607                     }
608                 }
609
610                 void onSuccess(final Object resp) throws Exception {
611                 }
612             }
613
614             class OnCommitFutureComplete extends OnFutureComplete {
615                 OnCommitFutureComplete() {
616                     super(CommitTransactionReply.class);
617                 }
618
619                 @Override
620                 public void onComplete(final Throwable error, final Object resp) {
621                     super.onComplete(error, resp);
622                     commitLatch.countDown();
623                 }
624             }
625
626             class OnCanCommitFutureComplete extends OnFutureComplete {
627                 private final TransactionIdentifier transactionID;
628
629                 OnCanCommitFutureComplete(final TransactionIdentifier transactionID) {
630                     super(CanCommitTransactionReply.class);
631                     this.transactionID = transactionID;
632                 }
633
634                 @Override
635                 void onSuccess(final Object resp) throws Exception {
636                     final CanCommitTransactionReply canCommitReply =
637                             CanCommitTransactionReply.fromSerializable(resp);
638                     assertEquals("Can commit", true, canCommitReply.getCanCommit());
639
640                     final Future<Object> commitFuture = Patterns.ask(shard,
641                             new CommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), timeout);
642                     commitFuture.onComplete(new OnCommitFutureComplete(), getSystem().dispatcher());
643                 }
644             }
645
646             canCommitFuture1.onComplete(new OnCanCommitFutureComplete(transactionID2),
647                     getSystem().dispatcher());
648
649             canCommitFuture2.onComplete(new OnCanCommitFutureComplete(transactionID3),
650                     getSystem().dispatcher());
651
652             final boolean done = commitLatch.await(timeoutSec, TimeUnit.SECONDS);
653
654             if(caughtEx.get() != null) {
655                 throw caughtEx.get();
656             }
657
658             assertEquals("Commits complete", true, done);
659
660             final InOrder inOrder = inOrder(cohort1.getCanCommit(), cohort1.getPreCommit(), cohort1.getCommit(),
661                     cohort2.getCanCommit(), cohort2.getPreCommit(), cohort2.getCommit(), cohort3.getCanCommit(),
662                     cohort3.getPreCommit(), cohort3.getCommit());
663             inOrder.verify(cohort1.getCanCommit()).onSuccess(any(Void.class));
664             inOrder.verify(cohort1.getPreCommit()).onSuccess(any(DataTreeCandidate.class));
665             inOrder.verify(cohort1.getCommit()).onSuccess(any(UnsignedLong.class));
666             inOrder.verify(cohort2.getCanCommit()).onSuccess(any(Void.class));
667             inOrder.verify(cohort2.getPreCommit()).onSuccess(any(DataTreeCandidate.class));
668             inOrder.verify(cohort2.getCommit()).onSuccess(any(UnsignedLong.class));
669             inOrder.verify(cohort3.getCanCommit()).onSuccess(any(Void.class));
670             inOrder.verify(cohort3.getPreCommit()).onSuccess(any(DataTreeCandidate.class));
671             inOrder.verify(cohort3.getCommit()).onSuccess(any(UnsignedLong.class));
672
673             // Verify data in the data store.
674
675             verifyOuterListEntry(shard, 1);
676
677             verifyLastApplied(shard, 2);
678         }};
679     }
680
681     @Test
682     public void testBatchedModificationsWithNoCommitOnReady() throws Throwable {
683         new ShardTestKit(getSystem()) {{
684             final TestActorRef<Shard> shard = actorFactory.createTestActor(
685                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
686                     "testBatchedModificationsWithNoCommitOnReady");
687
688             waitUntilLeader(shard);
689
690             final TransactionIdentifier transactionID = nextTransactionId();
691             final FiniteDuration duration = duration("5 seconds");
692
693             // Send a BatchedModifications to start a transaction.
694
695             shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH,
696                     ImmutableNodes.containerNode(TestModel.TEST_QNAME), false, false, 1), getRef());
697             expectMsgClass(duration, BatchedModificationsReply.class);
698
699             // Send a couple more BatchedModifications.
700
701             shard.tell(newBatchedModifications(transactionID, TestModel.OUTER_LIST_PATH,
702                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), false, false, 2), getRef());
703             expectMsgClass(duration, BatchedModificationsReply.class);
704
705             shard.tell(newBatchedModifications(transactionID, YangInstanceIdentifier.builder(
706                     TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
707                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true, false, 3), getRef());
708             expectMsgClass(duration, ReadyTransactionReply.class);
709
710             // Send the CanCommitTransaction message.
711
712             shard.tell(new CanCommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
713             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
714                     expectMsgClass(duration, CanCommitTransactionReply.class));
715             assertEquals("Can commit", true, canCommitReply.getCanCommit());
716
717             // Send the CommitTransaction message.
718
719             shard.tell(new CommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
720             expectMsgClass(duration, CommitTransactionReply.class);
721
722             // Verify data in the data store.
723
724             verifyOuterListEntry(shard, 1);
725         }};
726     }
727
728     @Test
729     public void testBatchedModificationsWithCommitOnReady() throws Throwable {
730         new ShardTestKit(getSystem()) {{
731             final TestActorRef<Shard> shard = actorFactory.createTestActor(
732                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
733                     "testBatchedModificationsWithCommitOnReady");
734
735             waitUntilLeader(shard);
736
737             final TransactionIdentifier transactionID = nextTransactionId();
738             final FiniteDuration duration = duration("5 seconds");
739
740             // Send a BatchedModifications to start a transaction.
741
742             shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH,
743                     ImmutableNodes.containerNode(TestModel.TEST_QNAME), false, false, 1), getRef());
744             expectMsgClass(duration, BatchedModificationsReply.class);
745
746             // Send a couple more BatchedModifications.
747
748             shard.tell(newBatchedModifications(transactionID, TestModel.OUTER_LIST_PATH,
749                 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), false, false, 2), getRef());
750             expectMsgClass(duration, BatchedModificationsReply.class);
751
752             shard.tell(newBatchedModifications(transactionID, YangInstanceIdentifier.builder(
753                     TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
754                 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true, true, 3), getRef());
755
756             expectMsgClass(duration, CommitTransactionReply.class);
757
758             // Verify data in the data store.
759
760             verifyOuterListEntry(shard, 1);
761         }};
762     }
763
764     @Test(expected=IllegalStateException.class)
765     public void testBatchedModificationsReadyWithIncorrectTotalMessageCount() throws Throwable {
766         new ShardTestKit(getSystem()) {{
767             final TestActorRef<Shard> shard = actorFactory.createTestActor(
768                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
769                     "testBatchedModificationsReadyWithIncorrectTotalMessageCount");
770
771             waitUntilLeader(shard);
772
773             final TransactionIdentifier transactionID = nextTransactionId();
774             final BatchedModifications batched = new BatchedModifications(transactionID, DataStoreVersions.CURRENT_VERSION);
775             batched.setReady(true);
776             batched.setTotalMessagesSent(2);
777
778             shard.tell(batched, getRef());
779
780             final Failure failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
781
782             if(failure != null) {
783                 throw failure.cause();
784             }
785         }};
786     }
787
788     @Test
789     public void testBatchedModificationsWithOperationFailure() throws Throwable {
790         new ShardTestKit(getSystem()) {{
791             final TestActorRef<Shard> shard = actorFactory.createTestActor(
792                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
793                     "testBatchedModificationsWithOperationFailure");
794
795             waitUntilLeader(shard);
796
797             // Test merge with invalid data. An exception should occur when the merge is applied. Note that
798             // write will not validate the children for performance reasons.
799
800             TransactionIdentifier transactionID = nextTransactionId();
801
802             ContainerNode invalidData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
803                     new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
804                         withChild(ImmutableNodes.leafNode(TestModel.JUNK_QNAME, "junk")).build();
805
806             BatchedModifications batched = new BatchedModifications(transactionID, CURRENT_VERSION);
807             batched.addModification(new MergeModification(TestModel.TEST_PATH, invalidData));
808             shard.tell(batched, getRef());
809             Failure failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
810
811             Throwable cause = failure.cause();
812
813             batched = new BatchedModifications(transactionID, DataStoreVersions.CURRENT_VERSION);
814             batched.setReady(true);
815             batched.setTotalMessagesSent(2);
816
817             shard.tell(batched, getRef());
818
819             failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
820             assertEquals("Failure cause", cause, failure.cause());
821         }};
822     }
823
824     @Test
825     public void testBatchedModificationsOnTransactionChain() throws Throwable {
826         new ShardTestKit(getSystem()) {{
827             final TestActorRef<Shard> shard = actorFactory.createTestActor(
828                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
829                     "testBatchedModificationsOnTransactionChain");
830
831             waitUntilLeader(shard);
832
833             final LocalHistoryIdentifier historyId = nextHistoryId();
834             final TransactionIdentifier transactionID1 = new TransactionIdentifier(historyId, 0);
835             final TransactionIdentifier transactionID2 = new TransactionIdentifier(historyId, 1);
836
837             final FiniteDuration duration = duration("5 seconds");
838
839             // Send a BatchedModifications to start a chained write transaction and ready it.
840
841             final ContainerNode containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
842             final YangInstanceIdentifier path = TestModel.TEST_PATH;
843             shard.tell(newBatchedModifications(transactionID1, path, containerNode, true, false, 1), getRef());
844             expectMsgClass(duration, ReadyTransactionReply.class);
845
846             // Create a read Tx on the same chain.
847
848             shard.tell(new CreateTransaction(transactionID2, TransactionType.READ_ONLY.ordinal(),
849                     DataStoreVersions.CURRENT_VERSION).toSerializable(), getRef());
850
851             final CreateTransactionReply createReply = expectMsgClass(duration("3 seconds"), CreateTransactionReply.class);
852
853             getSystem().actorSelection(createReply.getTransactionPath()).tell(
854                     new ReadData(path, DataStoreVersions.CURRENT_VERSION), getRef());
855             final ReadDataReply readReply = expectMsgClass(duration("3 seconds"), ReadDataReply.class);
856             assertEquals("Read node", containerNode, readReply.getNormalizedNode());
857
858             // Commit the write transaction.
859
860             shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
861             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
862                     expectMsgClass(duration, CanCommitTransactionReply.class));
863             assertEquals("Can commit", true, canCommitReply.getCanCommit());
864
865             shard.tell(new CommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
866             expectMsgClass(duration, CommitTransactionReply.class);
867
868             // Verify data in the data store.
869
870             final NormalizedNode<?, ?> actualNode = readStore(shard, path);
871             assertEquals("Stored node", containerNode, actualNode);
872         }};
873     }
874
875     @Test
876     public void testOnBatchedModificationsWhenNotLeader() {
877         final AtomicBoolean overrideLeaderCalls = new AtomicBoolean();
878         new ShardTestKit(getSystem()) {{
879             final Creator<Shard> creator = new Creator<Shard>() {
880                 private static final long serialVersionUID = 1L;
881
882                 @Override
883                 public Shard create() throws Exception {
884                     return new Shard(newShardBuilder()) {
885                         @Override
886                         protected boolean isLeader() {
887                             return overrideLeaderCalls.get() ? false : super.isLeader();
888                         }
889
890                         @Override
891                         protected ActorSelection getLeader() {
892                             return overrideLeaderCalls.get() ? getSystem().actorSelection(getRef().path()) :
893                                 super.getLeader();
894                         }
895                     };
896                 }
897             };
898
899             final TestActorRef<Shard> shard = actorFactory.createTestActor(
900                     Props.create(new DelegatingShardCreator(creator)).
901                         withDispatcher(Dispatchers.DefaultDispatcherId()), "testOnBatchedModificationsWhenNotLeader");
902
903             waitUntilLeader(shard);
904
905             overrideLeaderCalls.set(true);
906
907             final BatchedModifications batched = new BatchedModifications(nextTransactionId(), DataStoreVersions.CURRENT_VERSION);
908
909             shard.tell(batched, ActorRef.noSender());
910
911             expectMsgEquals(batched);
912         }};
913     }
914
915     @Test
916     public void testTransactionMessagesWithNoLeader() {
917         new ShardTestKit(getSystem()) {{
918             dataStoreContextBuilder.customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName()).
919                 shardHeartbeatIntervalInMillis(50).shardElectionTimeoutFactor(1);
920             final TestActorRef<Shard> shard = actorFactory.createTestActor(
921                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
922                     "testTransactionMessagesWithNoLeader");
923
924             waitUntilNoLeader(shard);
925
926             final TransactionIdentifier txId = nextTransactionId();
927             shard.tell(new BatchedModifications(txId, DataStoreVersions.CURRENT_VERSION), getRef());
928             Failure failure = expectMsgClass(Failure.class);
929             assertEquals("Failure cause type", NoShardLeaderException.class, failure.cause().getClass());
930
931             shard.tell(prepareForwardedReadyTransaction(shard, txId, TestModel.TEST_PATH,
932                     ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
933             failure = expectMsgClass(Failure.class);
934             assertEquals("Failure cause type", NoShardLeaderException.class, failure.cause().getClass());
935
936             shard.tell(new ReadyLocalTransaction(txId, mock(DataTreeModification.class), true), getRef());
937             failure = expectMsgClass(Failure.class);
938             assertEquals("Failure cause type", NoShardLeaderException.class, failure.cause().getClass());
939         }};
940     }
941
942     @Test
943     public void testReadyWithReadWriteImmediateCommit() throws Exception{
944         testReadyWithImmediateCommit(true);
945     }
946
947     @Test
948     public void testReadyWithWriteOnlyImmediateCommit() throws Exception{
949         testReadyWithImmediateCommit(false);
950     }
951
952     private void testReadyWithImmediateCommit(final boolean readWrite) throws Exception{
953         new ShardTestKit(getSystem()) {{
954             final TestActorRef<Shard> shard = actorFactory.createTestActor(
955                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
956                     "testReadyWithImmediateCommit-" + readWrite);
957
958             waitUntilLeader(shard);
959
960             final TransactionIdentifier transactionID = nextTransactionId();
961             final NormalizedNode<?, ?> containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
962             if(readWrite) {
963                 shard.tell(prepareForwardedReadyTransaction(shard, transactionID, TestModel.TEST_PATH,
964                         containerNode, true), getRef());
965             } else {
966                 shard.tell(prepareBatchedModifications(transactionID, TestModel.TEST_PATH, containerNode, true), getRef());
967             }
968
969             expectMsgClass(duration("5 seconds"), CommitTransactionReply.class);
970
971             final NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.TEST_PATH);
972             assertEquals(TestModel.TEST_QNAME.getLocalName(), containerNode, actualNode);
973         }};
974     }
975
976     @Test
977     public void testReadyLocalTransactionWithImmediateCommit() throws Exception{
978         new ShardTestKit(getSystem()) {{
979             final TestActorRef<Shard> shard = actorFactory.createTestActor(
980                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
981                     "testReadyLocalTransactionWithImmediateCommit");
982
983             waitUntilLeader(shard);
984
985             final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
986
987             final DataTreeModification modification = dataStore.newModification();
988
989             final ContainerNode writeData = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
990             new WriteModification(TestModel.TEST_PATH, writeData).apply(modification);
991             final MapNode mergeData = ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build();
992             new MergeModification(TestModel.OUTER_LIST_PATH, mergeData).apply(modification);
993
994             final TransactionIdentifier txId = nextTransactionId();
995             modification.ready();
996             final ReadyLocalTransaction readyMessage = new ReadyLocalTransaction(txId, modification, true);
997
998             shard.tell(readyMessage, getRef());
999
1000             expectMsgClass(CommitTransactionReply.class);
1001
1002             final NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.OUTER_LIST_PATH);
1003             assertEquals(TestModel.OUTER_LIST_QNAME.getLocalName(), mergeData, actualNode);
1004         }};
1005     }
1006
1007     @Test
1008     public void testReadyLocalTransactionWithThreePhaseCommit() throws Exception{
1009         new ShardTestKit(getSystem()) {{
1010             final TestActorRef<Shard> shard = actorFactory.createTestActor(
1011                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1012                     "testReadyLocalTransactionWithThreePhaseCommit");
1013
1014             waitUntilLeader(shard);
1015
1016             final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1017
1018             final DataTreeModification modification = dataStore.newModification();
1019
1020             final ContainerNode writeData = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1021             new WriteModification(TestModel.TEST_PATH, writeData).apply(modification);
1022             final MapNode mergeData = ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build();
1023             new MergeModification(TestModel.OUTER_LIST_PATH, mergeData).apply(modification);
1024
1025             final TransactionIdentifier txId = nextTransactionId();
1026             modification.ready();
1027             final ReadyLocalTransaction readyMessage = new ReadyLocalTransaction(txId, modification, false);
1028
1029             shard.tell(readyMessage, getRef());
1030
1031             expectMsgClass(ReadyTransactionReply.class);
1032
1033             // Send the CanCommitTransaction message.
1034
1035             shard.tell(new CanCommitTransaction(txId, CURRENT_VERSION).toSerializable(), getRef());
1036             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1037                     expectMsgClass(CanCommitTransactionReply.class));
1038             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1039
1040             // Send the CanCommitTransaction message.
1041
1042             shard.tell(new CommitTransaction(txId, CURRENT_VERSION).toSerializable(), getRef());
1043             expectMsgClass(CommitTransactionReply.class);
1044
1045             final NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.OUTER_LIST_PATH);
1046             assertEquals(TestModel.OUTER_LIST_QNAME.getLocalName(), mergeData, actualNode);
1047         }};
1048     }
1049
1050     @Test
1051     public void testReadWriteCommitWithPersistenceDisabled() throws Throwable {
1052         dataStoreContextBuilder.persistent(false);
1053         new ShardTestKit(getSystem()) {{
1054             final TestActorRef<Shard> shard = actorFactory.createTestActor(
1055                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1056                     "testCommitWithPersistenceDisabled");
1057
1058             waitUntilLeader(shard);
1059
1060             // Setup a simulated transactions with a mock cohort.
1061
1062             final FiniteDuration duration = duration("5 seconds");
1063
1064             final TransactionIdentifier transactionID = nextTransactionId();
1065             final NormalizedNode<?, ?> containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1066             shard.tell(prepareBatchedModifications(transactionID, TestModel.TEST_PATH, containerNode, false), getRef());
1067             expectMsgClass(duration, ReadyTransactionReply.class);
1068
1069             // Send the CanCommitTransaction message.
1070
1071             shard.tell(new CanCommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
1072             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1073                     expectMsgClass(duration, CanCommitTransactionReply.class));
1074             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1075
1076             // Send the CanCommitTransaction message.
1077
1078             shard.tell(new CommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
1079             expectMsgClass(duration, CommitTransactionReply.class);
1080
1081             final NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.TEST_PATH);
1082             assertEquals(TestModel.TEST_QNAME.getLocalName(), containerNode, actualNode);
1083         }};
1084     }
1085
1086     @Test
1087     public void testReadWriteCommitWhenTransactionHasNoModifications() {
1088         testCommitWhenTransactionHasNoModifications(true);
1089     }
1090
1091     @Test
1092     public void testWriteOnlyCommitWhenTransactionHasNoModifications() {
1093         testCommitWhenTransactionHasNoModifications(false);
1094     }
1095
1096     private void testCommitWhenTransactionHasNoModifications(final boolean readWrite){
1097         // Note that persistence is enabled which would normally result in the entry getting written to the journal
1098         // but here that need not happen
1099         new ShardTestKit(getSystem()) {{
1100             final TestActorRef<Shard> shard = actorFactory.createTestActor(
1101                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1102                     "testCommitWhenTransactionHasNoModifications-" + readWrite);
1103
1104             waitUntilLeader(shard);
1105
1106             final TransactionIdentifier transactionID = nextTransactionId();
1107
1108             final FiniteDuration duration = duration("5 seconds");
1109
1110             if(readWrite) {
1111                 ReadWriteShardDataTreeTransaction rwTx = shard.underlyingActor().getDataStore().
1112                         newReadWriteTransaction(transactionID);
1113                 shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION, rwTx, false), getRef());
1114             } else {
1115                 shard.tell(prepareBatchedModifications(transactionID, new MutableCompositeModification()), getRef());
1116             }
1117
1118             expectMsgClass(duration, ReadyTransactionReply.class);
1119
1120             // Send the CanCommitTransaction message.
1121
1122             shard.tell(new CanCommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
1123             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1124                     expectMsgClass(duration, CanCommitTransactionReply.class));
1125             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1126
1127             shard.tell(new CommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
1128             expectMsgClass(duration, CommitTransactionReply.class);
1129
1130             shard.tell(Shard.GET_SHARD_MBEAN_MESSAGE, getRef());
1131             final ShardStats shardStats = expectMsgClass(duration, ShardStats.class);
1132
1133             // Use MBean for verification
1134             // Committed transaction count should increase as usual
1135             assertEquals(1,shardStats.getCommittedTransactionsCount());
1136
1137             // Commit index should not advance because this does not go into the journal
1138             assertEquals(-1, shardStats.getCommitIndex());
1139         }};
1140     }
1141
1142     @Test
1143     public void testReadWriteCommitWhenTransactionHasModifications() throws Exception {
1144         testCommitWhenTransactionHasModifications(true);
1145     }
1146
1147     @Test
1148     public void testWriteOnlyCommitWhenTransactionHasModifications() throws Exception {
1149         testCommitWhenTransactionHasModifications(false);
1150     }
1151
1152     private void testCommitWhenTransactionHasModifications(final boolean readWrite) throws Exception {
1153         new ShardTestKit(getSystem()) {{
1154             final TipProducingDataTree dataTree = createDelegatingMockDataTree();
1155             final TestActorRef<Shard> shard = actorFactory.createTestActor(
1156                     newShardBuilder().dataTree(dataTree).props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1157                     "testCommitWhenTransactionHasModifications-" + readWrite);
1158
1159             waitUntilLeader(shard);
1160
1161             final FiniteDuration duration = duration("5 seconds");
1162             final TransactionIdentifier transactionID = nextTransactionId();
1163
1164             if(readWrite) {
1165                 shard.tell(prepareForwardedReadyTransaction(shard, transactionID, TestModel.TEST_PATH,
1166                         ImmutableNodes.containerNode(TestModel.TEST_QNAME), false), getRef());
1167             } else {
1168                 shard.tell(prepareBatchedModifications(transactionID, TestModel.TEST_PATH,
1169                         ImmutableNodes.containerNode(TestModel.TEST_QNAME), false), getRef());
1170             }
1171
1172             expectMsgClass(duration, ReadyTransactionReply.class);
1173
1174             // Send the CanCommitTransaction message.
1175
1176             shard.tell(new CanCommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
1177             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1178                     expectMsgClass(duration, CanCommitTransactionReply.class));
1179             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1180
1181             shard.tell(new CommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
1182             expectMsgClass(duration, CommitTransactionReply.class);
1183
1184             final InOrder inOrder = inOrder(dataTree);
1185             inOrder.verify(dataTree).validate(any(DataTreeModification.class));
1186             inOrder.verify(dataTree).prepare(any(DataTreeModification.class));
1187             inOrder.verify(dataTree).commit(any(DataTreeCandidate.class));
1188
1189             shard.tell(Shard.GET_SHARD_MBEAN_MESSAGE, getRef());
1190             final ShardStats shardStats = expectMsgClass(duration, ShardStats.class);
1191
1192             // Use MBean for verification
1193             // Committed transaction count should increase as usual
1194             assertEquals(1, shardStats.getCommittedTransactionsCount());
1195
1196             // Commit index should advance as we do not have an empty modification
1197             assertEquals(0, shardStats.getCommitIndex());
1198         }};
1199     }
1200
1201     @Test
1202     public void testCommitPhaseFailure() throws Throwable {
1203         new ShardTestKit(getSystem()) {{
1204             final TipProducingDataTree dataTree = createDelegatingMockDataTree();
1205             final TestActorRef<Shard> shard = actorFactory.createTestActor(
1206                     newShardBuilder().dataTree(dataTree).props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1207                     "testCommitPhaseFailure");
1208
1209             waitUntilLeader(shard);
1210
1211             final FiniteDuration duration = duration("5 seconds");
1212             final Timeout timeout = new Timeout(duration);
1213
1214             // Setup 2 simulated transactions with mock cohorts. The first one fails in the
1215             // commit phase.
1216
1217             doThrow(new RuntimeException("mock commit failure")).when(dataTree).commit(any(DataTreeCandidate.class));
1218
1219             final TransactionIdentifier transactionID1 = nextTransactionId();
1220             shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH,
1221                     ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), getRef());
1222             expectMsgClass(duration, ReadyTransactionReply.class);
1223
1224             final TransactionIdentifier transactionID2 = nextTransactionId();
1225             shard.tell(newBatchedModifications(transactionID2, TestModel.TEST_PATH,
1226                     ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), getRef());
1227             expectMsgClass(duration, ReadyTransactionReply.class);
1228
1229             // Send the CanCommitTransaction message for the first Tx.
1230
1231             shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
1232             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1233                     expectMsgClass(duration, CanCommitTransactionReply.class));
1234             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1235
1236             // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
1237             // processed after the first Tx completes.
1238
1239             final Future<Object> canCommitFuture = Patterns.ask(shard,
1240                     new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), timeout);
1241
1242             // Send the CommitTransaction message for the first Tx. This should send back an error
1243             // and trigger the 2nd Tx to proceed.
1244
1245             shard.tell(new CommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
1246             expectMsgClass(duration, akka.actor.Status.Failure.class);
1247
1248             // Wait for the 2nd Tx to complete the canCommit phase.
1249
1250             final CountDownLatch latch = new CountDownLatch(1);
1251             canCommitFuture.onComplete(new OnComplete<Object>() {
1252                 @Override
1253                 public void onComplete(final Throwable t, final Object resp) {
1254                     latch.countDown();
1255                 }
1256             }, getSystem().dispatcher());
1257
1258             assertEquals("2nd CanCommit complete", true, latch.await(5, TimeUnit.SECONDS));
1259
1260             final InOrder inOrder = inOrder(dataTree);
1261             inOrder.verify(dataTree).validate(any(DataTreeModification.class));
1262             inOrder.verify(dataTree).prepare(any(DataTreeModification.class));
1263             inOrder.verify(dataTree).commit(any(DataTreeCandidate.class));
1264             inOrder.verify(dataTree).validate(any(DataTreeModification.class));
1265         }};
1266     }
1267
1268     @Test
1269     public void testPreCommitPhaseFailure() throws Throwable {
1270         new ShardTestKit(getSystem()) {{
1271             final TipProducingDataTree dataTree = createDelegatingMockDataTree();
1272             final TestActorRef<Shard> shard = actorFactory.createTestActor(
1273                     newShardBuilder().dataTree(dataTree).props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1274                     "testPreCommitPhaseFailure");
1275
1276             waitUntilLeader(shard);
1277
1278             final FiniteDuration duration = duration("5 seconds");
1279             final Timeout timeout = new Timeout(duration);
1280
1281             doThrow(new RuntimeException("mock preCommit failure")).when(dataTree).prepare(any(DataTreeModification.class));
1282
1283             final TransactionIdentifier transactionID1 = nextTransactionId();
1284             shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH,
1285                     ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), getRef());
1286             expectMsgClass(duration, ReadyTransactionReply.class);
1287
1288             final TransactionIdentifier transactionID2 = nextTransactionId();
1289             shard.tell(newBatchedModifications(transactionID2, TestModel.TEST_PATH,
1290                     ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), getRef());
1291             expectMsgClass(duration, ReadyTransactionReply.class);
1292
1293             // Send the CanCommitTransaction message for the first Tx.
1294
1295             shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
1296             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1297                 expectMsgClass(duration, CanCommitTransactionReply.class));
1298             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1299
1300             // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
1301             // processed after the first Tx completes.
1302
1303             final Future<Object> canCommitFuture = Patterns.ask(shard,
1304                     new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), timeout);
1305
1306             // Send the CommitTransaction message for the first Tx. This should send back an error
1307             // and trigger the 2nd Tx to proceed.
1308
1309             shard.tell(new CommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
1310             expectMsgClass(duration, akka.actor.Status.Failure.class);
1311
1312             // Wait for the 2nd Tx to complete the canCommit phase.
1313
1314             final CountDownLatch latch = new CountDownLatch(1);
1315             canCommitFuture.onComplete(new OnComplete<Object>() {
1316                 @Override
1317                 public void onComplete(final Throwable t, final Object resp) {
1318                     latch.countDown();
1319                 }
1320             }, getSystem().dispatcher());
1321
1322             assertEquals("2nd CanCommit complete", true, latch.await(5, TimeUnit.SECONDS));
1323
1324             final InOrder inOrder = inOrder(dataTree);
1325             inOrder.verify(dataTree).validate(any(DataTreeModification.class));
1326             inOrder.verify(dataTree).prepare(any(DataTreeModification.class));
1327             inOrder.verify(dataTree).validate(any(DataTreeModification.class));
1328         }};
1329     }
1330
1331     @Test
1332     public void testCanCommitPhaseFailure() throws Throwable {
1333         new ShardTestKit(getSystem()) {{
1334             final TipProducingDataTree dataTree = createDelegatingMockDataTree();
1335             final TestActorRef<Shard> shard = actorFactory.createTestActor(
1336                     newShardBuilder().dataTree(dataTree).props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1337                     "testCanCommitPhaseFailure");
1338
1339             waitUntilLeader(shard);
1340
1341             final FiniteDuration duration = duration("5 seconds");
1342             final TransactionIdentifier transactionID1 = nextTransactionId();
1343
1344             doThrow(new DataValidationFailedException(YangInstanceIdentifier.EMPTY, "mock canCommit failure")).
1345                 doNothing().when(dataTree).validate(any(DataTreeModification.class));
1346
1347             shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH,
1348                     ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), getRef());
1349             expectMsgClass(duration, ReadyTransactionReply.class);
1350
1351             // Send the CanCommitTransaction message.
1352
1353             shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
1354             expectMsgClass(duration, akka.actor.Status.Failure.class);
1355
1356             // Send another can commit to ensure the failed one got cleaned up.
1357
1358             final TransactionIdentifier transactionID2 = nextTransactionId();
1359             shard.tell(newBatchedModifications(transactionID2, TestModel.TEST_PATH,
1360                     ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), getRef());
1361             expectMsgClass(duration, ReadyTransactionReply.class);
1362
1363             shard.tell(new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), getRef());
1364             final CanCommitTransactionReply reply = CanCommitTransactionReply.fromSerializable(
1365                     expectMsgClass(CanCommitTransactionReply.class));
1366             assertEquals("getCanCommit", true, reply.getCanCommit());
1367         }};
1368     }
1369
1370     @Test
1371     public void testImmediateCommitWithCanCommitPhaseFailure() throws Throwable {
1372         testImmediateCommitWithCanCommitPhaseFailure(true);
1373         testImmediateCommitWithCanCommitPhaseFailure(false);
1374     }
1375
1376     private void testImmediateCommitWithCanCommitPhaseFailure(final boolean readWrite) throws Throwable {
1377         new ShardTestKit(getSystem()) {{
1378             final TipProducingDataTree dataTree = createDelegatingMockDataTree();
1379             final TestActorRef<Shard> shard = actorFactory.createTestActor(
1380                     newShardBuilder().dataTree(dataTree).props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1381                     "testImmediateCommitWithCanCommitPhaseFailure-" + readWrite);
1382
1383             waitUntilLeader(shard);
1384
1385             doThrow(new DataValidationFailedException(YangInstanceIdentifier.EMPTY, "mock canCommit failure")).
1386                  doNothing().when(dataTree).validate(any(DataTreeModification.class));
1387
1388             final FiniteDuration duration = duration("5 seconds");
1389
1390             final TransactionIdentifier transactionID1 = nextTransactionId();
1391
1392             if(readWrite) {
1393                 shard.tell(prepareForwardedReadyTransaction(shard, transactionID1, TestModel.TEST_PATH,
1394                         ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
1395             } else {
1396                 shard.tell(prepareBatchedModifications(transactionID1, TestModel.TEST_PATH,
1397                         ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
1398             }
1399
1400             expectMsgClass(duration, akka.actor.Status.Failure.class);
1401
1402             // Send another can commit to ensure the failed one got cleaned up.
1403
1404             final TransactionIdentifier transactionID2 = nextTransactionId();
1405             if(readWrite) {
1406                 shard.tell(prepareForwardedReadyTransaction(shard, transactionID2, TestModel.TEST_PATH,
1407                         ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
1408             } else {
1409                 shard.tell(prepareBatchedModifications(transactionID2, TestModel.TEST_PATH,
1410                         ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
1411             }
1412
1413             expectMsgClass(duration, CommitTransactionReply.class);
1414         }};
1415     }
1416
1417     @SuppressWarnings("serial")
1418     @Test
1419     public void testAbortWithCommitPending() throws Throwable {
1420         new ShardTestKit(getSystem()) {{
1421             final Creator<Shard> creator = new Creator<Shard>() {
1422                 @Override
1423                 public Shard create() throws Exception {
1424                     return new Shard(newShardBuilder()) {
1425                         @Override
1426                         void persistPayload(final TransactionIdentifier transactionId, final Payload payload) {
1427                             // Simulate an AbortTransaction message occurring during replication, after
1428                             // persisting and before finishing the commit to the in-memory store.
1429
1430                             doAbortTransaction(transactionId, null);
1431                             super.persistPayload(transactionId, payload);
1432                         }
1433                     };
1434                 }
1435             };
1436
1437             final TestActorRef<Shard> shard = actorFactory.createTestActor(
1438                     Props.create(new DelegatingShardCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()),
1439                     "testAbortWithCommitPending");
1440
1441             waitUntilLeader(shard);
1442
1443             final FiniteDuration duration = duration("5 seconds");
1444
1445             final TransactionIdentifier transactionID = nextTransactionId();
1446
1447             shard.tell(prepareBatchedModifications(transactionID, TestModel.TEST_PATH,
1448                     ImmutableNodes.containerNode(TestModel.TEST_QNAME), false), getRef());
1449             expectMsgClass(duration, ReadyTransactionReply.class);
1450
1451             shard.tell(new CanCommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
1452             expectMsgClass(duration, CanCommitTransactionReply.class);
1453
1454             shard.tell(new CommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
1455             expectMsgClass(duration, CommitTransactionReply.class);
1456
1457             final NormalizedNode<?, ?> node = readStore(shard, TestModel.TEST_PATH);
1458
1459             // Since we're simulating an abort occurring during replication and before finish commit,
1460             // the data should still get written to the in-memory store since we've gotten past
1461             // canCommit and preCommit and persisted the data.
1462             assertNotNull(TestModel.TEST_QNAME.getLocalName() + " not found", node);
1463         }};
1464     }
1465
1466     @Test
1467     public void testTransactionCommitTimeout() throws Throwable {
1468         dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1);
1469         new ShardTestKit(getSystem()) {{
1470             final TestActorRef<Shard> shard = actorFactory.createTestActor(
1471                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1472                     "testTransactionCommitTimeout");
1473
1474             waitUntilLeader(shard);
1475
1476             final FiniteDuration duration = duration("5 seconds");
1477
1478             writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1479             writeToStore(shard, TestModel.OUTER_LIST_PATH,
1480                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
1481
1482             // Ready 2 Tx's - the first will timeout
1483
1484             final TransactionIdentifier transactionID1 = nextTransactionId();
1485             shard.tell(prepareBatchedModifications(transactionID1, YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
1486                     .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
1487                 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), false), getRef());
1488             expectMsgClass(duration, ReadyTransactionReply.class);
1489
1490             final TransactionIdentifier transactionID2 = nextTransactionId();
1491             final YangInstanceIdentifier listNodePath = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
1492                     .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build();
1493             shard.tell(prepareBatchedModifications(transactionID2, listNodePath,
1494                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2), false), getRef());
1495             expectMsgClass(duration, ReadyTransactionReply.class);
1496
1497             // canCommit 1st Tx. We don't send the commit so it should timeout.
1498
1499             shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
1500             expectMsgClass(duration, CanCommitTransactionReply.class);
1501
1502             // canCommit the 2nd Tx - it should complete after the 1st Tx times out.
1503
1504             shard.tell(new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), getRef());
1505             expectMsgClass(duration, CanCommitTransactionReply.class);
1506
1507             // Try to commit the 1st Tx - should fail as it's not the current Tx.
1508
1509             shard.tell(new CommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
1510             expectMsgClass(duration, akka.actor.Status.Failure.class);
1511
1512             // Commit the 2nd Tx.
1513
1514             shard.tell(new CommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), getRef());
1515             expectMsgClass(duration, CommitTransactionReply.class);
1516
1517             final NormalizedNode<?, ?> node = readStore(shard, listNodePath);
1518             assertNotNull(listNodePath + " not found", node);
1519         }};
1520     }
1521
1522 //    @Test
1523 //    @Ignore
1524 //    public void testTransactionCommitQueueCapacityExceeded() throws Throwable {
1525 //        dataStoreContextBuilder.shardTransactionCommitQueueCapacity(2);
1526 //
1527 //        new ShardTestKit(getSystem()) {{
1528 //            final TestActorRef<Shard> shard = actorFactory.createTestActor(
1529 //                    newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1530 //                    "testTransactionCommitQueueCapacityExceeded");
1531 //
1532 //            waitUntilLeader(shard);
1533 //
1534 //            final FiniteDuration duration = duration("5 seconds");
1535 //
1536 //            final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1537 //
1538 //            final TransactionIdentifier transactionID1 = nextTransactionId();
1539 //            final MutableCompositeModification modification1 = new MutableCompositeModification();
1540 //            final ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
1541 //                    TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), transactionID1,
1542 //                    modification1);
1543 //
1544 //            final TransactionIdentifier transactionID2 = nextTransactionId();
1545 //            final MutableCompositeModification modification2 = new MutableCompositeModification();
1546 //            final ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
1547 //                    TestModel.OUTER_LIST_PATH,
1548 //                    ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), transactionID2,
1549 //                    modification2);
1550 //
1551 //            final TransactionIdentifier transactionID3 = nextTransactionId();
1552 //            final MutableCompositeModification modification3 = new MutableCompositeModification();
1553 //            final ShardDataTreeCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
1554 //                    TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), transactionID3,
1555 //                    modification3);
1556 //
1557 //            // Ready the Tx's
1558 //
1559 //            shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort1, transactionID1, modification1), getRef());
1560 //            expectMsgClass(duration, ReadyTransactionReply.class);
1561 //
1562 //            shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort2, transactionID2, modification2), getRef());
1563 //            expectMsgClass(duration, ReadyTransactionReply.class);
1564 //
1565 //            // The 3rd Tx should exceed queue capacity and fail.
1566 //
1567 //            shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort3, transactionID3, modification3), getRef());
1568 //            expectMsgClass(duration, akka.actor.Status.Failure.class);
1569 //
1570 //            // canCommit 1st Tx.
1571 //
1572 //            shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
1573 //            expectMsgClass(duration, CanCommitTransactionReply.class);
1574 //
1575 //            // canCommit the 2nd Tx - it should get queued.
1576 //
1577 //            shard.tell(new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), getRef());
1578 //
1579 //            // canCommit the 3rd Tx - should exceed queue capacity and fail.
1580 //
1581 //            shard.tell(new CanCommitTransaction(transactionID3, CURRENT_VERSION).toSerializable(), getRef());
1582 //            expectMsgClass(duration, akka.actor.Status.Failure.class);
1583 //        }};
1584 //    }
1585
1586     @Test
1587     public void testTransactionCommitWithPriorExpiredCohortEntries() throws Throwable {
1588         dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1);
1589         new ShardTestKit(getSystem()) {{
1590             final TestActorRef<Shard> shard = actorFactory.createTestActor(
1591                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1592                     "testTransactionCommitWithPriorExpiredCohortEntries");
1593
1594             waitUntilLeader(shard);
1595
1596             final FiniteDuration duration = duration("5 seconds");
1597
1598             final TransactionIdentifier transactionID1 = nextTransactionId();
1599             shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH,
1600                     ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), getRef());
1601             expectMsgClass(duration, ReadyTransactionReply.class);
1602
1603             final TransactionIdentifier transactionID2 = nextTransactionId();
1604             shard.tell(newBatchedModifications(transactionID2, TestModel.TEST_PATH,
1605                     ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), getRef());
1606             expectMsgClass(duration, ReadyTransactionReply.class);
1607
1608             final TransactionIdentifier transactionID3 = nextTransactionId();
1609             shard.tell(newBatchedModifications(transactionID3, TestModel.TEST_PATH,
1610                     ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), getRef());
1611             expectMsgClass(duration, ReadyTransactionReply.class);
1612
1613             // All Tx's are readied. We'll send canCommit for the last one but not the others. The others
1614             // should expire from the queue and the last one should be processed.
1615
1616             shard.tell(new CanCommitTransaction(transactionID3, CURRENT_VERSION).toSerializable(), getRef());
1617             expectMsgClass(duration, CanCommitTransactionReply.class);
1618         }};
1619     }
1620
1621     @Test
1622     public void testTransactionCommitWithSubsequentExpiredCohortEntry() throws Throwable {
1623         dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1);
1624         new ShardTestKit(getSystem()) {{
1625             final TestActorRef<Shard> shard = actorFactory.createTestActor(
1626                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1627                     "testTransactionCommitWithSubsequentExpiredCohortEntry");
1628
1629             waitUntilLeader(shard);
1630
1631             final FiniteDuration duration = duration("5 seconds");
1632
1633             final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1634
1635             final TransactionIdentifier transactionID1 = nextTransactionId();
1636             shard.tell(prepareBatchedModifications(transactionID1, TestModel.TEST_PATH,
1637                     ImmutableNodes.containerNode(TestModel.TEST_QNAME), false), getRef());
1638             expectMsgClass(duration, ReadyTransactionReply.class);
1639
1640             // CanCommit the first Tx so it's the current in-progress Tx.
1641
1642             shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
1643             expectMsgClass(duration, CanCommitTransactionReply.class);
1644
1645             // Ready the second Tx.
1646
1647             final TransactionIdentifier transactionID2 = nextTransactionId();
1648             shard.tell(prepareBatchedModifications(transactionID2, TestModel.TEST_PATH,
1649                     ImmutableNodes.containerNode(TestModel.TEST_QNAME), false), getRef());
1650             expectMsgClass(duration, ReadyTransactionReply.class);
1651
1652             // Ready the third Tx.
1653
1654             final TransactionIdentifier transactionID3 = nextTransactionId();
1655             final DataTreeModification modification3 = dataStore.newModification();
1656             new WriteModification(TestModel.TEST2_PATH, ImmutableNodes.containerNode(TestModel.TEST2_QNAME))
1657                     .apply(modification3);
1658             modification3.ready();
1659             final ReadyLocalTransaction readyMessage = new ReadyLocalTransaction(transactionID3, modification3, true);
1660             shard.tell(readyMessage, getRef());
1661
1662             // Commit the first Tx. After completing, the second should expire from the queue and the third
1663             // Tx committed.
1664
1665             shard.tell(new CommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
1666             expectMsgClass(duration, CommitTransactionReply.class);
1667
1668             // Expect commit reply from the third Tx.
1669
1670             expectMsgClass(duration, CommitTransactionReply.class);
1671
1672             final NormalizedNode<?, ?> node = readStore(shard, TestModel.TEST2_PATH);
1673             assertNotNull(TestModel.TEST2_PATH + " not found", node);
1674         }};
1675     }
1676
1677     @Test
1678     public void testCanCommitBeforeReadyFailure() throws Throwable {
1679         new ShardTestKit(getSystem()) {{
1680             final TestActorRef<Shard> shard = actorFactory.createTestActor(
1681                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1682                     "testCanCommitBeforeReadyFailure");
1683
1684             shard.tell(new CanCommitTransaction(nextTransactionId(), CURRENT_VERSION).toSerializable(), getRef());
1685             expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
1686         }};
1687     }
1688
1689     @Test
1690     public void testAbortAfterCanCommit() throws Throwable {
1691         new ShardTestKit(getSystem()) {{
1692             final TestActorRef<Shard> shard = actorFactory.createTestActor(
1693                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1694                     "testAbortAfterCanCommit");
1695
1696             waitUntilLeader(shard);
1697
1698             final FiniteDuration duration = duration("5 seconds");
1699             final Timeout timeout = new Timeout(duration);
1700
1701             // Ready 2 transactions - the first one will be aborted.
1702
1703             final TransactionIdentifier transactionID1 = nextTransactionId();
1704             shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH,
1705                     ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), getRef());
1706             expectMsgClass(duration, ReadyTransactionReply.class);
1707
1708             final TransactionIdentifier transactionID2 = nextTransactionId();
1709             shard.tell(newBatchedModifications(transactionID2, TestModel.TEST_PATH,
1710                     ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), getRef());
1711             expectMsgClass(duration, ReadyTransactionReply.class);
1712
1713             // Send the CanCommitTransaction message for the first Tx.
1714
1715             shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
1716             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1717                     expectMsgClass(duration, CanCommitTransactionReply.class));
1718             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1719
1720             // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
1721             // processed after the first Tx completes.
1722
1723             final Future<Object> canCommitFuture = Patterns.ask(shard,
1724                     new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), timeout);
1725
1726             // Send the AbortTransaction message for the first Tx. This should trigger the 2nd
1727             // Tx to proceed.
1728
1729             shard.tell(new AbortTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
1730             expectMsgClass(duration, AbortTransactionReply.class);
1731
1732             // Wait for the 2nd Tx to complete the canCommit phase.
1733
1734             canCommitReply = (CanCommitTransactionReply) Await.result(canCommitFuture, duration);
1735             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1736         }};
1737     }
1738
1739     @Test
1740     public void testAbortAfterReady() throws Throwable {
1741         dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1);
1742         new ShardTestKit(getSystem()) {{
1743             final TestActorRef<Shard> shard = actorFactory.createTestActor(
1744                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), "testAbortAfterReady");
1745
1746             waitUntilLeader(shard);
1747
1748             final FiniteDuration duration = duration("5 seconds");
1749
1750             // Ready a tx.
1751
1752             final TransactionIdentifier transactionID1 = nextTransactionId();
1753             shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH,
1754                     ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), getRef());
1755             expectMsgClass(duration, ReadyTransactionReply.class);
1756
1757             // Send the AbortTransaction message.
1758
1759             shard.tell(new AbortTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
1760             expectMsgClass(duration, AbortTransactionReply.class);
1761
1762             assertEquals("getPendingTxCommitQueueSize", 0, shard.underlyingActor().getPendingTxCommitQueueSize());
1763
1764             // Now send CanCommitTransaction - should fail.
1765
1766             shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
1767             Throwable failure = expectMsgClass(duration, akka.actor.Status.Failure.class).cause();
1768             assertTrue("Failure type", failure instanceof IllegalStateException);
1769
1770             // Ready and CanCommit another and verify success.
1771
1772             final TransactionIdentifier transactionID2 = nextTransactionId();
1773             shard.tell(newBatchedModifications(transactionID2, TestModel.TEST_PATH,
1774                     ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), getRef());
1775             expectMsgClass(duration, ReadyTransactionReply.class);
1776
1777             shard.tell(new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), getRef());
1778             expectMsgClass(duration, CanCommitTransactionReply.class);
1779         }};
1780     }
1781
1782     @Test
1783     public void testAbortQueuedTransaction() throws Throwable {
1784         new ShardTestKit(getSystem()) {{
1785             final TestActorRef<Shard> shard = actorFactory.createTestActor(
1786                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), "testAbortAfterReady");
1787
1788             waitUntilLeader(shard);
1789
1790             final FiniteDuration duration = duration("5 seconds");
1791
1792             // Ready 3 tx's.
1793
1794             final TransactionIdentifier transactionID1 = nextTransactionId();
1795             shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH,
1796                     ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), getRef());
1797             expectMsgClass(duration, ReadyTransactionReply.class);
1798
1799             final TransactionIdentifier transactionID2 = nextTransactionId();
1800             shard.tell(newBatchedModifications(transactionID2, TestModel.TEST_PATH,
1801                     ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), getRef());
1802             expectMsgClass(duration, ReadyTransactionReply.class);
1803
1804             final TransactionIdentifier transactionID3 = nextTransactionId();
1805             shard.tell(newBatchedModifications(transactionID3, TestModel.OUTER_LIST_PATH,
1806                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), true, false, 1), getRef());
1807             expectMsgClass(duration, ReadyTransactionReply.class);
1808
1809             // Abort the second tx while it's queued.
1810
1811             shard.tell(new AbortTransaction(transactionID2, CURRENT_VERSION).toSerializable(), getRef());
1812             expectMsgClass(duration, AbortTransactionReply.class);
1813
1814             // Commit the other 2.
1815
1816             shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
1817             expectMsgClass(duration, CanCommitTransactionReply.class);
1818
1819             shard.tell(new CommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
1820             expectMsgClass(duration, CommitTransactionReply.class);
1821
1822             shard.tell(new CanCommitTransaction(transactionID3, CURRENT_VERSION).toSerializable(), getRef());
1823             expectMsgClass(duration, CanCommitTransactionReply.class);
1824
1825             shard.tell(new CommitTransaction(transactionID3, CURRENT_VERSION).toSerializable(), getRef());
1826             expectMsgClass(duration, CommitTransactionReply.class);
1827
1828             assertEquals("getPendingTxCommitQueueSize", 0, shard.underlyingActor().getPendingTxCommitQueueSize());
1829         }};
1830     }
1831
1832     @Test
1833     public void testCreateSnapshot() throws Exception {
1834         testCreateSnapshot(true, "testCreateSnapshot");
1835     }
1836
1837     @Test
1838     public void testCreateSnapshotWithNonPersistentData() throws Exception {
1839         testCreateSnapshot(false, "testCreateSnapshotWithNonPersistentData");
1840     }
1841
1842     private void testCreateSnapshot(final boolean persistent, final String shardActorName) throws Exception{
1843
1844         final AtomicReference<CountDownLatch> latch = new AtomicReference<>(new CountDownLatch(1));
1845
1846         final AtomicReference<Object> savedSnapshot = new AtomicReference<>();
1847         class TestPersistentDataProvider extends DelegatingPersistentDataProvider {
1848             TestPersistentDataProvider(final DataPersistenceProvider delegate) {
1849                 super(delegate);
1850             }
1851
1852             @Override
1853             public void saveSnapshot(final Object o) {
1854                 savedSnapshot.set(o);
1855                 super.saveSnapshot(o);
1856             }
1857         }
1858
1859         dataStoreContextBuilder.persistent(persistent);
1860
1861         new ShardTestKit(getSystem()) {{
1862             class TestShard extends Shard {
1863
1864                 protected TestShard(final AbstractBuilder<?, ?> builder) {
1865                     super(builder);
1866                     setPersistence(new TestPersistentDataProvider(super.persistence()));
1867                 }
1868
1869                 @Override
1870                 public void handleCommand(final Object message) {
1871                     super.handleCommand(message);
1872
1873                     // XXX:  commit_snapshot equality check references RaftActorSnapshotMessageSupport.COMMIT_SNAPSHOT
1874                     if (message instanceof SaveSnapshotSuccess || "commit_snapshot".equals(message.toString())) {
1875                         latch.get().countDown();
1876                     }
1877                 }
1878
1879                 @Override
1880                 public RaftActorContext getRaftActorContext() {
1881                     return super.getRaftActorContext();
1882                 }
1883             }
1884
1885             final Creator<Shard> creator = () -> new TestShard(newShardBuilder());
1886
1887             final TestActorRef<Shard> shard = actorFactory.createTestActor(
1888                     Props.create(new DelegatingShardCreator(creator)).
1889                         withDispatcher(Dispatchers.DefaultDispatcherId()), shardActorName);
1890
1891             waitUntilLeader(shard);
1892             writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1893
1894             final NormalizedNode<?,?> expectedRoot = readStore(shard, YangInstanceIdentifier.EMPTY);
1895
1896             // Trigger creation of a snapshot by ensuring
1897             final RaftActorContext raftActorContext = ((TestShard) shard.underlyingActor()).getRaftActorContext();
1898             raftActorContext.getSnapshotManager().capture(mock(ReplicatedLogEntry.class), -1);
1899             awaitAndValidateSnapshot(expectedRoot);
1900
1901             raftActorContext.getSnapshotManager().capture(mock(ReplicatedLogEntry.class), -1);
1902             awaitAndValidateSnapshot(expectedRoot);
1903         }
1904
1905         private void awaitAndValidateSnapshot(final NormalizedNode<?,?> expectedRoot) throws InterruptedException, IOException {
1906             assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS));
1907
1908             assertTrue("Invalid saved snapshot " + savedSnapshot.get(),
1909                     savedSnapshot.get() instanceof Snapshot);
1910
1911             verifySnapshot((Snapshot)savedSnapshot.get(), expectedRoot);
1912
1913             latch.set(new CountDownLatch(1));
1914             savedSnapshot.set(null);
1915         }
1916
1917         private void verifySnapshot(final Snapshot snapshot, final NormalizedNode<?,?> expectedRoot) throws IOException {
1918             final NormalizedNode<?, ?> actual = ShardDataTreeSnapshot.deserialize(snapshot.getState()).getRootNode().get();
1919             assertEquals("Root node", expectedRoot, actual);
1920         }};
1921     }
1922
1923     /**
1924      * This test simply verifies that the applySnapShot logic will work
1925      * @throws ReadFailedException
1926      * @throws DataValidationFailedException
1927      */
1928     @Test
1929     public void testInMemoryDataTreeRestore() throws ReadFailedException, DataValidationFailedException {
1930         final DataTree store = InMemoryDataTreeFactory.getInstance().create(TreeType.OPERATIONAL);
1931         store.setSchemaContext(SCHEMA_CONTEXT);
1932
1933         final DataTreeModification putTransaction = store.takeSnapshot().newModification();
1934         putTransaction.write(TestModel.TEST_PATH,
1935             ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1936         commitTransaction(store, putTransaction);
1937
1938
1939         final NormalizedNode<?, ?> expected = readStore(store, YangInstanceIdentifier.EMPTY);
1940
1941         final DataTreeModification writeTransaction = store.takeSnapshot().newModification();
1942
1943         writeTransaction.delete(YangInstanceIdentifier.EMPTY);
1944         writeTransaction.write(YangInstanceIdentifier.EMPTY, expected);
1945
1946         commitTransaction(store, writeTransaction);
1947
1948         final NormalizedNode<?, ?> actual = readStore(store, YangInstanceIdentifier.EMPTY);
1949
1950         assertEquals(expected, actual);
1951     }
1952
1953     @Test
1954     public void testRecoveryApplicable(){
1955
1956         final DatastoreContext persistentContext = DatastoreContext.newBuilder().
1957                 shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(true).build();
1958
1959         final Props persistentProps = Shard.builder().id(shardID).datastoreContext(persistentContext).
1960                 schemaContext(SCHEMA_CONTEXT).props();
1961
1962         final DatastoreContext nonPersistentContext = DatastoreContext.newBuilder().
1963                 shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(false).build();
1964
1965         final Props nonPersistentProps = Shard.builder().id(shardID).datastoreContext(nonPersistentContext).
1966                 schemaContext(SCHEMA_CONTEXT).props();
1967
1968         new ShardTestKit(getSystem()) {{
1969             final TestActorRef<Shard> shard1 = actorFactory.createTestActor(persistentProps, "testPersistence1");
1970
1971             assertTrue("Recovery Applicable", shard1.underlyingActor().persistence().isRecoveryApplicable());
1972
1973             final TestActorRef<Shard> shard2 = actorFactory.createTestActor(nonPersistentProps, "testPersistence2");
1974
1975             assertFalse("Recovery Not Applicable", shard2.underlyingActor().persistence().isRecoveryApplicable());
1976         }};
1977     }
1978
1979     @Test
1980     public void testOnDatastoreContext() {
1981         new ShardTestKit(getSystem()) {{
1982             dataStoreContextBuilder.persistent(true);
1983
1984             final TestActorRef<Shard> shard = actorFactory.createTestActor(newShardProps(), "testOnDatastoreContext");
1985
1986             assertEquals("isRecoveryApplicable", true,
1987                     shard.underlyingActor().persistence().isRecoveryApplicable());
1988
1989             waitUntilLeader(shard);
1990
1991             shard.tell(dataStoreContextBuilder.persistent(false).build(), ActorRef.noSender());
1992
1993             assertEquals("isRecoveryApplicable", false,
1994                 shard.underlyingActor().persistence().isRecoveryApplicable());
1995
1996             shard.tell(dataStoreContextBuilder.persistent(true).build(), ActorRef.noSender());
1997
1998             assertEquals("isRecoveryApplicable", true,
1999                 shard.underlyingActor().persistence().isRecoveryApplicable());
2000         }};
2001     }
2002
2003     @Test
2004     public void testRegisterRoleChangeListener() throws Exception {
2005         new ShardTestKit(getSystem()) {
2006             {
2007                 final TestActorRef<Shard> shard = actorFactory.createTestActor(
2008                         newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
2009                         "testRegisterRoleChangeListener");
2010
2011                 waitUntilLeader(shard);
2012
2013                 final TestActorRef<MessageCollectorActor> listener =
2014                         TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
2015
2016                 shard.tell(new RegisterRoleChangeListener(), listener);
2017
2018                 MessageCollectorActor.expectFirstMatching(listener, RegisterRoleChangeListenerReply.class);
2019
2020                 ShardLeaderStateChanged leaderStateChanged = MessageCollectorActor.expectFirstMatching(listener,
2021                     ShardLeaderStateChanged.class);
2022                 assertEquals("getLocalShardDataTree present", true,
2023                         leaderStateChanged.getLocalShardDataTree().isPresent());
2024                 assertSame("getLocalShardDataTree", shard.underlyingActor().getDataStore().getDataTree(),
2025                     leaderStateChanged.getLocalShardDataTree().get());
2026
2027                 MessageCollectorActor.clearMessages(listener);
2028
2029                 // Force a leader change
2030
2031                 shard.tell(new RequestVote(10000, "member2", 50, 50), getRef());
2032
2033                 leaderStateChanged = MessageCollectorActor.expectFirstMatching(listener,
2034                         ShardLeaderStateChanged.class);
2035                 assertEquals("getLocalShardDataTree present", false,
2036                         leaderStateChanged.getLocalShardDataTree().isPresent());
2037             }
2038         };
2039     }
2040
2041     @Test
2042     public void testFollowerInitialSyncStatus() throws Exception {
2043         final TestActorRef<Shard> shard = actorFactory.createTestActor(
2044                 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
2045                 "testFollowerInitialSyncStatus");
2046
2047         shard.underlyingActor().handleNonRaftCommand(new FollowerInitialSyncUpStatus(false, "member-1-shard-inventory-operational"));
2048
2049         assertEquals(false, shard.underlyingActor().getShardMBean().getFollowerInitialSyncStatus());
2050
2051         shard.underlyingActor().handleNonRaftCommand(new FollowerInitialSyncUpStatus(true, "member-1-shard-inventory-operational"));
2052
2053         assertEquals(true, shard.underlyingActor().getShardMBean().getFollowerInitialSyncStatus());
2054     }
2055
2056     @Test
2057     public void testClusteredDataChangeListenerDelayedRegistration() throws Exception {
2058         new ShardTestKit(getSystem()) {{
2059             String testName = "testClusteredDataChangeListenerDelayedRegistration";
2060             dataStoreContextBuilder.shardElectionTimeoutFactor(1000).
2061                     customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName());
2062
2063             final MockDataChangeListener listener = new MockDataChangeListener(1);
2064             final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener),
2065                     actorFactory.generateActorId(testName + "-DataChangeListener"));
2066
2067             setupInMemorySnapshotStore();
2068
2069             final TestActorRef<Shard> shard = actorFactory.createTestActor(
2070                     newShardBuilder().props().withDispatcher(Dispatchers.DefaultDispatcherId()),
2071                     actorFactory.generateActorId(testName + "-shard"));
2072
2073             waitUntilNoLeader(shard);
2074
2075             final YangInstanceIdentifier path = TestModel.TEST_PATH;
2076
2077             shard.tell(new RegisterChangeListener(path, dclActor, AsyncDataBroker.DataChangeScope.BASE, true), getRef());
2078             final RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
2079                 RegisterChangeListenerReply.class);
2080             assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
2081
2082             shard.tell(DatastoreContext.newBuilderFrom(dataStoreContextBuilder.build()).
2083                     customRaftPolicyImplementation(null).build(), ActorRef.noSender());
2084
2085             listener.waitForChangeEvents();
2086         }};
2087     }
2088
2089     @Test
2090     public void testClusteredDataChangeListenerRegistration() throws Exception {
2091         new ShardTestKit(getSystem()) {{
2092             String testName = "testClusteredDataChangeListenerRegistration";
2093             final ShardIdentifier followerShardID = ShardIdentifier.create("inventory",
2094                     MemberName.forName(actorFactory.generateActorId(testName + "-follower")), "config");
2095
2096             final ShardIdentifier leaderShardID = ShardIdentifier.create("inventory",
2097                     MemberName.forName(actorFactory.generateActorId(testName + "-leader")), "config");
2098
2099             final TestActorRef<Shard> followerShard = actorFactory.createTestActor(
2100                     Shard.builder().id(followerShardID).
2101                         datastoreContext(dataStoreContextBuilder.shardElectionTimeoutFactor(1000).build()).
2102                         peerAddresses(Collections.singletonMap(leaderShardID.toString(),
2103                             "akka://test/user/" + leaderShardID.toString())).schemaContext(SCHEMA_CONTEXT).props().
2104                     withDispatcher(Dispatchers.DefaultDispatcherId()), followerShardID.toString());
2105
2106             final TestActorRef<Shard> leaderShard = actorFactory.createTestActor(
2107                     Shard.builder().id(leaderShardID).datastoreContext(newDatastoreContext()).
2108                         peerAddresses(Collections.singletonMap(followerShardID.toString(),
2109                             "akka://test/user/" + followerShardID.toString())).schemaContext(SCHEMA_CONTEXT).props().
2110                     withDispatcher(Dispatchers.DefaultDispatcherId()), leaderShardID.toString());
2111
2112             leaderShard.tell(TimeoutNow.INSTANCE, ActorRef.noSender());
2113             String leaderPath = waitUntilLeader(followerShard);
2114             assertEquals("Shard leader path", leaderShard.path().toString(), leaderPath);
2115
2116             final YangInstanceIdentifier path = TestModel.TEST_PATH;
2117             final MockDataChangeListener listener = new MockDataChangeListener(1);
2118             final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener),
2119                     actorFactory.generateActorId(testName + "-DataChangeListener"));
2120
2121             followerShard.tell(new RegisterChangeListener(path, dclActor, AsyncDataBroker.DataChangeScope.BASE, true), getRef());
2122             final RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
2123                 RegisterChangeListenerReply.class);
2124             assertNotNull("getListenerRegistratioznPath", reply.getListenerRegistrationPath());
2125
2126             writeToStore(followerShard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
2127
2128             listener.waitForChangeEvents();
2129         }};
2130     }
2131
2132     @Test
2133     public void testClusteredDataTreeChangeListenerDelayedRegistration() throws Exception {
2134         new ShardTestKit(getSystem()) {{
2135             String testName = "testClusteredDataTreeChangeListenerDelayedRegistration";
2136             dataStoreContextBuilder.shardElectionTimeoutFactor(1000).
2137                     customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName());
2138
2139             final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
2140             final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener),
2141                     actorFactory.generateActorId(testName + "-DataTreeChangeListener"));
2142
2143             setupInMemorySnapshotStore();
2144
2145             final TestActorRef<Shard> shard = actorFactory.createTestActor(
2146                     newShardBuilder().props().withDispatcher(Dispatchers.DefaultDispatcherId()),
2147                     actorFactory.generateActorId(testName + "-shard"));
2148
2149             waitUntilNoLeader(shard);
2150
2151             shard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor, true), getRef());
2152             final RegisterDataTreeChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
2153                     RegisterDataTreeChangeListenerReply.class);
2154             assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
2155
2156             shard.tell(DatastoreContext.newBuilderFrom(dataStoreContextBuilder.build()).
2157                     customRaftPolicyImplementation(null).build(), ActorRef.noSender());
2158
2159             listener.waitForChangeEvents();
2160         }};
2161     }
2162
2163     @Test
2164     public void testClusteredDataTreeChangeListenerRegistration() throws Exception {
2165         new ShardTestKit(getSystem()) {{
2166             String testName = "testClusteredDataTreeChangeListenerRegistration";
2167             final ShardIdentifier followerShardID = ShardIdentifier.create("inventory",
2168                     MemberName.forName(actorFactory.generateActorId(testName + "-follower")), "config");
2169
2170             final ShardIdentifier leaderShardID = ShardIdentifier.create("inventory",
2171                     MemberName.forName(actorFactory.generateActorId(testName + "-leader")), "config");
2172
2173             final TestActorRef<Shard> followerShard = actorFactory.createTestActor(
2174                     Shard.builder().id(followerShardID).
2175                         datastoreContext(dataStoreContextBuilder.shardElectionTimeoutFactor(1000).build()).
2176                         peerAddresses(Collections.singletonMap(leaderShardID.toString(),
2177                             "akka://test/user/" + leaderShardID.toString())).schemaContext(SCHEMA_CONTEXT).props().
2178                     withDispatcher(Dispatchers.DefaultDispatcherId()), followerShardID.toString());
2179
2180             final TestActorRef<Shard> leaderShard = actorFactory.createTestActor(
2181                     Shard.builder().id(leaderShardID).datastoreContext(newDatastoreContext()).
2182                         peerAddresses(Collections.singletonMap(followerShardID.toString(),
2183                             "akka://test/user/" + followerShardID.toString())).schemaContext(SCHEMA_CONTEXT).props().
2184                     withDispatcher(Dispatchers.DefaultDispatcherId()), leaderShardID.toString());
2185
2186             leaderShard.tell(TimeoutNow.INSTANCE, ActorRef.noSender());
2187             String leaderPath = waitUntilLeader(followerShard);
2188             assertEquals("Shard leader path", leaderShard.path().toString(), leaderPath);
2189
2190             final YangInstanceIdentifier path = TestModel.TEST_PATH;
2191             final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
2192             final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener),
2193                     actorFactory.generateActorId(testName + "-DataTreeChangeListener"));
2194
2195             followerShard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor, true), getRef());
2196             final RegisterDataTreeChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
2197                     RegisterDataTreeChangeListenerReply.class);
2198             assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
2199
2200             writeToStore(followerShard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
2201
2202             listener.waitForChangeEvents();
2203         }};
2204     }
2205
2206     @Test
2207     public void testServerRemoved() throws Exception {
2208         final TestActorRef<MessageCollectorActor> parent = actorFactory.createTestActor(MessageCollectorActor.props());
2209
2210         final ActorRef shard = parent.underlyingActor().context().actorOf(
2211                 newShardBuilder().props().withDispatcher(Dispatchers.DefaultDispatcherId()),
2212                 "testServerRemoved");
2213
2214         shard.tell(new ServerRemoved("test"), ActorRef.noSender());
2215
2216         MessageCollectorActor.expectFirstMatching(parent, ServerRemoved.class);
2217     }
2218 }