Elide front-end 3PC for single-shard Tx
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / ShardTest.java
1 package org.opendaylight.controller.cluster.datastore;
2
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertFalse;
5 import static org.junit.Assert.assertNotNull;
6 import static org.junit.Assert.assertNull;
7 import static org.junit.Assert.assertTrue;
8 import static org.mockito.Mockito.doReturn;
9 import static org.mockito.Mockito.inOrder;
10 import static org.mockito.Mockito.mock;
11 import static org.mockito.Mockito.reset;
12 import static org.opendaylight.controller.cluster.datastore.DataStoreVersions.CURRENT_VERSION;
13 import akka.actor.ActorRef;
14 import akka.actor.ActorSelection;
15 import akka.actor.PoisonPill;
16 import akka.actor.Props;
17 import akka.dispatch.Dispatchers;
18 import akka.dispatch.OnComplete;
19 import akka.japi.Creator;
20 import akka.pattern.Patterns;
21 import akka.persistence.SaveSnapshotSuccess;
22 import akka.testkit.TestActorRef;
23 import akka.util.Timeout;
24 import com.google.common.base.Function;
25 import com.google.common.base.Optional;
26 import com.google.common.util.concurrent.Futures;
27 import com.google.common.util.concurrent.ListenableFuture;
28 import com.google.common.util.concurrent.MoreExecutors;
29 import com.google.common.util.concurrent.Uninterruptibles;
30 import java.io.IOException;
31 import java.util.Collections;
32 import java.util.HashSet;
33 import java.util.List;
34 import java.util.Map;
35 import java.util.Set;
36 import java.util.concurrent.CountDownLatch;
37 import java.util.concurrent.ExecutionException;
38 import java.util.concurrent.TimeUnit;
39 import java.util.concurrent.atomic.AtomicBoolean;
40 import java.util.concurrent.atomic.AtomicReference;
41 import org.junit.Test;
42 import org.mockito.InOrder;
43 import org.opendaylight.controller.cluster.DataPersistenceProvider;
44 import org.opendaylight.controller.cluster.DelegatingPersistentDataProvider;
45 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
46 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
47 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
48 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
49 import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
50 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
51 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
52 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
53 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
54 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
55 import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
56 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
57 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
58 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
59 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
60 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
61 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
62 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
63 import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
64 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
65 import org.opendaylight.controller.cluster.datastore.modification.Modification;
66 import org.opendaylight.controller.cluster.datastore.modification.ModificationPayload;
67 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
68 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
69 import org.opendaylight.controller.cluster.datastore.utils.MessageCollectorActor;
70 import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener;
71 import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
72 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
73 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListenerReply;
74 import org.opendaylight.controller.cluster.raft.RaftActorContext;
75 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
76 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
77 import org.opendaylight.controller.cluster.raft.Snapshot;
78 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
79 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
80 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
81 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
82 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
83 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
84 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
85 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
86 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
87 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
88 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
89 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
90 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
91 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
92 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreFactory;
93 import org.opendaylight.controller.protobuff.messages.cohort3pc.ThreePhaseCommitCohortMessages;
94 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
95 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
96 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
97 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
98 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
99 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
100 import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
101 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
102 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
103 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
104 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
105 import scala.concurrent.Await;
106 import scala.concurrent.Future;
107 import scala.concurrent.duration.FiniteDuration;
108
109 public class ShardTest extends AbstractShardTest {
110
111     @Test
112     public void testRegisterChangeListener() throws Exception {
113         new ShardTestKit(getSystem()) {{
114             TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
115                     newShardProps(),  "testRegisterChangeListener");
116
117             waitUntilLeader(shard);
118
119             shard.tell(new UpdateSchemaContext(SchemaContextHelper.full()), ActorRef.noSender());
120
121             MockDataChangeListener listener = new MockDataChangeListener(1);
122             ActorRef dclActor = getSystem().actorOf(DataChangeListener.props(listener),
123                     "testRegisterChangeListener-DataChangeListener");
124
125             shard.tell(new RegisterChangeListener(TestModel.TEST_PATH,
126                     dclActor, AsyncDataBroker.DataChangeScope.BASE), getRef());
127
128             RegisterChangeListenerReply reply = expectMsgClass(duration("3 seconds"),
129                     RegisterChangeListenerReply.class);
130             String replyPath = reply.getListenerRegistrationPath().toString();
131             assertTrue("Incorrect reply path: " + replyPath, replyPath.matches(
132                     "akka:\\/\\/test\\/user\\/testRegisterChangeListener\\/\\$.*"));
133
134             YangInstanceIdentifier path = TestModel.TEST_PATH;
135             writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
136
137             listener.waitForChangeEvents(path);
138
139             dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
140             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
141         }};
142     }
143
144     @SuppressWarnings("serial")
145     @Test
146     public void testChangeListenerNotifiedWhenNotTheLeaderOnRegistration() throws Exception {
147         // This test tests the timing window in which a change listener is registered before the
148         // shard becomes the leader. We verify that the listener is registered and notified of the
149         // existing data when the shard becomes the leader.
150         new ShardTestKit(getSystem()) {{
151             // For this test, we want to send the RegisterChangeListener message after the shard
152             // has recovered from persistence and before it becomes the leader. So we subclass
153             // Shard to override onReceiveCommand and, when the first ElectionTimeout is received,
154             // we know that the shard has been initialized to a follower and has started the
155             // election process. The following 2 CountDownLatches are used to coordinate the
156             // ElectionTimeout with the sending of the RegisterChangeListener message.
157             final CountDownLatch onFirstElectionTimeout = new CountDownLatch(1);
158             final CountDownLatch onChangeListenerRegistered = new CountDownLatch(1);
159             Creator<Shard> creator = new Creator<Shard>() {
160                 boolean firstElectionTimeout = true;
161
162                 @Override
163                 public Shard create() throws Exception {
164                     // Use a non persistent provider because this test actually invokes persist on the journal
165                     // this will cause all other messages to not be queued properly after that.
166                     // The basic issue is that you cannot use TestActorRef with a persistent actor (at least when
167                     // it does do a persist)
168                     return new Shard(shardID, Collections.<String,String>emptyMap(),
169                             dataStoreContextBuilder.persistent(false).build(), SCHEMA_CONTEXT) {
170                         @Override
171                         public void onReceiveCommand(final Object message) throws Exception {
172                             if(message instanceof ElectionTimeout && firstElectionTimeout) {
173                                 // Got the first ElectionTimeout. We don't forward it to the
174                                 // base Shard yet until we've sent the RegisterChangeListener
175                                 // message. So we signal the onFirstElectionTimeout latch to tell
176                                 // the main thread to send the RegisterChangeListener message and
177                                 // start a thread to wait on the onChangeListenerRegistered latch,
178                                 // which the main thread signals after it has sent the message.
179                                 // After the onChangeListenerRegistered is triggered, we send the
180                                 // original ElectionTimeout message to proceed with the election.
181                                 firstElectionTimeout = false;
182                                 final ActorRef self = getSelf();
183                                 new Thread() {
184                                     @Override
185                                     public void run() {
186                                         Uninterruptibles.awaitUninterruptibly(
187                                                 onChangeListenerRegistered, 5, TimeUnit.SECONDS);
188                                         self.tell(message, self);
189                                     }
190                                 }.start();
191
192                                 onFirstElectionTimeout.countDown();
193                             } else {
194                                 super.onReceiveCommand(message);
195                             }
196                         }
197                     };
198                 }
199             };
200
201             MockDataChangeListener listener = new MockDataChangeListener(1);
202             ActorRef dclActor = getSystem().actorOf(DataChangeListener.props(listener),
203                     "testRegisterChangeListenerWhenNotLeaderInitially-DataChangeListener");
204
205             TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
206                     Props.create(new DelegatingShardCreator(creator)),
207                     "testRegisterChangeListenerWhenNotLeaderInitially");
208
209             // Write initial data into the in-memory store.
210             YangInstanceIdentifier path = TestModel.TEST_PATH;
211             writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
212
213             // Wait until the shard receives the first ElectionTimeout message.
214             assertEquals("Got first ElectionTimeout", true,
215                     onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
216
217             // Now send the RegisterChangeListener and wait for the reply.
218             shard.tell(new RegisterChangeListener(path, dclActor,
219                     AsyncDataBroker.DataChangeScope.SUBTREE), getRef());
220
221             RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
222                     RegisterChangeListenerReply.class);
223             assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
224
225             // Sanity check - verify the shard is not the leader yet.
226             shard.tell(new FindLeader(), getRef());
227             FindLeaderReply findLeadeReply =
228                     expectMsgClass(duration("5 seconds"), FindLeaderReply.class);
229             assertNull("Expected the shard not to be the leader", findLeadeReply.getLeaderActor());
230
231             // Signal the onChangeListenerRegistered latch to tell the thread above to proceed
232             // with the election process.
233             onChangeListenerRegistered.countDown();
234
235             // Wait for the shard to become the leader and notify our listener with the existing
236             // data in the store.
237             listener.waitForChangeEvents(path);
238
239             dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
240             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
241         }};
242     }
243
244     @Test
245     public void testCreateTransaction(){
246         new ShardTestKit(getSystem()) {{
247             ActorRef shard = getSystem().actorOf(newShardProps(), "testCreateTransaction");
248
249             waitUntilLeader(shard);
250
251             shard.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
252
253             shard.tell(new CreateTransaction("txn-1",
254                     TransactionProxy.TransactionType.READ_ONLY.ordinal() ).toSerializable(), getRef());
255
256             CreateTransactionReply reply = expectMsgClass(duration("3 seconds"),
257                     CreateTransactionReply.class);
258
259             String path = reply.getTransactionActorPath().toString();
260             assertTrue("Unexpected transaction path " + path,
261                     path.contains("akka://test/user/testCreateTransaction/shard-txn-1"));
262
263             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
264         }};
265     }
266
267     @Test
268     public void testCreateTransactionOnChain(){
269         new ShardTestKit(getSystem()) {{
270             final ActorRef shard = getSystem().actorOf(newShardProps(), "testCreateTransactionOnChain");
271
272             waitUntilLeader(shard);
273
274             shard.tell(new CreateTransaction("txn-1",
275                     TransactionProxy.TransactionType.READ_ONLY.ordinal() , "foobar").toSerializable(),
276                     getRef());
277
278             CreateTransactionReply reply = expectMsgClass(duration("3 seconds"),
279                     CreateTransactionReply.class);
280
281             String path = reply.getTransactionActorPath().toString();
282             assertTrue("Unexpected transaction path " + path,
283                     path.contains("akka://test/user/testCreateTransactionOnChain/shard-txn-1"));
284
285             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
286         }};
287     }
288
289     @SuppressWarnings("serial")
290     @Test
291     public void testPeerAddressResolved() throws Exception {
292         new ShardTestKit(getSystem()) {{
293             final CountDownLatch recoveryComplete = new CountDownLatch(1);
294             class TestShard extends Shard {
295                 TestShard() {
296                     super(shardID, Collections.<String, String>singletonMap(shardID.toString(), null),
297                             newDatastoreContext(), SCHEMA_CONTEXT);
298                 }
299
300                 Map<String, String> getPeerAddresses() {
301                     return getRaftActorContext().getPeerAddresses();
302                 }
303
304                 @Override
305                 protected void onRecoveryComplete() {
306                     try {
307                         super.onRecoveryComplete();
308                     } finally {
309                         recoveryComplete.countDown();
310                     }
311                 }
312             }
313
314             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
315                     Props.create(new DelegatingShardCreator(new Creator<Shard>() {
316                         @Override
317                         public TestShard create() throws Exception {
318                             return new TestShard();
319                         }
320                     })), "testPeerAddressResolved");
321
322             //waitUntilLeader(shard);
323             assertEquals("Recovery complete", true,
324                     Uninterruptibles.awaitUninterruptibly(recoveryComplete, 5, TimeUnit.SECONDS));
325
326             String address = "akka://foobar";
327             shard.underlyingActor().onReceiveCommand(new PeerAddressResolved(shardID.toString(), address));
328
329             assertEquals("getPeerAddresses", address,
330                     ((TestShard)shard.underlyingActor()).getPeerAddresses().get(shardID.toString()));
331
332             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
333         }};
334     }
335
336     @Test
337     public void testApplySnapshot() throws Exception {
338         TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(),
339                 "testApplySnapshot");
340
341         InMemoryDOMDataStore store = new InMemoryDOMDataStore("OPER", MoreExecutors.sameThreadExecutor());
342         store.onGlobalContextUpdated(SCHEMA_CONTEXT);
343
344         writeToStore(store, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
345
346         YangInstanceIdentifier root = YangInstanceIdentifier.builder().build();
347         NormalizedNode<?,?> expected = readStore(store, root);
348
349         ApplySnapshot applySnapshot = new ApplySnapshot(Snapshot.create(
350                 SerializationUtils.serializeNormalizedNode(expected),
351                 Collections.<ReplicatedLogEntry>emptyList(), 1, 2, 3, 4));
352
353         shard.underlyingActor().onReceiveCommand(applySnapshot);
354
355         NormalizedNode<?,?> actual = readStore(shard, root);
356
357         assertEquals("Root node", expected, actual);
358
359         shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
360     }
361
362     @Test
363     public void testApplyState() throws Exception {
364
365         TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(), "testApplyState");
366
367         NormalizedNode<?, ?> node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
368
369         ApplyState applyState = new ApplyState(null, "test", new ReplicatedLogImplEntry(1, 2,
370                 newModificationPayload(new WriteModification(TestModel.TEST_PATH, node))));
371
372         shard.underlyingActor().onReceiveCommand(applyState);
373
374         NormalizedNode<?,?> actual = readStore(shard, TestModel.TEST_PATH);
375         assertEquals("Applied state", node, actual);
376
377         shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
378     }
379
380     @Test
381     public void testRecovery() throws Exception {
382
383         // Set up the InMemorySnapshotStore.
384
385         InMemoryDOMDataStore testStore = InMemoryDOMDataStoreFactory.create("Test", null, null);
386         testStore.onGlobalContextUpdated(SCHEMA_CONTEXT);
387
388         writeToStore(testStore, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
389
390         NormalizedNode<?, ?> root = readStore(testStore, YangInstanceIdentifier.builder().build());
391
392         InMemorySnapshotStore.addSnapshot(shardID.toString(), Snapshot.create(
393                 SerializationUtils.serializeNormalizedNode(root),
394                 Collections.<ReplicatedLogEntry>emptyList(), 0, 1, -1, -1));
395
396         // Set up the InMemoryJournal.
397
398         InMemoryJournal.addEntry(shardID.toString(), 0, new ReplicatedLogImplEntry(0, 1, newModificationPayload(
399                   new WriteModification(TestModel.OUTER_LIST_PATH,
400                           ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build()))));
401
402         int nListEntries = 16;
403         Set<Integer> listEntryKeys = new HashSet<>();
404
405         // Add some ModificationPayload entries
406         for(int i = 1; i <= nListEntries; i++) {
407             listEntryKeys.add(Integer.valueOf(i));
408             YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
409                     .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
410             Modification mod = new MergeModification(path,
411                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i));
412             InMemoryJournal.addEntry(shardID.toString(), i, new ReplicatedLogImplEntry(i, 1,
413                     newModificationPayload(mod)));
414         }
415
416         InMemoryJournal.addEntry(shardID.toString(), nListEntries + 1,
417                 new ApplyJournalEntries(nListEntries));
418
419         testRecovery(listEntryKeys);
420     }
421
422     private ModificationPayload newModificationPayload(final Modification... mods) throws IOException {
423         MutableCompositeModification compMod = new MutableCompositeModification();
424         for(Modification mod: mods) {
425             compMod.addModification(mod);
426         }
427
428         return new ModificationPayload(compMod);
429     }
430
431     @SuppressWarnings({ "unchecked" })
432     @Test
433     public void testConcurrentThreePhaseCommits() throws Throwable {
434         new ShardTestKit(getSystem()) {{
435             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
436                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
437                     "testConcurrentThreePhaseCommits");
438
439             waitUntilLeader(shard);
440
441          // Setup 3 simulated transactions with mock cohorts backed by real cohorts.
442
443             InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
444
445             String transactionID1 = "tx1";
446             MutableCompositeModification modification1 = new MutableCompositeModification();
447             DOMStoreThreePhaseCommitCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
448                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
449
450             String transactionID2 = "tx2";
451             MutableCompositeModification modification2 = new MutableCompositeModification();
452             DOMStoreThreePhaseCommitCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
453                     TestModel.OUTER_LIST_PATH,
454                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
455                     modification2);
456
457             String transactionID3 = "tx3";
458             MutableCompositeModification modification3 = new MutableCompositeModification();
459             DOMStoreThreePhaseCommitCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
460                     YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
461                         .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
462                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1),
463                     modification3);
464
465             long timeoutSec = 5;
466             final FiniteDuration duration = FiniteDuration.create(timeoutSec, TimeUnit.SECONDS);
467             final Timeout timeout = new Timeout(duration);
468
469             // Simulate the ForwardedReadyTransaction message for the first Tx that would be sent
470             // by the ShardTransaction.
471
472             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
473                     cohort1, modification1, true, false), getRef());
474             ReadyTransactionReply readyReply = ReadyTransactionReply.fromSerializable(
475                     expectMsgClass(duration, ReadyTransactionReply.class));
476             assertEquals("Cohort path", shard.path().toString(), readyReply.getCohortPath());
477
478             // Send the CanCommitTransaction message for the first Tx.
479
480             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
481             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
482                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
483             assertEquals("Can commit", true, canCommitReply.getCanCommit());
484
485             // Send the ForwardedReadyTransaction for the next 2 Tx's.
486
487             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
488                     cohort2, modification2, true, false), getRef());
489             expectMsgClass(duration, ReadyTransactionReply.class);
490
491             shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
492                     cohort3, modification3, true, false), getRef());
493             expectMsgClass(duration, ReadyTransactionReply.class);
494
495             // Send the CanCommitTransaction message for the next 2 Tx's. These should get queued and
496             // processed after the first Tx completes.
497
498             Future<Object> canCommitFuture1 = Patterns.ask(shard,
499                     new CanCommitTransaction(transactionID2).toSerializable(), timeout);
500
501             Future<Object> canCommitFuture2 = Patterns.ask(shard,
502                     new CanCommitTransaction(transactionID3).toSerializable(), timeout);
503
504             // Send the CommitTransaction message for the first Tx. After it completes, it should
505             // trigger the 2nd Tx to proceed which should in turn then trigger the 3rd.
506
507             shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
508             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
509
510             // Wait for the next 2 Tx's to complete.
511
512             final AtomicReference<Throwable> caughtEx = new AtomicReference<>();
513             final CountDownLatch commitLatch = new CountDownLatch(2);
514
515             class OnFutureComplete extends OnComplete<Object> {
516                 private final Class<?> expRespType;
517
518                 OnFutureComplete(final Class<?> expRespType) {
519                     this.expRespType = expRespType;
520                 }
521
522                 @Override
523                 public void onComplete(final Throwable error, final Object resp) {
524                     if(error != null) {
525                         caughtEx.set(new AssertionError(getClass().getSimpleName() + " failure", error));
526                     } else {
527                         try {
528                             assertEquals("Commit response type", expRespType, resp.getClass());
529                             onSuccess(resp);
530                         } catch (Exception e) {
531                             caughtEx.set(e);
532                         }
533                     }
534                 }
535
536                 void onSuccess(final Object resp) throws Exception {
537                 }
538             }
539
540             class OnCommitFutureComplete extends OnFutureComplete {
541                 OnCommitFutureComplete() {
542                     super(CommitTransactionReply.SERIALIZABLE_CLASS);
543                 }
544
545                 @Override
546                 public void onComplete(final Throwable error, final Object resp) {
547                     super.onComplete(error, resp);
548                     commitLatch.countDown();
549                 }
550             }
551
552             class OnCanCommitFutureComplete extends OnFutureComplete {
553                 private final String transactionID;
554
555                 OnCanCommitFutureComplete(final String transactionID) {
556                     super(CanCommitTransactionReply.SERIALIZABLE_CLASS);
557                     this.transactionID = transactionID;
558                 }
559
560                 @Override
561                 void onSuccess(final Object resp) throws Exception {
562                     CanCommitTransactionReply canCommitReply =
563                             CanCommitTransactionReply.fromSerializable(resp);
564                     assertEquals("Can commit", true, canCommitReply.getCanCommit());
565
566                     Future<Object> commitFuture = Patterns.ask(shard,
567                             new CommitTransaction(transactionID).toSerializable(), timeout);
568                     commitFuture.onComplete(new OnCommitFutureComplete(), getSystem().dispatcher());
569                 }
570             }
571
572             canCommitFuture1.onComplete(new OnCanCommitFutureComplete(transactionID2),
573                     getSystem().dispatcher());
574
575             canCommitFuture2.onComplete(new OnCanCommitFutureComplete(transactionID3),
576                     getSystem().dispatcher());
577
578             boolean done = commitLatch.await(timeoutSec, TimeUnit.SECONDS);
579
580             if(caughtEx.get() != null) {
581                 throw caughtEx.get();
582             }
583
584             assertEquals("Commits complete", true, done);
585
586             InOrder inOrder = inOrder(cohort1, cohort2, cohort3);
587             inOrder.verify(cohort1).canCommit();
588             inOrder.verify(cohort1).preCommit();
589             inOrder.verify(cohort1).commit();
590             inOrder.verify(cohort2).canCommit();
591             inOrder.verify(cohort2).preCommit();
592             inOrder.verify(cohort2).commit();
593             inOrder.verify(cohort3).canCommit();
594             inOrder.verify(cohort3).preCommit();
595             inOrder.verify(cohort3).commit();
596
597             // Verify data in the data store.
598
599             verifyOuterListEntry(shard, 1);
600
601             verifyLastApplied(shard, 2);
602
603             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
604         }};
605     }
606
607     private BatchedModifications newBatchedModifications(String transactionID, YangInstanceIdentifier path,
608             NormalizedNode<?, ?> data, boolean ready, boolean doCommitOnReady) {
609         return newBatchedModifications(transactionID, null, path, data, ready, doCommitOnReady);
610     }
611
612     private BatchedModifications newBatchedModifications(String transactionID, String transactionChainID,
613             YangInstanceIdentifier path, NormalizedNode<?, ?> data, boolean ready, boolean doCommitOnReady) {
614         BatchedModifications batched = new BatchedModifications(transactionID, CURRENT_VERSION, transactionChainID);
615         batched.addModification(new WriteModification(path, data));
616         batched.setReady(ready);
617         batched.setDoCommitOnReady(doCommitOnReady);
618         return batched;
619     }
620
621     @Test
622     public void testBatchedModificationsWithNoCommitOnReady() throws Throwable {
623         new ShardTestKit(getSystem()) {{
624             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
625                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
626                     "testBatchedModificationsWithNoCommitOnReady");
627
628             waitUntilLeader(shard);
629
630             final String transactionID = "tx";
631             FiniteDuration duration = duration("5 seconds");
632
633             final AtomicReference<DOMStoreThreePhaseCommitCohort> mockCohort = new AtomicReference<>();
634             ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() {
635                 @Override
636                 public DOMStoreThreePhaseCommitCohort decorate(String txID, DOMStoreThreePhaseCommitCohort actual) {
637                     if(mockCohort.get() == null) {
638                         mockCohort.set(createDelegatingMockCohort("cohort", actual));
639                     }
640
641                     return mockCohort.get();
642                 }
643             };
644
645             shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator);
646
647             // Send a BatchedModifications to start a transaction.
648
649             shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH,
650                     ImmutableNodes.containerNode(TestModel.TEST_QNAME), false, false), getRef());
651             expectMsgClass(duration, BatchedModificationsReply.class);
652
653             // Send a couple more BatchedModifications.
654
655             shard.tell(newBatchedModifications(transactionID, TestModel.OUTER_LIST_PATH,
656                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), false, false), getRef());
657             expectMsgClass(duration, BatchedModificationsReply.class);
658
659             shard.tell(newBatchedModifications(transactionID, YangInstanceIdentifier.builder(
660                     TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
661                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true, false), getRef());
662             expectMsgClass(duration, ReadyTransactionReply.class);
663
664             // Send the CanCommitTransaction message.
665
666             shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
667             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
668                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
669             assertEquals("Can commit", true, canCommitReply.getCanCommit());
670
671             // Send the CanCommitTransaction message.
672
673             shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
674             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
675
676             InOrder inOrder = inOrder(mockCohort.get());
677             inOrder.verify(mockCohort.get()).canCommit();
678             inOrder.verify(mockCohort.get()).preCommit();
679             inOrder.verify(mockCohort.get()).commit();
680
681             // Verify data in the data store.
682
683             verifyOuterListEntry(shard, 1);
684
685             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
686         }};
687     }
688
689     @Test
690     public void testBatchedModificationsWithCommitOnReady() throws Throwable {
691         new ShardTestKit(getSystem()) {{
692             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
693                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
694                     "testBatchedModificationsWithCommitOnReady");
695
696             waitUntilLeader(shard);
697
698             final String transactionID = "tx";
699             FiniteDuration duration = duration("5 seconds");
700
701             final AtomicReference<DOMStoreThreePhaseCommitCohort> mockCohort = new AtomicReference<>();
702             ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() {
703                 @Override
704                 public DOMStoreThreePhaseCommitCohort decorate(String txID, DOMStoreThreePhaseCommitCohort actual) {
705                     if(mockCohort.get() == null) {
706                         mockCohort.set(createDelegatingMockCohort("cohort", actual));
707                     }
708
709                     return mockCohort.get();
710                 }
711             };
712
713             shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator);
714
715             // Send a BatchedModifications to start a transaction.
716
717             shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH,
718                     ImmutableNodes.containerNode(TestModel.TEST_QNAME), false, false), getRef());
719             expectMsgClass(duration, BatchedModificationsReply.class);
720
721             // Send a couple more BatchedModifications.
722
723             shard.tell(newBatchedModifications(transactionID, TestModel.OUTER_LIST_PATH,
724                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), false, false), getRef());
725             expectMsgClass(duration, BatchedModificationsReply.class);
726
727             shard.tell(newBatchedModifications(transactionID, YangInstanceIdentifier.builder(
728                     TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
729                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true, true), getRef());
730
731             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
732
733             InOrder inOrder = inOrder(mockCohort.get());
734             inOrder.verify(mockCohort.get()).canCommit();
735             inOrder.verify(mockCohort.get()).preCommit();
736             inOrder.verify(mockCohort.get()).commit();
737
738             // Verify data in the data store.
739
740             verifyOuterListEntry(shard, 1);
741
742             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
743         }};
744     }
745
746     @SuppressWarnings("unchecked")
747     private void verifyOuterListEntry(final TestActorRef<Shard> shard, Object expIDValue) throws Exception {
748         NormalizedNode<?, ?> outerList = readStore(shard, TestModel.OUTER_LIST_PATH);
749         assertNotNull(TestModel.OUTER_LIST_QNAME.getLocalName() + " not found", outerList);
750         assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " value is not Iterable",
751                 outerList.getValue() instanceof Iterable);
752         Object entry = ((Iterable<Object>)outerList.getValue()).iterator().next();
753         assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " entry is not MapEntryNode",
754                 entry instanceof MapEntryNode);
755         MapEntryNode mapEntry = (MapEntryNode)entry;
756         Optional<DataContainerChild<? extends PathArgument, ?>> idLeaf =
757                 mapEntry.getChild(new YangInstanceIdentifier.NodeIdentifier(TestModel.ID_QNAME));
758         assertTrue("Missing leaf " + TestModel.ID_QNAME.getLocalName(), idLeaf.isPresent());
759         assertEquals(TestModel.ID_QNAME.getLocalName() + " value", expIDValue, idLeaf.get().getValue());
760     }
761
762     @Test
763     public void testBatchedModificationsOnTransactionChain() throws Throwable {
764         new ShardTestKit(getSystem()) {{
765             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
766                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
767                     "testBatchedModificationsOnTransactionChain");
768
769             waitUntilLeader(shard);
770
771             String transactionChainID = "txChain";
772             String transactionID1 = "tx1";
773             String transactionID2 = "tx2";
774
775             FiniteDuration duration = duration("5 seconds");
776
777             // Send a BatchedModifications to start a chained write transaction and ready it.
778
779             ContainerNode containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
780             YangInstanceIdentifier path = TestModel.TEST_PATH;
781             shard.tell(newBatchedModifications(transactionID1, transactionChainID, path,
782                     containerNode, true, false), getRef());
783             expectMsgClass(duration, ReadyTransactionReply.class);
784
785             // Create a read Tx on the same chain.
786
787             shard.tell(new CreateTransaction(transactionID2, TransactionProxy.TransactionType.READ_ONLY.ordinal() ,
788                     transactionChainID).toSerializable(), getRef());
789
790             CreateTransactionReply createReply = expectMsgClass(duration("3 seconds"), CreateTransactionReply.class);
791
792             getSystem().actorSelection(createReply.getTransactionActorPath()).tell(new ReadData(path), getRef());
793             ReadDataReply readReply = expectMsgClass(duration("3 seconds"), ReadDataReply.class);
794             assertEquals("Read node", containerNode, readReply.getNormalizedNode());
795
796             // Commit the write transaction.
797
798             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
799             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
800                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
801             assertEquals("Can commit", true, canCommitReply.getCanCommit());
802
803             shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
804             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
805
806             // Verify data in the data store.
807
808             NormalizedNode<?, ?> actualNode = readStore(shard, path);
809             assertEquals("Stored node", containerNode, actualNode);
810
811             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
812         }};
813     }
814
815     @Test
816     public void testOnBatchedModificationsWhenNotLeader() {
817         final AtomicBoolean overrideLeaderCalls = new AtomicBoolean();
818         new ShardTestKit(getSystem()) {{
819             Creator<Shard> creator = new Creator<Shard>() {
820                 @Override
821                 public Shard create() throws Exception {
822                     return new Shard(shardID, Collections.<String,String>emptyMap(),
823                             newDatastoreContext(), SCHEMA_CONTEXT) {
824                         @Override
825                         protected boolean isLeader() {
826                             return overrideLeaderCalls.get() ? false : super.isLeader();
827                         }
828
829                         @Override
830                         protected ActorSelection getLeader() {
831                             return overrideLeaderCalls.get() ? getSystem().actorSelection(getRef().path()) :
832                                 super.getLeader();
833                         }
834                     };
835                 }
836             };
837
838             TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
839                     Props.create(new DelegatingShardCreator(creator)), "testOnBatchedModificationsWhenNotLeader");
840
841             waitUntilLeader(shard);
842
843             overrideLeaderCalls.set(true);
844
845             BatchedModifications batched = new BatchedModifications("tx", DataStoreVersions.CURRENT_VERSION, "");
846
847             shard.tell(batched, ActorRef.noSender());
848
849             expectMsgEquals(batched);
850
851             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
852         }};
853     }
854
855     @Test
856     public void testForwardedReadyTransactionWithImmediateCommit() throws Exception{
857         new ShardTestKit(getSystem()) {{
858             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
859                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
860                     "testForwardedReadyTransactionWithImmediateCommit");
861
862             waitUntilLeader(shard);
863
864             InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
865
866             String transactionID = "tx1";
867             MutableCompositeModification modification = new MutableCompositeModification();
868             NormalizedNode<?, ?> containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
869             DOMStoreThreePhaseCommitCohort cohort = setupMockWriteTransaction("cohort", dataStore,
870                     TestModel.TEST_PATH, containerNode, modification);
871
872             FiniteDuration duration = duration("5 seconds");
873
874             // Simulate the ForwardedReadyTransaction messages that would be sent
875             // by the ShardTransaction.
876
877             shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
878                     cohort, modification, true, true), getRef());
879
880             expectMsgClass(duration, ThreePhaseCommitCohortMessages.CommitTransactionReply.class);
881
882             InOrder inOrder = inOrder(cohort);
883             inOrder.verify(cohort).canCommit();
884             inOrder.verify(cohort).preCommit();
885             inOrder.verify(cohort).commit();
886
887             NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.TEST_PATH);
888             assertEquals(TestModel.TEST_QNAME.getLocalName(), containerNode, actualNode);
889
890             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
891         }};
892     }
893
894     @Test
895     public void testCommitWithPersistenceDisabled() throws Throwable {
896         dataStoreContextBuilder.persistent(false);
897         new ShardTestKit(getSystem()) {{
898             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
899                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
900                     "testCommitWithPersistenceDisabled");
901
902             waitUntilLeader(shard);
903
904             InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
905
906             // Setup a simulated transactions with a mock cohort.
907
908             String transactionID = "tx";
909             MutableCompositeModification modification = new MutableCompositeModification();
910             NormalizedNode<?, ?> containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
911             DOMStoreThreePhaseCommitCohort cohort = setupMockWriteTransaction("cohort", dataStore,
912                     TestModel.TEST_PATH, containerNode, modification);
913
914             FiniteDuration duration = duration("5 seconds");
915
916             // Simulate the ForwardedReadyTransaction messages that would be sent
917             // by the ShardTransaction.
918
919             shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
920                     cohort, modification, true, false), getRef());
921             expectMsgClass(duration, ReadyTransactionReply.class);
922
923             // Send the CanCommitTransaction message.
924
925             shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
926             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
927                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
928             assertEquals("Can commit", true, canCommitReply.getCanCommit());
929
930             // Send the CanCommitTransaction message.
931
932             shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
933             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
934
935             InOrder inOrder = inOrder(cohort);
936             inOrder.verify(cohort).canCommit();
937             inOrder.verify(cohort).preCommit();
938             inOrder.verify(cohort).commit();
939
940             NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.TEST_PATH);
941             assertEquals(TestModel.TEST_QNAME.getLocalName(), containerNode, actualNode);
942
943             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
944         }};
945     }
946
947     @Test
948     public void testCommitWhenTransactionHasNoModifications(){
949         // Note that persistence is enabled which would normally result in the entry getting written to the journal
950         // but here that need not happen
951         new ShardTestKit(getSystem()) {
952             {
953                 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
954                         newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
955                         "testCommitWhenTransactionHasNoModifications");
956
957                 waitUntilLeader(shard);
958
959                 String transactionID = "tx1";
960                 MutableCompositeModification modification = new MutableCompositeModification();
961                 DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
962                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
963                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).preCommit();
964                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).commit();
965
966                 FiniteDuration duration = duration("5 seconds");
967
968                 // Simulate the ForwardedReadyTransaction messages that would be sent
969                 // by the ShardTransaction.
970
971                 shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
972                         cohort, modification, true, false), getRef());
973                 expectMsgClass(duration, ReadyTransactionReply.class);
974
975                 // Send the CanCommitTransaction message.
976
977                 shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
978                 CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
979                         expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
980                 assertEquals("Can commit", true, canCommitReply.getCanCommit());
981
982                 shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
983                 expectMsgClass(duration, ThreePhaseCommitCohortMessages.CommitTransactionReply.class);
984
985                 InOrder inOrder = inOrder(cohort);
986                 inOrder.verify(cohort).canCommit();
987                 inOrder.verify(cohort).preCommit();
988                 inOrder.verify(cohort).commit();
989
990                 // Use MBean for verification
991                 // Committed transaction count should increase as usual
992                 assertEquals(1,shard.underlyingActor().getShardMBean().getCommittedTransactionsCount());
993
994                 // Commit index should not advance because this does not go into the journal
995                 assertEquals(-1, shard.underlyingActor().getShardMBean().getCommitIndex());
996
997                 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
998
999             }
1000         };
1001     }
1002
1003     @Test
1004     public void testCommitWhenTransactionHasModifications(){
1005         new ShardTestKit(getSystem()) {
1006             {
1007                 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1008                         newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1009                         "testCommitWhenTransactionHasModifications");
1010
1011                 waitUntilLeader(shard);
1012
1013                 String transactionID = "tx1";
1014                 MutableCompositeModification modification = new MutableCompositeModification();
1015                 modification.addModification(new DeleteModification(YangInstanceIdentifier.builder().build()));
1016                 DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
1017                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1018                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).preCommit();
1019                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).commit();
1020
1021                 FiniteDuration duration = duration("5 seconds");
1022
1023                 // Simulate the ForwardedReadyTransaction messages that would be sent
1024                 // by the ShardTransaction.
1025
1026                 shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
1027                         cohort, modification, true, false), getRef());
1028                 expectMsgClass(duration, ReadyTransactionReply.class);
1029
1030                 // Send the CanCommitTransaction message.
1031
1032                 shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
1033                 CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1034                         expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1035                 assertEquals("Can commit", true, canCommitReply.getCanCommit());
1036
1037                 shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
1038                 expectMsgClass(duration, ThreePhaseCommitCohortMessages.CommitTransactionReply.class);
1039
1040                 InOrder inOrder = inOrder(cohort);
1041                 inOrder.verify(cohort).canCommit();
1042                 inOrder.verify(cohort).preCommit();
1043                 inOrder.verify(cohort).commit();
1044
1045                 // Use MBean for verification
1046                 // Committed transaction count should increase as usual
1047                 assertEquals(1, shard.underlyingActor().getShardMBean().getCommittedTransactionsCount());
1048
1049                 // Commit index should advance as we do not have an empty modification
1050                 assertEquals(0, shard.underlyingActor().getShardMBean().getCommitIndex());
1051
1052                 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1053
1054             }
1055         };
1056     }
1057
1058     @Test
1059     public void testCommitPhaseFailure() throws Throwable {
1060         new ShardTestKit(getSystem()) {{
1061             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1062                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1063                     "testCommitPhaseFailure");
1064
1065             waitUntilLeader(shard);
1066
1067             // Setup 2 simulated transactions with mock cohorts. The first one fails in the
1068             // commit phase.
1069
1070             String transactionID1 = "tx1";
1071             MutableCompositeModification modification1 = new MutableCompositeModification();
1072             DOMStoreThreePhaseCommitCohort cohort1 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
1073             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
1074             doReturn(Futures.immediateFuture(null)).when(cohort1).preCommit();
1075             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort1).commit();
1076
1077             String transactionID2 = "tx2";
1078             MutableCompositeModification modification2 = new MutableCompositeModification();
1079             DOMStoreThreePhaseCommitCohort cohort2 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort2");
1080             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
1081
1082             FiniteDuration duration = duration("5 seconds");
1083             final Timeout timeout = new Timeout(duration);
1084
1085             // Simulate the ForwardedReadyTransaction messages that would be sent
1086             // by the ShardTransaction.
1087
1088             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1089                     cohort1, modification1, true, false), getRef());
1090             expectMsgClass(duration, ReadyTransactionReply.class);
1091
1092             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1093                     cohort2, modification2, true, false), getRef());
1094             expectMsgClass(duration, ReadyTransactionReply.class);
1095
1096             // Send the CanCommitTransaction message for the first Tx.
1097
1098             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1099             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1100                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1101             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1102
1103             // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
1104             // processed after the first Tx completes.
1105
1106             Future<Object> canCommitFuture = Patterns.ask(shard,
1107                     new CanCommitTransaction(transactionID2).toSerializable(), timeout);
1108
1109             // Send the CommitTransaction message for the first Tx. This should send back an error
1110             // and trigger the 2nd Tx to proceed.
1111
1112             shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
1113             expectMsgClass(duration, akka.actor.Status.Failure.class);
1114
1115             // Wait for the 2nd Tx to complete the canCommit phase.
1116
1117             final CountDownLatch latch = new CountDownLatch(1);
1118             canCommitFuture.onComplete(new OnComplete<Object>() {
1119                 @Override
1120                 public void onComplete(final Throwable t, final Object resp) {
1121                     latch.countDown();
1122                 }
1123             }, getSystem().dispatcher());
1124
1125             assertEquals("2nd CanCommit complete", true, latch.await(5, TimeUnit.SECONDS));
1126
1127             InOrder inOrder = inOrder(cohort1, cohort2);
1128             inOrder.verify(cohort1).canCommit();
1129             inOrder.verify(cohort1).preCommit();
1130             inOrder.verify(cohort1).commit();
1131             inOrder.verify(cohort2).canCommit();
1132
1133             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1134         }};
1135     }
1136
1137     @Test
1138     public void testPreCommitPhaseFailure() throws Throwable {
1139         new ShardTestKit(getSystem()) {{
1140             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1141                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1142                     "testPreCommitPhaseFailure");
1143
1144             waitUntilLeader(shard);
1145
1146             String transactionID1 = "tx1";
1147             MutableCompositeModification modification1 = new MutableCompositeModification();
1148             DOMStoreThreePhaseCommitCohort cohort1 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
1149             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
1150             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort1).preCommit();
1151
1152             String transactionID2 = "tx2";
1153             MutableCompositeModification modification2 = new MutableCompositeModification();
1154             DOMStoreThreePhaseCommitCohort cohort2 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort2");
1155             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
1156
1157             FiniteDuration duration = duration("5 seconds");
1158             final Timeout timeout = new Timeout(duration);
1159
1160             // Simulate the ForwardedReadyTransaction messages that would be sent
1161             // by the ShardTransaction.
1162
1163             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1164                     cohort1, modification1, true, false), getRef());
1165             expectMsgClass(duration, ReadyTransactionReply.class);
1166
1167             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1168                     cohort2, modification2, true, false), getRef());
1169             expectMsgClass(duration, ReadyTransactionReply.class);
1170
1171             // Send the CanCommitTransaction message for the first Tx.
1172
1173             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1174             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1175                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1176             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1177
1178             // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
1179             // processed after the first Tx completes.
1180
1181             Future<Object> canCommitFuture = Patterns.ask(shard,
1182                     new CanCommitTransaction(transactionID2).toSerializable(), timeout);
1183
1184             // Send the CommitTransaction message for the first Tx. This should send back an error
1185             // and trigger the 2nd Tx to proceed.
1186
1187             shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
1188             expectMsgClass(duration, akka.actor.Status.Failure.class);
1189
1190             // Wait for the 2nd Tx to complete the canCommit phase.
1191
1192             final CountDownLatch latch = new CountDownLatch(1);
1193             canCommitFuture.onComplete(new OnComplete<Object>() {
1194                 @Override
1195                 public void onComplete(final Throwable t, final Object resp) {
1196                     latch.countDown();
1197                 }
1198             }, getSystem().dispatcher());
1199
1200             assertEquals("2nd CanCommit complete", true, latch.await(5, TimeUnit.SECONDS));
1201
1202             InOrder inOrder = inOrder(cohort1, cohort2);
1203             inOrder.verify(cohort1).canCommit();
1204             inOrder.verify(cohort1).preCommit();
1205             inOrder.verify(cohort2).canCommit();
1206
1207             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1208         }};
1209     }
1210
1211     @Test
1212     public void testCanCommitPhaseFailure() throws Throwable {
1213         new ShardTestKit(getSystem()) {{
1214             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1215                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1216                     "testCanCommitPhaseFailure");
1217
1218             waitUntilLeader(shard);
1219
1220             final FiniteDuration duration = duration("5 seconds");
1221
1222             String transactionID1 = "tx1";
1223             MutableCompositeModification modification = new MutableCompositeModification();
1224             DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
1225             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).canCommit();
1226
1227             // Simulate the ForwardedReadyTransaction messages that would be sent
1228             // by the ShardTransaction.
1229
1230             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1231                     cohort, modification, true, false), getRef());
1232             expectMsgClass(duration, ReadyTransactionReply.class);
1233
1234             // Send the CanCommitTransaction message.
1235
1236             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1237             expectMsgClass(duration, akka.actor.Status.Failure.class);
1238
1239             // Send another can commit to ensure the failed one got cleaned up.
1240
1241             reset(cohort);
1242
1243             String transactionID2 = "tx2";
1244             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1245
1246             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1247                     cohort, modification, true, false), getRef());
1248             expectMsgClass(duration, ReadyTransactionReply.class);
1249
1250             shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
1251             CanCommitTransactionReply reply = CanCommitTransactionReply.fromSerializable(
1252                     expectMsgClass(CanCommitTransactionReply.SERIALIZABLE_CLASS));
1253             assertEquals("getCanCommit", true, reply.getCanCommit());
1254
1255             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1256         }};
1257     }
1258
1259     @Test
1260     public void testCanCommitPhaseFalseResponse() throws Throwable {
1261         new ShardTestKit(getSystem()) {{
1262             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1263                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1264                     "testCanCommitPhaseFalseResponse");
1265
1266             waitUntilLeader(shard);
1267
1268             final FiniteDuration duration = duration("5 seconds");
1269
1270             String transactionID1 = "tx1";
1271             MutableCompositeModification modification = new MutableCompositeModification();
1272             DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
1273             doReturn(Futures.immediateFuture(Boolean.FALSE)).when(cohort).canCommit();
1274
1275             // Simulate the ForwardedReadyTransaction messages that would be sent
1276             // by the ShardTransaction.
1277
1278             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1279                     cohort, modification, true, false), getRef());
1280             expectMsgClass(duration, ReadyTransactionReply.class);
1281
1282             // Send the CanCommitTransaction message.
1283
1284             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1285             CanCommitTransactionReply reply = CanCommitTransactionReply.fromSerializable(
1286                     expectMsgClass(CanCommitTransactionReply.SERIALIZABLE_CLASS));
1287             assertEquals("getCanCommit", false, reply.getCanCommit());
1288
1289             // Send another can commit to ensure the failed one got cleaned up.
1290
1291             reset(cohort);
1292
1293             String transactionID2 = "tx2";
1294             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1295
1296             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1297                     cohort, modification, true, false), getRef());
1298             expectMsgClass(duration, ReadyTransactionReply.class);
1299
1300             shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
1301             reply = CanCommitTransactionReply.fromSerializable(
1302                     expectMsgClass(CanCommitTransactionReply.SERIALIZABLE_CLASS));
1303             assertEquals("getCanCommit", true, reply.getCanCommit());
1304
1305             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1306         }};
1307     }
1308
1309     @Test
1310     public void testImmediateCommitWithCanCommitPhaseFailure() throws Throwable {
1311         new ShardTestKit(getSystem()) {{
1312             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1313                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1314                     "testImmediateCommitWithCanCommitPhaseFailure");
1315
1316             waitUntilLeader(shard);
1317
1318             final FiniteDuration duration = duration("5 seconds");
1319
1320             String transactionID1 = "tx1";
1321             MutableCompositeModification modification = new MutableCompositeModification();
1322             DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
1323             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).canCommit();
1324
1325             // Simulate the ForwardedReadyTransaction messages that would be sent
1326             // by the ShardTransaction.
1327
1328             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1329                     cohort, modification, true, true), getRef());
1330
1331             expectMsgClass(duration, akka.actor.Status.Failure.class);
1332
1333             // Send another can commit to ensure the failed one got cleaned up.
1334
1335             reset(cohort);
1336
1337             String transactionID2 = "tx2";
1338             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1339             doReturn(Futures.immediateFuture(null)).when(cohort).preCommit();
1340             doReturn(Futures.immediateFuture(null)).when(cohort).commit();
1341
1342             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1343                     cohort, modification, true, true), getRef());
1344
1345             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1346
1347             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1348         }};
1349     }
1350
1351     @Test
1352     public void testImmediateCommitWithCanCommitPhaseFalseResponse() throws Throwable {
1353         new ShardTestKit(getSystem()) {{
1354             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1355                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1356                     "testImmediateCommitWithCanCommitPhaseFalseResponse");
1357
1358             waitUntilLeader(shard);
1359
1360             final FiniteDuration duration = duration("5 seconds");
1361
1362             String transactionID = "tx1";
1363             MutableCompositeModification modification = new MutableCompositeModification();
1364             DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
1365             doReturn(Futures.immediateFuture(Boolean.FALSE)).when(cohort).canCommit();
1366
1367             // Simulate the ForwardedReadyTransaction messages that would be sent
1368             // by the ShardTransaction.
1369
1370             shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
1371                     cohort, modification, true, true), getRef());
1372
1373             expectMsgClass(duration, akka.actor.Status.Failure.class);
1374
1375             // Send another can commit to ensure the failed one got cleaned up.
1376
1377             reset(cohort);
1378
1379             String transactionID2 = "tx2";
1380             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1381             doReturn(Futures.immediateFuture(null)).when(cohort).preCommit();
1382             doReturn(Futures.immediateFuture(null)).when(cohort).commit();
1383
1384             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1385                     cohort, modification, true, true), getRef());
1386
1387             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1388
1389             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1390         }};
1391     }
1392
1393     @Test
1394     public void testAbortBeforeFinishCommit() throws Throwable {
1395         new ShardTestKit(getSystem()) {{
1396             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1397                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1398                     "testAbortBeforeFinishCommit");
1399
1400             waitUntilLeader(shard);
1401
1402             final FiniteDuration duration = duration("5 seconds");
1403             InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
1404
1405             final String transactionID = "tx1";
1406             Function<DOMStoreThreePhaseCommitCohort,ListenableFuture<Void>> preCommit =
1407                           new Function<DOMStoreThreePhaseCommitCohort,ListenableFuture<Void>>() {
1408                 @Override
1409                 public ListenableFuture<Void> apply(final DOMStoreThreePhaseCommitCohort cohort) {
1410                     ListenableFuture<Void> preCommitFuture = cohort.preCommit();
1411
1412                     // Simulate an AbortTransaction message occurring during replication, after
1413                     // persisting and before finishing the commit to the in-memory store.
1414                     // We have no followers so due to optimizations in the RaftActor, it does not
1415                     // attempt replication and thus we can't send an AbortTransaction message b/c
1416                     // it would be processed too late after CommitTransaction completes. So we'll
1417                     // simulate an AbortTransaction message occurring during replication by calling
1418                     // the shard directly.
1419                     //
1420                     shard.underlyingActor().doAbortTransaction(transactionID, null);
1421
1422                     return preCommitFuture;
1423                 }
1424             };
1425
1426             MutableCompositeModification modification = new MutableCompositeModification();
1427             DOMStoreThreePhaseCommitCohort cohort = setupMockWriteTransaction("cohort1", dataStore,
1428                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME),
1429                     modification, preCommit);
1430
1431             shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
1432                     cohort, modification, true, false), getRef());
1433             expectMsgClass(duration, ReadyTransactionReply.class);
1434
1435             shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
1436             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1437                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1438             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1439
1440             shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
1441             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1442
1443             NormalizedNode<?, ?> node = readStore(shard, TestModel.TEST_PATH);
1444
1445             // Since we're simulating an abort occurring during replication and before finish commit,
1446             // the data should still get written to the in-memory store since we've gotten past
1447             // canCommit and preCommit and persisted the data.
1448             assertNotNull(TestModel.TEST_QNAME.getLocalName() + " not found", node);
1449
1450             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1451         }};
1452     }
1453
1454     @Test
1455     public void testTransactionCommitTimeout() throws Throwable {
1456         dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1);
1457
1458         new ShardTestKit(getSystem()) {{
1459             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1460                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1461                     "testTransactionCommitTimeout");
1462
1463             waitUntilLeader(shard);
1464
1465             final FiniteDuration duration = duration("5 seconds");
1466
1467             InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
1468
1469             writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1470             writeToStore(shard, TestModel.OUTER_LIST_PATH,
1471                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
1472
1473             // Create 1st Tx - will timeout
1474
1475             String transactionID1 = "tx1";
1476             MutableCompositeModification modification1 = new MutableCompositeModification();
1477             DOMStoreThreePhaseCommitCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
1478                     YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
1479                         .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
1480                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1),
1481                     modification1);
1482
1483             // Create 2nd Tx
1484
1485             String transactionID2 = "tx3";
1486             MutableCompositeModification modification2 = new MutableCompositeModification();
1487             YangInstanceIdentifier listNodePath = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
1488                 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build();
1489             DOMStoreThreePhaseCommitCohort cohort2 = setupMockWriteTransaction("cohort3", dataStore,
1490                     listNodePath,
1491                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2),
1492                     modification2);
1493
1494             // Ready the Tx's
1495
1496             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1497                     cohort1, modification1, true, false), getRef());
1498             expectMsgClass(duration, ReadyTransactionReply.class);
1499
1500             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1501                     cohort2, modification2, true, false), getRef());
1502             expectMsgClass(duration, ReadyTransactionReply.class);
1503
1504             // canCommit 1st Tx. We don't send the commit so it should timeout.
1505
1506             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1507             expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
1508
1509             // canCommit the 2nd Tx - it should complete after the 1st Tx times out.
1510
1511             shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
1512             expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
1513
1514             // Try to commit the 1st Tx - should fail as it's not the current Tx.
1515
1516             shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
1517             expectMsgClass(duration, akka.actor.Status.Failure.class);
1518
1519             // Commit the 2nd Tx.
1520
1521             shard.tell(new CommitTransaction(transactionID2).toSerializable(), getRef());
1522             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1523
1524             NormalizedNode<?, ?> node = readStore(shard, listNodePath);
1525             assertNotNull(listNodePath + " not found", node);
1526
1527             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1528         }};
1529     }
1530
1531     @Test
1532     public void testTransactionCommitQueueCapacityExceeded() throws Throwable {
1533         dataStoreContextBuilder.shardTransactionCommitQueueCapacity(1);
1534
1535         new ShardTestKit(getSystem()) {{
1536             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1537                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1538                     "testTransactionCommitQueueCapacityExceeded");
1539
1540             waitUntilLeader(shard);
1541
1542             final FiniteDuration duration = duration("5 seconds");
1543
1544             InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
1545
1546             String transactionID1 = "tx1";
1547             MutableCompositeModification modification1 = new MutableCompositeModification();
1548             DOMStoreThreePhaseCommitCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
1549                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
1550
1551             String transactionID2 = "tx2";
1552             MutableCompositeModification modification2 = new MutableCompositeModification();
1553             DOMStoreThreePhaseCommitCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
1554                     TestModel.OUTER_LIST_PATH,
1555                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
1556                     modification2);
1557
1558             String transactionID3 = "tx3";
1559             MutableCompositeModification modification3 = new MutableCompositeModification();
1560             DOMStoreThreePhaseCommitCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
1561                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification3);
1562
1563             // Ready the Tx's
1564
1565             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1566                     cohort1, modification1, true, false), getRef());
1567             expectMsgClass(duration, ReadyTransactionReply.class);
1568
1569             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1570                     cohort2, modification2, true, false), getRef());
1571             expectMsgClass(duration, ReadyTransactionReply.class);
1572
1573             shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
1574                     cohort3, modification3, true, false), getRef());
1575             expectMsgClass(duration, ReadyTransactionReply.class);
1576
1577             // canCommit 1st Tx.
1578
1579             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1580             expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
1581
1582             // canCommit the 2nd Tx - it should get queued.
1583
1584             shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
1585
1586             // canCommit the 3rd Tx - should exceed queue capacity and fail.
1587
1588             shard.tell(new CanCommitTransaction(transactionID3).toSerializable(), getRef());
1589             expectMsgClass(duration, akka.actor.Status.Failure.class);
1590
1591             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1592         }};
1593     }
1594
1595     @Test
1596     public void testCanCommitBeforeReadyFailure() throws Throwable {
1597         new ShardTestKit(getSystem()) {{
1598             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1599                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1600                     "testCanCommitBeforeReadyFailure");
1601
1602             shard.tell(new CanCommitTransaction("tx").toSerializable(), getRef());
1603             expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
1604
1605             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1606         }};
1607     }
1608
1609     @Test
1610     public void testAbortTransaction() throws Throwable {
1611         new ShardTestKit(getSystem()) {{
1612             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1613                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1614                     "testAbortTransaction");
1615
1616             waitUntilLeader(shard);
1617
1618             // Setup 2 simulated transactions with mock cohorts. The first one will be aborted.
1619
1620             String transactionID1 = "tx1";
1621             MutableCompositeModification modification1 = new MutableCompositeModification();
1622             DOMStoreThreePhaseCommitCohort cohort1 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
1623             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
1624             doReturn(Futures.immediateFuture(null)).when(cohort1).abort();
1625
1626             String transactionID2 = "tx2";
1627             MutableCompositeModification modification2 = new MutableCompositeModification();
1628             DOMStoreThreePhaseCommitCohort cohort2 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort2");
1629             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
1630
1631             FiniteDuration duration = duration("5 seconds");
1632             final Timeout timeout = new Timeout(duration);
1633
1634             // Simulate the ForwardedReadyTransaction messages that would be sent
1635             // by the ShardTransaction.
1636
1637             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1638                     cohort1, modification1, true, false), getRef());
1639             expectMsgClass(duration, ReadyTransactionReply.class);
1640
1641             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1642                     cohort2, modification2, true, false), getRef());
1643             expectMsgClass(duration, ReadyTransactionReply.class);
1644
1645             // Send the CanCommitTransaction message for the first Tx.
1646
1647             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1648             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1649                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1650             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1651
1652             // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
1653             // processed after the first Tx completes.
1654
1655             Future<Object> canCommitFuture = Patterns.ask(shard,
1656                     new CanCommitTransaction(transactionID2).toSerializable(), timeout);
1657
1658             // Send the AbortTransaction message for the first Tx. This should trigger the 2nd
1659             // Tx to proceed.
1660
1661             shard.tell(new AbortTransaction(transactionID1).toSerializable(), getRef());
1662             expectMsgClass(duration, AbortTransactionReply.SERIALIZABLE_CLASS);
1663
1664             // Wait for the 2nd Tx to complete the canCommit phase.
1665
1666             Await.ready(canCommitFuture, duration);
1667
1668             InOrder inOrder = inOrder(cohort1, cohort2);
1669             inOrder.verify(cohort1).canCommit();
1670             inOrder.verify(cohort2).canCommit();
1671
1672             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1673         }};
1674     }
1675
1676     @Test
1677     public void testCreateSnapshot() throws Exception {
1678         testCreateSnapshot(true, "testCreateSnapshot");
1679     }
1680
1681     @Test
1682     public void testCreateSnapshotWithNonPersistentData() throws Exception {
1683         testCreateSnapshot(false, "testCreateSnapshotWithNonPersistentData");
1684     }
1685
1686     @SuppressWarnings("serial")
1687     public void testCreateSnapshot(final boolean persistent, final String shardActorName) throws Exception{
1688
1689         final AtomicReference<CountDownLatch> latch = new AtomicReference<>(new CountDownLatch(1));
1690
1691         final AtomicReference<Object> savedSnapshot = new AtomicReference<>();
1692         class TestPersistentDataProvider extends DelegatingPersistentDataProvider {
1693             TestPersistentDataProvider(DataPersistenceProvider delegate) {
1694                 super(delegate);
1695             }
1696
1697             @Override
1698             public void saveSnapshot(Object o) {
1699                 savedSnapshot.set(o);
1700                 super.saveSnapshot(o);
1701             }
1702         }
1703
1704         dataStoreContextBuilder.persistent(persistent);
1705
1706         new ShardTestKit(getSystem()) {{
1707             class TestShard extends Shard {
1708
1709                 protected TestShard(ShardIdentifier name, Map<String, String> peerAddresses,
1710                                     DatastoreContext datastoreContext, SchemaContext schemaContext) {
1711                     super(name, peerAddresses, datastoreContext, schemaContext);
1712                     setPersistence(new TestPersistentDataProvider(super.persistence()));
1713                 }
1714
1715                 @Override
1716                 public void handleCommand(Object message) {
1717                     super.handleCommand(message);
1718
1719                     if (message instanceof SaveSnapshotSuccess || message.equals("commit_snapshot")) {
1720                         latch.get().countDown();
1721                     }
1722                 }
1723
1724                 @Override
1725                 public RaftActorContext getRaftActorContext() {
1726                     return super.getRaftActorContext();
1727                 }
1728             }
1729
1730             Creator<Shard> creator = new Creator<Shard>() {
1731                 @Override
1732                 public Shard create() throws Exception {
1733                     return new TestShard(shardID, Collections.<String,String>emptyMap(),
1734                             newDatastoreContext(), SCHEMA_CONTEXT);
1735                 }
1736             };
1737
1738             TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1739                     Props.create(new DelegatingShardCreator(creator)), shardActorName);
1740
1741             waitUntilLeader(shard);
1742
1743             writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1744
1745             NormalizedNode<?,?> expectedRoot = readStore(shard, YangInstanceIdentifier.builder().build());
1746
1747             // Trigger creation of a snapshot by ensuring
1748             RaftActorContext raftActorContext = ((TestShard) shard.underlyingActor()).getRaftActorContext();
1749             raftActorContext.getSnapshotManager().capture(mock(ReplicatedLogEntry.class), -1);
1750
1751             assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS));
1752
1753             assertTrue("Invalid saved snapshot " + savedSnapshot.get(),
1754                     savedSnapshot.get() instanceof Snapshot);
1755
1756             verifySnapshot((Snapshot)savedSnapshot.get(), expectedRoot);
1757
1758             latch.set(new CountDownLatch(1));
1759             savedSnapshot.set(null);
1760
1761             raftActorContext.getSnapshotManager().capture(mock(ReplicatedLogEntry.class), -1);
1762
1763             assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS));
1764
1765             assertTrue("Invalid saved snapshot " + savedSnapshot.get(),
1766                     savedSnapshot.get() instanceof Snapshot);
1767
1768             verifySnapshot((Snapshot)savedSnapshot.get(), expectedRoot);
1769
1770             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1771         }
1772
1773         private void verifySnapshot(Snapshot snapshot, NormalizedNode<?,?> expectedRoot) {
1774
1775             NormalizedNode<?, ?> actual = SerializationUtils.deserializeNormalizedNode(snapshot.getState());
1776             assertEquals("Root node", expectedRoot, actual);
1777
1778         }};
1779     }
1780
1781     /**
1782      * This test simply verifies that the applySnapShot logic will work
1783      * @throws ReadFailedException
1784      */
1785     @Test
1786     public void testInMemoryDataStoreRestore() throws ReadFailedException {
1787         InMemoryDOMDataStore store = new InMemoryDOMDataStore("test", MoreExecutors.sameThreadExecutor());
1788
1789         store.onGlobalContextUpdated(SCHEMA_CONTEXT);
1790
1791         DOMStoreWriteTransaction putTransaction = store.newWriteOnlyTransaction();
1792         putTransaction.write(TestModel.TEST_PATH,
1793             ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1794         commitTransaction(putTransaction);
1795
1796
1797         NormalizedNode<?, ?> expected = readStore(store);
1798
1799         DOMStoreWriteTransaction writeTransaction = store.newWriteOnlyTransaction();
1800
1801         writeTransaction.delete(YangInstanceIdentifier.builder().build());
1802         writeTransaction.write(YangInstanceIdentifier.builder().build(), expected);
1803
1804         commitTransaction(writeTransaction);
1805
1806         NormalizedNode<?, ?> actual = readStore(store);
1807
1808         assertEquals(expected, actual);
1809     }
1810
1811     @Test
1812     public void testRecoveryApplicable(){
1813
1814         final DatastoreContext persistentContext = DatastoreContext.newBuilder().
1815                 shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(true).build();
1816
1817         final Props persistentProps = Shard.props(shardID, Collections.<String, String>emptyMap(),
1818                 persistentContext, SCHEMA_CONTEXT);
1819
1820         final DatastoreContext nonPersistentContext = DatastoreContext.newBuilder().
1821                 shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(false).build();
1822
1823         final Props nonPersistentProps = Shard.props(shardID, Collections.<String, String>emptyMap(),
1824                 nonPersistentContext, SCHEMA_CONTEXT);
1825
1826         new ShardTestKit(getSystem()) {{
1827             TestActorRef<Shard> shard1 = TestActorRef.create(getSystem(),
1828                     persistentProps, "testPersistence1");
1829
1830             assertTrue("Recovery Applicable", shard1.underlyingActor().persistence().isRecoveryApplicable());
1831
1832             shard1.tell(PoisonPill.getInstance(), ActorRef.noSender());
1833
1834             TestActorRef<Shard> shard2 = TestActorRef.create(getSystem(),
1835                     nonPersistentProps, "testPersistence2");
1836
1837             assertFalse("Recovery Not Applicable", shard2.underlyingActor().persistence().isRecoveryApplicable());
1838
1839             shard2.tell(PoisonPill.getInstance(), ActorRef.noSender());
1840
1841         }};
1842
1843     }
1844
1845     @Test
1846     public void testOnDatastoreContext() {
1847         new ShardTestKit(getSystem()) {{
1848             dataStoreContextBuilder.persistent(true);
1849
1850             TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(), "testOnDatastoreContext");
1851
1852             assertEquals("isRecoveryApplicable", true,
1853                     shard.underlyingActor().persistence().isRecoveryApplicable());
1854
1855             waitUntilLeader(shard);
1856
1857             shard.tell(dataStoreContextBuilder.persistent(false).build(), ActorRef.noSender());
1858
1859             assertEquals("isRecoveryApplicable", false,
1860                     shard.underlyingActor().persistence().isRecoveryApplicable());
1861
1862             shard.tell(dataStoreContextBuilder.persistent(true).build(), ActorRef.noSender());
1863
1864             assertEquals("isRecoveryApplicable", true,
1865                     shard.underlyingActor().persistence().isRecoveryApplicable());
1866
1867             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1868         }};
1869     }
1870
1871     @Test
1872     public void testRegisterRoleChangeListener() throws Exception {
1873         new ShardTestKit(getSystem()) {
1874             {
1875                 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1876                         newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1877                         "testRegisterRoleChangeListener");
1878
1879                 waitUntilLeader(shard);
1880
1881                 TestActorRef<MessageCollectorActor> listener =
1882                         TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
1883
1884                 shard.tell(new RegisterRoleChangeListener(), listener);
1885
1886                 // TODO: MessageCollectorActor exists as a test util in both the akka-raft and distributed-datastore
1887                 // projects. Need to move it to commons as a regular utility and then we can get rid of this arbitrary
1888                 // sleep.
1889                 Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
1890
1891                 List<Object> allMatching = MessageCollectorActor.getAllMatching(listener, RegisterRoleChangeListenerReply.class);
1892
1893                 assertEquals(1, allMatching.size());
1894             }
1895         };
1896     }
1897
1898     @Test
1899     public void testFollowerInitialSyncStatus() throws Exception {
1900         final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1901                 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1902                 "testFollowerInitialSyncStatus");
1903
1904         shard.underlyingActor().onReceiveCommand(new FollowerInitialSyncUpStatus(false, "member-1-shard-inventory-operational"));
1905
1906         assertEquals(false, shard.underlyingActor().getShardMBean().getFollowerInitialSyncStatus());
1907
1908         shard.underlyingActor().onReceiveCommand(new FollowerInitialSyncUpStatus(true, "member-1-shard-inventory-operational"));
1909
1910         assertEquals(true, shard.underlyingActor().getShardMBean().getFollowerInitialSyncStatus());
1911
1912         shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1913     }
1914
1915     private void commitTransaction(final DOMStoreWriteTransaction transaction) {
1916         DOMStoreThreePhaseCommitCohort commitCohort = transaction.ready();
1917         ListenableFuture<Void> future =
1918             commitCohort.preCommit();
1919         try {
1920             future.get();
1921             future = commitCohort.commit();
1922             future.get();
1923         } catch (InterruptedException | ExecutionException e) {
1924         }
1925     }
1926 }