Merge "BUG 2849 : Reduce sending of duplicate replication messages"
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / ShardTest.java
1 package org.opendaylight.controller.cluster.datastore;
2
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertFalse;
5 import static org.junit.Assert.assertNotNull;
6 import static org.junit.Assert.assertNull;
7 import static org.junit.Assert.assertTrue;
8 import static org.mockito.Mockito.doReturn;
9 import static org.mockito.Mockito.inOrder;
10 import static org.mockito.Mockito.mock;
11 import static org.opendaylight.controller.cluster.datastore.DataStoreVersions.CURRENT_VERSION;
12 import akka.actor.ActorRef;
13 import akka.actor.PoisonPill;
14 import akka.actor.Props;
15 import akka.dispatch.Dispatchers;
16 import akka.dispatch.OnComplete;
17 import akka.japi.Creator;
18 import akka.japi.Procedure;
19 import akka.pattern.Patterns;
20 import akka.persistence.SnapshotSelectionCriteria;
21 import akka.testkit.TestActorRef;
22 import akka.util.Timeout;
23 import com.google.common.base.Function;
24 import com.google.common.base.Optional;
25 import com.google.common.util.concurrent.Futures;
26 import com.google.common.util.concurrent.ListenableFuture;
27 import com.google.common.util.concurrent.MoreExecutors;
28 import com.google.common.util.concurrent.Uninterruptibles;
29 import java.io.IOException;
30 import java.util.Collections;
31 import java.util.HashSet;
32 import java.util.List;
33 import java.util.Map;
34 import java.util.Set;
35 import java.util.concurrent.CountDownLatch;
36 import java.util.concurrent.ExecutionException;
37 import java.util.concurrent.TimeUnit;
38 import java.util.concurrent.atomic.AtomicReference;
39 import org.junit.Test;
40 import org.mockito.InOrder;
41 import org.opendaylight.controller.cluster.DataPersistenceProvider;
42 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
43 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
44 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
45 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
46 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
47 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
48 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
49 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
50 import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
51 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
52 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
53 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
54 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
55 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
56 import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
57 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
58 import org.opendaylight.controller.cluster.datastore.modification.Modification;
59 import org.opendaylight.controller.cluster.datastore.modification.ModificationPayload;
60 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
61 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
62 import org.opendaylight.controller.cluster.datastore.utils.MessageCollectorActor;
63 import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener;
64 import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
65 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
66 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListenerReply;
67 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
68 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
69 import org.opendaylight.controller.cluster.raft.Snapshot;
70 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
71 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
72 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
73 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
74 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
75 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
76 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
77 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
78 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
79 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
80 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
81 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
82 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
83 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
84 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
85 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreFactory;
86 import org.opendaylight.controller.protobuff.messages.cohort3pc.ThreePhaseCommitCohortMessages;
87 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
88 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
89 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
90 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
91 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
92 import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
93 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
94 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
95 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
96 import scala.concurrent.Await;
97 import scala.concurrent.Future;
98 import scala.concurrent.duration.FiniteDuration;
99
100 public class ShardTest extends AbstractShardTest {
101     @Test
102     public void testRegisterChangeListener() throws Exception {
103         new ShardTestKit(getSystem()) {{
104             TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
105                     newShardProps(),  "testRegisterChangeListener");
106
107             waitUntilLeader(shard);
108
109             shard.tell(new UpdateSchemaContext(SchemaContextHelper.full()), ActorRef.noSender());
110
111             MockDataChangeListener listener = new MockDataChangeListener(1);
112             ActorRef dclActor = getSystem().actorOf(DataChangeListener.props(listener),
113                     "testRegisterChangeListener-DataChangeListener");
114
115             shard.tell(new RegisterChangeListener(TestModel.TEST_PATH,
116                     dclActor.path(), AsyncDataBroker.DataChangeScope.BASE), getRef());
117
118             RegisterChangeListenerReply reply = expectMsgClass(duration("3 seconds"),
119                     RegisterChangeListenerReply.class);
120             String replyPath = reply.getListenerRegistrationPath().toString();
121             assertTrue("Incorrect reply path: " + replyPath, replyPath.matches(
122                     "akka:\\/\\/test\\/user\\/testRegisterChangeListener\\/\\$.*"));
123
124             YangInstanceIdentifier path = TestModel.TEST_PATH;
125             writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
126
127             listener.waitForChangeEvents(path);
128
129             dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
130             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
131         }};
132     }
133
134     @SuppressWarnings("serial")
135     @Test
136     public void testChangeListenerNotifiedWhenNotTheLeaderOnRegistration() throws Exception {
137         // This test tests the timing window in which a change listener is registered before the
138         // shard becomes the leader. We verify that the listener is registered and notified of the
139         // existing data when the shard becomes the leader.
140         new ShardTestKit(getSystem()) {{
141             // For this test, we want to send the RegisterChangeListener message after the shard
142             // has recovered from persistence and before it becomes the leader. So we subclass
143             // Shard to override onReceiveCommand and, when the first ElectionTimeout is received,
144             // we know that the shard has been initialized to a follower and has started the
145             // election process. The following 2 CountDownLatches are used to coordinate the
146             // ElectionTimeout with the sending of the RegisterChangeListener message.
147             final CountDownLatch onFirstElectionTimeout = new CountDownLatch(1);
148             final CountDownLatch onChangeListenerRegistered = new CountDownLatch(1);
149             Creator<Shard> creator = new Creator<Shard>() {
150                 boolean firstElectionTimeout = true;
151
152                 @Override
153                 public Shard create() throws Exception {
154                     return new Shard(shardID, Collections.<ShardIdentifier,String>emptyMap(),
155                             newDatastoreContext(), SCHEMA_CONTEXT) {
156                         @Override
157                         public void onReceiveCommand(final Object message) throws Exception {
158                             if(message instanceof ElectionTimeout && firstElectionTimeout) {
159                                 // Got the first ElectionTimeout. We don't forward it to the
160                                 // base Shard yet until we've sent the RegisterChangeListener
161                                 // message. So we signal the onFirstElectionTimeout latch to tell
162                                 // the main thread to send the RegisterChangeListener message and
163                                 // start a thread to wait on the onChangeListenerRegistered latch,
164                                 // which the main thread signals after it has sent the message.
165                                 // After the onChangeListenerRegistered is triggered, we send the
166                                 // original ElectionTimeout message to proceed with the election.
167                                 firstElectionTimeout = false;
168                                 final ActorRef self = getSelf();
169                                 new Thread() {
170                                     @Override
171                                     public void run() {
172                                         Uninterruptibles.awaitUninterruptibly(
173                                                 onChangeListenerRegistered, 5, TimeUnit.SECONDS);
174                                         self.tell(message, self);
175                                     }
176                                 }.start();
177
178                                 onFirstElectionTimeout.countDown();
179                             } else {
180                                 super.onReceiveCommand(message);
181                             }
182                         }
183                     };
184                 }
185             };
186
187             MockDataChangeListener listener = new MockDataChangeListener(1);
188             ActorRef dclActor = getSystem().actorOf(DataChangeListener.props(listener),
189                     "testRegisterChangeListenerWhenNotLeaderInitially-DataChangeListener");
190
191             TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
192                     Props.create(new DelegatingShardCreator(creator)),
193                     "testRegisterChangeListenerWhenNotLeaderInitially");
194
195             // Write initial data into the in-memory store.
196             YangInstanceIdentifier path = TestModel.TEST_PATH;
197             writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
198
199             // Wait until the shard receives the first ElectionTimeout message.
200             assertEquals("Got first ElectionTimeout", true,
201                     onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
202
203             // Now send the RegisterChangeListener and wait for the reply.
204             shard.tell(new RegisterChangeListener(path, dclActor.path(),
205                     AsyncDataBroker.DataChangeScope.SUBTREE), getRef());
206
207             RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
208                     RegisterChangeListenerReply.class);
209             assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
210
211             // Sanity check - verify the shard is not the leader yet.
212             shard.tell(new FindLeader(), getRef());
213             FindLeaderReply findLeadeReply =
214                     expectMsgClass(duration("5 seconds"), FindLeaderReply.class);
215             assertNull("Expected the shard not to be the leader", findLeadeReply.getLeaderActor());
216
217             // Signal the onChangeListenerRegistered latch to tell the thread above to proceed
218             // with the election process.
219             onChangeListenerRegistered.countDown();
220
221             // Wait for the shard to become the leader and notify our listener with the existing
222             // data in the store.
223             listener.waitForChangeEvents(path);
224
225             dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
226             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
227         }};
228     }
229
230     @Test
231     public void testCreateTransaction(){
232         new ShardTestKit(getSystem()) {{
233             ActorRef shard = getSystem().actorOf(newShardProps(), "testCreateTransaction");
234
235             waitUntilLeader(shard);
236
237             shard.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
238
239             shard.tell(new CreateTransaction("txn-1",
240                     TransactionProxy.TransactionType.READ_ONLY.ordinal() ).toSerializable(), getRef());
241
242             CreateTransactionReply reply = expectMsgClass(duration("3 seconds"),
243                     CreateTransactionReply.class);
244
245             String path = reply.getTransactionActorPath().toString();
246             assertTrue("Unexpected transaction path " + path,
247                     path.contains("akka://test/user/testCreateTransaction/shard-txn-1"));
248
249             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
250         }};
251     }
252
253     @Test
254     public void testCreateTransactionOnChain(){
255         new ShardTestKit(getSystem()) {{
256             final ActorRef shard = getSystem().actorOf(newShardProps(), "testCreateTransactionOnChain");
257
258             waitUntilLeader(shard);
259
260             shard.tell(new CreateTransaction("txn-1",
261                     TransactionProxy.TransactionType.READ_ONLY.ordinal() , "foobar").toSerializable(),
262                     getRef());
263
264             CreateTransactionReply reply = expectMsgClass(duration("3 seconds"),
265                     CreateTransactionReply.class);
266
267             String path = reply.getTransactionActorPath().toString();
268             assertTrue("Unexpected transaction path " + path,
269                     path.contains("akka://test/user/testCreateTransactionOnChain/shard-txn-1"));
270
271             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
272         }};
273     }
274
275     @SuppressWarnings("serial")
276     @Test
277     public void testPeerAddressResolved() throws Exception {
278         new ShardTestKit(getSystem()) {{
279             final CountDownLatch recoveryComplete = new CountDownLatch(1);
280             class TestShard extends Shard {
281                 TestShard() {
282                     super(shardID, Collections.<ShardIdentifier, String>singletonMap(shardID, null),
283                             newDatastoreContext(), SCHEMA_CONTEXT);
284                 }
285
286                 Map<String, String> getPeerAddresses() {
287                     return getRaftActorContext().getPeerAddresses();
288                 }
289
290                 @Override
291                 protected void onRecoveryComplete() {
292                     try {
293                         super.onRecoveryComplete();
294                     } finally {
295                         recoveryComplete.countDown();
296                     }
297                 }
298             }
299
300             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
301                     Props.create(new DelegatingShardCreator(new Creator<Shard>() {
302                         @Override
303                         public TestShard create() throws Exception {
304                             return new TestShard();
305                         }
306                     })), "testPeerAddressResolved");
307
308             //waitUntilLeader(shard);
309             assertEquals("Recovery complete", true,
310                     Uninterruptibles.awaitUninterruptibly(recoveryComplete, 5, TimeUnit.SECONDS));
311
312             String address = "akka://foobar";
313             shard.underlyingActor().onReceiveCommand(new PeerAddressResolved(shardID, address));
314
315             assertEquals("getPeerAddresses", address,
316                     ((TestShard)shard.underlyingActor()).getPeerAddresses().get(shardID.toString()));
317
318             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
319         }};
320     }
321
322     @Test
323     public void testApplySnapshot() throws Exception {
324         TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(),
325                 "testApplySnapshot");
326
327         InMemoryDOMDataStore store = new InMemoryDOMDataStore("OPER", MoreExecutors.sameThreadExecutor());
328         store.onGlobalContextUpdated(SCHEMA_CONTEXT);
329
330         writeToStore(store, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
331
332         YangInstanceIdentifier root = YangInstanceIdentifier.builder().build();
333         NormalizedNode<?,?> expected = readStore(store, root);
334
335         ApplySnapshot applySnapshot = new ApplySnapshot(Snapshot.create(
336                 SerializationUtils.serializeNormalizedNode(expected),
337                 Collections.<ReplicatedLogEntry>emptyList(), 1, 2, 3, 4));
338
339         shard.underlyingActor().onReceiveCommand(applySnapshot);
340
341         NormalizedNode<?,?> actual = readStore(shard, root);
342
343         assertEquals("Root node", expected, actual);
344
345         shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
346     }
347
348     @Test
349     public void testApplyState() throws Exception {
350
351         TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(), "testApplyState");
352
353         NormalizedNode<?, ?> node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
354
355         ApplyState applyState = new ApplyState(null, "test", new ReplicatedLogImplEntry(1, 2,
356                 newModificationPayload(new WriteModification(TestModel.TEST_PATH, node))));
357
358         shard.underlyingActor().onReceiveCommand(applyState);
359
360         NormalizedNode<?,?> actual = readStore(shard, TestModel.TEST_PATH);
361         assertEquals("Applied state", node, actual);
362
363         shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
364     }
365
366     @Test
367     public void testRecovery() throws Exception {
368
369         // Set up the InMemorySnapshotStore.
370
371         InMemoryDOMDataStore testStore = InMemoryDOMDataStoreFactory.create("Test", null, null);
372         testStore.onGlobalContextUpdated(SCHEMA_CONTEXT);
373
374         writeToStore(testStore, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
375
376         NormalizedNode<?, ?> root = readStore(testStore, YangInstanceIdentifier.builder().build());
377
378         InMemorySnapshotStore.addSnapshot(shardID.toString(), Snapshot.create(
379                 SerializationUtils.serializeNormalizedNode(root),
380                 Collections.<ReplicatedLogEntry>emptyList(), 0, 1, -1, -1));
381
382         // Set up the InMemoryJournal.
383
384         InMemoryJournal.addEntry(shardID.toString(), 0, new ReplicatedLogImplEntry(0, 1, newModificationPayload(
385                   new WriteModification(TestModel.OUTER_LIST_PATH,
386                           ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build()))));
387
388         int nListEntries = 16;
389         Set<Integer> listEntryKeys = new HashSet<>();
390
391         // Add some ModificationPayload entries
392         for(int i = 1; i <= nListEntries; i++) {
393             listEntryKeys.add(Integer.valueOf(i));
394             YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
395                     .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
396             Modification mod = new MergeModification(path,
397                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i));
398             InMemoryJournal.addEntry(shardID.toString(), i, new ReplicatedLogImplEntry(i, 1,
399                     newModificationPayload(mod)));
400         }
401
402         InMemoryJournal.addEntry(shardID.toString(), nListEntries + 1,
403                 new ApplyJournalEntries(nListEntries));
404
405         testRecovery(listEntryKeys);
406     }
407
408     private ModificationPayload newModificationPayload(final Modification... mods) throws IOException {
409         MutableCompositeModification compMod = new MutableCompositeModification();
410         for(Modification mod: mods) {
411             compMod.addModification(mod);
412         }
413
414         return new ModificationPayload(compMod);
415     }
416
417     @SuppressWarnings({ "unchecked" })
418     @Test
419     public void testConcurrentThreePhaseCommits() throws Throwable {
420         new ShardTestKit(getSystem()) {{
421             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
422                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
423                     "testConcurrentThreePhaseCommits");
424
425             waitUntilLeader(shard);
426
427             // Setup 3 simulated transactions with mock cohorts backed by real cohorts.
428
429             InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
430
431             String transactionID1 = "tx1";
432             MutableCompositeModification modification1 = new MutableCompositeModification();
433             DOMStoreThreePhaseCommitCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
434                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
435
436             String transactionID2 = "tx2";
437             MutableCompositeModification modification2 = new MutableCompositeModification();
438             DOMStoreThreePhaseCommitCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
439                     TestModel.OUTER_LIST_PATH,
440                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
441                     modification2);
442
443             String transactionID3 = "tx3";
444             MutableCompositeModification modification3 = new MutableCompositeModification();
445             DOMStoreThreePhaseCommitCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
446                     YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
447                         .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
448                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1),
449                     modification3);
450
451             long timeoutSec = 5;
452             final FiniteDuration duration = FiniteDuration.create(timeoutSec, TimeUnit.SECONDS);
453             final Timeout timeout = new Timeout(duration);
454
455             // Simulate the ForwardedReadyTransaction message for the first Tx that would be sent
456             // by the ShardTransaction.
457
458             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
459                     cohort1, modification1, true), getRef());
460             ReadyTransactionReply readyReply = ReadyTransactionReply.fromSerializable(
461                     expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS));
462             assertEquals("Cohort path", shard.path().toString(), readyReply.getCohortPath());
463
464             // Send the CanCommitTransaction message for the first Tx.
465
466             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
467             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
468                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
469             assertEquals("Can commit", true, canCommitReply.getCanCommit());
470
471             // Send the ForwardedReadyTransaction for the next 2 Tx's.
472
473             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
474                     cohort2, modification2, true), getRef());
475             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
476
477             shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
478                     cohort3, modification3, true), getRef());
479             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
480
481             // Send the CanCommitTransaction message for the next 2 Tx's. These should get queued and
482             // processed after the first Tx completes.
483
484             Future<Object> canCommitFuture1 = Patterns.ask(shard,
485                     new CanCommitTransaction(transactionID2).toSerializable(), timeout);
486
487             Future<Object> canCommitFuture2 = Patterns.ask(shard,
488                     new CanCommitTransaction(transactionID3).toSerializable(), timeout);
489
490             // Send the CommitTransaction message for the first Tx. After it completes, it should
491             // trigger the 2nd Tx to proceed which should in turn then trigger the 3rd.
492
493             shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
494             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
495
496             // Wait for the next 2 Tx's to complete.
497
498             final AtomicReference<Throwable> caughtEx = new AtomicReference<>();
499             final CountDownLatch commitLatch = new CountDownLatch(2);
500
501             class OnFutureComplete extends OnComplete<Object> {
502                 private final Class<?> expRespType;
503
504                 OnFutureComplete(final Class<?> expRespType) {
505                     this.expRespType = expRespType;
506                 }
507
508                 @Override
509                 public void onComplete(final Throwable error, final Object resp) {
510                     if(error != null) {
511                         caughtEx.set(new AssertionError(getClass().getSimpleName() + " failure", error));
512                     } else {
513                         try {
514                             assertEquals("Commit response type", expRespType, resp.getClass());
515                             onSuccess(resp);
516                         } catch (Exception e) {
517                             caughtEx.set(e);
518                         }
519                     }
520                 }
521
522                 void onSuccess(final Object resp) throws Exception {
523                 }
524             }
525
526             class OnCommitFutureComplete extends OnFutureComplete {
527                 OnCommitFutureComplete() {
528                     super(CommitTransactionReply.SERIALIZABLE_CLASS);
529                 }
530
531                 @Override
532                 public void onComplete(final Throwable error, final Object resp) {
533                     super.onComplete(error, resp);
534                     commitLatch.countDown();
535                 }
536             }
537
538             class OnCanCommitFutureComplete extends OnFutureComplete {
539                 private final String transactionID;
540
541                 OnCanCommitFutureComplete(final String transactionID) {
542                     super(CanCommitTransactionReply.SERIALIZABLE_CLASS);
543                     this.transactionID = transactionID;
544                 }
545
546                 @Override
547                 void onSuccess(final Object resp) throws Exception {
548                     CanCommitTransactionReply canCommitReply =
549                             CanCommitTransactionReply.fromSerializable(resp);
550                     assertEquals("Can commit", true, canCommitReply.getCanCommit());
551
552                     Future<Object> commitFuture = Patterns.ask(shard,
553                             new CommitTransaction(transactionID).toSerializable(), timeout);
554                     commitFuture.onComplete(new OnCommitFutureComplete(), getSystem().dispatcher());
555                 }
556             }
557
558             canCommitFuture1.onComplete(new OnCanCommitFutureComplete(transactionID2),
559                     getSystem().dispatcher());
560
561             canCommitFuture2.onComplete(new OnCanCommitFutureComplete(transactionID3),
562                     getSystem().dispatcher());
563
564             boolean done = commitLatch.await(timeoutSec, TimeUnit.SECONDS);
565
566             if(caughtEx.get() != null) {
567                 throw caughtEx.get();
568             }
569
570             assertEquals("Commits complete", true, done);
571
572             InOrder inOrder = inOrder(cohort1, cohort2, cohort3);
573             inOrder.verify(cohort1).canCommit();
574             inOrder.verify(cohort1).preCommit();
575             inOrder.verify(cohort1).commit();
576             inOrder.verify(cohort2).canCommit();
577             inOrder.verify(cohort2).preCommit();
578             inOrder.verify(cohort2).commit();
579             inOrder.verify(cohort3).canCommit();
580             inOrder.verify(cohort3).preCommit();
581             inOrder.verify(cohort3).commit();
582
583             // Verify data in the data store.
584
585             NormalizedNode<?, ?> outerList = readStore(shard, TestModel.OUTER_LIST_PATH);
586             assertNotNull(TestModel.OUTER_LIST_QNAME.getLocalName() + " not found", outerList);
587             assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " value is not Iterable",
588                     outerList.getValue() instanceof Iterable);
589             Object entry = ((Iterable<Object>)outerList.getValue()).iterator().next();
590             assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " entry is not MapEntryNode",
591                        entry instanceof MapEntryNode);
592             MapEntryNode mapEntry = (MapEntryNode)entry;
593             Optional<DataContainerChild<? extends PathArgument, ?>> idLeaf =
594                     mapEntry.getChild(new YangInstanceIdentifier.NodeIdentifier(TestModel.ID_QNAME));
595             assertTrue("Missing leaf " + TestModel.ID_QNAME.getLocalName(), idLeaf.isPresent());
596             assertEquals(TestModel.ID_QNAME.getLocalName() + " value", 1, idLeaf.get().getValue());
597
598             verifyLastLogIndex(shard, 2);
599
600             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
601         }};
602     }
603
604     @Test
605     public void testCommitWithPersistenceDisabled() throws Throwable {
606         dataStoreContextBuilder.persistent(false);
607         new ShardTestKit(getSystem()) {{
608             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
609                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
610                     "testCommitPhaseFailure");
611
612             waitUntilLeader(shard);
613
614             InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
615
616             // Setup a simulated transactions with a mock cohort.
617
618             String transactionID = "tx";
619             MutableCompositeModification modification = new MutableCompositeModification();
620             NormalizedNode<?, ?> containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
621             DOMStoreThreePhaseCommitCohort cohort = setupMockWriteTransaction("cohort", dataStore,
622                     TestModel.TEST_PATH, containerNode, modification);
623
624             FiniteDuration duration = duration("5 seconds");
625
626             // Simulate the ForwardedReadyTransaction messages that would be sent
627             // by the ShardTransaction.
628
629             shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
630                     cohort, modification, true), getRef());
631             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
632
633             // Send the CanCommitTransaction message.
634
635             shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
636             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
637                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
638             assertEquals("Can commit", true, canCommitReply.getCanCommit());
639
640             // Send the CanCommitTransaction message.
641
642             shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
643             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
644
645             InOrder inOrder = inOrder(cohort);
646             inOrder.verify(cohort).canCommit();
647             inOrder.verify(cohort).preCommit();
648             inOrder.verify(cohort).commit();
649
650             NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.TEST_PATH);
651             assertEquals(TestModel.TEST_QNAME.getLocalName(), containerNode, actualNode);
652
653             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
654         }};
655     }
656
657     @Test
658     public void testCommitWhenTransactionHasNoModifications(){
659         // Note that persistence is enabled which would normally result in the entry getting written to the journal
660         // but here that need not happen
661         new ShardTestKit(getSystem()) {
662             {
663                 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
664                         newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
665                         "testCommitWhenTransactionHasNoModifications");
666
667                 waitUntilLeader(shard);
668
669                 String transactionID = "tx1";
670                 MutableCompositeModification modification = new MutableCompositeModification();
671                 DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
672                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
673                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).preCommit();
674                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).commit();
675
676                 FiniteDuration duration = duration("5 seconds");
677
678                 // Simulate the ForwardedReadyTransaction messages that would be sent
679                 // by the ShardTransaction.
680
681                 shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
682                         cohort, modification, true), getRef());
683                 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
684
685                 // Send the CanCommitTransaction message.
686
687                 shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
688                 CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
689                         expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
690                 assertEquals("Can commit", true, canCommitReply.getCanCommit());
691
692                 shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
693                 expectMsgClass(duration, ThreePhaseCommitCohortMessages.CommitTransactionReply.class);
694
695                 InOrder inOrder = inOrder(cohort);
696                 inOrder.verify(cohort).canCommit();
697                 inOrder.verify(cohort).preCommit();
698                 inOrder.verify(cohort).commit();
699
700                 // Use MBean for verification
701                 // Committed transaction count should increase as usual
702                 assertEquals(1,shard.underlyingActor().getShardMBean().getCommittedTransactionsCount());
703
704                 // Commit index should not advance because this does not go into the journal
705                 assertEquals(-1, shard.underlyingActor().getShardMBean().getCommitIndex());
706
707                 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
708
709             }
710         };
711     }
712
713     @Test
714     public void testCommitWhenTransactionHasModifications(){
715         new ShardTestKit(getSystem()) {
716             {
717                 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
718                         newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
719                         "testCommitWhenTransactionHasModifications");
720
721                 waitUntilLeader(shard);
722
723                 String transactionID = "tx1";
724                 MutableCompositeModification modification = new MutableCompositeModification();
725                 modification.addModification(new DeleteModification(YangInstanceIdentifier.builder().build()));
726                 DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
727                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
728                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).preCommit();
729                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).commit();
730
731                 FiniteDuration duration = duration("5 seconds");
732
733                 // Simulate the ForwardedReadyTransaction messages that would be sent
734                 // by the ShardTransaction.
735
736                 shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
737                         cohort, modification, true), getRef());
738                 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
739
740                 // Send the CanCommitTransaction message.
741
742                 shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
743                 CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
744                         expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
745                 assertEquals("Can commit", true, canCommitReply.getCanCommit());
746
747                 shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
748                 expectMsgClass(duration, ThreePhaseCommitCohortMessages.CommitTransactionReply.class);
749
750                 InOrder inOrder = inOrder(cohort);
751                 inOrder.verify(cohort).canCommit();
752                 inOrder.verify(cohort).preCommit();
753                 inOrder.verify(cohort).commit();
754
755                 // Use MBean for verification
756                 // Committed transaction count should increase as usual
757                 assertEquals(1,shard.underlyingActor().getShardMBean().getCommittedTransactionsCount());
758
759                 // Commit index should advance as we do not have an empty modification
760                 assertEquals(0, shard.underlyingActor().getShardMBean().getCommitIndex());
761
762                 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
763
764             }
765         };
766     }
767
768     @Test
769     public void testCommitPhaseFailure() throws Throwable {
770         new ShardTestKit(getSystem()) {{
771             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
772                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
773                     "testCommitPhaseFailure");
774
775             waitUntilLeader(shard);
776
777             // Setup 2 simulated transactions with mock cohorts. The first one fails in the
778             // commit phase.
779
780             String transactionID1 = "tx1";
781             MutableCompositeModification modification1 = new MutableCompositeModification();
782             DOMStoreThreePhaseCommitCohort cohort1 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
783             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
784             doReturn(Futures.immediateFuture(null)).when(cohort1).preCommit();
785             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort1).commit();
786
787             String transactionID2 = "tx2";
788             MutableCompositeModification modification2 = new MutableCompositeModification();
789             DOMStoreThreePhaseCommitCohort cohort2 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort2");
790             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
791
792             FiniteDuration duration = duration("5 seconds");
793             final Timeout timeout = new Timeout(duration);
794
795             // Simulate the ForwardedReadyTransaction messages that would be sent
796             // by the ShardTransaction.
797
798             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
799                     cohort1, modification1, true), getRef());
800             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
801
802             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
803                     cohort2, modification2, true), getRef());
804             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
805
806             // Send the CanCommitTransaction message for the first Tx.
807
808             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
809             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
810                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
811             assertEquals("Can commit", true, canCommitReply.getCanCommit());
812
813             // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
814             // processed after the first Tx completes.
815
816             Future<Object> canCommitFuture = Patterns.ask(shard,
817                     new CanCommitTransaction(transactionID2).toSerializable(), timeout);
818
819             // Send the CommitTransaction message for the first Tx. This should send back an error
820             // and trigger the 2nd Tx to proceed.
821
822             shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
823             expectMsgClass(duration, akka.actor.Status.Failure.class);
824
825             // Wait for the 2nd Tx to complete the canCommit phase.
826
827             final CountDownLatch latch = new CountDownLatch(1);
828             canCommitFuture.onComplete(new OnComplete<Object>() {
829                 @Override
830                 public void onComplete(final Throwable t, final Object resp) {
831                     latch.countDown();
832                 }
833             }, getSystem().dispatcher());
834
835             assertEquals("2nd CanCommit complete", true, latch.await(5, TimeUnit.SECONDS));
836
837             InOrder inOrder = inOrder(cohort1, cohort2);
838             inOrder.verify(cohort1).canCommit();
839             inOrder.verify(cohort1).preCommit();
840             inOrder.verify(cohort1).commit();
841             inOrder.verify(cohort2).canCommit();
842
843             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
844         }};
845     }
846
847     @Test
848     public void testPreCommitPhaseFailure() throws Throwable {
849         new ShardTestKit(getSystem()) {{
850             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
851                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
852                     "testPreCommitPhaseFailure");
853
854             waitUntilLeader(shard);
855
856             String transactionID = "tx1";
857             MutableCompositeModification modification = new MutableCompositeModification();
858             DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
859             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
860             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).preCommit();
861
862             FiniteDuration duration = duration("5 seconds");
863
864             // Simulate the ForwardedReadyTransaction messages that would be sent
865             // by the ShardTransaction.
866
867             shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
868                     cohort, modification, true), getRef());
869             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
870
871             // Send the CanCommitTransaction message.
872
873             shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
874             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
875                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
876             assertEquals("Can commit", true, canCommitReply.getCanCommit());
877
878             // Send the CommitTransaction message. This should send back an error
879             // for preCommit failure.
880
881             shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
882             expectMsgClass(duration, akka.actor.Status.Failure.class);
883
884             InOrder inOrder = inOrder(cohort);
885             inOrder.verify(cohort).canCommit();
886             inOrder.verify(cohort).preCommit();
887
888             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
889         }};
890     }
891
892     @Test
893     public void testCanCommitPhaseFailure() throws Throwable {
894         new ShardTestKit(getSystem()) {{
895             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
896                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
897                     "testCanCommitPhaseFailure");
898
899             waitUntilLeader(shard);
900
901             final FiniteDuration duration = duration("5 seconds");
902
903             String transactionID = "tx1";
904             MutableCompositeModification modification = new MutableCompositeModification();
905             DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
906             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).canCommit();
907
908             // Simulate the ForwardedReadyTransaction messages that would be sent
909             // by the ShardTransaction.
910
911             shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
912                     cohort, modification, true), getRef());
913             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
914
915             // Send the CanCommitTransaction message.
916
917             shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
918             expectMsgClass(duration, akka.actor.Status.Failure.class);
919
920             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
921         }};
922     }
923
924     @Test
925     public void testAbortBeforeFinishCommit() throws Throwable {
926         new ShardTestKit(getSystem()) {{
927             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
928                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
929                     "testAbortBeforeFinishCommit");
930
931             waitUntilLeader(shard);
932
933             final FiniteDuration duration = duration("5 seconds");
934             InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
935
936             final String transactionID = "tx1";
937             Function<DOMStoreThreePhaseCommitCohort,ListenableFuture<Void>> preCommit =
938                           new Function<DOMStoreThreePhaseCommitCohort,ListenableFuture<Void>>() {
939                 @Override
940                 public ListenableFuture<Void> apply(final DOMStoreThreePhaseCommitCohort cohort) {
941                     ListenableFuture<Void> preCommitFuture = cohort.preCommit();
942
943                     // Simulate an AbortTransaction message occurring during replication, after
944                     // persisting and before finishing the commit to the in-memory store.
945                     // We have no followers so due to optimizations in the RaftActor, it does not
946                     // attempt replication and thus we can't send an AbortTransaction message b/c
947                     // it would be processed too late after CommitTransaction completes. So we'll
948                     // simulate an AbortTransaction message occurring during replication by calling
949                     // the shard directly.
950                     //
951                     shard.underlyingActor().doAbortTransaction(transactionID, null);
952
953                     return preCommitFuture;
954                 }
955             };
956
957             MutableCompositeModification modification = new MutableCompositeModification();
958             DOMStoreThreePhaseCommitCohort cohort = setupMockWriteTransaction("cohort1", dataStore,
959                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME),
960                     modification, preCommit);
961
962             shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
963                     cohort, modification, true), getRef());
964             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
965
966             shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
967             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
968                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
969             assertEquals("Can commit", true, canCommitReply.getCanCommit());
970
971             shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
972             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
973
974             NormalizedNode<?, ?> node = readStore(shard, TestModel.TEST_PATH);
975
976             // Since we're simulating an abort occurring during replication and before finish commit,
977             // the data should still get written to the in-memory store since we've gotten past
978             // canCommit and preCommit and persisted the data.
979             assertNotNull(TestModel.TEST_QNAME.getLocalName() + " not found", node);
980
981             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
982         }};
983     }
984
985     @Test
986     public void testTransactionCommitTimeout() throws Throwable {
987         dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1);
988
989         new ShardTestKit(getSystem()) {{
990             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
991                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
992                     "testTransactionCommitTimeout");
993
994             waitUntilLeader(shard);
995
996             final FiniteDuration duration = duration("5 seconds");
997
998             InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
999
1000             writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1001             writeToStore(shard, TestModel.OUTER_LIST_PATH,
1002                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
1003
1004             // Create 1st Tx - will timeout
1005
1006             String transactionID1 = "tx1";
1007             MutableCompositeModification modification1 = new MutableCompositeModification();
1008             DOMStoreThreePhaseCommitCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
1009                     YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
1010                         .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
1011                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1),
1012                     modification1);
1013
1014             // Create 2nd Tx
1015
1016             String transactionID2 = "tx3";
1017             MutableCompositeModification modification2 = new MutableCompositeModification();
1018             YangInstanceIdentifier listNodePath = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
1019                 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build();
1020             DOMStoreThreePhaseCommitCohort cohort2 = setupMockWriteTransaction("cohort3", dataStore,
1021                     listNodePath,
1022                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2),
1023                     modification2);
1024
1025             // Ready the Tx's
1026
1027             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1028                     cohort1, modification1, true), getRef());
1029             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1030
1031             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1032                     cohort2, modification2, true), getRef());
1033             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1034
1035             // canCommit 1st Tx. We don't send the commit so it should timeout.
1036
1037             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1038             expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
1039
1040             // canCommit the 2nd Tx - it should complete after the 1st Tx times out.
1041
1042             shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
1043             expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
1044
1045             // Commit the 2nd Tx.
1046
1047             shard.tell(new CommitTransaction(transactionID2).toSerializable(), getRef());
1048             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1049
1050             NormalizedNode<?, ?> node = readStore(shard, listNodePath);
1051             assertNotNull(listNodePath + " not found", node);
1052
1053             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1054         }};
1055     }
1056
1057     @Test
1058     public void testTransactionCommitQueueCapacityExceeded() throws Throwable {
1059         dataStoreContextBuilder.shardTransactionCommitQueueCapacity(1);
1060
1061         new ShardTestKit(getSystem()) {{
1062             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1063                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1064                     "testTransactionCommitQueueCapacityExceeded");
1065
1066             waitUntilLeader(shard);
1067
1068             final FiniteDuration duration = duration("5 seconds");
1069
1070             InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
1071
1072             String transactionID1 = "tx1";
1073             MutableCompositeModification modification1 = new MutableCompositeModification();
1074             DOMStoreThreePhaseCommitCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
1075                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
1076
1077             String transactionID2 = "tx2";
1078             MutableCompositeModification modification2 = new MutableCompositeModification();
1079             DOMStoreThreePhaseCommitCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
1080                     TestModel.OUTER_LIST_PATH,
1081                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
1082                     modification2);
1083
1084             String transactionID3 = "tx3";
1085             MutableCompositeModification modification3 = new MutableCompositeModification();
1086             DOMStoreThreePhaseCommitCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
1087                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification3);
1088
1089             // Ready the Tx's
1090
1091             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1092                     cohort1, modification1, true), getRef());
1093             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1094
1095             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1096                     cohort2, modification2, true), getRef());
1097             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1098
1099             shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
1100                     cohort3, modification3, true), getRef());
1101             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1102
1103             // canCommit 1st Tx.
1104
1105             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1106             expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
1107
1108             // canCommit the 2nd Tx - it should get queued.
1109
1110             shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
1111
1112             // canCommit the 3rd Tx - should exceed queue capacity and fail.
1113
1114             shard.tell(new CanCommitTransaction(transactionID3).toSerializable(), getRef());
1115             expectMsgClass(duration, akka.actor.Status.Failure.class);
1116
1117             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1118         }};
1119     }
1120
1121     @Test
1122     public void testCanCommitBeforeReadyFailure() throws Throwable {
1123         new ShardTestKit(getSystem()) {{
1124             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1125                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1126                     "testCanCommitBeforeReadyFailure");
1127
1128             shard.tell(new CanCommitTransaction("tx").toSerializable(), getRef());
1129             expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
1130
1131             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1132         }};
1133     }
1134
1135     @Test
1136     public void testAbortTransaction() throws Throwable {
1137         new ShardTestKit(getSystem()) {{
1138             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1139                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1140                     "testAbortTransaction");
1141
1142             waitUntilLeader(shard);
1143
1144             // Setup 2 simulated transactions with mock cohorts. The first one will be aborted.
1145
1146             String transactionID1 = "tx1";
1147             MutableCompositeModification modification1 = new MutableCompositeModification();
1148             DOMStoreThreePhaseCommitCohort cohort1 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
1149             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
1150             doReturn(Futures.immediateFuture(null)).when(cohort1).abort();
1151
1152             String transactionID2 = "tx2";
1153             MutableCompositeModification modification2 = new MutableCompositeModification();
1154             DOMStoreThreePhaseCommitCohort cohort2 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort2");
1155             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
1156
1157             FiniteDuration duration = duration("5 seconds");
1158             final Timeout timeout = new Timeout(duration);
1159
1160             // Simulate the ForwardedReadyTransaction messages that would be sent
1161             // by the ShardTransaction.
1162
1163             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1164                     cohort1, modification1, true), getRef());
1165             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1166
1167             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1168                     cohort2, modification2, true), getRef());
1169             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1170
1171             // Send the CanCommitTransaction message for the first Tx.
1172
1173             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1174             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1175                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1176             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1177
1178             // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
1179             // processed after the first Tx completes.
1180
1181             Future<Object> canCommitFuture = Patterns.ask(shard,
1182                     new CanCommitTransaction(transactionID2).toSerializable(), timeout);
1183
1184             // Send the AbortTransaction message for the first Tx. This should trigger the 2nd
1185             // Tx to proceed.
1186
1187             shard.tell(new AbortTransaction(transactionID1).toSerializable(), getRef());
1188             expectMsgClass(duration, AbortTransactionReply.SERIALIZABLE_CLASS);
1189
1190             // Wait for the 2nd Tx to complete the canCommit phase.
1191
1192             Await.ready(canCommitFuture, duration);
1193
1194             InOrder inOrder = inOrder(cohort1, cohort2);
1195             inOrder.verify(cohort1).canCommit();
1196             inOrder.verify(cohort2).canCommit();
1197
1198             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1199         }};
1200     }
1201
1202     @Test
1203     public void testCreateSnapshot() throws Exception {
1204         testCreateSnapshot(true, "testCreateSnapshot");
1205     }
1206
1207     @Test
1208     public void testCreateSnapshotWithNonPersistentData() throws Exception {
1209         testCreateSnapshot(false, "testCreateSnapshotWithNonPersistentData");
1210     }
1211
1212     @SuppressWarnings("serial")
1213     public void testCreateSnapshot(final boolean persistent, final String shardActorName) throws Exception{
1214
1215         final AtomicReference<Object> savedSnapshot = new AtomicReference<>();
1216         class DelegatingPersistentDataProvider implements DataPersistenceProvider {
1217             DataPersistenceProvider delegate;
1218
1219             DelegatingPersistentDataProvider(DataPersistenceProvider delegate) {
1220                 this.delegate = delegate;
1221             }
1222
1223             @Override
1224             public boolean isRecoveryApplicable() {
1225                 return delegate.isRecoveryApplicable();
1226             }
1227
1228             @Override
1229             public <T> void persist(T o, Procedure<T> procedure) {
1230                 delegate.persist(o, procedure);
1231             }
1232
1233             @Override
1234             public void saveSnapshot(Object o) {
1235                 savedSnapshot.set(o);
1236                 delegate.saveSnapshot(o);
1237             }
1238
1239             @Override
1240             public void deleteSnapshots(SnapshotSelectionCriteria criteria) {
1241                 delegate.deleteSnapshots(criteria);
1242             }
1243
1244             @Override
1245             public void deleteMessages(long sequenceNumber) {
1246                 delegate.deleteMessages(sequenceNumber);
1247             }
1248         }
1249
1250         dataStoreContextBuilder.persistent(persistent);
1251
1252         new ShardTestKit(getSystem()) {{
1253             final AtomicReference<CountDownLatch> latch = new AtomicReference<>(new CountDownLatch(1));
1254             Creator<Shard> creator = new Creator<Shard>() {
1255                 @Override
1256                 public Shard create() throws Exception {
1257                     return new Shard(shardID, Collections.<ShardIdentifier,String>emptyMap(),
1258                             newDatastoreContext(), SCHEMA_CONTEXT) {
1259
1260                         DelegatingPersistentDataProvider delegating;
1261
1262                         @Override
1263                         protected DataPersistenceProvider persistence() {
1264                             if(delegating == null) {
1265                                 delegating = new DelegatingPersistentDataProvider(super.persistence());
1266                             }
1267
1268                             return delegating;
1269                         }
1270
1271                         @Override
1272                         protected void commitSnapshot(final long sequenceNumber) {
1273                             super.commitSnapshot(sequenceNumber);
1274                             latch.get().countDown();
1275                         }
1276                     };
1277                 }
1278             };
1279
1280             TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1281                     Props.create(new DelegatingShardCreator(creator)), shardActorName);
1282
1283             waitUntilLeader(shard);
1284
1285             writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1286
1287             NormalizedNode<?,?> expectedRoot = readStore(shard, YangInstanceIdentifier.builder().build());
1288
1289             CaptureSnapshot capture = new CaptureSnapshot(-1, -1, -1, -1, -1, -1);
1290             shard.tell(capture, getRef());
1291
1292             assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS));
1293
1294             assertTrue("Invalid saved snapshot " + savedSnapshot.get(),
1295                     savedSnapshot.get() instanceof Snapshot);
1296
1297             verifySnapshot((Snapshot)savedSnapshot.get(), expectedRoot);
1298
1299             latch.set(new CountDownLatch(1));
1300             savedSnapshot.set(null);
1301
1302             shard.tell(capture, getRef());
1303
1304             assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS));
1305
1306             assertTrue("Invalid saved snapshot " + savedSnapshot.get(),
1307                     savedSnapshot.get() instanceof Snapshot);
1308
1309             verifySnapshot((Snapshot)savedSnapshot.get(), expectedRoot);
1310
1311             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1312         }
1313
1314         private void verifySnapshot(Snapshot snapshot, NormalizedNode<?,?> expectedRoot) {
1315
1316             NormalizedNode<?, ?> actual = SerializationUtils.deserializeNormalizedNode(snapshot.getState());
1317             assertEquals("Root node", expectedRoot, actual);
1318
1319         }};
1320     }
1321
1322     /**
1323      * This test simply verifies that the applySnapShot logic will work
1324      * @throws ReadFailedException
1325      */
1326     @Test
1327     public void testInMemoryDataStoreRestore() throws ReadFailedException {
1328         InMemoryDOMDataStore store = new InMemoryDOMDataStore("test", MoreExecutors.sameThreadExecutor());
1329
1330         store.onGlobalContextUpdated(SCHEMA_CONTEXT);
1331
1332         DOMStoreWriteTransaction putTransaction = store.newWriteOnlyTransaction();
1333         putTransaction.write(TestModel.TEST_PATH,
1334             ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1335         commitTransaction(putTransaction);
1336
1337
1338         NormalizedNode<?, ?> expected = readStore(store);
1339
1340         DOMStoreWriteTransaction writeTransaction = store.newWriteOnlyTransaction();
1341
1342         writeTransaction.delete(YangInstanceIdentifier.builder().build());
1343         writeTransaction.write(YangInstanceIdentifier.builder().build(), expected);
1344
1345         commitTransaction(writeTransaction);
1346
1347         NormalizedNode<?, ?> actual = readStore(store);
1348
1349         assertEquals(expected, actual);
1350     }
1351
1352     @Test
1353     public void testRecoveryApplicable(){
1354
1355         final DatastoreContext persistentContext = DatastoreContext.newBuilder().
1356                 shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(true).build();
1357
1358         final Props persistentProps = Shard.props(shardID, Collections.<ShardIdentifier, String>emptyMap(),
1359                 persistentContext, SCHEMA_CONTEXT);
1360
1361         final DatastoreContext nonPersistentContext = DatastoreContext.newBuilder().
1362                 shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(false).build();
1363
1364         final Props nonPersistentProps = Shard.props(shardID, Collections.<ShardIdentifier, String>emptyMap(),
1365                 nonPersistentContext, SCHEMA_CONTEXT);
1366
1367         new ShardTestKit(getSystem()) {{
1368             TestActorRef<Shard> shard1 = TestActorRef.create(getSystem(),
1369                     persistentProps, "testPersistence1");
1370
1371             assertTrue("Recovery Applicable", shard1.underlyingActor().getDataPersistenceProvider().isRecoveryApplicable());
1372
1373             shard1.tell(PoisonPill.getInstance(), ActorRef.noSender());
1374
1375             TestActorRef<Shard> shard2 = TestActorRef.create(getSystem(),
1376                     nonPersistentProps, "testPersistence2");
1377
1378             assertFalse("Recovery Not Applicable", shard2.underlyingActor().getDataPersistenceProvider().isRecoveryApplicable());
1379
1380             shard2.tell(PoisonPill.getInstance(), ActorRef.noSender());
1381
1382         }};
1383     }
1384
1385     @Test
1386     public void testOnDatastoreContext() {
1387         new ShardTestKit(getSystem()) {{
1388             dataStoreContextBuilder.persistent(true);
1389
1390             TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(), "testOnDatastoreContext");
1391
1392             assertEquals("isRecoveryApplicable", true,
1393                     shard.underlyingActor().getDataPersistenceProvider().isRecoveryApplicable());
1394
1395             waitUntilLeader(shard);
1396
1397             shard.tell(dataStoreContextBuilder.persistent(false).build(), ActorRef.noSender());
1398
1399             assertEquals("isRecoveryApplicable", false,
1400                     shard.underlyingActor().getDataPersistenceProvider().isRecoveryApplicable());
1401
1402             shard.tell(dataStoreContextBuilder.persistent(true).build(), ActorRef.noSender());
1403
1404             assertEquals("isRecoveryApplicable", true,
1405                     shard.underlyingActor().getDataPersistenceProvider().isRecoveryApplicable());
1406
1407             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1408         }};
1409     }
1410
1411     @Test
1412     public void testRegisterRoleChangeListener() throws Exception {
1413         new ShardTestKit(getSystem()) {
1414             {
1415                 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1416                         newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1417                         "testRegisterRoleChangeListener");
1418
1419                 waitUntilLeader(shard);
1420
1421                 TestActorRef<MessageCollectorActor> listener =
1422                         TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
1423
1424                 shard.tell(new RegisterRoleChangeListener(), listener);
1425
1426                 // TODO: MessageCollectorActor exists as a test util in both the akka-raft and distributed-datastore
1427                 // projects. Need to move it to commons as a regular utility and then we can get rid of this arbitrary
1428                 // sleep.
1429                 Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
1430
1431                 List<Object> allMatching = MessageCollectorActor.getAllMatching(listener, RegisterRoleChangeListenerReply.class);
1432
1433                 assertEquals(1, allMatching.size());
1434             }
1435         };
1436     }
1437
1438     @Test
1439     public void testFollowerInitialSyncStatus() throws Exception {
1440         final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1441                 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1442                 "testFollowerInitialSyncStatus");
1443
1444         shard.underlyingActor().onReceiveCommand(new FollowerInitialSyncUpStatus(false, "member-1-shard-inventory-operational"));
1445
1446         assertEquals(false, shard.underlyingActor().getShardMBean().getFollowerInitialSyncStatus());
1447
1448         shard.underlyingActor().onReceiveCommand(new FollowerInitialSyncUpStatus(true, "member-1-shard-inventory-operational"));
1449
1450         assertEquals(true, shard.underlyingActor().getShardMBean().getFollowerInitialSyncStatus());
1451
1452         shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1453     }
1454
1455     private void commitTransaction(final DOMStoreWriteTransaction transaction) {
1456         DOMStoreThreePhaseCommitCohort commitCohort = transaction.ready();
1457         ListenableFuture<Void> future =
1458             commitCohort.preCommit();
1459         try {
1460             future.get();
1461             future = commitCohort.commit();
1462             future.get();
1463         } catch (InterruptedException | ExecutionException e) {
1464         }
1465     }
1466 }