Deprecate 3PC protobuff messages
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / ShardTest.java
1 /*
2  * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertFalse;
13 import static org.junit.Assert.assertNotNull;
14 import static org.junit.Assert.assertNull;
15 import static org.junit.Assert.assertSame;
16 import static org.junit.Assert.assertTrue;
17 import static org.mockito.Mockito.doReturn;
18 import static org.mockito.Mockito.inOrder;
19 import static org.mockito.Mockito.mock;
20 import static org.mockito.Mockito.reset;
21 import static org.mockito.Mockito.verify;
22 import static org.opendaylight.controller.cluster.datastore.DataStoreVersions.CURRENT_VERSION;
23 import akka.actor.ActorRef;
24 import akka.actor.ActorSelection;
25 import akka.actor.Props;
26 import akka.actor.Status.Failure;
27 import akka.dispatch.Dispatchers;
28 import akka.dispatch.OnComplete;
29 import akka.japi.Creator;
30 import akka.pattern.Patterns;
31 import akka.persistence.SaveSnapshotSuccess;
32 import akka.testkit.TestActorRef;
33 import akka.util.Timeout;
34 import com.google.common.base.Function;
35 import com.google.common.util.concurrent.Futures;
36 import com.google.common.util.concurrent.ListenableFuture;
37 import com.google.common.util.concurrent.Uninterruptibles;
38 import java.util.Collections;
39 import java.util.HashSet;
40 import java.util.Set;
41 import java.util.concurrent.CountDownLatch;
42 import java.util.concurrent.TimeUnit;
43 import java.util.concurrent.atomic.AtomicBoolean;
44 import java.util.concurrent.atomic.AtomicReference;
45 import org.junit.Test;
46 import org.mockito.InOrder;
47 import org.opendaylight.controller.cluster.DataPersistenceProvider;
48 import org.opendaylight.controller.cluster.DelegatingPersistentDataProvider;
49 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
50 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
51 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
52 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
53 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
54 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
55 import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
56 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
57 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
58 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
59 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
60 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
61 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
62 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
63 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
64 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
65 import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction;
66 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
67 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
68 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
69 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener;
70 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListenerReply;
71 import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
72 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
73 import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
74 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
75 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
76 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
77 import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener;
78 import org.opendaylight.controller.cluster.datastore.utils.MockDataTreeChangeListener;
79 import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
80 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
81 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListenerReply;
82 import org.opendaylight.controller.cluster.raft.RaftActorContext;
83 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
84 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
85 import org.opendaylight.controller.cluster.raft.Snapshot;
86 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
87 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
88 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
89 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
90 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
91 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
92 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
93 import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
94 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
95 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
96 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
97 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
98 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
99 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
100 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
101 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
102 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
103 import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
104 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
105 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
106 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateNode;
107 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateTip;
108 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
109 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
110 import org.opendaylight.yangtools.yang.data.api.schema.tree.ModificationType;
111 import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType;
112 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
113 import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
114 import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory;
115 import scala.concurrent.Await;
116 import scala.concurrent.Future;
117 import scala.concurrent.duration.FiniteDuration;
118
119 public class ShardTest extends AbstractShardTest {
120     private static final String DUMMY_DATA = "Dummy data as snapshot sequence number is set to 0 in InMemorySnapshotStore and journal recovery seq number will start from 1";
121
122     @Test
123     public void testRegisterChangeListener() throws Exception {
124         new ShardTestKit(getSystem()) {{
125             final TestActorRef<Shard> shard = actorFactory.createTestActor(
126                     newShardProps(),  "testRegisterChangeListener");
127
128             waitUntilLeader(shard);
129
130             shard.tell(new UpdateSchemaContext(SchemaContextHelper.full()), ActorRef.noSender());
131
132             final MockDataChangeListener listener = new MockDataChangeListener(1);
133             final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener),
134                     "testRegisterChangeListener-DataChangeListener");
135
136             shard.tell(new RegisterChangeListener(TestModel.TEST_PATH,
137                     dclActor, AsyncDataBroker.DataChangeScope.BASE, true), getRef());
138
139             final RegisterChangeListenerReply reply = expectMsgClass(duration("3 seconds"),
140                     RegisterChangeListenerReply.class);
141             final String replyPath = reply.getListenerRegistrationPath().toString();
142             assertTrue("Incorrect reply path: " + replyPath, replyPath.matches(
143                     "akka:\\/\\/test\\/user\\/testRegisterChangeListener\\/\\$.*"));
144
145             final YangInstanceIdentifier path = TestModel.TEST_PATH;
146             writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
147
148             listener.waitForChangeEvents(path);
149         }};
150     }
151
152     @SuppressWarnings("serial")
153     @Test
154     public void testChangeListenerNotifiedWhenNotTheLeaderOnRegistration() throws Exception {
155         // This test tests the timing window in which a change listener is registered before the
156         // shard becomes the leader. We verify that the listener is registered and notified of the
157         // existing data when the shard becomes the leader.
158         new ShardTestKit(getSystem()) {{
159             // For this test, we want to send the RegisterChangeListener message after the shard
160             // has recovered from persistence and before it becomes the leader. So we subclass
161             // Shard to override onReceiveCommand and, when the first ElectionTimeout is received,
162             // we know that the shard has been initialized to a follower and has started the
163             // election process. The following 2 CountDownLatches are used to coordinate the
164             // ElectionTimeout with the sending of the RegisterChangeListener message.
165             final CountDownLatch onFirstElectionTimeout = new CountDownLatch(1);
166             final CountDownLatch onChangeListenerRegistered = new CountDownLatch(1);
167             final Creator<Shard> creator = new Creator<Shard>() {
168                 boolean firstElectionTimeout = true;
169
170                 @Override
171                 public Shard create() throws Exception {
172                     // Use a non persistent provider because this test actually invokes persist on the journal
173                     // this will cause all other messages to not be queued properly after that.
174                     // The basic issue is that you cannot use TestActorRef with a persistent actor (at least when
175                     // it does do a persist)
176                     return new Shard(newShardBuilder()) {
177                         @Override
178                         public void onReceiveCommand(final Object message) throws Exception {
179                             if(message instanceof ElectionTimeout && firstElectionTimeout) {
180                                 // Got the first ElectionTimeout. We don't forward it to the
181                                 // base Shard yet until we've sent the RegisterChangeListener
182                                 // message. So we signal the onFirstElectionTimeout latch to tell
183                                 // the main thread to send the RegisterChangeListener message and
184                                 // start a thread to wait on the onChangeListenerRegistered latch,
185                                 // which the main thread signals after it has sent the message.
186                                 // After the onChangeListenerRegistered is triggered, we send the
187                                 // original ElectionTimeout message to proceed with the election.
188                                 firstElectionTimeout = false;
189                                 final ActorRef self = getSelf();
190                                 new Thread() {
191                                     @Override
192                                     public void run() {
193                                         Uninterruptibles.awaitUninterruptibly(
194                                                 onChangeListenerRegistered, 5, TimeUnit.SECONDS);
195                                         self.tell(message, self);
196                                     }
197                                 }.start();
198
199                                 onFirstElectionTimeout.countDown();
200                             } else {
201                                 super.onReceiveCommand(message);
202                             }
203                         }
204                     };
205                 }
206             };
207
208             final MockDataChangeListener listener = new MockDataChangeListener(1);
209             final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener),
210                     "testRegisterChangeListenerWhenNotLeaderInitially-DataChangeListener");
211
212             final TestActorRef<Shard> shard = actorFactory.createTestActor(
213                     Props.create(new DelegatingShardCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()),
214                     "testRegisterChangeListenerWhenNotLeaderInitially");
215
216             // Write initial data into the in-memory store.
217             final YangInstanceIdentifier path = TestModel.TEST_PATH;
218             writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
219
220             // Wait until the shard receives the first ElectionTimeout message.
221             assertEquals("Got first ElectionTimeout", true,
222                     onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
223
224             // Now send the RegisterChangeListener and wait for the reply.
225             shard.tell(new RegisterChangeListener(path, dclActor,
226                     AsyncDataBroker.DataChangeScope.SUBTREE, false), getRef());
227
228             final RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
229                     RegisterChangeListenerReply.class);
230             assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
231
232             // Sanity check - verify the shard is not the leader yet.
233             shard.tell(new FindLeader(), getRef());
234             final FindLeaderReply findLeadeReply =
235                     expectMsgClass(duration("5 seconds"), FindLeaderReply.class);
236             assertNull("Expected the shard not to be the leader", findLeadeReply.getLeaderActor());
237
238             // Signal the onChangeListenerRegistered latch to tell the thread above to proceed
239             // with the election process.
240             onChangeListenerRegistered.countDown();
241
242             // Wait for the shard to become the leader and notify our listener with the existing
243             // data in the store.
244             listener.waitForChangeEvents(path);
245         }};
246     }
247
248     @Test
249     public void testRegisterDataTreeChangeListener() throws Exception {
250         new ShardTestKit(getSystem()) {{
251             final TestActorRef<Shard> shard = actorFactory.createTestActor(
252                     newShardProps(), "testRegisterDataTreeChangeListener");
253
254             waitUntilLeader(shard);
255
256             shard.tell(new UpdateSchemaContext(SchemaContextHelper.full()), ActorRef.noSender());
257
258             final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
259             final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener),
260                     "testRegisterDataTreeChangeListener-DataTreeChangeListener");
261
262             shard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor, false), getRef());
263
264             final RegisterDataTreeChangeListenerReply reply = expectMsgClass(duration("3 seconds"),
265                     RegisterDataTreeChangeListenerReply.class);
266             final String replyPath = reply.getListenerRegistrationPath().toString();
267             assertTrue("Incorrect reply path: " + replyPath, replyPath.matches(
268                     "akka:\\/\\/test\\/user\\/testRegisterDataTreeChangeListener\\/\\$.*"));
269
270             final YangInstanceIdentifier path = TestModel.TEST_PATH;
271             writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
272
273             listener.waitForChangeEvents();
274         }};
275     }
276
277     @SuppressWarnings("serial")
278     @Test
279     public void testDataTreeChangeListenerNotifiedWhenNotTheLeaderOnRegistration() throws Exception {
280         new ShardTestKit(getSystem()) {{
281             final CountDownLatch onFirstElectionTimeout = new CountDownLatch(1);
282             final CountDownLatch onChangeListenerRegistered = new CountDownLatch(1);
283             final Creator<Shard> creator = new Creator<Shard>() {
284                 boolean firstElectionTimeout = true;
285
286                 @Override
287                 public Shard create() throws Exception {
288                     return new Shard(Shard.builder().id(shardID).datastoreContext(
289                             dataStoreContextBuilder.persistent(false).build()).schemaContext(SCHEMA_CONTEXT)) {
290                         @Override
291                         public void onReceiveCommand(final Object message) throws Exception {
292                             if(message instanceof ElectionTimeout && firstElectionTimeout) {
293                                 firstElectionTimeout = false;
294                                 final ActorRef self = getSelf();
295                                 new Thread() {
296                                     @Override
297                                     public void run() {
298                                         Uninterruptibles.awaitUninterruptibly(
299                                                 onChangeListenerRegistered, 5, TimeUnit.SECONDS);
300                                         self.tell(message, self);
301                                     }
302                                 }.start();
303
304                                 onFirstElectionTimeout.countDown();
305                             } else {
306                                 super.onReceiveCommand(message);
307                             }
308                         }
309                     };
310                 }
311             };
312
313             final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
314             final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener),
315                     "testDataTreeChangeListenerNotifiedWhenNotTheLeaderOnRegistration-DataChangeListener");
316
317             final TestActorRef<Shard> shard = actorFactory.createTestActor(
318                     Props.create(new DelegatingShardCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()),
319                     "testDataTreeChangeListenerNotifiedWhenNotTheLeaderOnRegistration");
320
321             final YangInstanceIdentifier path = TestModel.TEST_PATH;
322             writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
323
324             assertEquals("Got first ElectionTimeout", true,
325                 onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
326
327             shard.tell(new RegisterDataTreeChangeListener(path, dclActor, false), getRef());
328             final RegisterDataTreeChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
329                 RegisterDataTreeChangeListenerReply.class);
330             assertNotNull("getListenerRegistratioznPath", reply.getListenerRegistrationPath());
331
332             shard.tell(new FindLeader(), getRef());
333             final FindLeaderReply findLeadeReply =
334                     expectMsgClass(duration("5 seconds"), FindLeaderReply.class);
335             assertNull("Expected the shard not to be the leader", findLeadeReply.getLeaderActor());
336
337             writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
338
339             onChangeListenerRegistered.countDown();
340
341             // TODO: investigate why we do not receive data chage events
342             listener.waitForChangeEvents();
343         }};
344     }
345
346     @Test
347     public void testCreateTransaction(){
348         new ShardTestKit(getSystem()) {{
349             final ActorRef shard = actorFactory.createActor(newShardProps(), "testCreateTransaction");
350
351             waitUntilLeader(shard);
352
353             shard.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
354
355             shard.tell(new CreateTransaction("txn-1", TransactionType.READ_ONLY.ordinal(), null,
356                     DataStoreVersions.CURRENT_VERSION).toSerializable(), getRef());
357
358             final CreateTransactionReply reply = expectMsgClass(duration("3 seconds"),
359                     CreateTransactionReply.class);
360
361             final String path = reply.getTransactionPath().toString();
362             assertTrue("Unexpected transaction path " + path,
363                     path.contains("akka://test/user/testCreateTransaction/shard-txn-1"));
364         }};
365     }
366
367     @Test
368     public void testCreateTransactionOnChain(){
369         new ShardTestKit(getSystem()) {{
370             final ActorRef shard = actorFactory.createActor(newShardProps(), "testCreateTransactionOnChain");
371
372             waitUntilLeader(shard);
373
374             shard.tell(new CreateTransaction("txn-1",TransactionType.READ_ONLY.ordinal(), "foobar",
375                     DataStoreVersions.CURRENT_VERSION).toSerializable(), getRef());
376
377             final CreateTransactionReply reply = expectMsgClass(duration("3 seconds"),
378                     CreateTransactionReply.class);
379
380             final String path = reply.getTransactionPath().toString();
381             assertTrue("Unexpected transaction path " + path,
382                     path.contains("akka://test/user/testCreateTransactionOnChain/shard-txn-1"));
383         }};
384     }
385
386     @SuppressWarnings("serial")
387     @Test
388     public void testPeerAddressResolved() throws Exception {
389         new ShardTestKit(getSystem()) {{
390             final CountDownLatch recoveryComplete = new CountDownLatch(1);
391             class TestShard extends Shard {
392                 TestShard() {
393                     super(Shard.builder().id(shardID).datastoreContext(newDatastoreContext()).
394                             peerAddresses(Collections.<String, String>singletonMap(shardID.toString(), null)).
395                             schemaContext(SCHEMA_CONTEXT));
396                 }
397
398                 String getPeerAddress(String id) {
399                     return getRaftActorContext().getPeerAddress(id);
400                 }
401
402                 @Override
403                 protected void onRecoveryComplete() {
404                     try {
405                         super.onRecoveryComplete();
406                     } finally {
407                         recoveryComplete.countDown();
408                     }
409                 }
410             }
411
412             final TestActorRef<Shard> shard = actorFactory.createTestActor(
413                     Props.create(new DelegatingShardCreator(new Creator<Shard>() {
414                         @Override
415                         public TestShard create() throws Exception {
416                             return new TestShard();
417                         }
418                     })), "testPeerAddressResolved");
419
420             assertEquals("Recovery complete", true,
421                 Uninterruptibles.awaitUninterruptibly(recoveryComplete, 5, TimeUnit.SECONDS));
422
423             final String address = "akka://foobar";
424             shard.underlyingActor().onReceiveCommand(new PeerAddressResolved(shardID.toString(), address));
425
426             assertEquals("getPeerAddress", address,
427                 ((TestShard) shard.underlyingActor()).getPeerAddress(shardID.toString()));
428         }};
429     }
430
431     @Test
432     public void testApplySnapshot() throws Exception {
433
434         final TestActorRef<Shard> shard = actorFactory.createTestActor(newShardProps(), "testApplySnapshot");
435
436         ShardTestKit.waitUntilLeader(shard);
437
438         final DataTree store = InMemoryDataTreeFactory.getInstance().create(TreeType.OPERATIONAL);
439         store.setSchemaContext(SCHEMA_CONTEXT);
440
441         final ContainerNode container = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
442                 new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
443                     withChild(ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).addChild(
444                         ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1)).build()).build();
445
446         writeToStore(store, TestModel.TEST_PATH, container);
447
448         final YangInstanceIdentifier root = YangInstanceIdentifier.builder().build();
449         final NormalizedNode<?,?> expected = readStore(store, root);
450
451         final Snapshot snapshot = Snapshot.create(SerializationUtils.serializeNormalizedNode(expected),
452                 Collections.<ReplicatedLogEntry>emptyList(), 1, 2, 3, 4);
453
454         shard.underlyingActor().getRaftActorSnapshotCohort().applySnapshot(snapshot.getState());
455
456         final NormalizedNode<?,?> actual = readStore(shard, root);
457
458         assertEquals("Root node", expected, actual);
459     }
460
461     @Test
462     public void testApplyState() throws Exception {
463         final TestActorRef<Shard> shard = actorFactory.createTestActor(newShardProps(), "testApplyState");
464
465         ShardTestKit.waitUntilLeader(shard);
466
467         final DataTree source = setupInMemorySnapshotStore();
468         final DataTreeModification writeMod = source.takeSnapshot().newModification();
469         ContainerNode node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
470         writeMod.write(TestModel.TEST_PATH, node);
471         writeMod.ready();
472
473         final ApplyState applyState = new ApplyState(null, "test", new ReplicatedLogImplEntry(1, 2,
474                 payloadForModification(source, writeMod)));
475
476         shard.underlyingActor().onReceiveCommand(applyState);
477
478         final NormalizedNode<?,?> actual = readStore(shard, TestModel.TEST_PATH);
479         assertEquals("Applied state", node, actual);
480     }
481
482     @Test
483     public void testDataTreeCandidateRecovery() throws Exception {
484         // Set up the InMemorySnapshotStore.
485         final DataTree source = setupInMemorySnapshotStore();
486
487         final DataTreeModification writeMod = source.takeSnapshot().newModification();
488         writeMod.write(TestModel.OUTER_LIST_PATH, ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
489         writeMod.ready();
490         InMemoryJournal.addEntry(shardID.toString(), 0, DUMMY_DATA);
491
492         // Set up the InMemoryJournal.
493         InMemoryJournal.addEntry(shardID.toString(), 1, new ReplicatedLogImplEntry(0, 1, payloadForModification(source, writeMod)));
494
495         final int nListEntries = 16;
496         final Set<Integer> listEntryKeys = new HashSet<>();
497
498         // Add some ModificationPayload entries
499         for (int i = 1; i <= nListEntries; i++) {
500             listEntryKeys.add(Integer.valueOf(i));
501
502             final YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
503                     .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
504
505             final DataTreeModification mod = source.takeSnapshot().newModification();
506             mod.merge(path, ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i));
507             mod.ready();
508             InMemoryJournal.addEntry(shardID.toString(), i+1, new ReplicatedLogImplEntry(i, 1,
509                 payloadForModification(source, mod)));
510         }
511
512         InMemoryJournal.addEntry(shardID.toString(), nListEntries + 2,
513             new ApplyJournalEntries(nListEntries));
514
515         testRecovery(listEntryKeys);
516     }
517
518     @Test
519     public void testConcurrentThreePhaseCommits() throws Throwable {
520         new ShardTestKit(getSystem()) {{
521             final TestActorRef<Shard> shard = actorFactory.createTestActor(
522                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
523                     "testConcurrentThreePhaseCommits");
524
525             waitUntilLeader(shard);
526
527             // Setup 3 simulated transactions with mock cohorts backed by real cohorts.
528
529             final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
530
531             final String transactionID1 = "tx1";
532             final MutableCompositeModification modification1 = new MutableCompositeModification();
533             final ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
534                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
535
536             final String transactionID2 = "tx2";
537             final MutableCompositeModification modification2 = new MutableCompositeModification();
538             final ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
539                     TestModel.OUTER_LIST_PATH,
540                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
541                     modification2);
542
543             final String transactionID3 = "tx3";
544             final MutableCompositeModification modification3 = new MutableCompositeModification();
545             final ShardDataTreeCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
546                     YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
547                         .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
548                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1),
549                     modification3);
550
551             final long timeoutSec = 5;
552             final FiniteDuration duration = FiniteDuration.create(timeoutSec, TimeUnit.SECONDS);
553             final Timeout timeout = new Timeout(duration);
554
555             shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort1, transactionID1, modification1), getRef());
556             final ReadyTransactionReply readyReply = ReadyTransactionReply.fromSerializable(
557                     expectMsgClass(duration, ReadyTransactionReply.class));
558             assertEquals("Cohort path", shard.path().toString(), readyReply.getCohortPath());
559
560             // Send the CanCommitTransaction message for the first Tx.
561
562             shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
563             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
564                     expectMsgClass(duration, CanCommitTransactionReply.class));
565             assertEquals("Can commit", true, canCommitReply.getCanCommit());
566
567             shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort2, transactionID2, modification2), getRef());
568             expectMsgClass(duration, ReadyTransactionReply.class);
569
570             shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort3, transactionID3, modification3), getRef());
571             expectMsgClass(duration, ReadyTransactionReply.class);
572
573             // Send the CanCommitTransaction message for the next 2 Tx's. These should get queued and
574             // processed after the first Tx completes.
575
576             final Future<Object> canCommitFuture1 = Patterns.ask(shard,
577                     new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), timeout);
578
579             final Future<Object> canCommitFuture2 = Patterns.ask(shard,
580                     new CanCommitTransaction(transactionID3, CURRENT_VERSION).toSerializable(), timeout);
581
582             // Send the CommitTransaction message for the first Tx. After it completes, it should
583             // trigger the 2nd Tx to proceed which should in turn then trigger the 3rd.
584
585             shard.tell(new CommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
586             expectMsgClass(duration, CommitTransactionReply.class);
587
588             // Wait for the next 2 Tx's to complete.
589
590             final AtomicReference<Throwable> caughtEx = new AtomicReference<>();
591             final CountDownLatch commitLatch = new CountDownLatch(2);
592
593             class OnFutureComplete extends OnComplete<Object> {
594                 private final Class<?> expRespType;
595
596                 OnFutureComplete(final Class<?> expRespType) {
597                     this.expRespType = expRespType;
598                 }
599
600                 @Override
601                 public void onComplete(final Throwable error, final Object resp) {
602                     if(error != null) {
603                         caughtEx.set(new AssertionError(getClass().getSimpleName() + " failure", error));
604                     } else {
605                         try {
606                             assertEquals("Commit response type", expRespType, resp.getClass());
607                             onSuccess(resp);
608                         } catch (final Exception e) {
609                             caughtEx.set(e);
610                         }
611                     }
612                 }
613
614                 void onSuccess(final Object resp) throws Exception {
615                 }
616             }
617
618             class OnCommitFutureComplete extends OnFutureComplete {
619                 OnCommitFutureComplete() {
620                     super(CommitTransactionReply.class);
621                 }
622
623                 @Override
624                 public void onComplete(final Throwable error, final Object resp) {
625                     super.onComplete(error, resp);
626                     commitLatch.countDown();
627                 }
628             }
629
630             class OnCanCommitFutureComplete extends OnFutureComplete {
631                 private final String transactionID;
632
633                 OnCanCommitFutureComplete(final String transactionID) {
634                     super(CanCommitTransactionReply.class);
635                     this.transactionID = transactionID;
636                 }
637
638                 @Override
639                 void onSuccess(final Object resp) throws Exception {
640                     final CanCommitTransactionReply canCommitReply =
641                             CanCommitTransactionReply.fromSerializable(resp);
642                     assertEquals("Can commit", true, canCommitReply.getCanCommit());
643
644                     final Future<Object> commitFuture = Patterns.ask(shard,
645                             new CommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), timeout);
646                     commitFuture.onComplete(new OnCommitFutureComplete(), getSystem().dispatcher());
647                 }
648             }
649
650             canCommitFuture1.onComplete(new OnCanCommitFutureComplete(transactionID2),
651                     getSystem().dispatcher());
652
653             canCommitFuture2.onComplete(new OnCanCommitFutureComplete(transactionID3),
654                     getSystem().dispatcher());
655
656             final boolean done = commitLatch.await(timeoutSec, TimeUnit.SECONDS);
657
658             if(caughtEx.get() != null) {
659                 throw caughtEx.get();
660             }
661
662             assertEquals("Commits complete", true, done);
663
664             final InOrder inOrder = inOrder(cohort1, cohort2, cohort3);
665             inOrder.verify(cohort1).canCommit();
666             inOrder.verify(cohort1).preCommit();
667             inOrder.verify(cohort1).commit();
668             inOrder.verify(cohort2).canCommit();
669             inOrder.verify(cohort2).preCommit();
670             inOrder.verify(cohort2).commit();
671             inOrder.verify(cohort3).canCommit();
672             inOrder.verify(cohort3).preCommit();
673             inOrder.verify(cohort3).commit();
674
675             // Verify data in the data store.
676
677             verifyOuterListEntry(shard, 1);
678
679             verifyLastApplied(shard, 2);
680         }};
681     }
682
683     @Test
684     public void testBatchedModificationsWithNoCommitOnReady() throws Throwable {
685         new ShardTestKit(getSystem()) {{
686             final TestActorRef<Shard> shard = actorFactory.createTestActor(
687                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
688                     "testBatchedModificationsWithNoCommitOnReady");
689
690             waitUntilLeader(shard);
691
692             final String transactionID = "tx";
693             final FiniteDuration duration = duration("5 seconds");
694
695             final AtomicReference<ShardDataTreeCohort> mockCohort = new AtomicReference<>();
696             final ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() {
697                 @Override
698                 public ShardDataTreeCohort decorate(final String txID, final ShardDataTreeCohort actual) {
699                     if(mockCohort.get() == null) {
700                         mockCohort.set(createDelegatingMockCohort("cohort", actual));
701                     }
702
703                     return mockCohort.get();
704                 }
705             };
706
707             shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator);
708
709             // Send a BatchedModifications to start a transaction.
710
711             shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH,
712                     ImmutableNodes.containerNode(TestModel.TEST_QNAME), false, false, 1), getRef());
713             expectMsgClass(duration, BatchedModificationsReply.class);
714
715             // Send a couple more BatchedModifications.
716
717             shard.tell(newBatchedModifications(transactionID, TestModel.OUTER_LIST_PATH,
718                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), false, false, 2), getRef());
719             expectMsgClass(duration, BatchedModificationsReply.class);
720
721             shard.tell(newBatchedModifications(transactionID, YangInstanceIdentifier.builder(
722                     TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
723                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true, false, 3), getRef());
724             expectMsgClass(duration, ReadyTransactionReply.class);
725
726             // Send the CanCommitTransaction message.
727
728             shard.tell(new CanCommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
729             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
730                     expectMsgClass(duration, CanCommitTransactionReply.class));
731             assertEquals("Can commit", true, canCommitReply.getCanCommit());
732
733             // Send the CanCommitTransaction message.
734
735             shard.tell(new CommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
736             expectMsgClass(duration, CommitTransactionReply.class);
737
738             final InOrder inOrder = inOrder(mockCohort.get());
739             inOrder.verify(mockCohort.get()).canCommit();
740             inOrder.verify(mockCohort.get()).preCommit();
741             inOrder.verify(mockCohort.get()).commit();
742
743             // Verify data in the data store.
744
745             verifyOuterListEntry(shard, 1);
746         }};
747     }
748
749     @Test
750     public void testBatchedModificationsWithCommitOnReady() throws Throwable {
751         new ShardTestKit(getSystem()) {{
752             final TestActorRef<Shard> shard = actorFactory.createTestActor(
753                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
754                     "testBatchedModificationsWithCommitOnReady");
755
756             waitUntilLeader(shard);
757
758             final String transactionID = "tx";
759             final FiniteDuration duration = duration("5 seconds");
760
761             final AtomicReference<ShardDataTreeCohort> mockCohort = new AtomicReference<>();
762             final ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() {
763                 @Override
764                 public ShardDataTreeCohort decorate(final String txID, final ShardDataTreeCohort actual) {
765                     if(mockCohort.get() == null) {
766                         mockCohort.set(createDelegatingMockCohort("cohort", actual));
767                     }
768
769                     return mockCohort.get();
770                 }
771             };
772
773             shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator);
774
775             // Send a BatchedModifications to start a transaction.
776
777             shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH,
778                     ImmutableNodes.containerNode(TestModel.TEST_QNAME), false, false, 1), getRef());
779             expectMsgClass(duration, BatchedModificationsReply.class);
780
781             // Send a couple more BatchedModifications.
782
783             shard.tell(newBatchedModifications(transactionID, TestModel.OUTER_LIST_PATH,
784                 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), false, false, 2), getRef());
785             expectMsgClass(duration, BatchedModificationsReply.class);
786
787             shard.tell(newBatchedModifications(transactionID, YangInstanceIdentifier.builder(
788                     TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
789                 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true, true, 3), getRef());
790
791             expectMsgClass(duration, CommitTransactionReply.class);
792
793             final InOrder inOrder = inOrder(mockCohort.get());
794             inOrder.verify(mockCohort.get()).canCommit();
795             inOrder.verify(mockCohort.get()).preCommit();
796             inOrder.verify(mockCohort.get()).commit();
797
798             // Verify data in the data store.
799
800             verifyOuterListEntry(shard, 1);
801         }};
802     }
803
804     @Test(expected=IllegalStateException.class)
805     public void testBatchedModificationsReadyWithIncorrectTotalMessageCount() throws Throwable {
806         new ShardTestKit(getSystem()) {{
807             final TestActorRef<Shard> shard = actorFactory.createTestActor(
808                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
809                     "testBatchedModificationsReadyWithIncorrectTotalMessageCount");
810
811             waitUntilLeader(shard);
812
813             final String transactionID = "tx1";
814             final BatchedModifications batched = new BatchedModifications(transactionID, DataStoreVersions.CURRENT_VERSION, null);
815             batched.setReady(true);
816             batched.setTotalMessagesSent(2);
817
818             shard.tell(batched, getRef());
819
820             final Failure failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
821
822             if(failure != null) {
823                 throw failure.cause();
824             }
825         }};
826     }
827
828     @Test
829     public void testBatchedModificationsWithOperationFailure() throws Throwable {
830         new ShardTestKit(getSystem()) {{
831             final TestActorRef<Shard> shard = actorFactory.createTestActor(
832                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
833                     "testBatchedModificationsWithOperationFailure");
834
835             waitUntilLeader(shard);
836
837             // Test merge with invalid data. An exception should occur when the merge is applied. Note that
838             // write will not validate the children for performance reasons.
839
840             String transactionID = "tx1";
841
842             ContainerNode invalidData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
843                     new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
844                         withChild(ImmutableNodes.leafNode(TestModel.JUNK_QNAME, "junk")).build();
845
846             BatchedModifications batched = new BatchedModifications(transactionID, CURRENT_VERSION, null);
847             batched.addModification(new MergeModification(TestModel.TEST_PATH, invalidData));
848             shard.tell(batched, getRef());
849             Failure failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
850
851             Throwable cause = failure.cause();
852
853             batched = new BatchedModifications(transactionID, DataStoreVersions.CURRENT_VERSION, null);
854             batched.setReady(true);
855             batched.setTotalMessagesSent(2);
856
857             shard.tell(batched, getRef());
858
859             failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
860             assertEquals("Failure cause", cause, failure.cause());
861         }};
862     }
863
864     @Test
865     public void testBatchedModificationsOnTransactionChain() throws Throwable {
866         new ShardTestKit(getSystem()) {{
867             final TestActorRef<Shard> shard = actorFactory.createTestActor(
868                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
869                     "testBatchedModificationsOnTransactionChain");
870
871             waitUntilLeader(shard);
872
873             final String transactionChainID = "txChain";
874             final String transactionID1 = "tx1";
875             final String transactionID2 = "tx2";
876
877             final FiniteDuration duration = duration("5 seconds");
878
879             // Send a BatchedModifications to start a chained write transaction and ready it.
880
881             final ContainerNode containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
882             final YangInstanceIdentifier path = TestModel.TEST_PATH;
883             shard.tell(newBatchedModifications(transactionID1, transactionChainID, path,
884                     containerNode, true, false, 1), getRef());
885             expectMsgClass(duration, ReadyTransactionReply.class);
886
887             // Create a read Tx on the same chain.
888
889             shard.tell(new CreateTransaction(transactionID2, TransactionType.READ_ONLY.ordinal(),
890                     transactionChainID, DataStoreVersions.CURRENT_VERSION).toSerializable(), getRef());
891
892             final CreateTransactionReply createReply = expectMsgClass(duration("3 seconds"), CreateTransactionReply.class);
893
894             getSystem().actorSelection(createReply.getTransactionPath()).tell(
895                     new ReadData(path, DataStoreVersions.CURRENT_VERSION), getRef());
896             final ReadDataReply readReply = expectMsgClass(duration("3 seconds"), ReadDataReply.class);
897             assertEquals("Read node", containerNode, readReply.getNormalizedNode());
898
899             // Commit the write transaction.
900
901             shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
902             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
903                     expectMsgClass(duration, CanCommitTransactionReply.class));
904             assertEquals("Can commit", true, canCommitReply.getCanCommit());
905
906             shard.tell(new CommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
907             expectMsgClass(duration, CommitTransactionReply.class);
908
909             // Verify data in the data store.
910
911             final NormalizedNode<?, ?> actualNode = readStore(shard, path);
912             assertEquals("Stored node", containerNode, actualNode);
913         }};
914     }
915
916     @Test
917     public void testOnBatchedModificationsWhenNotLeader() {
918         final AtomicBoolean overrideLeaderCalls = new AtomicBoolean();
919         new ShardTestKit(getSystem()) {{
920             final Creator<Shard> creator = new Creator<Shard>() {
921                 private static final long serialVersionUID = 1L;
922
923                 @Override
924                 public Shard create() throws Exception {
925                     return new Shard(newShardBuilder()) {
926                         @Override
927                         protected boolean isLeader() {
928                             return overrideLeaderCalls.get() ? false : super.isLeader();
929                         }
930
931                         @Override
932                         protected ActorSelection getLeader() {
933                             return overrideLeaderCalls.get() ? getSystem().actorSelection(getRef().path()) :
934                                 super.getLeader();
935                         }
936                     };
937                 }
938             };
939
940             final TestActorRef<Shard> shard = actorFactory.createTestActor(
941                     Props.create(new DelegatingShardCreator(creator)), "testOnBatchedModificationsWhenNotLeader");
942
943             waitUntilLeader(shard);
944
945             overrideLeaderCalls.set(true);
946
947             final BatchedModifications batched = new BatchedModifications("tx", DataStoreVersions.CURRENT_VERSION, "");
948
949             shard.tell(batched, ActorRef.noSender());
950
951             expectMsgEquals(batched);
952         }};
953     }
954
955     @Test
956     public void testTransactionMessagesWithNoLeader() {
957         new ShardTestKit(getSystem()) {{
958             dataStoreContextBuilder.customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName()).
959                 shardHeartbeatIntervalInMillis(50).shardElectionTimeoutFactor(1);
960             final TestActorRef<Shard> shard = actorFactory.createTestActor(
961                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
962                     "testTransactionMessagesWithNoLeader");
963
964             waitUntilNoLeader(shard);
965
966             shard.tell(new BatchedModifications("tx", DataStoreVersions.CURRENT_VERSION, ""), getRef());
967             Failure failure = expectMsgClass(Failure.class);
968             assertEquals("Failure cause type", NoShardLeaderException.class, failure.cause().getClass());
969
970             shard.tell(prepareForwardedReadyTransaction(mock(ShardDataTreeCohort.class), "tx",
971                     DataStoreVersions.CURRENT_VERSION, true), getRef());
972             failure = expectMsgClass(Failure.class);
973             assertEquals("Failure cause type", NoShardLeaderException.class, failure.cause().getClass());
974
975             shard.tell(new ReadyLocalTransaction("tx", mock(DataTreeModification.class), true), getRef());
976             failure = expectMsgClass(Failure.class);
977             assertEquals("Failure cause type", NoShardLeaderException.class, failure.cause().getClass());
978         }};
979     }
980
981     @Test
982     public void testReadyWithImmediateCommit() throws Exception{
983         testReadyWithImmediateCommit(true);
984         testReadyWithImmediateCommit(false);
985     }
986
987     private void testReadyWithImmediateCommit(final boolean readWrite) throws Exception{
988         new ShardTestKit(getSystem()) {{
989             final TestActorRef<Shard> shard = actorFactory.createTestActor(
990                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
991                     "testReadyWithImmediateCommit-" + readWrite);
992
993             waitUntilLeader(shard);
994
995             final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
996
997             final String transactionID = "tx1";
998             final MutableCompositeModification modification = new MutableCompositeModification();
999             final NormalizedNode<?, ?> containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1000             final ShardDataTreeCohort cohort = setupMockWriteTransaction("cohort", dataStore,
1001                     TestModel.TEST_PATH, containerNode, modification);
1002
1003             final FiniteDuration duration = duration("5 seconds");
1004
1005             shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID, modification, true), getRef());
1006
1007             expectMsgClass(duration, CommitTransactionReply.class);
1008
1009             final InOrder inOrder = inOrder(cohort);
1010             inOrder.verify(cohort).canCommit();
1011             inOrder.verify(cohort).preCommit();
1012             inOrder.verify(cohort).commit();
1013
1014             final NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.TEST_PATH);
1015             assertEquals(TestModel.TEST_QNAME.getLocalName(), containerNode, actualNode);
1016         }};
1017     }
1018
1019     @Test
1020     public void testReadyLocalTransactionWithImmediateCommit() throws Exception{
1021         new ShardTestKit(getSystem()) {{
1022             final TestActorRef<Shard> shard = actorFactory.createTestActor(
1023                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1024                     "testReadyLocalTransactionWithImmediateCommit");
1025
1026             waitUntilLeader(shard);
1027
1028             final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1029
1030             final DataTreeModification modification = dataStore.newModification();
1031
1032             final ContainerNode writeData = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1033             new WriteModification(TestModel.TEST_PATH, writeData).apply(modification);
1034             final MapNode mergeData = ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build();
1035             new MergeModification(TestModel.OUTER_LIST_PATH, mergeData).apply(modification);
1036
1037             final String txId = "tx1";
1038             modification.ready();
1039             final ReadyLocalTransaction readyMessage = new ReadyLocalTransaction(txId, modification, true);
1040
1041             shard.tell(readyMessage, getRef());
1042
1043             expectMsgClass(CommitTransactionReply.class);
1044
1045             final NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.OUTER_LIST_PATH);
1046             assertEquals(TestModel.OUTER_LIST_QNAME.getLocalName(), mergeData, actualNode);
1047         }};
1048     }
1049
1050     @Test
1051     public void testReadyLocalTransactionWithThreePhaseCommit() throws Exception{
1052         new ShardTestKit(getSystem()) {{
1053             final TestActorRef<Shard> shard = actorFactory.createTestActor(
1054                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1055                     "testReadyLocalTransactionWithThreePhaseCommit");
1056
1057             waitUntilLeader(shard);
1058
1059             final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1060
1061             final DataTreeModification modification = dataStore.newModification();
1062
1063             final ContainerNode writeData = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1064             new WriteModification(TestModel.TEST_PATH, writeData).apply(modification);
1065             final MapNode mergeData = ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build();
1066             new MergeModification(TestModel.OUTER_LIST_PATH, mergeData).apply(modification);
1067
1068             final String txId = "tx1";
1069                 modification.ready();
1070             final ReadyLocalTransaction readyMessage = new ReadyLocalTransaction(txId, modification, false);
1071
1072             shard.tell(readyMessage, getRef());
1073
1074             expectMsgClass(ReadyTransactionReply.class);
1075
1076             // Send the CanCommitTransaction message.
1077
1078             shard.tell(new CanCommitTransaction(txId, CURRENT_VERSION).toSerializable(), getRef());
1079             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1080                     expectMsgClass(CanCommitTransactionReply.class));
1081             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1082
1083             // Send the CanCommitTransaction message.
1084
1085             shard.tell(new CommitTransaction(txId, CURRENT_VERSION).toSerializable(), getRef());
1086             expectMsgClass(CommitTransactionReply.class);
1087
1088             final NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.OUTER_LIST_PATH);
1089             assertEquals(TestModel.OUTER_LIST_QNAME.getLocalName(), mergeData, actualNode);
1090         }};
1091     }
1092
1093     @Test
1094     public void testCommitWithPersistenceDisabled() throws Throwable {
1095         testCommitWithPersistenceDisabled(true);
1096         testCommitWithPersistenceDisabled(false);
1097     }
1098
1099     private void testCommitWithPersistenceDisabled(final boolean readWrite) throws Throwable {
1100         dataStoreContextBuilder.persistent(false);
1101         new ShardTestKit(getSystem()) {{
1102             final TestActorRef<Shard> shard = actorFactory.createTestActor(
1103                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1104                     "testCommitWithPersistenceDisabled-" + readWrite);
1105
1106             waitUntilLeader(shard);
1107
1108             final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1109
1110             // Setup a simulated transactions with a mock cohort.
1111
1112             final String transactionID = "tx";
1113             final MutableCompositeModification modification = new MutableCompositeModification();
1114             final NormalizedNode<?, ?> containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1115             final ShardDataTreeCohort cohort = setupMockWriteTransaction("cohort", dataStore,
1116                 TestModel.TEST_PATH, containerNode, modification);
1117
1118             final FiniteDuration duration = duration("5 seconds");
1119
1120             shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID, modification), getRef());
1121             expectMsgClass(duration, ReadyTransactionReply.class);
1122
1123             // Send the CanCommitTransaction message.
1124
1125             shard.tell(new CanCommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
1126             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1127                     expectMsgClass(duration, CanCommitTransactionReply.class));
1128             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1129
1130             // Send the CanCommitTransaction message.
1131
1132             shard.tell(new CommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
1133             expectMsgClass(duration, CommitTransactionReply.class);
1134
1135             final InOrder inOrder = inOrder(cohort);
1136             inOrder.verify(cohort).canCommit();
1137             inOrder.verify(cohort).preCommit();
1138             inOrder.verify(cohort).commit();
1139
1140             final NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.TEST_PATH);
1141             assertEquals(TestModel.TEST_QNAME.getLocalName(), containerNode, actualNode);
1142         }};
1143     }
1144
1145     @Test
1146     public void testCommitWhenTransactionHasNoModifications() {
1147         testCommitWhenTransactionHasNoModifications(true);
1148         testCommitWhenTransactionHasNoModifications(false);
1149     }
1150
1151     private void testCommitWhenTransactionHasNoModifications(final boolean readWrite){
1152         // Note that persistence is enabled which would normally result in the entry getting written to the journal
1153         // but here that need not happen
1154         new ShardTestKit(getSystem()) {
1155             {
1156                 final TestActorRef<Shard> shard = actorFactory.createTestActor(
1157                         newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1158                         "testCommitWhenTransactionHasNoModifications-" + readWrite);
1159
1160                 waitUntilLeader(shard);
1161
1162                 final String transactionID = "tx1";
1163                 final MutableCompositeModification modification = new MutableCompositeModification();
1164                 final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1165                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1166                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).preCommit();
1167                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).commit();
1168                 doReturn(mockUnmodifiedCandidate("cohort1-candidate")).when(cohort).getCandidate();
1169
1170                 final FiniteDuration duration = duration("5 seconds");
1171
1172                 shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID, modification), getRef());
1173                 expectMsgClass(duration, ReadyTransactionReply.class);
1174
1175                 // Send the CanCommitTransaction message.
1176
1177                 shard.tell(new CanCommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
1178                 final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1179                         expectMsgClass(duration, CanCommitTransactionReply.class));
1180                 assertEquals("Can commit", true, canCommitReply.getCanCommit());
1181
1182                 shard.tell(new CommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
1183                 expectMsgClass(duration, CommitTransactionReply.class);
1184
1185                 final InOrder inOrder = inOrder(cohort);
1186                 inOrder.verify(cohort).canCommit();
1187                 inOrder.verify(cohort).preCommit();
1188                 inOrder.verify(cohort).commit();
1189
1190                 shard.tell(Shard.GET_SHARD_MBEAN_MESSAGE, getRef());
1191                 final ShardStats shardStats = expectMsgClass(duration, ShardStats.class);
1192
1193                 // Use MBean for verification
1194                 // Committed transaction count should increase as usual
1195                 assertEquals(1,shardStats.getCommittedTransactionsCount());
1196
1197                 // Commit index should not advance because this does not go into the journal
1198                 assertEquals(-1, shardStats.getCommitIndex());
1199             }
1200         };
1201     }
1202
1203     @Test
1204     public void testCommitWhenTransactionHasModifications() {
1205         testCommitWhenTransactionHasModifications(true);
1206         testCommitWhenTransactionHasModifications(false);
1207     }
1208
1209     private void testCommitWhenTransactionHasModifications(final boolean readWrite){
1210         new ShardTestKit(getSystem()) {
1211             {
1212                 final TestActorRef<Shard> shard = actorFactory.createTestActor(
1213                         newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1214                         "testCommitWhenTransactionHasModifications-" + readWrite);
1215
1216                 waitUntilLeader(shard);
1217
1218                 final String transactionID = "tx1";
1219                 final MutableCompositeModification modification = new MutableCompositeModification();
1220                 modification.addModification(new DeleteModification(YangInstanceIdentifier.builder().build()));
1221                 final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1222                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1223                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).preCommit();
1224                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).commit();
1225                 doReturn(mockCandidate("cohort1-candidate")).when(cohort).getCandidate();
1226
1227                 final FiniteDuration duration = duration("5 seconds");
1228
1229                 shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID, modification), getRef());
1230                 expectMsgClass(duration, ReadyTransactionReply.class);
1231
1232                 // Send the CanCommitTransaction message.
1233
1234                 shard.tell(new CanCommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
1235                 final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1236                         expectMsgClass(duration, CanCommitTransactionReply.class));
1237                 assertEquals("Can commit", true, canCommitReply.getCanCommit());
1238
1239                 shard.tell(new CommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
1240                 expectMsgClass(duration, CommitTransactionReply.class);
1241
1242                 final InOrder inOrder = inOrder(cohort);
1243                 inOrder.verify(cohort).canCommit();
1244                 inOrder.verify(cohort).preCommit();
1245                 inOrder.verify(cohort).commit();
1246
1247                 shard.tell(Shard.GET_SHARD_MBEAN_MESSAGE, getRef());
1248                 final ShardStats shardStats = expectMsgClass(duration, ShardStats.class);
1249
1250                 // Use MBean for verification
1251                 // Committed transaction count should increase as usual
1252                 assertEquals(1, shardStats.getCommittedTransactionsCount());
1253
1254                 // Commit index should advance as we do not have an empty modification
1255                 assertEquals(0, shardStats.getCommitIndex());
1256             }
1257         };
1258     }
1259
1260     @Test
1261     public void testCommitPhaseFailure() throws Throwable {
1262         testCommitPhaseFailure(true);
1263         testCommitPhaseFailure(false);
1264     }
1265
1266     private void testCommitPhaseFailure(final boolean readWrite) throws Throwable {
1267         new ShardTestKit(getSystem()) {{
1268             final TestActorRef<Shard> shard = actorFactory.createTestActor(
1269                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1270                     "testCommitPhaseFailure-" + readWrite);
1271
1272             waitUntilLeader(shard);
1273
1274             // Setup 2 simulated transactions with mock cohorts. The first one fails in the
1275             // commit phase.
1276
1277             final String transactionID1 = "tx1";
1278             final MutableCompositeModification modification1 = new MutableCompositeModification();
1279             final ShardDataTreeCohort cohort1 = mock(ShardDataTreeCohort.class, "cohort1");
1280             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
1281             doReturn(Futures.immediateFuture(null)).when(cohort1).preCommit();
1282             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort1).commit();
1283             doReturn(mockCandidate("cohort1-candidate")).when(cohort1).getCandidate();
1284
1285             final String transactionID2 = "tx2";
1286             final MutableCompositeModification modification2 = new MutableCompositeModification();
1287             final ShardDataTreeCohort cohort2 = mock(ShardDataTreeCohort.class, "cohort2");
1288             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
1289
1290             final FiniteDuration duration = duration("5 seconds");
1291             final Timeout timeout = new Timeout(duration);
1292
1293             shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort1, transactionID1, modification1), getRef());
1294             expectMsgClass(duration, ReadyTransactionReply.class);
1295
1296             shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort2, transactionID2, modification2), getRef());
1297             expectMsgClass(duration, ReadyTransactionReply.class);
1298
1299             // Send the CanCommitTransaction message for the first Tx.
1300
1301             shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
1302             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1303                     expectMsgClass(duration, CanCommitTransactionReply.class));
1304             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1305
1306             // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
1307             // processed after the first Tx completes.
1308
1309             final Future<Object> canCommitFuture = Patterns.ask(shard,
1310                     new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), timeout);
1311
1312             // Send the CommitTransaction message for the first Tx. This should send back an error
1313             // and trigger the 2nd Tx to proceed.
1314
1315             shard.tell(new CommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
1316             expectMsgClass(duration, akka.actor.Status.Failure.class);
1317
1318             // Wait for the 2nd Tx to complete the canCommit phase.
1319
1320             final CountDownLatch latch = new CountDownLatch(1);
1321             canCommitFuture.onComplete(new OnComplete<Object>() {
1322                 @Override
1323                 public void onComplete(final Throwable t, final Object resp) {
1324                     latch.countDown();
1325                 }
1326             }, getSystem().dispatcher());
1327
1328             assertEquals("2nd CanCommit complete", true, latch.await(5, TimeUnit.SECONDS));
1329
1330             final InOrder inOrder = inOrder(cohort1, cohort2);
1331             inOrder.verify(cohort1).canCommit();
1332             inOrder.verify(cohort1).preCommit();
1333             inOrder.verify(cohort1).commit();
1334             inOrder.verify(cohort2).canCommit();
1335         }};
1336     }
1337
1338     @Test
1339     public void testPreCommitPhaseFailure() throws Throwable {
1340         testPreCommitPhaseFailure(true);
1341         testPreCommitPhaseFailure(false);
1342     }
1343
1344     private void testPreCommitPhaseFailure(final boolean readWrite) throws Throwable {
1345         new ShardTestKit(getSystem()) {{
1346             final TestActorRef<Shard> shard = actorFactory.createTestActor(
1347                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1348                     "testPreCommitPhaseFailure-" + readWrite);
1349
1350             waitUntilLeader(shard);
1351
1352             final String transactionID1 = "tx1";
1353             final MutableCompositeModification modification1 = new MutableCompositeModification();
1354             final ShardDataTreeCohort cohort1 = mock(ShardDataTreeCohort.class, "cohort1");
1355             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
1356             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort1).preCommit();
1357
1358             final String transactionID2 = "tx2";
1359             final MutableCompositeModification modification2 = new MutableCompositeModification();
1360             final ShardDataTreeCohort cohort2 = mock(ShardDataTreeCohort.class, "cohort2");
1361             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
1362
1363             final FiniteDuration duration = duration("5 seconds");
1364             final Timeout timeout = new Timeout(duration);
1365
1366             shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort1, transactionID1, modification1), getRef());
1367             expectMsgClass(duration, ReadyTransactionReply.class);
1368
1369             shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort2, transactionID2, modification2), getRef());
1370             expectMsgClass(duration, ReadyTransactionReply.class);
1371
1372             // Send the CanCommitTransaction message for the first Tx.
1373
1374             shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
1375             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1376                 expectMsgClass(duration, CanCommitTransactionReply.class));
1377             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1378
1379             // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
1380             // processed after the first Tx completes.
1381
1382             final Future<Object> canCommitFuture = Patterns.ask(shard,
1383                     new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), timeout);
1384
1385             // Send the CommitTransaction message for the first Tx. This should send back an error
1386             // and trigger the 2nd Tx to proceed.
1387
1388             shard.tell(new CommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
1389             expectMsgClass(duration, akka.actor.Status.Failure.class);
1390
1391             // Wait for the 2nd Tx to complete the canCommit phase.
1392
1393             final CountDownLatch latch = new CountDownLatch(1);
1394             canCommitFuture.onComplete(new OnComplete<Object>() {
1395                 @Override
1396                 public void onComplete(final Throwable t, final Object resp) {
1397                     latch.countDown();
1398                 }
1399             }, getSystem().dispatcher());
1400
1401             assertEquals("2nd CanCommit complete", true, latch.await(5, TimeUnit.SECONDS));
1402
1403             final InOrder inOrder = inOrder(cohort1, cohort2);
1404             inOrder.verify(cohort1).canCommit();
1405             inOrder.verify(cohort1).preCommit();
1406             inOrder.verify(cohort2).canCommit();
1407         }};
1408     }
1409
1410     @Test
1411     public void testCanCommitPhaseFailure() throws Throwable {
1412         testCanCommitPhaseFailure(true);
1413         testCanCommitPhaseFailure(false);
1414     }
1415
1416     private void testCanCommitPhaseFailure(final boolean readWrite) throws Throwable {
1417         new ShardTestKit(getSystem()) {{
1418             final TestActorRef<Shard> shard = actorFactory.createTestActor(
1419                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1420                     "testCanCommitPhaseFailure-" + readWrite);
1421
1422             waitUntilLeader(shard);
1423
1424             final FiniteDuration duration = duration("5 seconds");
1425
1426             final String transactionID1 = "tx1";
1427             final MutableCompositeModification modification = new MutableCompositeModification();
1428             final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1429             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).canCommit();
1430
1431             shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID1, modification), getRef());
1432             expectMsgClass(duration, ReadyTransactionReply.class);
1433
1434             // Send the CanCommitTransaction message.
1435
1436             shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
1437             expectMsgClass(duration, akka.actor.Status.Failure.class);
1438
1439             // Send another can commit to ensure the failed one got cleaned up.
1440
1441             reset(cohort);
1442
1443             final String transactionID2 = "tx2";
1444             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1445
1446             shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID2, modification), getRef());
1447             expectMsgClass(duration, ReadyTransactionReply.class);
1448
1449             shard.tell(new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), getRef());
1450             final CanCommitTransactionReply reply = CanCommitTransactionReply.fromSerializable(
1451                     expectMsgClass(CanCommitTransactionReply.class));
1452             assertEquals("getCanCommit", true, reply.getCanCommit());
1453         }};
1454     }
1455
1456     @Test
1457     public void testCanCommitPhaseFalseResponse() throws Throwable {
1458         testCanCommitPhaseFalseResponse(true);
1459         testCanCommitPhaseFalseResponse(false);
1460     }
1461
1462     private void testCanCommitPhaseFalseResponse(final boolean readWrite) throws Throwable {
1463         new ShardTestKit(getSystem()) {{
1464             final TestActorRef<Shard> shard = actorFactory.createTestActor(
1465                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1466                     "testCanCommitPhaseFalseResponse-" + readWrite);
1467
1468             waitUntilLeader(shard);
1469
1470             final FiniteDuration duration = duration("5 seconds");
1471
1472             final String transactionID1 = "tx1";
1473             final MutableCompositeModification modification = new MutableCompositeModification();
1474             final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1475             doReturn(Futures.immediateFuture(Boolean.FALSE)).when(cohort).canCommit();
1476
1477             shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID1, modification), getRef());
1478             expectMsgClass(duration, ReadyTransactionReply.class);
1479
1480             // Send the CanCommitTransaction message.
1481
1482             shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
1483             CanCommitTransactionReply reply = CanCommitTransactionReply.fromSerializable(
1484                     expectMsgClass(CanCommitTransactionReply.class));
1485             assertEquals("getCanCommit", false, reply.getCanCommit());
1486
1487             // Send another can commit to ensure the failed one got cleaned up.
1488
1489             reset(cohort);
1490
1491             final String transactionID2 = "tx2";
1492             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1493
1494             shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID2, modification), getRef());
1495             expectMsgClass(duration, ReadyTransactionReply.class);
1496
1497             shard.tell(new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), getRef());
1498             reply = CanCommitTransactionReply.fromSerializable(
1499                     expectMsgClass(CanCommitTransactionReply.class));
1500             assertEquals("getCanCommit", true, reply.getCanCommit());
1501         }};
1502     }
1503
1504     @Test
1505     public void testImmediateCommitWithCanCommitPhaseFailure() throws Throwable {
1506         testImmediateCommitWithCanCommitPhaseFailure(true);
1507         testImmediateCommitWithCanCommitPhaseFailure(false);
1508     }
1509
1510     private void testImmediateCommitWithCanCommitPhaseFailure(final boolean readWrite) throws Throwable {
1511         new ShardTestKit(getSystem()) {{
1512             final TestActorRef<Shard> shard = actorFactory.createTestActor(
1513                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1514                     "testImmediateCommitWithCanCommitPhaseFailure-" + readWrite);
1515
1516             waitUntilLeader(shard);
1517
1518             final FiniteDuration duration = duration("5 seconds");
1519
1520             final String transactionID1 = "tx1";
1521             final MutableCompositeModification modification = new MutableCompositeModification();
1522             final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1523             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).canCommit();
1524
1525             shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID1, modification, true), getRef());
1526
1527             expectMsgClass(duration, akka.actor.Status.Failure.class);
1528
1529             // Send another can commit to ensure the failed one got cleaned up.
1530
1531             reset(cohort);
1532
1533             final String transactionID2 = "tx2";
1534             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1535             doReturn(Futures.immediateFuture(null)).when(cohort).preCommit();
1536             doReturn(Futures.immediateFuture(null)).when(cohort).commit();
1537             final DataTreeCandidateTip candidate = mock(DataTreeCandidateTip.class);
1538             final DataTreeCandidateNode candidateRoot = mock(DataTreeCandidateNode.class);
1539             doReturn(ModificationType.UNMODIFIED).when(candidateRoot).getModificationType();
1540             doReturn(candidateRoot).when(candidate).getRootNode();
1541             doReturn(candidate).when(cohort).getCandidate();
1542
1543             shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID2, modification, true), getRef());
1544
1545             expectMsgClass(duration, CommitTransactionReply.class);
1546         }};
1547     }
1548
1549     @Test
1550     public void testImmediateCommitWithCanCommitPhaseFalseResponse() throws Throwable {
1551         testImmediateCommitWithCanCommitPhaseFalseResponse(true);
1552         testImmediateCommitWithCanCommitPhaseFalseResponse(false);
1553     }
1554
1555     private void testImmediateCommitWithCanCommitPhaseFalseResponse(final boolean readWrite) throws Throwable {
1556         new ShardTestKit(getSystem()) {{
1557             final TestActorRef<Shard> shard = actorFactory.createTestActor(
1558                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1559                     "testImmediateCommitWithCanCommitPhaseFalseResponse-" + readWrite);
1560
1561             waitUntilLeader(shard);
1562
1563             final FiniteDuration duration = duration("5 seconds");
1564
1565             final String transactionID = "tx1";
1566             final MutableCompositeModification modification = new MutableCompositeModification();
1567             final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1568             doReturn(Futures.immediateFuture(Boolean.FALSE)).when(cohort).canCommit();
1569
1570             shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID, modification, true), getRef());
1571
1572             expectMsgClass(duration, akka.actor.Status.Failure.class);
1573
1574             // Send another can commit to ensure the failed one got cleaned up.
1575
1576             reset(cohort);
1577
1578             final String transactionID2 = "tx2";
1579             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1580             doReturn(Futures.immediateFuture(null)).when(cohort).preCommit();
1581             doReturn(Futures.immediateFuture(null)).when(cohort).commit();
1582             final DataTreeCandidateTip candidate = mock(DataTreeCandidateTip.class);
1583             final DataTreeCandidateNode candidateRoot = mock(DataTreeCandidateNode.class);
1584             doReturn(ModificationType.UNMODIFIED).when(candidateRoot).getModificationType();
1585             doReturn(candidateRoot).when(candidate).getRootNode();
1586             doReturn(candidate).when(cohort).getCandidate();
1587
1588             shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID2, modification, true), getRef());
1589
1590             expectMsgClass(duration, CommitTransactionReply.class);
1591         }};
1592     }
1593
1594     @Test
1595     public void testAbortBeforeFinishCommit() throws Throwable {
1596         testAbortBeforeFinishCommit(true);
1597         testAbortBeforeFinishCommit(false);
1598     }
1599
1600     private void testAbortBeforeFinishCommit(final boolean readWrite) throws Throwable {
1601         new ShardTestKit(getSystem()) {{
1602             final TestActorRef<Shard> shard = actorFactory.createTestActor(
1603                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1604                     "testAbortBeforeFinishCommit-" + readWrite);
1605
1606             waitUntilLeader(shard);
1607
1608             final FiniteDuration duration = duration("5 seconds");
1609             final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1610
1611             final String transactionID = "tx1";
1612             final Function<ShardDataTreeCohort, ListenableFuture<Void>> preCommit =
1613                           new Function<ShardDataTreeCohort, ListenableFuture<Void>>() {
1614                 @Override
1615                 public ListenableFuture<Void> apply(final ShardDataTreeCohort cohort) {
1616                     final ListenableFuture<Void> preCommitFuture = cohort.preCommit();
1617
1618                     // Simulate an AbortTransaction message occurring during replication, after
1619                     // persisting and before finishing the commit to the in-memory store.
1620                     // We have no followers so due to optimizations in the RaftActor, it does not
1621                     // attempt replication and thus we can't send an AbortTransaction message b/c
1622                     // it would be processed too late after CommitTransaction completes. So we'll
1623                     // simulate an AbortTransaction message occurring during replication by calling
1624                     // the shard directly.
1625                     //
1626                     shard.underlyingActor().doAbortTransaction(transactionID, null);
1627
1628                     return preCommitFuture;
1629                 }
1630             };
1631
1632             final MutableCompositeModification modification = new MutableCompositeModification();
1633             final ShardDataTreeCohort cohort = setupMockWriteTransaction("cohort1", dataStore,
1634                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME),
1635                     modification, preCommit);
1636
1637             shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID, modification), getRef());
1638             expectMsgClass(duration, ReadyTransactionReply.class);
1639
1640             shard.tell(new CanCommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
1641             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1642                 expectMsgClass(duration, CanCommitTransactionReply.class));
1643             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1644
1645             shard.tell(new CommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
1646             expectMsgClass(duration, CommitTransactionReply.class);
1647
1648             final NormalizedNode<?, ?> node = readStore(shard, TestModel.TEST_PATH);
1649
1650             // Since we're simulating an abort occurring during replication and before finish commit,
1651             // the data should still get written to the in-memory store since we've gotten past
1652             // canCommit and preCommit and persisted the data.
1653             assertNotNull(TestModel.TEST_QNAME.getLocalName() + " not found", node);
1654         }};
1655     }
1656
1657     @Test
1658     public void testTransactionCommitTimeout() throws Throwable {
1659         testTransactionCommitTimeout(true);
1660         testTransactionCommitTimeout(false);
1661     }
1662
1663     private void testTransactionCommitTimeout(final boolean readWrite) throws Throwable {
1664         dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1);
1665
1666         new ShardTestKit(getSystem()) {{
1667             final TestActorRef<Shard> shard = actorFactory.createTestActor(
1668                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1669                     "testTransactionCommitTimeout-" + readWrite);
1670
1671             waitUntilLeader(shard);
1672
1673             final FiniteDuration duration = duration("5 seconds");
1674
1675             final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1676
1677             writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1678             writeToStore(shard, TestModel.OUTER_LIST_PATH,
1679                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
1680
1681             // Create 1st Tx - will timeout
1682
1683             final String transactionID1 = "tx1";
1684             final MutableCompositeModification modification1 = new MutableCompositeModification();
1685             final ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
1686                     YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
1687                         .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
1688                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1),
1689                     modification1);
1690
1691             // Create 2nd Tx
1692
1693             final String transactionID2 = "tx3";
1694             final MutableCompositeModification modification2 = new MutableCompositeModification();
1695             final YangInstanceIdentifier listNodePath = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
1696                 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build();
1697             final ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort3", dataStore,
1698                     listNodePath,
1699                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2),
1700                     modification2);
1701
1702             // Ready the Tx's
1703
1704             shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort1, transactionID1, modification1), getRef());
1705             expectMsgClass(duration, ReadyTransactionReply.class);
1706
1707             shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort2, transactionID2, modification2), getRef());
1708             expectMsgClass(duration, ReadyTransactionReply.class);
1709
1710             // canCommit 1st Tx. We don't send the commit so it should timeout.
1711
1712             shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
1713             expectMsgClass(duration, CanCommitTransactionReply.class);
1714
1715             // canCommit the 2nd Tx - it should complete after the 1st Tx times out.
1716
1717             shard.tell(new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), getRef());
1718             expectMsgClass(duration, CanCommitTransactionReply.class);
1719
1720             // Try to commit the 1st Tx - should fail as it's not the current Tx.
1721
1722             shard.tell(new CommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
1723             expectMsgClass(duration, akka.actor.Status.Failure.class);
1724
1725             // Commit the 2nd Tx.
1726
1727             shard.tell(new CommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), getRef());
1728             expectMsgClass(duration, CommitTransactionReply.class);
1729
1730             final NormalizedNode<?, ?> node = readStore(shard, listNodePath);
1731             assertNotNull(listNodePath + " not found", node);
1732         }};
1733     }
1734
1735     @Test
1736     public void testTransactionCommitQueueCapacityExceeded() throws Throwable {
1737         dataStoreContextBuilder.shardTransactionCommitQueueCapacity(2);
1738
1739         new ShardTestKit(getSystem()) {{
1740             final TestActorRef<Shard> shard = actorFactory.createTestActor(
1741                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1742                     "testTransactionCommitQueueCapacityExceeded");
1743
1744             waitUntilLeader(shard);
1745
1746             final FiniteDuration duration = duration("5 seconds");
1747
1748             final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1749
1750             final String transactionID1 = "tx1";
1751             final MutableCompositeModification modification1 = new MutableCompositeModification();
1752             final ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
1753                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
1754
1755             final String transactionID2 = "tx2";
1756             final MutableCompositeModification modification2 = new MutableCompositeModification();
1757             final ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
1758                     TestModel.OUTER_LIST_PATH,
1759                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
1760                     modification2);
1761
1762             final String transactionID3 = "tx3";
1763             final MutableCompositeModification modification3 = new MutableCompositeModification();
1764             final ShardDataTreeCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
1765                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification3);
1766
1767             // Ready the Tx's
1768
1769             shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort1, transactionID1, modification1), getRef());
1770             expectMsgClass(duration, ReadyTransactionReply.class);
1771
1772             shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort2, transactionID2, modification2), getRef());
1773             expectMsgClass(duration, ReadyTransactionReply.class);
1774
1775             // The 3rd Tx should exceed queue capacity and fail.
1776
1777             shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort3, transactionID3, modification3), getRef());
1778             expectMsgClass(duration, akka.actor.Status.Failure.class);
1779
1780             // canCommit 1st Tx.
1781
1782             shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
1783             expectMsgClass(duration, CanCommitTransactionReply.class);
1784
1785             // canCommit the 2nd Tx - it should get queued.
1786
1787             shard.tell(new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), getRef());
1788
1789             // canCommit the 3rd Tx - should exceed queue capacity and fail.
1790
1791             shard.tell(new CanCommitTransaction(transactionID3, CURRENT_VERSION).toSerializable(), getRef());
1792             expectMsgClass(duration, akka.actor.Status.Failure.class);
1793         }};
1794     }
1795
1796     @Test
1797     public void testTransactionCommitWithPriorExpiredCohortEntries() throws Throwable {
1798         dataStoreContextBuilder.shardCommitQueueExpiryTimeoutInMillis(1300).shardTransactionCommitTimeoutInSeconds(1);
1799
1800         new ShardTestKit(getSystem()) {{
1801             final TestActorRef<Shard> shard = actorFactory.createTestActor(
1802                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1803                     "testTransactionCommitWithPriorExpiredCohortEntries");
1804
1805             waitUntilLeader(shard);
1806
1807             final FiniteDuration duration = duration("5 seconds");
1808
1809             final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1810
1811             final String transactionID1 = "tx1";
1812             final MutableCompositeModification modification1 = new MutableCompositeModification();
1813             final ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
1814                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
1815
1816             shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort1, transactionID1, modification1), getRef());
1817             expectMsgClass(duration, ReadyTransactionReply.class);
1818
1819             final String transactionID2 = "tx2";
1820             final MutableCompositeModification modification2 = new MutableCompositeModification();
1821             final ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
1822                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification2);
1823
1824             shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort2, transactionID2, modification2), getRef());
1825             expectMsgClass(duration, ReadyTransactionReply.class);
1826
1827             final String transactionID3 = "tx3";
1828             final MutableCompositeModification modification3 = new MutableCompositeModification();
1829             final ShardDataTreeCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
1830                     TestModel.TEST2_PATH, ImmutableNodes.containerNode(TestModel.TEST2_QNAME), modification3);
1831
1832             shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort3, transactionID3, modification3), getRef());
1833             expectMsgClass(duration, ReadyTransactionReply.class);
1834
1835             // All Tx's are readied. We'll send canCommit for the last one but not the others. The others
1836             // should expire from the queue and the last one should be processed.
1837
1838             shard.tell(new CanCommitTransaction(transactionID3, CURRENT_VERSION).toSerializable(), getRef());
1839             expectMsgClass(duration, CanCommitTransactionReply.class);
1840         }};
1841     }
1842
1843     @Test
1844     public void testTransactionCommitWithSubsequentExpiredCohortEntry() throws Throwable {
1845         dataStoreContextBuilder.shardCommitQueueExpiryTimeoutInMillis(1300).shardTransactionCommitTimeoutInSeconds(1);
1846
1847         new ShardTestKit(getSystem()) {{
1848             final TestActorRef<Shard> shard = actorFactory.createTestActor(
1849                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1850                     "testTransactionCommitWithSubsequentExpiredCohortEntry");
1851
1852             waitUntilLeader(shard);
1853
1854             final FiniteDuration duration = duration("5 seconds");
1855
1856             final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1857
1858             final String transactionID1 = "tx1";
1859             final MutableCompositeModification modification1 = new MutableCompositeModification();
1860             final ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
1861                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
1862
1863             shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort1, transactionID1, modification1), getRef());
1864             expectMsgClass(duration, ReadyTransactionReply.class);
1865
1866             // CanCommit the first one so it's the current in-progress CohortEntry.
1867
1868             shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
1869             expectMsgClass(duration, CanCommitTransactionReply.class);
1870
1871             // Ready the second Tx.
1872
1873             final String transactionID2 = "tx2";
1874             final MutableCompositeModification modification2 = new MutableCompositeModification();
1875             final ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
1876                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification2);
1877
1878             shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort2, transactionID2, modification2), getRef());
1879             expectMsgClass(duration, ReadyTransactionReply.class);
1880
1881             // Ready the third Tx.
1882
1883             final String transactionID3 = "tx3";
1884             final DataTreeModification modification3 = dataStore.newModification();
1885             new WriteModification(TestModel.TEST2_PATH, ImmutableNodes.containerNode(TestModel.TEST2_QNAME))
1886                     .apply(modification3);
1887                 modification3.ready();
1888             final ReadyLocalTransaction readyMessage = new ReadyLocalTransaction(transactionID3, modification3, true);
1889
1890             shard.tell(readyMessage, getRef());
1891
1892             // Commit the first Tx. After completing, the second should expire from the queue and the third
1893             // Tx committed.
1894
1895             shard.tell(new CommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
1896             expectMsgClass(duration, CommitTransactionReply.class);
1897
1898             // Expect commit reply from the third Tx.
1899
1900             expectMsgClass(duration, CommitTransactionReply.class);
1901
1902             final NormalizedNode<?, ?> node = readStore(shard, TestModel.TEST2_PATH);
1903             assertNotNull(TestModel.TEST2_PATH + " not found", node);
1904         }};
1905     }
1906
1907     @Test
1908     public void testCanCommitBeforeReadyFailure() throws Throwable {
1909         new ShardTestKit(getSystem()) {{
1910             final TestActorRef<Shard> shard = actorFactory.createTestActor(
1911                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1912                     "testCanCommitBeforeReadyFailure");
1913
1914             shard.tell(new CanCommitTransaction("tx", CURRENT_VERSION).toSerializable(), getRef());
1915             expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
1916         }};
1917     }
1918
1919     @Test
1920     public void testAbortCurrentTransaction() throws Throwable {
1921         testAbortCurrentTransaction(true);
1922         testAbortCurrentTransaction(false);
1923     }
1924
1925     private void testAbortCurrentTransaction(final boolean readWrite) throws Throwable {
1926         new ShardTestKit(getSystem()) {{
1927             final TestActorRef<Shard> shard = actorFactory.createTestActor(
1928                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1929                     "testAbortCurrentTransaction-" + readWrite);
1930
1931             waitUntilLeader(shard);
1932
1933             // Setup 2 simulated transactions with mock cohorts. The first one will be aborted.
1934
1935             final String transactionID1 = "tx1";
1936             final MutableCompositeModification modification1 = new MutableCompositeModification();
1937             final ShardDataTreeCohort cohort1 = mock(ShardDataTreeCohort.class, "cohort1");
1938             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
1939             doReturn(Futures.immediateFuture(null)).when(cohort1).abort();
1940
1941             final String transactionID2 = "tx2";
1942             final MutableCompositeModification modification2 = new MutableCompositeModification();
1943             final ShardDataTreeCohort cohort2 = mock(ShardDataTreeCohort.class, "cohort2");
1944             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
1945
1946             final FiniteDuration duration = duration("5 seconds");
1947             final Timeout timeout = new Timeout(duration);
1948
1949             shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort1, transactionID1, modification1), getRef());
1950             expectMsgClass(duration, ReadyTransactionReply.class);
1951
1952             shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort2, transactionID2, modification2), getRef());
1953             expectMsgClass(duration, ReadyTransactionReply.class);
1954
1955             // Send the CanCommitTransaction message for the first Tx.
1956
1957             shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
1958             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1959                     expectMsgClass(duration, CanCommitTransactionReply.class));
1960             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1961
1962             // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
1963             // processed after the first Tx completes.
1964
1965             final Future<Object> canCommitFuture = Patterns.ask(shard,
1966                     new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), timeout);
1967
1968             // Send the AbortTransaction message for the first Tx. This should trigger the 2nd
1969             // Tx to proceed.
1970
1971             shard.tell(new AbortTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
1972             expectMsgClass(duration, AbortTransactionReply.class);
1973
1974             // Wait for the 2nd Tx to complete the canCommit phase.
1975
1976             Await.ready(canCommitFuture, duration);
1977
1978             final InOrder inOrder = inOrder(cohort1, cohort2);
1979             inOrder.verify(cohort1).canCommit();
1980             inOrder.verify(cohort2).canCommit();
1981         }};
1982     }
1983
1984     @Test
1985     public void testAbortQueuedTransaction() throws Throwable {
1986         testAbortQueuedTransaction(true);
1987         testAbortQueuedTransaction(false);
1988     }
1989
1990     private void testAbortQueuedTransaction(final boolean readWrite) throws Throwable {
1991         dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1);
1992         new ShardTestKit(getSystem()) {{
1993             final AtomicReference<CountDownLatch> cleaupCheckLatch = new AtomicReference<>();
1994             @SuppressWarnings("serial")
1995             final Creator<Shard> creator = new Creator<Shard>() {
1996                 @Override
1997                 public Shard create() throws Exception {
1998                     return new Shard(newShardBuilder()) {
1999                         @Override
2000                         public void onReceiveCommand(final Object message) throws Exception {
2001                             super.onReceiveCommand(message);
2002                             if(message.equals(TX_COMMIT_TIMEOUT_CHECK_MESSAGE)) {
2003                                 if(cleaupCheckLatch.get() != null) {
2004                                     cleaupCheckLatch.get().countDown();
2005                                 }
2006                             }
2007                         }
2008                     };
2009                 }
2010             };
2011
2012             final TestActorRef<Shard> shard = actorFactory.createTestActor(
2013                     Props.create(new DelegatingShardCreator(creator)).withDispatcher(
2014                             Dispatchers.DefaultDispatcherId()), "testAbortQueuedTransaction-" + readWrite);
2015
2016             waitUntilLeader(shard);
2017
2018             final String transactionID = "tx1";
2019
2020             final MutableCompositeModification modification = new MutableCompositeModification();
2021             final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort");
2022             doReturn(Futures.immediateFuture(null)).when(cohort).abort();
2023
2024             final FiniteDuration duration = duration("5 seconds");
2025
2026             // Ready the tx.
2027
2028             shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID, modification), getRef());
2029             expectMsgClass(duration, ReadyTransactionReply.class);
2030
2031             assertEquals("getPendingTxCommitQueueSize", 1, shard.underlyingActor().getPendingTxCommitQueueSize());
2032
2033             // Send the AbortTransaction message.
2034
2035             shard.tell(new AbortTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
2036             expectMsgClass(duration, AbortTransactionReply.class);
2037
2038             verify(cohort).abort();
2039
2040             // Verify the tx cohort is removed from queue at the cleanup check interval.
2041
2042             cleaupCheckLatch.set(new CountDownLatch(1));
2043             assertEquals("TX_COMMIT_TIMEOUT_CHECK_MESSAGE received", true,
2044                     cleaupCheckLatch.get().await(5, TimeUnit.SECONDS));
2045
2046             assertEquals("getPendingTxCommitQueueSize", 0, shard.underlyingActor().getPendingTxCommitQueueSize());
2047
2048             // Now send CanCommitTransaction - should fail.
2049
2050             shard.tell(new CanCommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
2051
2052             Throwable failure = expectMsgClass(duration, akka.actor.Status.Failure.class).cause();
2053             assertTrue("Failure type", failure instanceof IllegalStateException);
2054         }};
2055     }
2056
2057     @Test
2058     public void testCreateSnapshot() throws Exception {
2059         testCreateSnapshot(true, "testCreateSnapshot");
2060     }
2061
2062     @Test
2063     public void testCreateSnapshotWithNonPersistentData() throws Exception {
2064         testCreateSnapshot(false, "testCreateSnapshotWithNonPersistentData");
2065     }
2066
2067     @SuppressWarnings("serial")
2068     private void testCreateSnapshot(final boolean persistent, final String shardActorName) throws Exception{
2069
2070         final AtomicReference<CountDownLatch> latch = new AtomicReference<>(new CountDownLatch(1));
2071
2072         final AtomicReference<Object> savedSnapshot = new AtomicReference<>();
2073         class TestPersistentDataProvider extends DelegatingPersistentDataProvider {
2074             TestPersistentDataProvider(final DataPersistenceProvider delegate) {
2075                 super(delegate);
2076             }
2077
2078             @Override
2079             public void saveSnapshot(final Object o) {
2080                 savedSnapshot.set(o);
2081                 super.saveSnapshot(o);
2082             }
2083         }
2084
2085         dataStoreContextBuilder.persistent(persistent);
2086
2087         new ShardTestKit(getSystem()) {{
2088             class TestShard extends Shard {
2089
2090                 protected TestShard(AbstractBuilder<?, ?> builder) {
2091                     super(builder);
2092                     setPersistence(new TestPersistentDataProvider(super.persistence()));
2093                 }
2094
2095                 @Override
2096                 public void handleCommand(final Object message) {
2097                     super.handleCommand(message);
2098
2099                     if (message instanceof SaveSnapshotSuccess || message.equals("commit_snapshot")) {
2100                         latch.get().countDown();
2101                     }
2102                 }
2103
2104                 @Override
2105                 public RaftActorContext getRaftActorContext() {
2106                     return super.getRaftActorContext();
2107                 }
2108             }
2109
2110             final Creator<Shard> creator = new Creator<Shard>() {
2111                 @Override
2112                 public Shard create() throws Exception {
2113                     return new TestShard(newShardBuilder());
2114                 }
2115             };
2116
2117             final TestActorRef<Shard> shard = actorFactory.createTestActor(
2118                     Props.create(new DelegatingShardCreator(creator)), shardActorName);
2119
2120             waitUntilLeader(shard);
2121             writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
2122
2123             final NormalizedNode<?,?> expectedRoot = readStore(shard, YangInstanceIdentifier.builder().build());
2124
2125             // Trigger creation of a snapshot by ensuring
2126             final RaftActorContext raftActorContext = ((TestShard) shard.underlyingActor()).getRaftActorContext();
2127             raftActorContext.getSnapshotManager().capture(mock(ReplicatedLogEntry.class), -1);
2128             awaitAndValidateSnapshot(expectedRoot);
2129
2130             raftActorContext.getSnapshotManager().capture(mock(ReplicatedLogEntry.class), -1);
2131             awaitAndValidateSnapshot(expectedRoot);
2132         }
2133
2134         private void awaitAndValidateSnapshot(NormalizedNode<?,?> expectedRoot) throws InterruptedException {
2135             assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS));
2136
2137             assertTrue("Invalid saved snapshot " + savedSnapshot.get(),
2138                     savedSnapshot.get() instanceof Snapshot);
2139
2140             verifySnapshot((Snapshot)savedSnapshot.get(), expectedRoot);
2141
2142             latch.set(new CountDownLatch(1));
2143             savedSnapshot.set(null);
2144         }
2145
2146         private void verifySnapshot(final Snapshot snapshot, final NormalizedNode<?,?> expectedRoot) {
2147
2148             final NormalizedNode<?, ?> actual = SerializationUtils.deserializeNormalizedNode(snapshot.getState());
2149             assertEquals("Root node", expectedRoot, actual);
2150
2151         }};
2152     }
2153
2154     /**
2155      * This test simply verifies that the applySnapShot logic will work
2156      * @throws ReadFailedException
2157      * @throws DataValidationFailedException
2158      */
2159     @Test
2160     public void testInMemoryDataTreeRestore() throws ReadFailedException, DataValidationFailedException {
2161         final DataTree store = InMemoryDataTreeFactory.getInstance().create(TreeType.OPERATIONAL);
2162         store.setSchemaContext(SCHEMA_CONTEXT);
2163
2164         final DataTreeModification putTransaction = store.takeSnapshot().newModification();
2165         putTransaction.write(TestModel.TEST_PATH,
2166             ImmutableNodes.containerNode(TestModel.TEST_QNAME));
2167         commitTransaction(store, putTransaction);
2168
2169
2170         final NormalizedNode<?, ?> expected = readStore(store, YangInstanceIdentifier.builder().build());
2171
2172         final DataTreeModification writeTransaction = store.takeSnapshot().newModification();
2173
2174         writeTransaction.delete(YangInstanceIdentifier.builder().build());
2175         writeTransaction.write(YangInstanceIdentifier.builder().build(), expected);
2176
2177         commitTransaction(store, writeTransaction);
2178
2179         final NormalizedNode<?, ?> actual = readStore(store, YangInstanceIdentifier.builder().build());
2180
2181         assertEquals(expected, actual);
2182     }
2183
2184     @Test
2185     public void testRecoveryApplicable(){
2186
2187         final DatastoreContext persistentContext = DatastoreContext.newBuilder().
2188                 shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(true).build();
2189
2190         final Props persistentProps = Shard.builder().id(shardID).datastoreContext(persistentContext).
2191                 schemaContext(SCHEMA_CONTEXT).props();
2192
2193         final DatastoreContext nonPersistentContext = DatastoreContext.newBuilder().
2194                 shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(false).build();
2195
2196         final Props nonPersistentProps = Shard.builder().id(shardID).datastoreContext(nonPersistentContext).
2197                 schemaContext(SCHEMA_CONTEXT).props();
2198
2199         new ShardTestKit(getSystem()) {{
2200             final TestActorRef<Shard> shard1 = actorFactory.createTestActor(persistentProps, "testPersistence1");
2201
2202             assertTrue("Recovery Applicable", shard1.underlyingActor().persistence().isRecoveryApplicable());
2203
2204             final TestActorRef<Shard> shard2 = actorFactory.createTestActor(nonPersistentProps, "testPersistence2");
2205
2206             assertFalse("Recovery Not Applicable", shard2.underlyingActor().persistence().isRecoveryApplicable());
2207         }};
2208     }
2209
2210     @Test
2211     public void testOnDatastoreContext() {
2212         new ShardTestKit(getSystem()) {{
2213             dataStoreContextBuilder.persistent(true);
2214
2215             final TestActorRef<Shard> shard = actorFactory.createTestActor(newShardProps(), "testOnDatastoreContext");
2216
2217             assertEquals("isRecoveryApplicable", true,
2218                     shard.underlyingActor().persistence().isRecoveryApplicable());
2219
2220             waitUntilLeader(shard);
2221
2222             shard.tell(dataStoreContextBuilder.persistent(false).build(), ActorRef.noSender());
2223
2224             assertEquals("isRecoveryApplicable", false,
2225                 shard.underlyingActor().persistence().isRecoveryApplicable());
2226
2227             shard.tell(dataStoreContextBuilder.persistent(true).build(), ActorRef.noSender());
2228
2229             assertEquals("isRecoveryApplicable", true,
2230                 shard.underlyingActor().persistence().isRecoveryApplicable());
2231         }};
2232     }
2233
2234     @Test
2235     public void testRegisterRoleChangeListener() throws Exception {
2236         new ShardTestKit(getSystem()) {
2237             {
2238                 final TestActorRef<Shard> shard = actorFactory.createTestActor(
2239                         newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
2240                         "testRegisterRoleChangeListener");
2241
2242                 waitUntilLeader(shard);
2243
2244                 final TestActorRef<MessageCollectorActor> listener =
2245                         TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
2246
2247                 shard.tell(new RegisterRoleChangeListener(), listener);
2248
2249                 MessageCollectorActor.expectFirstMatching(listener, RegisterRoleChangeListenerReply.class);
2250
2251                 ShardLeaderStateChanged leaderStateChanged = MessageCollectorActor.expectFirstMatching(listener,
2252                     ShardLeaderStateChanged.class);
2253                 assertEquals("getLocalShardDataTree present", true,
2254                         leaderStateChanged.getLocalShardDataTree().isPresent());
2255                 assertSame("getLocalShardDataTree", shard.underlyingActor().getDataStore().getDataTree(),
2256                     leaderStateChanged.getLocalShardDataTree().get());
2257
2258                 MessageCollectorActor.clearMessages(listener);
2259
2260                 // Force a leader change
2261
2262                 shard.tell(new RequestVote(10000, "member2", 50, 50), getRef());
2263
2264                 leaderStateChanged = MessageCollectorActor.expectFirstMatching(listener,
2265                         ShardLeaderStateChanged.class);
2266                 assertEquals("getLocalShardDataTree present", false,
2267                         leaderStateChanged.getLocalShardDataTree().isPresent());
2268             }
2269         };
2270     }
2271
2272     @Test
2273     public void testFollowerInitialSyncStatus() throws Exception {
2274         final TestActorRef<Shard> shard = actorFactory.createTestActor(
2275                 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
2276                 "testFollowerInitialSyncStatus");
2277
2278         shard.underlyingActor().onReceiveCommand(new FollowerInitialSyncUpStatus(false, "member-1-shard-inventory-operational"));
2279
2280         assertEquals(false, shard.underlyingActor().getShardMBean().getFollowerInitialSyncStatus());
2281
2282         shard.underlyingActor().onReceiveCommand(new FollowerInitialSyncUpStatus(true, "member-1-shard-inventory-operational"));
2283
2284         assertEquals(true, shard.underlyingActor().getShardMBean().getFollowerInitialSyncStatus());
2285     }
2286
2287     @Test
2288     public void testClusteredDataChangeListenerDelayedRegistration() throws Exception {
2289         new ShardTestKit(getSystem()) {{
2290             String testName = "testClusteredDataChangeListenerDelayedRegistration";
2291             dataStoreContextBuilder.shardElectionTimeoutFactor(1000);
2292
2293             final MockDataChangeListener listener = new MockDataChangeListener(1);
2294             final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener),
2295                     actorFactory.generateActorId(testName + "-DataChangeListener"));
2296
2297             final TestActorRef<Shard> shard = actorFactory.createTestActor(
2298                     newShardBuilder().props().withDispatcher(Dispatchers.DefaultDispatcherId()),
2299                     actorFactory.generateActorId(testName + "-shard"));
2300
2301             waitUntilNoLeader(shard);
2302
2303             final YangInstanceIdentifier path = TestModel.TEST_PATH;
2304
2305             shard.tell(new RegisterChangeListener(path, dclActor, AsyncDataBroker.DataChangeScope.BASE, true), getRef());
2306             final RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
2307                 RegisterChangeListenerReply.class);
2308             assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
2309
2310             writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
2311
2312             shard.tell(new ElectionTimeout(), ActorRef.noSender());
2313
2314             listener.waitForChangeEvents();
2315         }};
2316     }
2317
2318     @Test
2319     public void testClusteredDataChangeListenerRegistration() throws Exception {
2320         new ShardTestKit(getSystem()) {{
2321             String testName = "testClusteredDataChangeListenerRegistration";
2322             final ShardIdentifier followerShardID = ShardIdentifier.builder().memberName(
2323                     actorFactory.generateActorId(testName + "-follower")).shardName("inventory").type("config").build();
2324
2325             final ShardIdentifier leaderShardID = ShardIdentifier.builder().memberName(
2326                     actorFactory.generateActorId(testName + "-leader")).shardName("inventory").type("config").build();
2327
2328             final TestActorRef<Shard> followerShard = actorFactory.createTestActor(
2329                     Shard.builder().id(followerShardID).
2330                         datastoreContext(dataStoreContextBuilder.shardElectionTimeoutFactor(1000).build()).
2331                         peerAddresses(Collections.singletonMap(leaderShardID.toString(),
2332                             "akka://test/user/" + leaderShardID.toString())).schemaContext(SCHEMA_CONTEXT).props().
2333                     withDispatcher(Dispatchers.DefaultDispatcherId()), followerShardID.toString());
2334
2335             final TestActorRef<Shard> leaderShard = actorFactory.createTestActor(
2336                     Shard.builder().id(leaderShardID).datastoreContext(newDatastoreContext()).
2337                         peerAddresses(Collections.singletonMap(followerShardID.toString(),
2338                             "akka://test/user/" + followerShardID.toString())).schemaContext(SCHEMA_CONTEXT).props().
2339                     withDispatcher(Dispatchers.DefaultDispatcherId()), leaderShardID.toString());
2340
2341             leaderShard.tell(new ElectionTimeout(), ActorRef.noSender());
2342             String leaderPath = waitUntilLeader(followerShard);
2343             assertEquals("Shard leader path", leaderShard.path().toString(), leaderPath);
2344
2345             final YangInstanceIdentifier path = TestModel.TEST_PATH;
2346             final MockDataChangeListener listener = new MockDataChangeListener(1);
2347             final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener),
2348                     actorFactory.generateActorId(testName + "-DataChangeListener"));
2349
2350             followerShard.tell(new RegisterChangeListener(path, dclActor, AsyncDataBroker.DataChangeScope.BASE, true), getRef());
2351             final RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
2352                 RegisterChangeListenerReply.class);
2353             assertNotNull("getListenerRegistratioznPath", reply.getListenerRegistrationPath());
2354
2355             writeToStore(followerShard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
2356
2357             listener.waitForChangeEvents();
2358         }};
2359     }
2360
2361     @Test
2362     public void testClusteredDataTreeChangeListenerDelayedRegistration() throws Exception {
2363         new ShardTestKit(getSystem()) {{
2364             String testName = "testClusteredDataTreeChangeListenerDelayedRegistration";
2365             dataStoreContextBuilder.shardElectionTimeoutFactor(1000);
2366
2367             final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
2368             final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener),
2369                     actorFactory.generateActorId(testName + "-DataTreeChangeListener"));
2370
2371             final TestActorRef<Shard> shard = actorFactory.createTestActor(
2372                     newShardBuilder().props().withDispatcher(Dispatchers.DefaultDispatcherId()),
2373                     actorFactory.generateActorId(testName + "-shard"));
2374
2375             waitUntilNoLeader(shard);
2376
2377             final YangInstanceIdentifier path = TestModel.TEST_PATH;
2378
2379             shard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor, true), getRef());
2380             final RegisterDataTreeChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
2381                     RegisterDataTreeChangeListenerReply.class);
2382             assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
2383
2384             writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
2385
2386             shard.tell(new ElectionTimeout(), ActorRef.noSender());
2387
2388             listener.waitForChangeEvents();
2389         }};
2390     }
2391
2392     @Test
2393     public void testClusteredDataTreeChangeListenerRegistration() throws Exception {
2394         new ShardTestKit(getSystem()) {{
2395             String testName = "testClusteredDataTreeChangeListenerRegistration";
2396             final ShardIdentifier followerShardID = ShardIdentifier.builder().memberName(
2397                     actorFactory.generateActorId(testName + "-follower")).shardName("inventory").type("config").build();
2398
2399             final ShardIdentifier leaderShardID = ShardIdentifier.builder().memberName(
2400                     actorFactory.generateActorId(testName + "-leader")).shardName("inventory").type("config").build();
2401
2402             final TestActorRef<Shard> followerShard = actorFactory.createTestActor(
2403                     Shard.builder().id(followerShardID).
2404                         datastoreContext(dataStoreContextBuilder.shardElectionTimeoutFactor(1000).build()).
2405                         peerAddresses(Collections.singletonMap(leaderShardID.toString(),
2406                             "akka://test/user/" + leaderShardID.toString())).schemaContext(SCHEMA_CONTEXT).props().
2407                     withDispatcher(Dispatchers.DefaultDispatcherId()), followerShardID.toString());
2408
2409             final TestActorRef<Shard> leaderShard = actorFactory.createTestActor(
2410                     Shard.builder().id(leaderShardID).datastoreContext(newDatastoreContext()).
2411                         peerAddresses(Collections.singletonMap(followerShardID.toString(),
2412                             "akka://test/user/" + followerShardID.toString())).schemaContext(SCHEMA_CONTEXT).props().
2413                     withDispatcher(Dispatchers.DefaultDispatcherId()), leaderShardID.toString());
2414
2415             leaderShard.tell(new ElectionTimeout(), ActorRef.noSender());
2416             String leaderPath = waitUntilLeader(followerShard);
2417             assertEquals("Shard leader path",&