5c4e35a3a99d60095d6f0e2a6f08d4b81aed350c
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / ShardTest.java
1 /*
2  * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertFalse;
13 import static org.junit.Assert.assertNotNull;
14 import static org.junit.Assert.assertSame;
15 import static org.junit.Assert.assertTrue;
16 import static org.junit.Assert.fail;
17 import static org.mockito.Matchers.any;
18 import static org.mockito.Mockito.doThrow;
19 import static org.mockito.Mockito.inOrder;
20 import static org.mockito.Mockito.mock;
21 import static org.opendaylight.controller.cluster.datastore.DataStoreVersions.CURRENT_VERSION;
22 import akka.actor.ActorRef;
23 import akka.actor.ActorSelection;
24 import akka.actor.Props;
25 import akka.actor.Status.Failure;
26 import akka.dispatch.Dispatchers;
27 import akka.dispatch.OnComplete;
28 import akka.japi.Creator;
29 import akka.pattern.Patterns;
30 import akka.persistence.SaveSnapshotSuccess;
31 import akka.testkit.TestActorRef;
32 import akka.util.Timeout;
33 import com.google.common.base.Stopwatch;
34 import com.google.common.primitives.UnsignedLong;
35 import com.google.common.util.concurrent.Uninterruptibles;
36 import java.io.IOException;
37 import java.util.Collections;
38 import java.util.HashSet;
39 import java.util.Map;
40 import java.util.Set;
41 import java.util.concurrent.CountDownLatch;
42 import java.util.concurrent.TimeUnit;
43 import java.util.concurrent.atomic.AtomicBoolean;
44 import java.util.concurrent.atomic.AtomicReference;
45 import org.junit.Test;
46 import org.mockito.InOrder;
47 import org.opendaylight.controller.cluster.DataPersistenceProvider;
48 import org.opendaylight.controller.cluster.DelegatingPersistentDataProvider;
49 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
50 import org.opendaylight.controller.cluster.access.concepts.MemberName;
51 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
52 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
53 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
54 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
55 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
56 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
57 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
58 import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
59 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
60 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
61 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
62 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
63 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
64 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
65 import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
66 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
67 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
68 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
69 import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction;
70 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
71 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
72 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
73 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener;
74 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListenerReply;
75 import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
76 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
77 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
78 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
79 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
80 import org.opendaylight.controller.cluster.datastore.persisted.MetadataShardDataTreeSnapshot;
81 import org.opendaylight.controller.cluster.datastore.persisted.ShardDataTreeSnapshot;
82 import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener;
83 import org.opendaylight.controller.cluster.datastore.utils.MockDataTreeChangeListener;
84 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
85 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListenerReply;
86 import org.opendaylight.controller.cluster.raft.RaftActorContext;
87 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
88 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
89 import org.opendaylight.controller.cluster.raft.Snapshot;
90 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
91 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
92 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
93 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
94 import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
95 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
96 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
97 import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState;
98 import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState;
99 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
100 import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
101 import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
102 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
103 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
104 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
105 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
106 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
107 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
108 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
109 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
110 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
111 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
112 import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
113 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
114 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
115 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
116 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
117 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
118 import org.opendaylight.yangtools.yang.data.api.schema.tree.TipProducingDataTree;
119 import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType;
120 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
121 import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
122 import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory;
123 import scala.concurrent.Await;
124 import scala.concurrent.Future;
125 import scala.concurrent.duration.FiniteDuration;
126
127 public class ShardTest extends AbstractShardTest {
128     private static final String DUMMY_DATA = "Dummy data as snapshot sequence number is set to 0 in InMemorySnapshotStore and journal recovery seq number will start from 1";
129
130     @Test
131     public void testRegisterChangeListener() throws Exception {
132         new ShardTestKit(getSystem()) {{
133             final TestActorRef<Shard> shard = actorFactory.createTestActor(
134                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), "testRegisterChangeListener");
135
136             waitUntilLeader(shard);
137
138             shard.tell(new UpdateSchemaContext(SchemaContextHelper.full()), ActorRef.noSender());
139
140             final MockDataChangeListener listener = new MockDataChangeListener(1);
141             final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener,
142                     TestModel.TEST_PATH), "testRegisterChangeListener-DataChangeListener");
143
144             shard.tell(new RegisterChangeListener(TestModel.TEST_PATH,
145                     dclActor, AsyncDataBroker.DataChangeScope.BASE, true), getRef());
146
147             final RegisterChangeListenerReply reply = expectMsgClass(duration("3 seconds"),
148                     RegisterChangeListenerReply.class);
149             final String replyPath = reply.getListenerRegistrationPath().toString();
150             assertTrue("Incorrect reply path: " + replyPath, replyPath.matches(
151                     "akka:\\/\\/test\\/user\\/testRegisterChangeListener\\/\\$.*"));
152
153             final YangInstanceIdentifier path = TestModel.TEST_PATH;
154             writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
155
156             listener.waitForChangeEvents(path);
157         }};
158     }
159
160     @SuppressWarnings("serial")
161     @Test
162     public void testChangeListenerNotifiedWhenNotTheLeaderOnRegistration() throws Exception {
163         // This test tests the timing window in which a change listener is registered before the
164         // shard becomes the leader. We verify that the listener is registered and notified of the
165         // existing data when the shard becomes the leader.
166         new ShardTestKit(getSystem()) {{
167             // For this test, we want to send the RegisterChangeListener message after the shard
168             // has recovered from persistence and before it becomes the leader. So we subclass
169             // Shard to override onReceiveCommand and, when the first ElectionTimeout is received,
170             // we know that the shard has been initialized to a follower and has started the
171             // election process. The following 2 CountDownLatches are used to coordinate the
172             // ElectionTimeout with the sending of the RegisterChangeListener message.
173             final CountDownLatch onFirstElectionTimeout = new CountDownLatch(1);
174             final CountDownLatch onChangeListenerRegistered = new CountDownLatch(1);
175             final Creator<Shard> creator = new Creator<Shard>() {
176                 boolean firstElectionTimeout = true;
177
178                 @Override
179                 public Shard create() throws Exception {
180                     // Use a non persistent provider because this test actually invokes persist on the journal
181                     // this will cause all other messages to not be queued properly after that.
182                     // The basic issue is that you cannot use TestActorRef with a persistent actor (at least when
183                     // it does do a persist)
184                     return new Shard(newShardBuilder()) {
185                         @Override
186                         public void handleCommand(final Object message) {
187                             if(message instanceof ElectionTimeout && firstElectionTimeout) {
188                                 // Got the first ElectionTimeout. We don't forward it to the
189                                 // base Shard yet until we've sent the RegisterChangeListener
190                                 // message. So we signal the onFirstElectionTimeout latch to tell
191                                 // the main thread to send the RegisterChangeListener message and
192                                 // start a thread to wait on the onChangeListenerRegistered latch,
193                                 // which the main thread signals after it has sent the message.
194                                 // After the onChangeListenerRegistered is triggered, we send the
195                                 // original ElectionTimeout message to proceed with the election.
196                                 firstElectionTimeout = false;
197                                 final ActorRef self = getSelf();
198                                 new Thread() {
199                                     @Override
200                                     public void run() {
201                                         Uninterruptibles.awaitUninterruptibly(
202                                                 onChangeListenerRegistered, 5, TimeUnit.SECONDS);
203                                         self.tell(message, self);
204                                     }
205                                 }.start();
206
207                                 onFirstElectionTimeout.countDown();
208                             } else {
209                                 super.handleCommand(message);
210                             }
211                         }
212                     };
213                 }
214             };
215
216             setupInMemorySnapshotStore();
217
218             final YangInstanceIdentifier path = TestModel.TEST_PATH;
219             final MockDataChangeListener listener = new MockDataChangeListener(1);
220             final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener, path),
221                     "testRegisterChangeListenerWhenNotLeaderInitially-DataChangeListener");
222
223             final TestActorRef<Shard> shard = actorFactory.createTestActor(
224                     Props.create(new DelegatingShardCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()),
225                     "testRegisterChangeListenerWhenNotLeaderInitially");
226
227             // Wait until the shard receives the first ElectionTimeout message.
228             assertEquals("Got first ElectionTimeout", true,
229                     onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
230
231             // Now send the RegisterChangeListener and wait for the reply.
232             shard.tell(new RegisterChangeListener(path, dclActor,
233                     AsyncDataBroker.DataChangeScope.SUBTREE, false), getRef());
234
235             final RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
236                     RegisterChangeListenerReply.class);
237             assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
238
239             // Sanity check - verify the shard is not the leader yet.
240             shard.tell(FindLeader.INSTANCE, getRef());
241             final FindLeaderReply findLeadeReply =
242                     expectMsgClass(duration("5 seconds"), FindLeaderReply.class);
243             assertFalse("Expected the shard not to be the leader", findLeadeReply.getLeaderActor().isPresent());
244
245             // Signal the onChangeListenerRegistered latch to tell the thread above to proceed
246             // with the election process.
247             onChangeListenerRegistered.countDown();
248
249             // Wait for the shard to become the leader and notify our listener with the existing
250             // data in the store.
251             listener.waitForChangeEvents(path);
252         }};
253     }
254
255     @Test
256     public void testRegisterDataTreeChangeListener() throws Exception {
257         new ShardTestKit(getSystem()) {{
258             final TestActorRef<Shard> shard = actorFactory.createTestActor(
259                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), "testRegisterDataTreeChangeListener");
260
261             waitUntilLeader(shard);
262
263             shard.tell(new UpdateSchemaContext(SchemaContextHelper.full()), ActorRef.noSender());
264
265             final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
266             final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener,
267                     TestModel.TEST_PATH), "testRegisterDataTreeChangeListener-DataTreeChangeListener");
268
269             shard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor, false), getRef());
270
271             final RegisterDataTreeChangeListenerReply reply = expectMsgClass(duration("3 seconds"),
272                     RegisterDataTreeChangeListenerReply.class);
273             final String replyPath = reply.getListenerRegistrationPath().toString();
274             assertTrue("Incorrect reply path: " + replyPath, replyPath.matches(
275                     "akka:\\/\\/test\\/user\\/testRegisterDataTreeChangeListener\\/\\$.*"));
276
277             final YangInstanceIdentifier path = TestModel.TEST_PATH;
278             writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
279
280             listener.waitForChangeEvents();
281         }};
282     }
283
284     @SuppressWarnings("serial")
285     @Test
286     public void testDataTreeChangeListenerNotifiedWhenNotTheLeaderOnRegistration() throws Exception {
287         new ShardTestKit(getSystem()) {{
288             final CountDownLatch onFirstElectionTimeout = new CountDownLatch(1);
289             final CountDownLatch onChangeListenerRegistered = new CountDownLatch(1);
290             final Creator<Shard> creator = new Creator<Shard>() {
291                 boolean firstElectionTimeout = true;
292
293                 @Override
294                 public Shard create() throws Exception {
295                     return new Shard(newShardBuilder()) {
296                         @Override
297                         public void handleCommand(final Object message) {
298                             if(message instanceof ElectionTimeout && firstElectionTimeout) {
299                                 firstElectionTimeout = false;
300                                 final ActorRef self = getSelf();
301                                 new Thread() {
302                                     @Override
303                                     public void run() {
304                                         Uninterruptibles.awaitUninterruptibly(
305                                                 onChangeListenerRegistered, 5, TimeUnit.SECONDS);
306                                         self.tell(message, self);
307                                     }
308                                 }.start();
309
310                                 onFirstElectionTimeout.countDown();
311                             } else {
312                                 super.handleCommand(message);
313                             }
314                         }
315                     };
316                 }
317             };
318
319             setupInMemorySnapshotStore();
320
321             final YangInstanceIdentifier path = TestModel.TEST_PATH;
322             final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
323             final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener, path),
324                     "testDataTreeChangeListenerNotifiedWhenNotTheLeaderOnRegistration-DataChangeListener");
325
326             final TestActorRef<Shard> shard = actorFactory.createTestActor(
327                     Props.create(new DelegatingShardCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()),
328                     "testDataTreeChangeListenerNotifiedWhenNotTheLeaderOnRegistration");
329
330             assertEquals("Got first ElectionTimeout", true,
331                 onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
332
333             shard.tell(new RegisterDataTreeChangeListener(path, dclActor, false), getRef());
334             final RegisterDataTreeChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
335                 RegisterDataTreeChangeListenerReply.class);
336             assertNotNull("getListenerRegistratioznPath", reply.getListenerRegistrationPath());
337
338             shard.tell(FindLeader.INSTANCE, getRef());
339             final FindLeaderReply findLeadeReply =
340                     expectMsgClass(duration("5 seconds"), FindLeaderReply.class);
341             assertFalse("Expected the shard not to be the leader", findLeadeReply.getLeaderActor().isPresent());
342
343
344             onChangeListenerRegistered.countDown();
345
346             // TODO: investigate why we do not receive data chage events
347             listener.waitForChangeEvents();
348         }};
349     }
350
351     @Test
352     public void testCreateTransaction(){
353         new ShardTestKit(getSystem()) {{
354             final ActorRef shard = actorFactory.createActor(newShardProps(), "testCreateTransaction");
355
356             waitUntilLeader(shard);
357
358             shard.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
359
360             shard.tell(new CreateTransaction(nextTransactionId(), TransactionType.READ_ONLY.ordinal(),
361                     DataStoreVersions.CURRENT_VERSION).toSerializable(), getRef());
362
363             final CreateTransactionReply reply = expectMsgClass(duration("3 seconds"),
364                     CreateTransactionReply.class);
365
366             final String path = reply.getTransactionPath().toString();
367             assertTrue("Unexpected transaction path " + path,
368                     path.startsWith("akka://test/user/testCreateTransaction/shard-member-1:ShardTransactionTest@0:"));
369         }};
370     }
371
372     @Test
373     public void testCreateTransactionOnChain(){
374         new ShardTestKit(getSystem()) {{
375             final ActorRef shard = actorFactory.createActor(newShardProps(), "testCreateTransactionOnChain");
376
377             waitUntilLeader(shard);
378
379             shard.tell(new CreateTransaction(nextTransactionId(),TransactionType.READ_ONLY.ordinal(),
380                     DataStoreVersions.CURRENT_VERSION).toSerializable(), getRef());
381
382             final CreateTransactionReply reply = expectMsgClass(duration("3 seconds"),
383                     CreateTransactionReply.class);
384
385             final String path = reply.getTransactionPath().toString();
386             assertTrue("Unexpected transaction path " + path,
387                     path.startsWith("akka://test/user/testCreateTransactionOnChain/shard-member-1:ShardTransactionTest@0:"));
388         }};
389     }
390
391     @Test
392     public void testPeerAddressResolved() throws Exception {
393         new ShardTestKit(getSystem()) {{
394             final ShardIdentifier peerID = ShardIdentifier.create("inventory", MemberName.forName("member-2"), "config");
395             final TestActorRef<Shard> shard = actorFactory.createTestActor(newShardBuilder().
396                     peerAddresses(Collections.<String, String>singletonMap(peerID.toString(), null)).props().
397                         withDispatcher(Dispatchers.DefaultDispatcherId()), "testPeerAddressResolved");
398
399             final String address = "akka://foobar";
400             shard.tell(new PeerAddressResolved(peerID.toString(), address), ActorRef.noSender());
401
402             shard.tell(GetOnDemandRaftState.INSTANCE, getRef());
403             final OnDemandRaftState state = expectMsgClass(OnDemandRaftState.class);
404             assertEquals("getPeerAddress", address, state.getPeerAddresses().get(peerID.toString()));
405         }};
406     }
407
408     @Test
409     public void testApplySnapshot() throws Exception {
410
411         final TestActorRef<Shard> shard = actorFactory.createTestActor(newShardProps().
412                 withDispatcher(Dispatchers.DefaultDispatcherId()), "testApplySnapshot");
413
414         ShardTestKit.waitUntilLeader(shard);
415
416         final DataTree store = InMemoryDataTreeFactory.getInstance().create(TreeType.OPERATIONAL);
417         store.setSchemaContext(SCHEMA_CONTEXT);
418
419         final ContainerNode container = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
420                 new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
421                     withChild(ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).addChild(
422                         ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1)).build()).build();
423
424         writeToStore(store, TestModel.TEST_PATH, container);
425
426         final YangInstanceIdentifier root = YangInstanceIdentifier.EMPTY;
427         final NormalizedNode<?,?> expected = readStore(store, root);
428
429         final Snapshot snapshot = Snapshot.create(new MetadataShardDataTreeSnapshot(expected).serialize(),
430                 Collections.<ReplicatedLogEntry>emptyList(), 1, 2, 3, 4);
431
432         shard.tell(new ApplySnapshot(snapshot), ActorRef.noSender());
433
434         final Stopwatch sw = Stopwatch.createStarted();
435         while(sw.elapsed(TimeUnit.SECONDS) <= 5) {
436             Uninterruptibles.sleepUninterruptibly(75, TimeUnit.MILLISECONDS);
437
438             try {
439                 assertEquals("Root node", expected, readStore(shard, root));
440                 return;
441             } catch(final AssertionError e) {
442                 // try again
443             }
444         }
445
446         fail("Snapshot was not applied");
447     }
448
449     @Test
450     public void testApplyState() throws Exception {
451         final TestActorRef<Shard> shard = actorFactory.createTestActor(
452                 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), "testApplyState");
453
454         ShardTestKit.waitUntilLeader(shard);
455
456         final DataTree store = InMemoryDataTreeFactory.getInstance().create(TreeType.OPERATIONAL);
457         store.setSchemaContext(SCHEMA_CONTEXT);
458         writeToStore(store, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
459
460         final NormalizedNode<?, ?> root = readStore(store, YangInstanceIdentifier.EMPTY);
461         final Snapshot snapshot = Snapshot.create(new MetadataShardDataTreeSnapshot(root).serialize(),
462                 Collections.<ReplicatedLogEntry> emptyList(), 1, 2, 3, 4);
463
464         shard.tell(new ApplySnapshot(snapshot), ActorRef.noSender());
465
466         final DataTreeModification writeMod = store.takeSnapshot().newModification();
467         final ContainerNode node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
468         writeMod.write(TestModel.TEST_PATH, node);
469         writeMod.ready();
470
471         final TransactionIdentifier tx = nextTransactionId();
472         final ApplyState applyState = new ApplyState(null, tx,
473                 new ReplicatedLogImplEntry(1, 2, payloadForModification(store, writeMod, tx)));
474
475         shard.tell(applyState, shard);
476
477         final Stopwatch sw = Stopwatch.createStarted();
478         while(sw.elapsed(TimeUnit.SECONDS) <= 5) {
479             Uninterruptibles.sleepUninterruptibly(75, TimeUnit.MILLISECONDS);
480
481             final NormalizedNode<?,?> actual = readStore(shard, TestModel.TEST_PATH);
482             if(actual != null) {
483                 assertEquals("Applied state", node, actual);
484                 return;
485             }
486         }
487
488         fail("State was not applied");
489     }
490
491     @Test
492     public void testDataTreeCandidateRecovery() throws Exception {
493         // Set up the InMemorySnapshotStore.
494         final DataTree source = setupInMemorySnapshotStore();
495
496         final DataTreeModification writeMod = source.takeSnapshot().newModification();
497         writeMod.write(TestModel.OUTER_LIST_PATH, ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
498         writeMod.ready();
499         InMemoryJournal.addEntry(shardID.toString(), 0, DUMMY_DATA);
500
501         // Set up the InMemoryJournal.
502         InMemoryJournal.addEntry(shardID.toString(), 1, new ReplicatedLogImplEntry(0, 1,
503             payloadForModification(source, writeMod, nextTransactionId())));
504
505         final int nListEntries = 16;
506         final Set<Integer> listEntryKeys = new HashSet<>();
507
508         // Add some ModificationPayload entries
509         for (int i = 1; i <= nListEntries; i++) {
510             listEntryKeys.add(Integer.valueOf(i));
511
512             final YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
513                     .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
514
515             final DataTreeModification mod = source.takeSnapshot().newModification();
516             mod.merge(path, ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i));
517             mod.ready();
518
519             InMemoryJournal.addEntry(shardID.toString(), i+1, new ReplicatedLogImplEntry(i, 1,
520                 payloadForModification(source, mod, nextTransactionId())));
521         }
522
523         InMemoryJournal.addEntry(shardID.toString(), nListEntries + 2,
524             new ApplyJournalEntries(nListEntries));
525
526         testRecovery(listEntryKeys);
527     }
528
529     @Test
530     public void testConcurrentThreePhaseCommits() throws Throwable {
531         new ShardTestKit(getSystem()) {{
532             final TestActorRef<Shard> shard = actorFactory.createTestActor(
533                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
534                     "testConcurrentThreePhaseCommits");
535
536             waitUntilLeader(shard);
537
538             final TransactionIdentifier transactionID1 = nextTransactionId();
539             final TransactionIdentifier transactionID2 = nextTransactionId();
540             final TransactionIdentifier transactionID3 = nextTransactionId();
541
542             final Map<TransactionIdentifier, CapturingShardDataTreeCohort> cohortMap = setupCohortDecorator(
543                     shard.underlyingActor(), transactionID1, transactionID2, transactionID3);
544             final CapturingShardDataTreeCohort cohort1 = cohortMap.get(transactionID1);
545             final CapturingShardDataTreeCohort cohort2 = cohortMap.get(transactionID2);
546             final CapturingShardDataTreeCohort cohort3 = cohortMap.get(transactionID3);
547
548             final long timeoutSec = 5;
549             final FiniteDuration duration = FiniteDuration.create(timeoutSec, TimeUnit.SECONDS);
550             final Timeout timeout = new Timeout(duration);
551
552             shard.tell(prepareBatchedModifications(transactionID1, TestModel.TEST_PATH,
553                     ImmutableNodes.containerNode(TestModel.TEST_QNAME), false), getRef());
554             final ReadyTransactionReply readyReply = ReadyTransactionReply.fromSerializable(
555                     expectMsgClass(duration, ReadyTransactionReply.class));
556             assertEquals("Cohort path", shard.path().toString(), readyReply.getCohortPath());
557
558             // Send the CanCommitTransaction message for the first Tx.
559
560             shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
561             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
562                     expectMsgClass(duration, CanCommitTransactionReply.class));
563             assertEquals("Can commit", true, canCommitReply.getCanCommit());
564
565             // Ready 2 more Tx's.
566
567             shard.tell(prepareBatchedModifications(transactionID2, TestModel.OUTER_LIST_PATH,
568                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), false), getRef());
569             expectMsgClass(duration, ReadyTransactionReply.class);
570
571             shard.tell(prepareBatchedModifications(transactionID3, YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
572                     .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
573                 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), false), getRef());
574             expectMsgClass(duration, ReadyTransactionReply.class);
575
576             // Send the CanCommitTransaction message for the next 2 Tx's. These should get queued and
577             // processed after the first Tx completes.
578
579             final Future<Object> canCommitFuture1 = Patterns.ask(shard,
580                     new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), timeout);
581
582             final Future<Object> canCommitFuture2 = Patterns.ask(shard,
583                     new CanCommitTransaction(transactionID3, CURRENT_VERSION).toSerializable(), timeout);
584
585             // Send the CommitTransaction message for the first Tx. After it completes, it should
586             // trigger the 2nd Tx to proceed which should in turn then trigger the 3rd.
587
588             shard.tell(new CommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
589             expectMsgClass(duration, CommitTransactionReply.class);
590
591             // Wait for the next 2 Tx's to complete.
592
593             final AtomicReference<Throwable> caughtEx = new AtomicReference<>();
594             final CountDownLatch commitLatch = new CountDownLatch(2);
595
596             class OnFutureComplete extends OnComplete<Object> {
597                 private final Class<?> expRespType;
598
599                 OnFutureComplete(final Class<?> expRespType) {
600                     this.expRespType = expRespType;
601                 }
602
603                 @Override
604                 public void onComplete(final Throwable error, final Object resp) {
605                     if(error != null) {
606                         caughtEx.set(new AssertionError(getClass().getSimpleName() + " failure", error));
607                     } else {
608                         try {
609                             assertEquals("Commit response type", expRespType, resp.getClass());
610                             onSuccess(resp);
611                         } catch (final Exception e) {
612                             caughtEx.set(e);
613                         }
614                     }
615                 }
616
617                 void onSuccess(final Object resp) throws Exception {
618                 }
619             }
620
621             class OnCommitFutureComplete extends OnFutureComplete {
622                 OnCommitFutureComplete() {
623                     super(CommitTransactionReply.class);
624                 }
625
626                 @Override
627                 public void onComplete(final Throwable error, final Object resp) {
628                     super.onComplete(error, resp);
629                     commitLatch.countDown();
630                 }
631             }
632
633             class OnCanCommitFutureComplete extends OnFutureComplete {
634                 private final TransactionIdentifier transactionID;
635
636                 OnCanCommitFutureComplete(final TransactionIdentifier transactionID) {
637                     super(CanCommitTransactionReply.class);
638                     this.transactionID = transactionID;
639                 }
640
641                 @Override
642                 void onSuccess(final Object resp) throws Exception {
643                     final CanCommitTransactionReply canCommitReply =
644                             CanCommitTransactionReply.fromSerializable(resp);
645                     assertEquals("Can commit", true, canCommitReply.getCanCommit());
646
647                     final Future<Object> commitFuture = Patterns.ask(shard,
648                             new CommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), timeout);
649                     commitFuture.onComplete(new OnCommitFutureComplete(), getSystem().dispatcher());
650                 }
651             }
652
653             canCommitFuture1.onComplete(new OnCanCommitFutureComplete(transactionID2),
654                     getSystem().dispatcher());
655
656             canCommitFuture2.onComplete(new OnCanCommitFutureComplete(transactionID3),
657                     getSystem().dispatcher());
658
659             final boolean done = commitLatch.await(timeoutSec, TimeUnit.SECONDS);
660
661             if(caughtEx.get() != null) {
662                 throw caughtEx.get();
663             }
664
665             assertEquals("Commits complete", true, done);
666
667             final InOrder inOrder = inOrder(cohort1.getCanCommit(), cohort1.getPreCommit(), cohort1.getCommit(),
668                     cohort2.getCanCommit(), cohort2.getPreCommit(), cohort2.getCommit(), cohort3.getCanCommit(),
669                     cohort3.getPreCommit(), cohort3.getCommit());
670             inOrder.verify(cohort1.getCanCommit()).onSuccess(any(Void.class));
671             inOrder.verify(cohort1.getPreCommit()).onSuccess(any(DataTreeCandidate.class));
672             inOrder.verify(cohort1.getCommit()).onSuccess(any(UnsignedLong.class));
673             inOrder.verify(cohort2.getCanCommit()).onSuccess(any(Void.class));
674             inOrder.verify(cohort2.getPreCommit()).onSuccess(any(DataTreeCandidate.class));
675             inOrder.verify(cohort2.getCommit()).onSuccess(any(UnsignedLong.class));
676             inOrder.verify(cohort3.getCanCommit()).onSuccess(any(Void.class));
677             inOrder.verify(cohort3.getPreCommit()).onSuccess(any(DataTreeCandidate.class));
678             inOrder.verify(cohort3.getCommit()).onSuccess(any(UnsignedLong.class));
679
680             // Verify data in the data store.
681
682             verifyOuterListEntry(shard, 1);
683
684             verifyLastApplied(shard, 2);
685         }};
686     }
687
688     @Test
689     public void testBatchedModificationsWithNoCommitOnReady() throws Throwable {
690         new ShardTestKit(getSystem()) {{
691             final TestActorRef<Shard> shard = actorFactory.createTestActor(
692                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
693                     "testBatchedModificationsWithNoCommitOnReady");
694
695             waitUntilLeader(shard);
696
697             final TransactionIdentifier transactionID = nextTransactionId();
698             final FiniteDuration duration = duration("5 seconds");
699
700             // Send a BatchedModifications to start a transaction.
701
702             shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH,
703                     ImmutableNodes.containerNode(TestModel.TEST_QNAME), false, false, 1), getRef());
704             expectMsgClass(duration, BatchedModificationsReply.class);
705
706             // Send a couple more BatchedModifications.
707
708             shard.tell(newBatchedModifications(transactionID, TestModel.OUTER_LIST_PATH,
709                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), false, false, 2), getRef());
710             expectMsgClass(duration, BatchedModificationsReply.class);
711
712             shard.tell(newBatchedModifications(transactionID, YangInstanceIdentifier.builder(
713                     TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
714                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true, false, 3), getRef());
715             expectMsgClass(duration, ReadyTransactionReply.class);
716
717             // Send the CanCommitTransaction message.
718
719             shard.tell(new CanCommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
720             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
721                     expectMsgClass(duration, CanCommitTransactionReply.class));
722             assertEquals("Can commit", true, canCommitReply.getCanCommit());
723
724             // Send the CommitTransaction message.
725
726             shard.tell(new CommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
727             expectMsgClass(duration, CommitTransactionReply.class);
728
729             // Verify data in the data store.
730
731             verifyOuterListEntry(shard, 1);
732         }};
733     }
734
735     @Test
736     public void testBatchedModificationsWithCommitOnReady() throws Throwable {
737         new ShardTestKit(getSystem()) {{
738             final TestActorRef<Shard> shard = actorFactory.createTestActor(
739                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
740                     "testBatchedModificationsWithCommitOnReady");
741
742             waitUntilLeader(shard);
743
744             final TransactionIdentifier transactionID = nextTransactionId();
745             final FiniteDuration duration = duration("5 seconds");
746
747             // Send a BatchedModifications to start a transaction.
748
749             shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH,
750                     ImmutableNodes.containerNode(TestModel.TEST_QNAME), false, false, 1), getRef());
751             expectMsgClass(duration, BatchedModificationsReply.class);
752
753             // Send a couple more BatchedModifications.
754
755             shard.tell(newBatchedModifications(transactionID, TestModel.OUTER_LIST_PATH,
756                 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), false, false, 2), getRef());
757             expectMsgClass(duration, BatchedModificationsReply.class);
758
759             shard.tell(newBatchedModifications(transactionID, YangInstanceIdentifier.builder(
760                     TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
761                 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true, true, 3), getRef());
762
763             expectMsgClass(duration, CommitTransactionReply.class);
764
765             // Verify data in the data store.
766
767             verifyOuterListEntry(shard, 1);
768         }};
769     }
770
771     @Test(expected=IllegalStateException.class)
772     public void testBatchedModificationsReadyWithIncorrectTotalMessageCount() throws Throwable {
773         new ShardTestKit(getSystem()) {{
774             final TestActorRef<Shard> shard = actorFactory.createTestActor(
775                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
776                     "testBatchedModificationsReadyWithIncorrectTotalMessageCount");
777
778             waitUntilLeader(shard);
779
780             final TransactionIdentifier transactionID = nextTransactionId();
781             final BatchedModifications batched = new BatchedModifications(transactionID, DataStoreVersions.CURRENT_VERSION);
782             batched.setReady(true);
783             batched.setTotalMessagesSent(2);
784
785             shard.tell(batched, getRef());
786
787             final Failure failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
788
789             if(failure != null) {
790                 throw failure.cause();
791             }
792         }};
793     }
794
795     @Test
796     public void testBatchedModificationsWithOperationFailure() throws Throwable {
797         new ShardTestKit(getSystem()) {{
798             final TestActorRef<Shard> shard = actorFactory.createTestActor(
799                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
800                     "testBatchedModificationsWithOperationFailure");
801
802             waitUntilLeader(shard);
803
804             // Test merge with invalid data. An exception should occur when the merge is applied. Note that
805             // write will not validate the children for performance reasons.
806
807             final TransactionIdentifier transactionID = nextTransactionId();
808
809             final ContainerNode invalidData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
810                     new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
811                         withChild(ImmutableNodes.leafNode(TestModel.JUNK_QNAME, "junk")).build();
812
813             BatchedModifications batched = new BatchedModifications(transactionID, CURRENT_VERSION);
814             batched.addModification(new MergeModification(TestModel.TEST_PATH, invalidData));
815             shard.tell(batched, getRef());
816             Failure failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
817
818             final Throwable cause = failure.cause();
819
820             batched = new BatchedModifications(transactionID, DataStoreVersions.CURRENT_VERSION);
821             batched.setReady(true);
822             batched.setTotalMessagesSent(2);
823
824             shard.tell(batched, getRef());
825
826             failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
827             assertEquals("Failure cause", cause, failure.cause());
828         }};
829     }
830
831     @Test
832     public void testBatchedModificationsOnTransactionChain() throws Throwable {
833         new ShardTestKit(getSystem()) {{
834             final TestActorRef<Shard> shard = actorFactory.createTestActor(
835                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
836                     "testBatchedModificationsOnTransactionChain");
837
838             waitUntilLeader(shard);
839
840             final LocalHistoryIdentifier historyId = nextHistoryId();
841             final TransactionIdentifier transactionID1 = new TransactionIdentifier(historyId, 0);
842             final TransactionIdentifier transactionID2 = new TransactionIdentifier(historyId, 1);
843
844             final FiniteDuration duration = duration("5 seconds");
845
846             // Send a BatchedModifications to start a chained write transaction and ready it.
847
848             final ContainerNode containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
849             final YangInstanceIdentifier path = TestModel.TEST_PATH;
850             shard.tell(newBatchedModifications(transactionID1, path, containerNode, true, false, 1), getRef());
851             expectMsgClass(duration, ReadyTransactionReply.class);
852
853             // Create a read Tx on the same chain.
854
855             shard.tell(new CreateTransaction(transactionID2, TransactionType.READ_ONLY.ordinal(),
856                     DataStoreVersions.CURRENT_VERSION).toSerializable(), getRef());
857
858             final CreateTransactionReply createReply = expectMsgClass(duration("3 seconds"), CreateTransactionReply.class);
859
860             getSystem().actorSelection(createReply.getTransactionPath()).tell(
861                     new ReadData(path, DataStoreVersions.CURRENT_VERSION), getRef());
862             final ReadDataReply readReply = expectMsgClass(duration("3 seconds"), ReadDataReply.class);
863             assertEquals("Read node", containerNode, readReply.getNormalizedNode());
864
865             // Commit the write transaction.
866
867             shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
868             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
869                     expectMsgClass(duration, CanCommitTransactionReply.class));
870             assertEquals("Can commit", true, canCommitReply.getCanCommit());
871
872             shard.tell(new CommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
873             expectMsgClass(duration, CommitTransactionReply.class);
874
875             // Verify data in the data store.
876
877             final NormalizedNode<?, ?> actualNode = readStore(shard, path);
878             assertEquals("Stored node", containerNode, actualNode);
879         }};
880     }
881
882     @Test
883     public void testOnBatchedModificationsWhenNotLeader() {
884         final AtomicBoolean overrideLeaderCalls = new AtomicBoolean();
885         new ShardTestKit(getSystem()) {{
886             final Creator<Shard> creator = new Creator<Shard>() {
887                 private static final long serialVersionUID = 1L;
888
889                 @Override
890                 public Shard create() throws Exception {
891                     return new Shard(newShardBuilder()) {
892                         @Override
893                         protected boolean isLeader() {
894                             return overrideLeaderCalls.get() ? false : super.isLeader();
895                         }
896
897                         @Override
898                         public ActorSelection getLeader() {
899                             return overrideLeaderCalls.get() ? getSystem().actorSelection(getRef().path()) :
900                                 super.getLeader();
901                         }
902                     };
903                 }
904             };
905
906             final TestActorRef<Shard> shard = actorFactory.createTestActor(
907                     Props.create(new DelegatingShardCreator(creator)).
908                         withDispatcher(Dispatchers.DefaultDispatcherId()), "testOnBatchedModificationsWhenNotLeader");
909
910             waitUntilLeader(shard);
911
912             overrideLeaderCalls.set(true);
913
914             final BatchedModifications batched = new BatchedModifications(nextTransactionId(), DataStoreVersions.CURRENT_VERSION);
915
916             shard.tell(batched, ActorRef.noSender());
917
918             expectMsgEquals(batched);
919         }};
920     }
921
922     @Test
923     public void testTransactionMessagesWithNoLeader() {
924         new ShardTestKit(getSystem()) {{
925             dataStoreContextBuilder.customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName()).
926                 shardHeartbeatIntervalInMillis(50).shardElectionTimeoutFactor(1);
927             final TestActorRef<Shard> shard = actorFactory.createTestActor(
928                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
929                     "testTransactionMessagesWithNoLeader");
930
931             waitUntilNoLeader(shard);
932
933             final TransactionIdentifier txId = nextTransactionId();
934             shard.tell(new BatchedModifications(txId, DataStoreVersions.CURRENT_VERSION), getRef());
935             Failure failure = expectMsgClass(Failure.class);
936             assertEquals("Failure cause type", NoShardLeaderException.class, failure.cause().getClass());
937
938             shard.tell(prepareForwardedReadyTransaction(shard, txId, TestModel.TEST_PATH,
939                     ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
940             failure = expectMsgClass(Failure.class);
941             assertEquals("Failure cause type", NoShardLeaderException.class, failure.cause().getClass());
942
943             shard.tell(new ReadyLocalTransaction(txId, mock(DataTreeModification.class), true), getRef());
944             failure = expectMsgClass(Failure.class);
945             assertEquals("Failure cause type", NoShardLeaderException.class, failure.cause().getClass());
946         }};
947     }
948
949     @Test
950     public void testReadyWithReadWriteImmediateCommit() throws Exception{
951         testReadyWithImmediateCommit(true);
952     }
953
954     @Test
955     public void testReadyWithWriteOnlyImmediateCommit() throws Exception{
956         testReadyWithImmediateCommit(false);
957     }
958
959     private void testReadyWithImmediateCommit(final boolean readWrite) throws Exception{
960         new ShardTestKit(getSystem()) {{
961             final TestActorRef<Shard> shard = actorFactory.createTestActor(
962                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
963                     "testReadyWithImmediateCommit-" + readWrite);
964
965             waitUntilLeader(shard);
966
967             final TransactionIdentifier transactionID = nextTransactionId();
968             final NormalizedNode<?, ?> containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
969             if(readWrite) {
970                 shard.tell(prepareForwardedReadyTransaction(shard, transactionID, TestModel.TEST_PATH,
971                         containerNode, true), getRef());
972             } else {
973                 shard.tell(prepareBatchedModifications(transactionID, TestModel.TEST_PATH, containerNode, true), getRef());
974             }
975
976             expectMsgClass(duration("5 seconds"), CommitTransactionReply.class);
977
978             final NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.TEST_PATH);
979             assertEquals(TestModel.TEST_QNAME.getLocalName(), containerNode, actualNode);
980         }};
981     }
982
983     @Test
984     public void testReadyLocalTransactionWithImmediateCommit() throws Exception{
985         new ShardTestKit(getSystem()) {{
986             final TestActorRef<Shard> shard = actorFactory.createTestActor(
987                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
988                     "testReadyLocalTransactionWithImmediateCommit");
989
990             waitUntilLeader(shard);
991
992             final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
993
994             final DataTreeModification modification = dataStore.newModification();
995
996             final ContainerNode writeData = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
997             new WriteModification(TestModel.TEST_PATH, writeData).apply(modification);
998             final MapNode mergeData = ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build();
999             new MergeModification(TestModel.OUTER_LIST_PATH, mergeData).apply(modification);
1000
1001             final TransactionIdentifier txId = nextTransactionId();
1002             modification.ready();
1003             final ReadyLocalTransaction readyMessage = new ReadyLocalTransaction(txId, modification, true);
1004
1005             shard.tell(readyMessage, getRef());
1006
1007             expectMsgClass(CommitTransactionReply.class);
1008
1009             final NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.OUTER_LIST_PATH);
1010             assertEquals(TestModel.OUTER_LIST_QNAME.getLocalName(), mergeData, actualNode);
1011         }};
1012     }
1013
1014     @Test
1015     public void testReadyLocalTransactionWithThreePhaseCommit() throws Exception{
1016         new ShardTestKit(getSystem()) {{
1017             final TestActorRef<Shard> shard = actorFactory.createTestActor(
1018                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1019                     "testReadyLocalTransactionWithThreePhaseCommit");
1020
1021             waitUntilLeader(shard);
1022
1023             final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1024
1025             final DataTreeModification modification = dataStore.newModification();
1026
1027             final ContainerNode writeData = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1028             new WriteModification(TestModel.TEST_PATH, writeData).apply(modification);
1029             final MapNode mergeData = ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build();
1030             new MergeModification(TestModel.OUTER_LIST_PATH, mergeData).apply(modification);
1031
1032             final TransactionIdentifier txId = nextTransactionId();
1033             modification.ready();
1034             final ReadyLocalTransaction readyMessage = new ReadyLocalTransaction(txId, modification, false);
1035
1036             shard.tell(readyMessage, getRef());
1037
1038             expectMsgClass(ReadyTransactionReply.class);
1039
1040             // Send the CanCommitTransaction message.
1041
1042             shard.tell(new CanCommitTransaction(txId, CURRENT_VERSION).toSerializable(), getRef());
1043             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1044                     expectMsgClass(CanCommitTransactionReply.class));
1045             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1046
1047             // Send the CanCommitTransaction message.
1048
1049             shard.tell(new CommitTransaction(txId, CURRENT_VERSION).toSerializable(), getRef());
1050             expectMsgClass(CommitTransactionReply.class);
1051
1052             final NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.OUTER_LIST_PATH);
1053             assertEquals(TestModel.OUTER_LIST_QNAME.getLocalName(), mergeData, actualNode);
1054         }};
1055     }
1056
1057     @Test
1058     public void testReadWriteCommitWithPersistenceDisabled() throws Throwable {
1059         dataStoreContextBuilder.persistent(false);
1060         new ShardTestKit(getSystem()) {{
1061             final TestActorRef<Shard> shard = actorFactory.createTestActor(
1062                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1063                     "testCommitWithPersistenceDisabled");
1064
1065             waitUntilLeader(shard);
1066
1067             // Setup a simulated transactions with a mock cohort.
1068
1069             final FiniteDuration duration = duration("5 seconds");
1070
1071             final TransactionIdentifier transactionID = nextTransactionId();
1072             final NormalizedNode<?, ?> containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1073             shard.tell(prepareBatchedModifications(transactionID, TestModel.TEST_PATH, containerNode, false), getRef());
1074             expectMsgClass(duration, ReadyTransactionReply.class);
1075
1076             // Send the CanCommitTransaction message.
1077
1078             shard.tell(new CanCommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
1079             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1080                     expectMsgClass(duration, CanCommitTransactionReply.class));
1081             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1082
1083             // Send the CanCommitTransaction message.
1084
1085             shard.tell(new CommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
1086             expectMsgClass(duration, CommitTransactionReply.class);
1087
1088             final NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.TEST_PATH);
1089             assertEquals(TestModel.TEST_QNAME.getLocalName(), containerNode, actualNode);
1090         }};
1091     }
1092
1093     @Test
1094     public void testReadWriteCommitWhenTransactionHasNoModifications() {
1095         testCommitWhenTransactionHasNoModifications(true);
1096     }
1097
1098     @Test
1099     public void testWriteOnlyCommitWhenTransactionHasNoModifications() {
1100         testCommitWhenTransactionHasNoModifications(false);
1101     }
1102
1103     private void testCommitWhenTransactionHasNoModifications(final boolean readWrite){
1104         // Note that persistence is enabled which would normally result in the entry getting written to the journal
1105         // but here that need not happen
1106         new ShardTestKit(getSystem()) {{
1107             final TestActorRef<Shard> shard = actorFactory.createTestActor(
1108                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1109                     "testCommitWhenTransactionHasNoModifications-" + readWrite);
1110
1111             waitUntilLeader(shard);
1112
1113             final TransactionIdentifier transactionID = nextTransactionId();
1114
1115             final FiniteDuration duration = duration("5 seconds");
1116
1117             if(readWrite) {
1118                 final ReadWriteShardDataTreeTransaction rwTx = shard.underlyingActor().getDataStore().
1119                         newReadWriteTransaction(transactionID);
1120                 shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION, rwTx, false), getRef());
1121             } else {
1122                 shard.tell(prepareBatchedModifications(transactionID, new MutableCompositeModification()), getRef());
1123             }
1124
1125             expectMsgClass(duration, ReadyTransactionReply.class);
1126
1127             // Send the CanCommitTransaction message.
1128
1129             shard.tell(new CanCommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
1130             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1131                     expectMsgClass(duration, CanCommitTransactionReply.class));
1132             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1133
1134             shard.tell(new CommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
1135             expectMsgClass(duration, CommitTransactionReply.class);
1136
1137             shard.tell(Shard.GET_SHARD_MBEAN_MESSAGE, getRef());
1138             final ShardStats shardStats = expectMsgClass(duration, ShardStats.class);
1139
1140             // Use MBean for verification
1141             // Committed transaction count should increase as usual
1142             assertEquals(1,shardStats.getCommittedTransactionsCount());
1143
1144             // Commit index should not advance because this does not go into the journal
1145             assertEquals(-1, shardStats.getCommitIndex());
1146         }};
1147     }
1148
1149     @Test
1150     public void testReadWriteCommitWhenTransactionHasModifications() throws Exception {
1151         testCommitWhenTransactionHasModifications(true);
1152     }
1153
1154     @Test
1155     public void testWriteOnlyCommitWhenTransactionHasModifications() throws Exception {
1156         testCommitWhenTransactionHasModifications(false);
1157     }
1158
1159     private void testCommitWhenTransactionHasModifications(final boolean readWrite) throws Exception {
1160         new ShardTestKit(getSystem()) {{
1161             final TipProducingDataTree dataTree = createDelegatingMockDataTree();
1162             final TestActorRef<Shard> shard = actorFactory.createTestActor(
1163                     newShardBuilder().dataTree(dataTree).props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1164                     "testCommitWhenTransactionHasModifications-" + readWrite);
1165
1166             waitUntilLeader(shard);
1167
1168             final FiniteDuration duration = duration("5 seconds");
1169             final TransactionIdentifier transactionID = nextTransactionId();
1170
1171             if(readWrite) {
1172                 shard.tell(prepareForwardedReadyTransaction(shard, transactionID, TestModel.TEST_PATH,
1173                         ImmutableNodes.containerNode(TestModel.TEST_QNAME), false), getRef());
1174             } else {
1175                 shard.tell(prepareBatchedModifications(transactionID, TestModel.TEST_PATH,
1176                         ImmutableNodes.containerNode(TestModel.TEST_QNAME), false), getRef());
1177             }
1178
1179             expectMsgClass(duration, ReadyTransactionReply.class);
1180
1181             // Send the CanCommitTransaction message.
1182
1183             shard.tell(new CanCommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
1184             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1185                     expectMsgClass(duration, CanCommitTransactionReply.class));
1186             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1187
1188             shard.tell(new CommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
1189             expectMsgClass(duration, CommitTransactionReply.class);
1190
1191             final InOrder inOrder = inOrder(dataTree);
1192             inOrder.verify(dataTree).validate(any(DataTreeModification.class));
1193             inOrder.verify(dataTree).prepare(any(DataTreeModification.class));
1194             inOrder.verify(dataTree).commit(any(DataTreeCandidate.class));
1195
1196             shard.tell(Shard.GET_SHARD_MBEAN_MESSAGE, getRef());
1197             final ShardStats shardStats = expectMsgClass(duration, ShardStats.class);
1198
1199             // Use MBean for verification
1200             // Committed transaction count should increase as usual
1201             assertEquals(1, shardStats.getCommittedTransactionsCount());
1202
1203             // Commit index should advance as we do not have an empty modification
1204             assertEquals(0, shardStats.getCommitIndex());
1205         }};
1206     }
1207
1208     @Test
1209     public void testCommitPhaseFailure() throws Throwable {
1210         new ShardTestKit(getSystem()) {{
1211             final TipProducingDataTree dataTree = createDelegatingMockDataTree();
1212             final TestActorRef<Shard> shard = actorFactory.createTestActor(
1213                     newShardBuilder().dataTree(dataTree).props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1214                     "testCommitPhaseFailure");
1215
1216             waitUntilLeader(shard);
1217
1218             final FiniteDuration duration = duration("5 seconds");
1219             final Timeout timeout = new Timeout(duration);
1220
1221             // Setup 2 simulated transactions with mock cohorts. The first one fails in the
1222             // commit phase.
1223
1224             doThrow(new RuntimeException("mock commit failure")).when(dataTree).commit(any(DataTreeCandidate.class));
1225
1226             final TransactionIdentifier transactionID1 = nextTransactionId();
1227             shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH,
1228                     ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), getRef());
1229             expectMsgClass(duration, ReadyTransactionReply.class);
1230
1231             final TransactionIdentifier transactionID2 = nextTransactionId();
1232             shard.tell(newBatchedModifications(transactionID2, TestModel.TEST_PATH,
1233                     ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), getRef());
1234             expectMsgClass(duration, ReadyTransactionReply.class);
1235
1236             // Send the CanCommitTransaction message for the first Tx.
1237
1238             shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
1239             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1240                     expectMsgClass(duration, CanCommitTransactionReply.class));
1241             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1242
1243             // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
1244             // processed after the first Tx completes.
1245
1246             final Future<Object> canCommitFuture = Patterns.ask(shard,
1247                     new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), timeout);
1248
1249             // Send the CommitTransaction message for the first Tx. This should send back an error
1250             // and trigger the 2nd Tx to proceed.
1251
1252             shard.tell(new CommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
1253             expectMsgClass(duration, akka.actor.Status.Failure.class);
1254
1255             // Wait for the 2nd Tx to complete the canCommit phase.
1256
1257             final CountDownLatch latch = new CountDownLatch(1);
1258             canCommitFuture.onComplete(new OnComplete<Object>() {
1259                 @Override
1260                 public void onComplete(final Throwable t, final Object resp) {
1261                     latch.countDown();
1262                 }
1263             }, getSystem().dispatcher());
1264
1265             assertEquals("2nd CanCommit complete", true, latch.await(5, TimeUnit.SECONDS));
1266
1267             final InOrder inOrder = inOrder(dataTree);
1268             inOrder.verify(dataTree).validate(any(DataTreeModification.class));
1269             inOrder.verify(dataTree).prepare(any(DataTreeModification.class));
1270             inOrder.verify(dataTree).commit(any(DataTreeCandidate.class));
1271             inOrder.verify(dataTree).validate(any(DataTreeModification.class));
1272         }};
1273     }
1274
1275     @Test
1276     public void testPreCommitPhaseFailure() throws Throwable {
1277         new ShardTestKit(getSystem()) {{
1278             final TipProducingDataTree dataTree = createDelegatingMockDataTree();
1279             final TestActorRef<Shard> shard = actorFactory.createTestActor(
1280                     newShardBuilder().dataTree(dataTree).props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1281                     "testPreCommitPhaseFailure");
1282
1283             waitUntilLeader(shard);
1284
1285             final FiniteDuration duration = duration("5 seconds");
1286             final Timeout timeout = new Timeout(duration);
1287
1288             doThrow(new RuntimeException("mock preCommit failure")).when(dataTree).prepare(any(DataTreeModification.class));
1289
1290             final TransactionIdentifier transactionID1 = nextTransactionId();
1291             shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH,
1292                     ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), getRef());
1293             expectMsgClass(duration, ReadyTransactionReply.class);
1294
1295             final TransactionIdentifier transactionID2 = nextTransactionId();
1296             shard.tell(newBatchedModifications(transactionID2, TestModel.TEST_PATH,
1297                     ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), getRef());
1298             expectMsgClass(duration, ReadyTransactionReply.class);
1299
1300             // Send the CanCommitTransaction message for the first Tx.
1301
1302             shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
1303             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1304                 expectMsgClass(duration, CanCommitTransactionReply.class));
1305             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1306
1307             // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
1308             // processed after the first Tx completes.
1309
1310             final Future<Object> canCommitFuture = Patterns.ask(shard,
1311                     new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), timeout);
1312
1313             // Send the CommitTransaction message for the first Tx. This should send back an error
1314             // and trigger the 2nd Tx to proceed.
1315
1316             shard.tell(new CommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
1317             expectMsgClass(duration, akka.actor.Status.Failure.class);
1318
1319             // Wait for the 2nd Tx to complete the canCommit phase.
1320
1321             final CountDownLatch latch = new CountDownLatch(1);
1322             canCommitFuture.onComplete(new OnComplete<Object>() {
1323                 @Override
1324                 public void onComplete(final Throwable t, final Object resp) {
1325                     latch.countDown();
1326                 }
1327             }, getSystem().dispatcher());
1328
1329             assertEquals("2nd CanCommit complete", true, latch.await(5, TimeUnit.SECONDS));
1330
1331             final InOrder inOrder = inOrder(dataTree);
1332             inOrder.verify(dataTree).validate(any(DataTreeModification.class));
1333             inOrder.verify(dataTree).prepare(any(DataTreeModification.class));
1334             inOrder.verify(dataTree).validate(any(DataTreeModification.class));
1335         }};
1336     }
1337
1338     @Test
1339     public void testCanCommitPhaseFailure() throws Throwable {
1340         new ShardTestKit(getSystem()) {{
1341             final TipProducingDataTree dataTree = createDelegatingMockDataTree();
1342             final TestActorRef<Shard> shard = actorFactory.createTestActor(
1343                     newShardBuilder().dataTree(dataTree).props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1344                     "testCanCommitPhaseFailure");
1345
1346             waitUntilLeader(shard);
1347
1348             final FiniteDuration duration = duration("5 seconds");
1349             final TransactionIdentifier transactionID1 = nextTransactionId();
1350
1351             doThrow(new DataValidationFailedException(YangInstanceIdentifier.EMPTY, "mock canCommit failure")).
1352                 doNothing().when(dataTree).validate(any(DataTreeModification.class));
1353
1354             shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH,
1355                     ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), getRef());
1356             expectMsgClass(duration, ReadyTransactionReply.class);
1357
1358             // Send the CanCommitTransaction message.
1359
1360             shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
1361             expectMsgClass(duration, akka.actor.Status.Failure.class);
1362
1363             // Send another can commit to ensure the failed one got cleaned up.
1364
1365             final TransactionIdentifier transactionID2 = nextTransactionId();
1366             shard.tell(newBatchedModifications(transactionID2, TestModel.TEST_PATH,
1367                     ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), getRef());
1368             expectMsgClass(duration, ReadyTransactionReply.class);
1369
1370             shard.tell(new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), getRef());
1371             final CanCommitTransactionReply reply = CanCommitTransactionReply.fromSerializable(
1372                     expectMsgClass(CanCommitTransactionReply.class));
1373             assertEquals("getCanCommit", true, reply.getCanCommit());
1374         }};
1375     }
1376
1377     @Test
1378     public void testImmediateCommitWithCanCommitPhaseFailure() throws Throwable {
1379         testImmediateCommitWithCanCommitPhaseFailure(true);
1380         testImmediateCommitWithCanCommitPhaseFailure(false);
1381     }
1382
1383     private void testImmediateCommitWithCanCommitPhaseFailure(final boolean readWrite) throws Throwable {
1384         new ShardTestKit(getSystem()) {{
1385             final TipProducingDataTree dataTree = createDelegatingMockDataTree();
1386             final TestActorRef<Shard> shard = actorFactory.createTestActor(
1387                     newShardBuilder().dataTree(dataTree).props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1388                     "testImmediateCommitWithCanCommitPhaseFailure-" + readWrite);
1389
1390             waitUntilLeader(shard);
1391
1392             doThrow(new DataValidationFailedException(YangInstanceIdentifier.EMPTY, "mock canCommit failure")).
1393                  doNothing().when(dataTree).validate(any(DataTreeModification.class));
1394
1395             final FiniteDuration duration = duration("5 seconds");
1396
1397             final TransactionIdentifier transactionID1 = nextTransactionId();
1398
1399             if(readWrite) {
1400                 shard.tell(prepareForwardedReadyTransaction(shard, transactionID1, TestModel.TEST_PATH,
1401                         ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
1402             } else {
1403                 shard.tell(prepareBatchedModifications(transactionID1, TestModel.TEST_PATH,
1404                         ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
1405             }
1406
1407             expectMsgClass(duration, akka.actor.Status.Failure.class);
1408
1409             // Send another can commit to ensure the failed one got cleaned up.
1410
1411             final TransactionIdentifier transactionID2 = nextTransactionId();
1412             if(readWrite) {
1413                 shard.tell(prepareForwardedReadyTransaction(shard, transactionID2, TestModel.TEST_PATH,
1414                         ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
1415             } else {
1416                 shard.tell(prepareBatchedModifications(transactionID2, TestModel.TEST_PATH,
1417                         ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
1418             }
1419
1420             expectMsgClass(duration, CommitTransactionReply.class);
1421         }};
1422     }
1423
1424     @SuppressWarnings("serial")
1425     @Test
1426     public void testAbortWithCommitPending() throws Throwable {
1427         new ShardTestKit(getSystem()) {{
1428             final Creator<Shard> creator = () -> new Shard(newShardBuilder()) {
1429                 @Override
1430                 void persistPayload(final TransactionIdentifier transactionId, final Payload payload) {
1431                     // Simulate an AbortTransaction message occurring during replication, after
1432                     // persisting and before finishing the commit to the in-memory store.
1433
1434                     doAbortTransaction(transactionId, null);
1435                     super.persistPayload(transactionId, payload);
1436                 }
1437             };
1438
1439             final TestActorRef<Shard> shard = actorFactory.createTestActor(
1440                     Props.create(new DelegatingShardCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()),
1441                     "testAbortWithCommitPending");
1442
1443             waitUntilLeader(shard);
1444
1445             final FiniteDuration duration = duration("5 seconds");
1446
1447             final TransactionIdentifier transactionID = nextTransactionId();
1448
1449             shard.tell(prepareBatchedModifications(transactionID, TestModel.TEST_PATH,
1450                     ImmutableNodes.containerNode(TestModel.TEST_QNAME), false), getRef());
1451             expectMsgClass(duration, ReadyTransactionReply.class);
1452
1453             shard.tell(new CanCommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
1454             expectMsgClass(duration, CanCommitTransactionReply.class);
1455
1456             shard.tell(new CommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
1457             expectMsgClass(duration, CommitTransactionReply.class);
1458
1459             final NormalizedNode<?, ?> node = readStore(shard, TestModel.TEST_PATH);
1460
1461             // Since we're simulating an abort occurring during replication and before finish commit,
1462             // the data should still get written to the in-memory store since we've gotten past
1463             // canCommit and preCommit and persisted the data.
1464             assertNotNull(TestModel.TEST_QNAME.getLocalName() + " not found", node);
1465         }};
1466     }
1467
1468     @Test
1469     public void testTransactionCommitTimeout() throws Throwable {
1470         dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1);
1471         new ShardTestKit(getSystem()) {{
1472             final TestActorRef<Shard> shard = actorFactory.createTestActor(
1473                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1474                     "testTransactionCommitTimeout");
1475
1476             waitUntilLeader(shard);
1477
1478             final FiniteDuration duration = duration("5 seconds");
1479
1480             writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1481             writeToStore(shard, TestModel.OUTER_LIST_PATH,
1482                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
1483
1484             // Ready 2 Tx's - the first will timeout
1485
1486             final TransactionIdentifier transactionID1 = nextTransactionId();
1487             shard.tell(prepareBatchedModifications(transactionID1, YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
1488                     .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
1489                 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), false), getRef());
1490             expectMsgClass(duration, ReadyTransactionReply.class);
1491
1492             final TransactionIdentifier transactionID2 = nextTransactionId();
1493             final YangInstanceIdentifier listNodePath = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
1494                     .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build();
1495             shard.tell(prepareBatchedModifications(transactionID2, listNodePath,
1496                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2), false), getRef());
1497             expectMsgClass(duration, ReadyTransactionReply.class);
1498
1499             // canCommit 1st Tx. We don't send the commit so it should timeout.
1500
1501             shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
1502             expectMsgClass(duration, CanCommitTransactionReply.class);
1503
1504             // canCommit the 2nd Tx - it should complete after the 1st Tx times out.
1505
1506             shard.tell(new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), getRef());
1507             expectMsgClass(duration, CanCommitTransactionReply.class);
1508
1509             // Try to commit the 1st Tx - should fail as it's not the current Tx.
1510
1511             shard.tell(new CommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
1512             expectMsgClass(duration, akka.actor.Status.Failure.class);
1513
1514             // Commit the 2nd Tx.
1515
1516             shard.tell(new CommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), getRef());
1517             expectMsgClass(duration, CommitTransactionReply.class);
1518
1519             final NormalizedNode<?, ?> node = readStore(shard, listNodePath);
1520             assertNotNull(listNodePath + " not found", node);
1521         }};
1522     }
1523
1524 //    @Test
1525 //    @Ignore
1526 //    public void testTransactionCommitQueueCapacityExceeded() throws Throwable {
1527 //        dataStoreContextBuilder.shardTransactionCommitQueueCapacity(2);
1528 //
1529 //        new ShardTestKit(getSystem()) {{
1530 //            final TestActorRef<Shard> shard = actorFactory.createTestActor(
1531 //                    newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1532 //                    "testTransactionCommitQueueCapacityExceeded");
1533 //
1534 //            waitUntilLeader(shard);
1535 //
1536 //            final FiniteDuration duration = duration("5 seconds");
1537 //
1538 //            final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1539 //
1540 //            final TransactionIdentifier transactionID1 = nextTransactionId();
1541 //            final MutableCompositeModification modification1 = new MutableCompositeModification();
1542 //            final ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
1543 //                    TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), transactionID1,
1544 //                    modification1);
1545 //
1546 //            final TransactionIdentifier transactionID2 = nextTransactionId();
1547 //            final MutableCompositeModification modification2 = new MutableCompositeModification();
1548 //            final ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
1549 //                    TestModel.OUTER_LIST_PATH,
1550 //                    ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), transactionID2,
1551 //                    modification2);
1552 //
1553 //            final TransactionIdentifier transactionID3 = nextTransactionId();
1554 //            final MutableCompositeModification modification3 = new MutableCompositeModification();
1555 //            final ShardDataTreeCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
1556 //                    TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), transactionID3,
1557 //                    modification3);
1558 //
1559 //            // Ready the Tx's
1560 //
1561 //            shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort1, transactionID1, modification1), getRef());
1562 //            expectMsgClass(duration, ReadyTransactionReply.class);
1563 //
1564 //            shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort2, transactionID2, modification2), getRef());
1565 //            expectMsgClass(duration, ReadyTransactionReply.class);
1566 //
1567 //            // The 3rd Tx should exceed queue capacity and fail.
1568 //
1569 //            shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort3, transactionID3, modification3), getRef());
1570 //            expectMsgClass(duration, akka.actor.Status.Failure.class);
1571 //
1572 //            // canCommit 1st Tx.
1573 //
1574 //            shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
1575 //            expectMsgClass(duration, CanCommitTransactionReply.class);
1576 //
1577 //            // canCommit the 2nd Tx - it should get queued.
1578 //
1579 //            shard.tell(new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), getRef());
1580 //
1581 //            // canCommit the 3rd Tx - should exceed queue capacity and fail.
1582 //
1583 //            shard.tell(new CanCommitTransaction(transactionID3, CURRENT_VERSION).toSerializable(), getRef());
1584 //            expectMsgClass(duration, akka.actor.Status.Failure.class);
1585 //        }};
1586 //    }
1587
1588     @Test
1589     public void testTransactionCommitWithPriorExpiredCohortEntries() throws Throwable {
1590         dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1);
1591         new ShardTestKit(getSystem()) {{
1592             final TestActorRef<Shard> shard = actorFactory.createTestActor(
1593                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1594                     "testTransactionCommitWithPriorExpiredCohortEntries");
1595
1596             waitUntilLeader(shard);
1597
1598             final FiniteDuration duration = duration("5 seconds");
1599
1600             final TransactionIdentifier transactionID1 = nextTransactionId();
1601             shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH,
1602                     ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), getRef());
1603             expectMsgClass(duration, ReadyTransactionReply.class);
1604
1605             final TransactionIdentifier transactionID2 = nextTransactionId();
1606             shard.tell(newBatchedModifications(transactionID2, TestModel.TEST_PATH,
1607                     ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), getRef());
1608             expectMsgClass(duration, ReadyTransactionReply.class);
1609
1610             final TransactionIdentifier transactionID3 = nextTransactionId();
1611             shard.tell(newBatchedModifications(transactionID3, TestModel.TEST_PATH,
1612                     ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), getRef());
1613             expectMsgClass(duration, ReadyTransactionReply.class);
1614
1615             // All Tx's are readied. We'll send canCommit for the last one but not the others. The others
1616             // should expire from the queue and the last one should be processed.
1617
1618             shard.tell(new CanCommitTransaction(transactionID3, CURRENT_VERSION).toSerializable(), getRef());
1619             expectMsgClass(duration, CanCommitTransactionReply.class);
1620         }};
1621     }
1622
1623     @Test
1624     public void testTransactionCommitWithSubsequentExpiredCohortEntry() throws Throwable {
1625         dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1);
1626         new ShardTestKit(getSystem()) {{
1627             final TestActorRef<Shard> shard = actorFactory.createTestActor(
1628                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1629                     "testTransactionCommitWithSubsequentExpiredCohortEntry");
1630
1631             waitUntilLeader(shard);
1632
1633             final FiniteDuration duration = duration("5 seconds");
1634
1635             final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1636
1637             final TransactionIdentifier transactionID1 = nextTransactionId();
1638             shard.tell(prepareBatchedModifications(transactionID1, TestModel.TEST_PATH,
1639                     ImmutableNodes.containerNode(TestModel.TEST_QNAME), false), getRef());
1640             expectMsgClass(duration, ReadyTransactionReply.class);
1641
1642             // CanCommit the first Tx so it's the current in-progress Tx.
1643
1644             shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
1645             expectMsgClass(duration, CanCommitTransactionReply.class);
1646
1647             // Ready the second Tx.
1648
1649             final TransactionIdentifier transactionID2 = nextTransactionId();
1650             shard.tell(prepareBatchedModifications(transactionID2, TestModel.TEST_PATH,
1651                     ImmutableNodes.containerNode(TestModel.TEST_QNAME), false), getRef());
1652             expectMsgClass(duration, ReadyTransactionReply.class);
1653
1654             // Ready the third Tx.
1655
1656             final TransactionIdentifier transactionID3 = nextTransactionId();
1657             final DataTreeModification modification3 = dataStore.newModification();
1658             new WriteModification(TestModel.TEST2_PATH, ImmutableNodes.containerNode(TestModel.TEST2_QNAME))
1659                     .apply(modification3);
1660             modification3.ready();
1661             final ReadyLocalTransaction readyMessage = new ReadyLocalTransaction(transactionID3, modification3, true);
1662             shard.tell(readyMessage, getRef());
1663
1664             // Commit the first Tx. After completing, the second should expire from the queue and the third
1665             // Tx committed.
1666
1667             shard.tell(new CommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
1668             expectMsgClass(duration, CommitTransactionReply.class);
1669
1670             // Expect commit reply from the third Tx.
1671
1672             expectMsgClass(duration, CommitTransactionReply.class);
1673
1674             final NormalizedNode<?, ?> node = readStore(shard, TestModel.TEST2_PATH);
1675             assertNotNull(TestModel.TEST2_PATH + " not found", node);
1676         }};
1677     }
1678
1679     @Test
1680     public void testCanCommitBeforeReadyFailure() throws Throwable {
1681         new ShardTestKit(getSystem()) {{
1682             final TestActorRef<Shard> shard = actorFactory.createTestActor(
1683                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1684                     "testCanCommitBeforeReadyFailure");
1685
1686             shard.tell(new CanCommitTransaction(nextTransactionId(), CURRENT_VERSION).toSerializable(), getRef());
1687             expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
1688         }};
1689     }
1690
1691     @Test
1692     public void testAbortAfterCanCommit() throws Throwable {
1693         new ShardTestKit(getSystem()) {{
1694             final TestActorRef<Shard> shard = actorFactory.createTestActor(
1695                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1696                     "testAbortAfterCanCommit");
1697
1698             waitUntilLeader(shard);
1699
1700             final FiniteDuration duration = duration("5 seconds");
1701             final Timeout timeout = new Timeout(duration);
1702
1703             // Ready 2 transactions - the first one will be aborted.
1704
1705             final TransactionIdentifier transactionID1 = nextTransactionId();
1706             shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH,
1707                     ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), getRef());
1708             expectMsgClass(duration, ReadyTransactionReply.class);
1709
1710             final TransactionIdentifier transactionID2 = nextTransactionId();
1711             shard.tell(newBatchedModifications(transactionID2, TestModel.TEST_PATH,
1712                     ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), getRef());
1713             expectMsgClass(duration, ReadyTransactionReply.class);
1714
1715             // Send the CanCommitTransaction message for the first Tx.
1716
1717             shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
1718             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1719                     expectMsgClass(duration, CanCommitTransactionReply.class));
1720             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1721
1722             // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
1723             // processed after the first Tx completes.
1724
1725             final Future<Object> canCommitFuture = Patterns.ask(shard,
1726                     new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), timeout);
1727
1728             // Send the AbortTransaction message for the first Tx. This should trigger the 2nd
1729             // Tx to proceed.
1730
1731             shard.tell(new AbortTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
1732             expectMsgClass(duration, AbortTransactionReply.class);
1733
1734             // Wait for the 2nd Tx to complete the canCommit phase.
1735
1736             canCommitReply = (CanCommitTransactionReply) Await.result(canCommitFuture, duration);
1737             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1738         }};
1739     }
1740
1741     @Test
1742     public void testAbortAfterReady() throws Throwable {
1743         dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1);
1744         new ShardTestKit(getSystem()) {{
1745             final TestActorRef<Shard> shard = actorFactory.createTestActor(
1746                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), "testAbortAfterReady");
1747
1748             waitUntilLeader(shard);
1749
1750             final FiniteDuration duration = duration("5 seconds");
1751
1752             // Ready a tx.
1753
1754             final TransactionIdentifier transactionID1 = nextTransactionId();
1755             shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH,
1756                     ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), getRef());
1757             expectMsgClass(duration, ReadyTransactionReply.class);
1758
1759             // Send the AbortTransaction message.
1760
1761             shard.tell(new AbortTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
1762             expectMsgClass(duration, AbortTransactionReply.class);
1763
1764             assertEquals("getPendingTxCommitQueueSize", 0, shard.underlyingActor().getPendingTxCommitQueueSize());
1765
1766             // Now send CanCommitTransaction - should fail.
1767
1768             shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
1769             final Throwable failure = expectMsgClass(duration, akka.actor.Status.Failure.class).cause();
1770             assertTrue("Failure type", failure instanceof IllegalStateException);
1771
1772             // Ready and CanCommit another and verify success.
1773
1774             final TransactionIdentifier transactionID2 = nextTransactionId();
1775             shard.tell(newBatchedModifications(transactionID2, TestModel.TEST_PATH,
1776                     ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), getRef());
1777             expectMsgClass(duration, ReadyTransactionReply.class);
1778
1779             shard.tell(new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), getRef());
1780             expectMsgClass(duration, CanCommitTransactionReply.class);
1781         }};
1782     }
1783
1784     @Test
1785     public void testAbortQueuedTransaction() throws Throwable {
1786         new ShardTestKit(getSystem()) {{
1787             final TestActorRef<Shard> shard = actorFactory.createTestActor(
1788                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), "testAbortAfterReady");
1789
1790             waitUntilLeader(shard);
1791
1792             final FiniteDuration duration = duration("5 seconds");
1793
1794             // Ready 3 tx's.
1795
1796             final TransactionIdentifier transactionID1 = nextTransactionId();
1797             shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH,
1798                     ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), getRef());
1799             expectMsgClass(duration, ReadyTransactionReply.class);
1800
1801             final TransactionIdentifier transactionID2 = nextTransactionId();
1802             shard.tell(newBatchedModifications(transactionID2, TestModel.TEST_PATH,
1803                     ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), getRef());
1804             expectMsgClass(duration, ReadyTransactionReply.class);
1805
1806             final TransactionIdentifier transactionID3 = nextTransactionId();
1807             shard.tell(newBatchedModifications(transactionID3, TestModel.OUTER_LIST_PATH,
1808                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), true, false, 1), getRef());
1809             expectMsgClass(duration, ReadyTransactionReply.class);
1810
1811             // Abort the second tx while it's queued.
1812
1813             shard.tell(new AbortTransaction(transactionID2, CURRENT_VERSION).toSerializable(), getRef());
1814             expectMsgClass(duration, AbortTransactionReply.class);
1815
1816             // Commit the other 2.
1817
1818             shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
1819             expectMsgClass(duration, CanCommitTransactionReply.class);
1820
1821             shard.tell(new CommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
1822             expectMsgClass(duration, CommitTransactionReply.class);
1823
1824             shard.tell(new CanCommitTransaction(transactionID3, CURRENT_VERSION).toSerializable(), getRef());
1825             expectMsgClass(duration, CanCommitTransactionReply.class);
1826
1827             shard.tell(new CommitTransaction(transactionID3, CURRENT_VERSION).toSerializable(), getRef());
1828             expectMsgClass(duration, CommitTransactionReply.class);
1829
1830             assertEquals("getPendingTxCommitQueueSize", 0, shard.underlyingActor().getPendingTxCommitQueueSize());
1831         }};
1832     }
1833
1834     @Test
1835     public void testCreateSnapshot() throws Exception {
1836         testCreateSnapshot(true, "testCreateSnapshot");
1837     }
1838
1839     @Test
1840     public void testCreateSnapshotWithNonPersistentData() throws Exception {
1841         testCreateSnapshot(false, "testCreateSnapshotWithNonPersistentData");
1842     }
1843
1844     private void testCreateSnapshot(final boolean persistent, final String shardActorName) throws Exception{
1845
1846         final AtomicReference<CountDownLatch> latch = new AtomicReference<>(new CountDownLatch(1));
1847
1848         final AtomicReference<Object> savedSnapshot = new AtomicReference<>();
1849         class TestPersistentDataProvider extends DelegatingPersistentDataProvider {
1850             TestPersistentDataProvider(final DataPersistenceProvider delegate) {
1851                 super(delegate);
1852             }
1853
1854             @Override
1855             public void saveSnapshot(final Object o) {
1856                 savedSnapshot.set(o);
1857                 super.saveSnapshot(o);
1858             }
1859         }
1860
1861         dataStoreContextBuilder.persistent(persistent);
1862
1863         new ShardTestKit(getSystem()) {{
1864             class TestShard extends Shard {
1865
1866                 protected TestShard(final AbstractBuilder<?, ?> builder) {
1867                     super(builder);
1868                     setPersistence(new TestPersistentDataProvider(super.persistence()));
1869                 }
1870
1871                 @Override
1872                 public void handleCommand(final Object message) {
1873                     super.handleCommand(message);
1874
1875                     // XXX:  commit_snapshot equality check references RaftActorSnapshotMessageSupport.COMMIT_SNAPSHOT
1876                     if (message instanceof SaveSnapshotSuccess || "commit_snapshot".equals(message.toString())) {
1877                         latch.get().countDown();
1878                     }
1879                 }
1880
1881                 @Override
1882                 public RaftActorContext getRaftActorContext() {
1883                     return super.getRaftActorContext();
1884                 }
1885             }
1886
1887             final Creator<Shard> creator = () -> new TestShard(newShardBuilder());
1888
1889             final TestActorRef<Shard> shard = actorFactory.createTestActor(
1890                     Props.create(new DelegatingShardCreator(creator)).
1891                         withDispatcher(Dispatchers.DefaultDispatcherId()), shardActorName);
1892
1893             waitUntilLeader(shard);
1894             writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1895
1896             final NormalizedNode<?,?> expectedRoot = readStore(shard, YangInstanceIdentifier.EMPTY);
1897
1898             // Trigger creation of a snapshot by ensuring
1899             final RaftActorContext raftActorContext = ((TestShard) shard.underlyingActor()).getRaftActorContext();
1900             raftActorContext.getSnapshotManager().capture(mock(ReplicatedLogEntry.class), -1);
1901             awaitAndValidateSnapshot(expectedRoot);
1902
1903             raftActorContext.getSnapshotManager().capture(mock(ReplicatedLogEntry.class), -1);
1904             awaitAndValidateSnapshot(expectedRoot);
1905         }
1906
1907         private void awaitAndValidateSnapshot(final NormalizedNode<?,?> expectedRoot) throws InterruptedException, IOException {
1908             assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS));
1909
1910             assertTrue("Invalid saved snapshot " + savedSnapshot.get(),
1911                     savedSnapshot.get() instanceof Snapshot);
1912
1913             verifySnapshot((Snapshot)savedSnapshot.get(), expectedRoot);
1914
1915             latch.set(new CountDownLatch(1));
1916             savedSnapshot.set(null);
1917         }
1918
1919         private void verifySnapshot(final Snapshot snapshot, final NormalizedNode<?,?> expectedRoot) throws IOException {
1920             final NormalizedNode<?, ?> actual = ShardDataTreeSnapshot.deserialize(snapshot.getState()).getRootNode().get();
1921             assertEquals("Root node", expectedRoot, actual);
1922         }};
1923     }
1924
1925     /**
1926      * This test simply verifies that the applySnapShot logic will work
1927      * @throws ReadFailedException
1928      * @throws DataValidationFailedException
1929      */
1930     @Test
1931     public void testInMemoryDataTreeRestore() throws ReadFailedException, DataValidationFailedException {
1932         final DataTree store = InMemoryDataTreeFactory.getInstance().create(TreeType.OPERATIONAL);
1933         store.setSchemaContext(SCHEMA_CONTEXT);
1934
1935         final DataTreeModification putTransaction = store.takeSnapshot().newModification();
1936         putTransaction.write(TestModel.TEST_PATH,
1937             ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1938         commitTransaction(store, putTransaction);
1939
1940
1941         final NormalizedNode<?, ?> expected = readStore(store, YangInstanceIdentifier.EMPTY);
1942
1943         final DataTreeModification writeTransaction = store.takeSnapshot().newModification();
1944
1945         writeTransaction.delete(YangInstanceIdentifier.EMPTY);
1946         writeTransaction.write(YangInstanceIdentifier.EMPTY, expected);
1947
1948         commitTransaction(store, writeTransaction);
1949
1950         final NormalizedNode<?, ?> actual = readStore(store, YangInstanceIdentifier.EMPTY);
1951
1952         assertEquals(expected, actual);
1953     }
1954
1955     @Test
1956     public void testRecoveryApplicable(){
1957
1958         final DatastoreContext persistentContext = DatastoreContext.newBuilder().
1959                 shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(true).build();
1960
1961         final Props persistentProps = Shard.builder().id(shardID).datastoreContext(persistentContext).
1962                 schemaContext(SCHEMA_CONTEXT).props();
1963
1964         final DatastoreContext nonPersistentContext = DatastoreContext.newBuilder().
1965                 shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(false).build();
1966
1967         final Props nonPersistentProps = Shard.builder().id(shardID).datastoreContext(nonPersistentContext).
1968                 schemaContext(SCHEMA_CONTEXT).props();
1969
1970         new ShardTestKit(getSystem()) {{
1971             final TestActorRef<Shard> shard1 = actorFactory.createTestActor(persistentProps, "testPersistence1");
1972
1973             assertTrue("Recovery Applicable", shard1.underlyingActor().persistence().isRecoveryApplicable());
1974
1975             final TestActorRef<Shard> shard2 = actorFactory.createTestActor(nonPersistentProps, "testPersistence2");
1976
1977             assertFalse("Recovery Not Applicable", shard2.underlyingActor().persistence().isRecoveryApplicable());
1978         }};
1979     }
1980
1981     @Test
1982     public void testOnDatastoreContext() {
1983         new ShardTestKit(getSystem()) {{
1984             dataStoreContextBuilder.persistent(true);
1985
1986             final TestActorRef<Shard> shard = actorFactory.createTestActor(newShardProps(), "testOnDatastoreContext");
1987
1988             assertEquals("isRecoveryApplicable", true,
1989                     shard.underlyingActor().persistence().isRecoveryApplicable());
1990
1991             waitUntilLeader(shard);
1992
1993             shard.tell(dataStoreContextBuilder.persistent(false).build(), ActorRef.noSender());
1994
1995             assertEquals("isRecoveryApplicable", false,
1996                 shard.underlyingActor().persistence().isRecoveryApplicable());
1997
1998             shard.tell(dataStoreContextBuilder.persistent(true).build(), ActorRef.noSender());
1999
2000             assertEquals("isRecoveryApplicable", true,
2001                 shard.underlyingActor().persistence().isRecoveryApplicable());
2002         }};
2003     }
2004
2005     @Test
2006     public void testRegisterRoleChangeListener() throws Exception {
2007         new ShardTestKit(getSystem()) {
2008             {
2009                 final TestActorRef<Shard> shard = actorFactory.createTestActor(
2010                         newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
2011                         "testRegisterRoleChangeListener");
2012
2013                 waitUntilLeader(shard);
2014
2015                 final TestActorRef<MessageCollectorActor> listener =
2016                         TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
2017
2018                 shard.tell(new RegisterRoleChangeListener(), listener);
2019
2020                 MessageCollectorActor.expectFirstMatching(listener, RegisterRoleChangeListenerReply.class);
2021
2022                 ShardLeaderStateChanged leaderStateChanged = MessageCollectorActor.expectFirstMatching(listener,
2023                     ShardLeaderStateChanged.class);
2024                 assertEquals("getLocalShardDataTree present", true,
2025                         leaderStateChanged.getLocalShardDataTree().isPresent());
2026                 assertSame("getLocalShardDataTree", shard.underlyingActor().getDataStore().getDataTree(),
2027                     leaderStateChanged.getLocalShardDataTree().get());
2028
2029                 MessageCollectorActor.clearMessages(listener);
2030
2031                 // Force a leader change
2032
2033                 shard.tell(new RequestVote(10000, "member2", 50, 50), getRef());
2034
2035                 leaderStateChanged = MessageCollectorActor.expectFirstMatching(listener,
2036                         ShardLeaderStateChanged.class);
2037                 assertEquals("getLocalShardDataTree present", false,
2038                         leaderStateChanged.getLocalShardDataTree().isPresent());
2039             }
2040         };
2041     }
2042
2043     @Test
2044     public void testFollowerInitialSyncStatus() throws Exception {
2045         final TestActorRef<Shard> shard = actorFactory.createTestActor(
2046                 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
2047                 "testFollowerInitialSyncStatus");
2048
2049         shard.underlyingActor().handleNonRaftCommand(new FollowerInitialSyncUpStatus(false, "member-1-shard-inventory-operational"));
2050
2051         assertEquals(false, shard.underlyingActor().getShardMBean().getFollowerInitialSyncStatus());
2052
2053         shard.underlyingActor().handleNonRaftCommand(new FollowerInitialSyncUpStatus(true, "member-1-shard-inventory-operational"));
2054
2055         assertEquals(true, shard.underlyingActor().getShardMBean().getFollowerInitialSyncStatus());
2056     }
2057
2058     @Test
2059     public void testClusteredDataChangeListenerDelayedRegistration() throws Exception {
2060         new ShardTestKit(getSystem()) {{
2061             final String testName = "testClusteredDataChangeListenerDelayedRegistration";
2062             dataStoreContextBuilder.shardElectionTimeoutFactor(1000).
2063                     customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName());
2064
2065             final YangInstanceIdentifier path = TestModel.TEST_PATH;
2066             final MockDataChangeListener listener = new MockDataChangeListener(1);
2067             final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener, path),
2068                     actorFactory.generateActorId(testName + "-DataChangeListener"));
2069
2070             setupInMemorySnapshotStore();
2071
2072             final TestActorRef<Shard> shard = actorFactory.createTestActor(
2073                     newShardBuilder().props().withDispatcher(Dispatchers.DefaultDispatcherId()),
2074                     actorFactory.generateActorId(testName + "-shard"));
2075
2076             waitUntilNoLeader(shard);
2077
2078             shard.tell(new RegisterChangeListener(path, dclActor, AsyncDataBroker.DataChangeScope.BASE, true), getRef());
2079             final RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
2080                 RegisterChangeListenerReply.class);
2081             assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
2082
2083             shard.tell(DatastoreContext.newBuilderFrom(dataStoreContextBuilder.build()).
2084                     customRaftPolicyImplementation(null).build(), ActorRef.noSender());
2085
2086             listener.waitForChangeEvents();
2087         }};
2088     }
2089
2090     @Test
2091     public void testClusteredDataChangeListenerRegistration() throws Exception {
2092         new ShardTestKit(getSystem()) {{
2093             final String testName = "testClusteredDataChangeListenerRegistration";
2094             final ShardIdentifier followerShardID = ShardIdentifier.create("inventory",
2095                     MemberName.forName(actorFactory.generateActorId(testName + "-follower")), "config");
2096
2097             final ShardIdentifier leaderShardID = ShardIdentifier.create("inventory",
2098                     MemberName.forName(actorFactory.generateActorId(testName + "-leader")), "config");
2099
2100             final TestActorRef<Shard> followerShard = actorFactory.createTestActor(
2101                     Shard.builder().id(followerShardID).
2102                         datastoreContext(dataStoreContextBuilder.shardElectionTimeoutFactor(1000).build()).
2103                         peerAddresses(Collections.singletonMap(leaderShardID.toString(),
2104                             "akka://test/user/" + leaderShardID.toString())).schemaContext(SCHEMA_CONTEXT).props().
2105                     withDispatcher(Dispatchers.DefaultDispatcherId()), followerShardID.toString());
2106
2107             final TestActorRef<Shard> leaderShard = actorFactory.createTestActor(
2108                     Shard.builder().id(leaderShardID).datastoreContext(newDatastoreContext()).
2109                         peerAddresses(Collections.singletonMap(followerShardID.toString(),
2110                             "akka://test/user/" + followerShardID.toString())).schemaContext(SCHEMA_CONTEXT).props().
2111                     withDispatcher(Dispatchers.DefaultDispatcherId()), leaderShardID.toString());
2112
2113             leaderShard.tell(TimeoutNow.INSTANCE, ActorRef.noSender());
2114             final String leaderPath = waitUntilLeader(followerShard);
2115             assertEquals("Shard leader path", leaderShard.path().toString(), leaderPath);
2116
2117             final YangInstanceIdentifier path = TestModel.TEST_PATH;
2118             final MockDataChangeListener listener = new MockDataChangeListener(1);
2119             final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener, path),
2120                     actorFactory.generateActorId(testName + "-DataChangeListener"));
2121
2122             followerShard.tell(new RegisterChangeListener(path, dclActor, AsyncDataBroker.DataChangeScope.BASE, true), getRef());
2123             final RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
2124                 RegisterChangeListenerReply.class);
2125             assertNotNull("getListenerRegistratioznPath", reply.getListenerRegistrationPath());
2126
2127             writeToStore(followerShard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
2128
2129             listener.waitForChangeEvents();
2130         }};
2131     }
2132
2133     @Test
2134     public void testClusteredDataTreeChangeListenerDelayedRegistration() throws Exception {
2135         new ShardTestKit(getSystem()) {{
2136             final String testName = "testClusteredDataTreeChangeListenerDelayedRegistration";
2137             dataStoreContextBuilder.shardElectionTimeoutFactor(1000).
2138                     customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName());
2139
2140             final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
2141             final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener,
2142                     TestModel.TEST_PATH), actorFactory.generateActorId(testName + "-DataTreeChangeListener"));
2143
2144             setupInMemorySnapshotStore();
2145
2146             final TestActorRef<Shard> shard = actorFactory.createTestActor(
2147                     newShardBuilder().props().withDispatcher(Dispatchers.DefaultDispatcherId()),
2148                     actorFactory.generateActorId(testName + "-shard"));
2149
2150             waitUntilNoLeader(shard);
2151
2152             shard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor, true), getRef());
2153             final RegisterDataTreeChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
2154                     RegisterDataTreeChangeListenerReply.class);
2155             assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
2156
2157             shard.tell(DatastoreContext.newBuilderFrom(dataStoreContextBuilder.build()).
2158                     customRaftPolicyImplementation(null).build(), ActorRef.noSender());
2159
2160             listener.waitForChangeEvents();
2161         }};
2162     }
2163
2164     @Test
2165     public void testClusteredDataTreeChangeListenerRegistration() throws Exception {
2166         new ShardTestKit(getSystem()) {{
2167             final String testName = "testClusteredDataTreeChangeListenerRegistration";
2168             final ShardIdentifier followerShardID = ShardIdentifier.create("inventory",
2169                     MemberName.forName(actorFactory.generateActorId(testName + "-follower")), "config");
2170
2171             final ShardIdentifier leaderShardID = ShardIdentifier.create("inventory",
2172                     MemberName.forName(actorFactory.generateActorId(testName + "-leader")), "config");
2173
2174             final TestActorRef<Shard> followerShard = actorFactory.createTestActor(
2175                     Shard.builder().id(followerShardID).
2176                         datastoreContext(dataStoreContextBuilder.shardElectionTimeoutFactor(1000).build()).
2177                         peerAddresses(Collections.singletonMap(leaderShardID.toString(),
2178                             "akka://test/user/" + leaderShardID.toString())).schemaContext(SCHEMA_CONTEXT).props().
2179                     withDispatcher(Dispatchers.DefaultDispatcherId()), followerShardID.toString());
2180
2181             final TestActorRef<Shard> leaderShard = actorFactory.createTestActor(
2182                     Shard.builder().id(leaderShardID).datastoreContext(newDatastoreContext()).
2183                         peerAddresses(Collections.singletonMap(followerShardID.toString(),
2184                             "akka://test/user/" + followerShardID.toString())).schemaContext(SCHEMA_CONTEXT).props().
2185                     withDispatcher(Dispatchers.DefaultDispatcherId()), leaderShardID.toString());
2186
2187             leaderShard.tell(TimeoutNow.INSTANCE, ActorRef.noSender());
2188             final String leaderPath = waitUntilLeader(followerShard);
2189             assertEquals("Shard leader path", leaderShard.path().toString(), leaderPath);
2190
2191             final YangInstanceIdentifier path = TestModel.TEST_PATH;
2192             final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
2193             final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener, path),
2194                     actorFactory.generateActorId(testName + "-DataTreeChangeListener"));
2195
2196             followerShard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor, true), getRef());
2197             final RegisterDataTreeChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
2198                     RegisterDataTreeChangeListenerReply.class);
2199             assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
2200
2201             writeToStore(followerShard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
2202
2203             listener.waitForChangeEvents();
2204         }};
2205     }
2206
2207     @Test
2208     public void testServerRemoved() throws Exception {
2209         final TestActorRef<MessageCollectorActor> parent = actorFactory.createTestActor(MessageCollectorActor.props());
2210
2211         final ActorRef shard = parent.underlyingActor().context().actorOf(
2212                 newShardBuilder().props().withDispatcher(Dispatchers.DefaultDispatcherId()),
2213                 "testServerRemoved");
2214
2215         shard.tell(new ServerRemoved("test"), ActorRef.noSender());
2216
2217         MessageCollectorActor.expectFirstMatching(parent, ServerRemoved.class);
2218     }
2219 }