BUG 3045 : Use non-strict parsing in hello message.
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / ShardTest.java
1 package org.opendaylight.controller.cluster.datastore;
2
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertFalse;
5 import static org.junit.Assert.assertNotNull;
6 import static org.junit.Assert.assertNull;
7 import static org.junit.Assert.assertTrue;
8 import static org.mockito.Mockito.doReturn;
9 import static org.mockito.Mockito.inOrder;
10 import static org.mockito.Mockito.mock;
11 import static org.mockito.Mockito.reset;
12 import static org.opendaylight.controller.cluster.datastore.DataStoreVersions.CURRENT_VERSION;
13 import akka.actor.ActorRef;
14 import akka.actor.ActorSelection;
15 import akka.actor.PoisonPill;
16 import akka.actor.Props;
17 import akka.dispatch.Dispatchers;
18 import akka.dispatch.OnComplete;
19 import akka.japi.Creator;
20 import akka.pattern.Patterns;
21 import akka.persistence.SaveSnapshotSuccess;
22 import akka.testkit.TestActorRef;
23 import akka.util.Timeout;
24 import com.google.common.base.Function;
25 import com.google.common.base.Optional;
26 import com.google.common.util.concurrent.Futures;
27 import com.google.common.util.concurrent.ListenableFuture;
28 import com.google.common.util.concurrent.MoreExecutors;
29 import com.google.common.util.concurrent.Uninterruptibles;
30 import java.io.IOException;
31 import java.util.Collections;
32 import java.util.HashSet;
33 import java.util.List;
34 import java.util.Map;
35 import java.util.Set;
36 import java.util.concurrent.CountDownLatch;
37 import java.util.concurrent.ExecutionException;
38 import java.util.concurrent.TimeUnit;
39 import java.util.concurrent.atomic.AtomicBoolean;
40 import java.util.concurrent.atomic.AtomicReference;
41 import org.junit.Test;
42 import org.mockito.InOrder;
43 import org.opendaylight.controller.cluster.DataPersistenceProvider;
44 import org.opendaylight.controller.cluster.DelegatingPersistentDataProvider;
45 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
46 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
47 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
48 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
49 import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
50 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
51 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
52 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
53 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
54 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
55 import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
56 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
57 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
58 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
59 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
60 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
61 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
62 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
63 import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
64 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
65 import org.opendaylight.controller.cluster.datastore.modification.Modification;
66 import org.opendaylight.controller.cluster.datastore.modification.ModificationPayload;
67 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
68 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
69 import org.opendaylight.controller.cluster.datastore.utils.MessageCollectorActor;
70 import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener;
71 import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
72 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
73 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListenerReply;
74 import org.opendaylight.controller.cluster.raft.RaftActorContext;
75 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
76 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
77 import org.opendaylight.controller.cluster.raft.Snapshot;
78 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
79 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
80 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
81 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
82 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
83 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
84 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
85 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
86 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
87 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
88 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
89 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
90 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
91 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
92 import org.opendaylight.controller.protobuff.messages.cohort3pc.ThreePhaseCommitCohortMessages;
93 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
94 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
95 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
96 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
97 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
98 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
99 import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
100 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
101 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
102 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
103 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
104 import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory;
105 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
106 import scala.concurrent.Await;
107 import scala.concurrent.Future;
108 import scala.concurrent.duration.FiniteDuration;
109
110 public class ShardTest extends AbstractShardTest {
111
112     @Test
113     public void testRegisterChangeListener() throws Exception {
114         new ShardTestKit(getSystem()) {{
115             TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
116                     newShardProps(),  "testRegisterChangeListener");
117
118             waitUntilLeader(shard);
119
120             shard.tell(new UpdateSchemaContext(SchemaContextHelper.full()), ActorRef.noSender());
121
122             MockDataChangeListener listener = new MockDataChangeListener(1);
123             ActorRef dclActor = getSystem().actorOf(DataChangeListener.props(listener),
124                     "testRegisterChangeListener-DataChangeListener");
125
126             shard.tell(new RegisterChangeListener(TestModel.TEST_PATH,
127                     dclActor, AsyncDataBroker.DataChangeScope.BASE), getRef());
128
129             RegisterChangeListenerReply reply = expectMsgClass(duration("3 seconds"),
130                     RegisterChangeListenerReply.class);
131             String replyPath = reply.getListenerRegistrationPath().toString();
132             assertTrue("Incorrect reply path: " + replyPath, replyPath.matches(
133                     "akka:\\/\\/test\\/user\\/testRegisterChangeListener\\/\\$.*"));
134
135             YangInstanceIdentifier path = TestModel.TEST_PATH;
136             writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
137
138             listener.waitForChangeEvents(path);
139
140             dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
141             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
142         }};
143     }
144
145     @SuppressWarnings("serial")
146     @Test
147     public void testChangeListenerNotifiedWhenNotTheLeaderOnRegistration() throws Exception {
148         // This test tests the timing window in which a change listener is registered before the
149         // shard becomes the leader. We verify that the listener is registered and notified of the
150         // existing data when the shard becomes the leader.
151         new ShardTestKit(getSystem()) {{
152             // For this test, we want to send the RegisterChangeListener message after the shard
153             // has recovered from persistence and before it becomes the leader. So we subclass
154             // Shard to override onReceiveCommand and, when the first ElectionTimeout is received,
155             // we know that the shard has been initialized to a follower and has started the
156             // election process. The following 2 CountDownLatches are used to coordinate the
157             // ElectionTimeout with the sending of the RegisterChangeListener message.
158             final CountDownLatch onFirstElectionTimeout = new CountDownLatch(1);
159             final CountDownLatch onChangeListenerRegistered = new CountDownLatch(1);
160             Creator<Shard> creator = new Creator<Shard>() {
161                 boolean firstElectionTimeout = true;
162
163                 @Override
164                 public Shard create() throws Exception {
165                     // Use a non persistent provider because this test actually invokes persist on the journal
166                     // this will cause all other messages to not be queued properly after that.
167                     // The basic issue is that you cannot use TestActorRef with a persistent actor (at least when
168                     // it does do a persist)
169                     return new Shard(shardID, Collections.<String,String>emptyMap(),
170                             dataStoreContextBuilder.persistent(false).build(), SCHEMA_CONTEXT) {
171                         @Override
172                         public void onReceiveCommand(final Object message) throws Exception {
173                             if(message instanceof ElectionTimeout && firstElectionTimeout) {
174                                 // Got the first ElectionTimeout. We don't forward it to the
175                                 // base Shard yet until we've sent the RegisterChangeListener
176                                 // message. So we signal the onFirstElectionTimeout latch to tell
177                                 // the main thread to send the RegisterChangeListener message and
178                                 // start a thread to wait on the onChangeListenerRegistered latch,
179                                 // which the main thread signals after it has sent the message.
180                                 // After the onChangeListenerRegistered is triggered, we send the
181                                 // original ElectionTimeout message to proceed with the election.
182                                 firstElectionTimeout = false;
183                                 final ActorRef self = getSelf();
184                                 new Thread() {
185                                     @Override
186                                     public void run() {
187                                         Uninterruptibles.awaitUninterruptibly(
188                                                 onChangeListenerRegistered, 5, TimeUnit.SECONDS);
189                                         self.tell(message, self);
190                                     }
191                                 }.start();
192
193                                 onFirstElectionTimeout.countDown();
194                             } else {
195                                 super.onReceiveCommand(message);
196                             }
197                         }
198                     };
199                 }
200             };
201
202             MockDataChangeListener listener = new MockDataChangeListener(1);
203             ActorRef dclActor = getSystem().actorOf(DataChangeListener.props(listener),
204                     "testRegisterChangeListenerWhenNotLeaderInitially-DataChangeListener");
205
206             TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
207                     Props.create(new DelegatingShardCreator(creator)),
208                     "testRegisterChangeListenerWhenNotLeaderInitially");
209
210             // Write initial data into the in-memory store.
211             YangInstanceIdentifier path = TestModel.TEST_PATH;
212             writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
213
214             // Wait until the shard receives the first ElectionTimeout message.
215             assertEquals("Got first ElectionTimeout", true,
216                     onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
217
218             // Now send the RegisterChangeListener and wait for the reply.
219             shard.tell(new RegisterChangeListener(path, dclActor,
220                     AsyncDataBroker.DataChangeScope.SUBTREE), getRef());
221
222             RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
223                     RegisterChangeListenerReply.class);
224             assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
225
226             // Sanity check - verify the shard is not the leader yet.
227             shard.tell(new FindLeader(), getRef());
228             FindLeaderReply findLeadeReply =
229                     expectMsgClass(duration("5 seconds"), FindLeaderReply.class);
230             assertNull("Expected the shard not to be the leader", findLeadeReply.getLeaderActor());
231
232             // Signal the onChangeListenerRegistered latch to tell the thread above to proceed
233             // with the election process.
234             onChangeListenerRegistered.countDown();
235
236             // Wait for the shard to become the leader and notify our listener with the existing
237             // data in the store.
238             listener.waitForChangeEvents(path);
239
240             dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
241             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
242         }};
243     }
244
245     @Test
246     public void testCreateTransaction(){
247         new ShardTestKit(getSystem()) {{
248             ActorRef shard = getSystem().actorOf(newShardProps(), "testCreateTransaction");
249
250             waitUntilLeader(shard);
251
252             shard.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
253
254             shard.tell(new CreateTransaction("txn-1",
255                     TransactionProxy.TransactionType.READ_ONLY.ordinal() ).toSerializable(), getRef());
256
257             CreateTransactionReply reply = expectMsgClass(duration("3 seconds"),
258                     CreateTransactionReply.class);
259
260             String path = reply.getTransactionActorPath().toString();
261             assertTrue("Unexpected transaction path " + path,
262                     path.contains("akka://test/user/testCreateTransaction/shard-txn-1"));
263
264             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
265         }};
266     }
267
268     @Test
269     public void testCreateTransactionOnChain(){
270         new ShardTestKit(getSystem()) {{
271             final ActorRef shard = getSystem().actorOf(newShardProps(), "testCreateTransactionOnChain");
272
273             waitUntilLeader(shard);
274
275             shard.tell(new CreateTransaction("txn-1",
276                     TransactionProxy.TransactionType.READ_ONLY.ordinal() , "foobar").toSerializable(),
277                     getRef());
278
279             CreateTransactionReply reply = expectMsgClass(duration("3 seconds"),
280                     CreateTransactionReply.class);
281
282             String path = reply.getTransactionActorPath().toString();
283             assertTrue("Unexpected transaction path " + path,
284                     path.contains("akka://test/user/testCreateTransactionOnChain/shard-txn-1"));
285
286             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
287         }};
288     }
289
290     @SuppressWarnings("serial")
291     @Test
292     public void testPeerAddressResolved() throws Exception {
293         new ShardTestKit(getSystem()) {{
294             final CountDownLatch recoveryComplete = new CountDownLatch(1);
295             class TestShard extends Shard {
296                 TestShard() {
297                     super(shardID, Collections.<String, String>singletonMap(shardID.toString(), null),
298                             newDatastoreContext(), SCHEMA_CONTEXT);
299                 }
300
301                 Map<String, String> getPeerAddresses() {
302                     return getRaftActorContext().getPeerAddresses();
303                 }
304
305                 @Override
306                 protected void onRecoveryComplete() {
307                     try {
308                         super.onRecoveryComplete();
309                     } finally {
310                         recoveryComplete.countDown();
311                     }
312                 }
313             }
314
315             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
316                     Props.create(new DelegatingShardCreator(new Creator<Shard>() {
317                         @Override
318                         public TestShard create() throws Exception {
319                             return new TestShard();
320                         }
321                     })), "testPeerAddressResolved");
322
323             //waitUntilLeader(shard);
324             assertEquals("Recovery complete", true,
325                     Uninterruptibles.awaitUninterruptibly(recoveryComplete, 5, TimeUnit.SECONDS));
326
327             String address = "akka://foobar";
328             shard.underlyingActor().onReceiveCommand(new PeerAddressResolved(shardID.toString(), address));
329
330             assertEquals("getPeerAddresses", address,
331                     ((TestShard)shard.underlyingActor()).getPeerAddresses().get(shardID.toString()));
332
333             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
334         }};
335     }
336
337     @Test
338     public void testApplySnapshot() throws Exception {
339         TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(),
340                 "testApplySnapshot");
341
342         DataTree store = InMemoryDataTreeFactory.getInstance().create();
343         store.setSchemaContext(SCHEMA_CONTEXT);
344
345         writeToStore(store, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
346
347         YangInstanceIdentifier root = YangInstanceIdentifier.builder().build();
348         NormalizedNode<?,?> expected = readStore(store, root);
349
350         ApplySnapshot applySnapshot = new ApplySnapshot(Snapshot.create(
351                 SerializationUtils.serializeNormalizedNode(expected),
352                 Collections.<ReplicatedLogEntry>emptyList(), 1, 2, 3, 4));
353
354         shard.underlyingActor().onReceiveCommand(applySnapshot);
355
356         NormalizedNode<?,?> actual = readStore(shard, root);
357
358         assertEquals("Root node", expected, actual);
359
360         shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
361     }
362
363     @Test
364     public void testApplyState() throws Exception {
365
366         TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(), "testApplyState");
367
368         NormalizedNode<?, ?> node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
369
370         ApplyState applyState = new ApplyState(null, "test", new ReplicatedLogImplEntry(1, 2,
371                 newModificationPayload(new WriteModification(TestModel.TEST_PATH, node))));
372
373         shard.underlyingActor().onReceiveCommand(applyState);
374
375         NormalizedNode<?,?> actual = readStore(shard, TestModel.TEST_PATH);
376         assertEquals("Applied state", node, actual);
377
378         shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
379     }
380
381     @Test
382     public void testRecovery() throws Exception {
383
384         // Set up the InMemorySnapshotStore.
385
386         DataTree testStore = InMemoryDataTreeFactory.getInstance().create();
387         testStore.setSchemaContext(SCHEMA_CONTEXT);
388
389         writeToStore(testStore, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
390
391         NormalizedNode<?, ?> root = readStore(testStore, YangInstanceIdentifier.builder().build());
392
393         InMemorySnapshotStore.addSnapshot(shardID.toString(), Snapshot.create(
394                 SerializationUtils.serializeNormalizedNode(root),
395                 Collections.<ReplicatedLogEntry>emptyList(), 0, 1, -1, -1));
396
397         // Set up the InMemoryJournal.
398
399         InMemoryJournal.addEntry(shardID.toString(), 0, new ReplicatedLogImplEntry(0, 1, newModificationPayload(
400                   new WriteModification(TestModel.OUTER_LIST_PATH,
401                           ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build()))));
402
403         int nListEntries = 16;
404         Set<Integer> listEntryKeys = new HashSet<>();
405
406         // Add some ModificationPayload entries
407         for(int i = 1; i <= nListEntries; i++) {
408             listEntryKeys.add(Integer.valueOf(i));
409             YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
410                     .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
411             Modification mod = new MergeModification(path,
412                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i));
413             InMemoryJournal.addEntry(shardID.toString(), i, new ReplicatedLogImplEntry(i, 1,
414                     newModificationPayload(mod)));
415         }
416
417         InMemoryJournal.addEntry(shardID.toString(), nListEntries + 1,
418                 new ApplyJournalEntries(nListEntries));
419
420         testRecovery(listEntryKeys);
421     }
422
423     private ModificationPayload newModificationPayload(final Modification... mods) throws IOException {
424         MutableCompositeModification compMod = new MutableCompositeModification();
425         for(Modification mod: mods) {
426             compMod.addModification(mod);
427         }
428
429         return new ModificationPayload(compMod);
430     }
431
432     @SuppressWarnings({ "unchecked" })
433     @Test
434     public void testConcurrentThreePhaseCommits() throws Throwable {
435         new ShardTestKit(getSystem()) {{
436             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
437                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
438                     "testConcurrentThreePhaseCommits");
439
440             waitUntilLeader(shard);
441
442          // Setup 3 simulated transactions with mock cohorts backed by real cohorts.
443
444             ShardDataTree dataStore = shard.underlyingActor().getDataStore();
445
446             String transactionID1 = "tx1";
447             MutableCompositeModification modification1 = new MutableCompositeModification();
448             DOMStoreThreePhaseCommitCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
449                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
450
451             String transactionID2 = "tx2";
452             MutableCompositeModification modification2 = new MutableCompositeModification();
453             DOMStoreThreePhaseCommitCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
454                     TestModel.OUTER_LIST_PATH,
455                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
456                     modification2);
457
458             String transactionID3 = "tx3";
459             MutableCompositeModification modification3 = new MutableCompositeModification();
460             DOMStoreThreePhaseCommitCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
461                     YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
462                         .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
463                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1),
464                     modification3);
465
466             long timeoutSec = 5;
467             final FiniteDuration duration = FiniteDuration.create(timeoutSec, TimeUnit.SECONDS);
468             final Timeout timeout = new Timeout(duration);
469
470             // Simulate the ForwardedReadyTransaction message for the first Tx that would be sent
471             // by the ShardTransaction.
472
473             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
474                     cohort1, modification1, true, false), getRef());
475             ReadyTransactionReply readyReply = ReadyTransactionReply.fromSerializable(
476                     expectMsgClass(duration, ReadyTransactionReply.class));
477             assertEquals("Cohort path", shard.path().toString(), readyReply.getCohortPath());
478
479             // Send the CanCommitTransaction message for the first Tx.
480
481             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
482             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
483                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
484             assertEquals("Can commit", true, canCommitReply.getCanCommit());
485
486             // Send the ForwardedReadyTransaction for the next 2 Tx's.
487
488             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
489                     cohort2, modification2, true, false), getRef());
490             expectMsgClass(duration, ReadyTransactionReply.class);
491
492             shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
493                     cohort3, modification3, true, false), getRef());
494             expectMsgClass(duration, ReadyTransactionReply.class);
495
496             // Send the CanCommitTransaction message for the next 2 Tx's. These should get queued and
497             // processed after the first Tx completes.
498
499             Future<Object> canCommitFuture1 = Patterns.ask(shard,
500                     new CanCommitTransaction(transactionID2).toSerializable(), timeout);
501
502             Future<Object> canCommitFuture2 = Patterns.ask(shard,
503                     new CanCommitTransaction(transactionID3).toSerializable(), timeout);
504
505             // Send the CommitTransaction message for the first Tx. After it completes, it should
506             // trigger the 2nd Tx to proceed which should in turn then trigger the 3rd.
507
508             shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
509             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
510
511             // Wait for the next 2 Tx's to complete.
512
513             final AtomicReference<Throwable> caughtEx = new AtomicReference<>();
514             final CountDownLatch commitLatch = new CountDownLatch(2);
515
516             class OnFutureComplete extends OnComplete<Object> {
517                 private final Class<?> expRespType;
518
519                 OnFutureComplete(final Class<?> expRespType) {
520                     this.expRespType = expRespType;
521                 }
522
523                 @Override
524                 public void onComplete(final Throwable error, final Object resp) {
525                     if(error != null) {
526                         caughtEx.set(new AssertionError(getClass().getSimpleName() + " failure", error));
527                     } else {
528                         try {
529                             assertEquals("Commit response type", expRespType, resp.getClass());
530                             onSuccess(resp);
531                         } catch (Exception e) {
532                             caughtEx.set(e);
533                         }
534                     }
535                 }
536
537                 void onSuccess(final Object resp) throws Exception {
538                 }
539             }
540
541             class OnCommitFutureComplete extends OnFutureComplete {
542                 OnCommitFutureComplete() {
543                     super(CommitTransactionReply.SERIALIZABLE_CLASS);
544                 }
545
546                 @Override
547                 public void onComplete(final Throwable error, final Object resp) {
548                     super.onComplete(error, resp);
549                     commitLatch.countDown();
550                 }
551             }
552
553             class OnCanCommitFutureComplete extends OnFutureComplete {
554                 private final String transactionID;
555
556                 OnCanCommitFutureComplete(final String transactionID) {
557                     super(CanCommitTransactionReply.SERIALIZABLE_CLASS);
558                     this.transactionID = transactionID;
559                 }
560
561                 @Override
562                 void onSuccess(final Object resp) throws Exception {
563                     CanCommitTransactionReply canCommitReply =
564                             CanCommitTransactionReply.fromSerializable(resp);
565                     assertEquals("Can commit", true, canCommitReply.getCanCommit());
566
567                     Future<Object> commitFuture = Patterns.ask(shard,
568                             new CommitTransaction(transactionID).toSerializable(), timeout);
569                     commitFuture.onComplete(new OnCommitFutureComplete(), getSystem().dispatcher());
570                 }
571             }
572
573             canCommitFuture1.onComplete(new OnCanCommitFutureComplete(transactionID2),
574                     getSystem().dispatcher());
575
576             canCommitFuture2.onComplete(new OnCanCommitFutureComplete(transactionID3),
577                     getSystem().dispatcher());
578
579             boolean done = commitLatch.await(timeoutSec, TimeUnit.SECONDS);
580
581             if(caughtEx.get() != null) {
582                 throw caughtEx.get();
583             }
584
585             assertEquals("Commits complete", true, done);
586
587             InOrder inOrder = inOrder(cohort1, cohort2, cohort3);
588             inOrder.verify(cohort1).canCommit();
589             inOrder.verify(cohort1).preCommit();
590             inOrder.verify(cohort1).commit();
591             inOrder.verify(cohort2).canCommit();
592             inOrder.verify(cohort2).preCommit();
593             inOrder.verify(cohort2).commit();
594             inOrder.verify(cohort3).canCommit();
595             inOrder.verify(cohort3).preCommit();
596             inOrder.verify(cohort3).commit();
597
598             // Verify data in the data store.
599
600             verifyOuterListEntry(shard, 1);
601
602             verifyLastApplied(shard, 2);
603
604             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
605         }};
606     }
607
608     private BatchedModifications newBatchedModifications(String transactionID, YangInstanceIdentifier path,
609             NormalizedNode<?, ?> data, boolean ready, boolean doCommitOnReady) {
610         return newBatchedModifications(transactionID, null, path, data, ready, doCommitOnReady);
611     }
612
613     private BatchedModifications newBatchedModifications(String transactionID, String transactionChainID,
614             YangInstanceIdentifier path, NormalizedNode<?, ?> data, boolean ready, boolean doCommitOnReady) {
615         BatchedModifications batched = new BatchedModifications(transactionID, CURRENT_VERSION, transactionChainID);
616         batched.addModification(new WriteModification(path, data));
617         batched.setReady(ready);
618         batched.setDoCommitOnReady(doCommitOnReady);
619         return batched;
620     }
621
622     @Test
623     public void testBatchedModificationsWithNoCommitOnReady() throws Throwable {
624         new ShardTestKit(getSystem()) {{
625             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
626                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
627                     "testBatchedModificationsWithNoCommitOnReady");
628
629             waitUntilLeader(shard);
630
631             final String transactionID = "tx";
632             FiniteDuration duration = duration("5 seconds");
633
634             final AtomicReference<DOMStoreThreePhaseCommitCohort> mockCohort = new AtomicReference<>();
635             ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() {
636                 @Override
637                 public DOMStoreThreePhaseCommitCohort decorate(String txID, DOMStoreThreePhaseCommitCohort actual) {
638                     if(mockCohort.get() == null) {
639                         mockCohort.set(createDelegatingMockCohort("cohort", actual));
640                     }
641
642                     return mockCohort.get();
643                 }
644             };
645
646             shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator);
647
648             // Send a BatchedModifications to start a transaction.
649
650             shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH,
651                     ImmutableNodes.containerNode(TestModel.TEST_QNAME), false, false), getRef());
652             expectMsgClass(duration, BatchedModificationsReply.class);
653
654             // Send a couple more BatchedModifications.
655
656             shard.tell(newBatchedModifications(transactionID, TestModel.OUTER_LIST_PATH,
657                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), false, false), getRef());
658             expectMsgClass(duration, BatchedModificationsReply.class);
659
660             shard.tell(newBatchedModifications(transactionID, YangInstanceIdentifier.builder(
661                     TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
662                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true, false), getRef());
663             expectMsgClass(duration, ReadyTransactionReply.class);
664
665             // Send the CanCommitTransaction message.
666
667             shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
668             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
669                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
670             assertEquals("Can commit", true, canCommitReply.getCanCommit());
671
672             // Send the CanCommitTransaction message.
673
674             shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
675             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
676
677             InOrder inOrder = inOrder(mockCohort.get());
678             inOrder.verify(mockCohort.get()).canCommit();
679             inOrder.verify(mockCohort.get()).preCommit();
680             inOrder.verify(mockCohort.get()).commit();
681
682             // Verify data in the data store.
683
684             verifyOuterListEntry(shard, 1);
685
686             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
687         }};
688     }
689
690     @Test
691     public void testBatchedModificationsWithCommitOnReady() throws Throwable {
692         new ShardTestKit(getSystem()) {{
693             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
694                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
695                     "testBatchedModificationsWithCommitOnReady");
696
697             waitUntilLeader(shard);
698
699             final String transactionID = "tx";
700             FiniteDuration duration = duration("5 seconds");
701
702             final AtomicReference<DOMStoreThreePhaseCommitCohort> mockCohort = new AtomicReference<>();
703             ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() {
704                 @Override
705                 public DOMStoreThreePhaseCommitCohort decorate(String txID, DOMStoreThreePhaseCommitCohort actual) {
706                     if(mockCohort.get() == null) {
707                         mockCohort.set(createDelegatingMockCohort("cohort", actual));
708                     }
709
710                     return mockCohort.get();
711                 }
712             };
713
714             shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator);
715
716             // Send a BatchedModifications to start a transaction.
717
718             shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH,
719                     ImmutableNodes.containerNode(TestModel.TEST_QNAME), false, false), getRef());
720             expectMsgClass(duration, BatchedModificationsReply.class);
721
722             // Send a couple more BatchedModifications.
723
724             shard.tell(newBatchedModifications(transactionID, TestModel.OUTER_LIST_PATH,
725                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), false, false), getRef());
726             expectMsgClass(duration, BatchedModificationsReply.class);
727
728             shard.tell(newBatchedModifications(transactionID, YangInstanceIdentifier.builder(
729                     TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
730                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true, true), getRef());
731
732             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
733
734             InOrder inOrder = inOrder(mockCohort.get());
735             inOrder.verify(mockCohort.get()).canCommit();
736             inOrder.verify(mockCohort.get()).preCommit();
737             inOrder.verify(mockCohort.get()).commit();
738
739             // Verify data in the data store.
740
741             verifyOuterListEntry(shard, 1);
742
743             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
744         }};
745     }
746
747     @SuppressWarnings("unchecked")
748     private void verifyOuterListEntry(final TestActorRef<Shard> shard, Object expIDValue) throws Exception {
749         NormalizedNode<?, ?> outerList = readStore(shard, TestModel.OUTER_LIST_PATH);
750         assertNotNull(TestModel.OUTER_LIST_QNAME.getLocalName() + " not found", outerList);
751         assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " value is not Iterable",
752                 outerList.getValue() instanceof Iterable);
753         Object entry = ((Iterable<Object>)outerList.getValue()).iterator().next();
754         assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " entry is not MapEntryNode",
755                 entry instanceof MapEntryNode);
756         MapEntryNode mapEntry = (MapEntryNode)entry;
757         Optional<DataContainerChild<? extends PathArgument, ?>> idLeaf =
758                 mapEntry.getChild(new YangInstanceIdentifier.NodeIdentifier(TestModel.ID_QNAME));
759         assertTrue("Missing leaf " + TestModel.ID_QNAME.getLocalName(), idLeaf.isPresent());
760         assertEquals(TestModel.ID_QNAME.getLocalName() + " value", expIDValue, idLeaf.get().getValue());
761     }
762
763     @Test
764     public void testBatchedModificationsOnTransactionChain() throws Throwable {
765         new ShardTestKit(getSystem()) {{
766             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
767                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
768                     "testBatchedModificationsOnTransactionChain");
769
770             waitUntilLeader(shard);
771
772             String transactionChainID = "txChain";
773             String transactionID1 = "tx1";
774             String transactionID2 = "tx2";
775
776             FiniteDuration duration = duration("5 seconds");
777
778             // Send a BatchedModifications to start a chained write transaction and ready it.
779
780             ContainerNode containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
781             YangInstanceIdentifier path = TestModel.TEST_PATH;
782             shard.tell(newBatchedModifications(transactionID1, transactionChainID, path,
783                     containerNode, true, false), getRef());
784             expectMsgClass(duration, ReadyTransactionReply.class);
785
786             // Create a read Tx on the same chain.
787
788             shard.tell(new CreateTransaction(transactionID2, TransactionProxy.TransactionType.READ_ONLY.ordinal() ,
789                     transactionChainID).toSerializable(), getRef());
790
791             CreateTransactionReply createReply = expectMsgClass(duration("3 seconds"), CreateTransactionReply.class);
792
793             getSystem().actorSelection(createReply.getTransactionActorPath()).tell(new ReadData(path), getRef());
794             ReadDataReply readReply = expectMsgClass(duration("3 seconds"), ReadDataReply.class);
795             assertEquals("Read node", containerNode, readReply.getNormalizedNode());
796
797             // Commit the write transaction.
798
799             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
800             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
801                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
802             assertEquals("Can commit", true, canCommitReply.getCanCommit());
803
804             shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
805             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
806
807             // Verify data in the data store.
808
809             NormalizedNode<?, ?> actualNode = readStore(shard, path);
810             assertEquals("Stored node", containerNode, actualNode);
811
812             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
813         }};
814     }
815
816     @Test
817     public void testOnBatchedModificationsWhenNotLeader() {
818         final AtomicBoolean overrideLeaderCalls = new AtomicBoolean();
819         new ShardTestKit(getSystem()) {{
820             Creator<Shard> creator = new Creator<Shard>() {
821                 @Override
822                 public Shard create() throws Exception {
823                     return new Shard(shardID, Collections.<String,String>emptyMap(),
824                             newDatastoreContext(), SCHEMA_CONTEXT) {
825                         @Override
826                         protected boolean isLeader() {
827                             return overrideLeaderCalls.get() ? false : super.isLeader();
828                         }
829
830                         @Override
831                         protected ActorSelection getLeader() {
832                             return overrideLeaderCalls.get() ? getSystem().actorSelection(getRef().path()) :
833                                 super.getLeader();
834                         }
835                     };
836                 }
837             };
838
839             TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
840                     Props.create(new DelegatingShardCreator(creator)), "testOnBatchedModificationsWhenNotLeader");
841
842             waitUntilLeader(shard);
843
844             overrideLeaderCalls.set(true);
845
846             BatchedModifications batched = new BatchedModifications("tx", DataStoreVersions.CURRENT_VERSION, "");
847
848             shard.tell(batched, ActorRef.noSender());
849
850             expectMsgEquals(batched);
851
852             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
853         }};
854     }
855
856     @Test
857     public void testForwardedReadyTransactionWithImmediateCommit() throws Exception{
858         new ShardTestKit(getSystem()) {{
859             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
860                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
861                     "testForwardedReadyTransactionWithImmediateCommit");
862
863             waitUntilLeader(shard);
864
865             ShardDataTree dataStore = shard.underlyingActor().getDataStore();
866
867             String transactionID = "tx1";
868             MutableCompositeModification modification = new MutableCompositeModification();
869             NormalizedNode<?, ?> containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
870             DOMStoreThreePhaseCommitCohort cohort = setupMockWriteTransaction("cohort", dataStore,
871                     TestModel.TEST_PATH, containerNode, modification);
872
873             FiniteDuration duration = duration("5 seconds");
874
875             // Simulate the ForwardedReadyTransaction messages that would be sent
876             // by the ShardTransaction.
877
878             shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
879                     cohort, modification, true, true), getRef());
880
881             expectMsgClass(duration, ThreePhaseCommitCohortMessages.CommitTransactionReply.class);
882
883             InOrder inOrder = inOrder(cohort);
884             inOrder.verify(cohort).canCommit();
885             inOrder.verify(cohort).preCommit();
886             inOrder.verify(cohort).commit();
887
888             NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.TEST_PATH);
889             assertEquals(TestModel.TEST_QNAME.getLocalName(), containerNode, actualNode);
890
891             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
892         }};
893     }
894
895     @Test
896     public void testCommitWithPersistenceDisabled() throws Throwable {
897         dataStoreContextBuilder.persistent(false);
898         new ShardTestKit(getSystem()) {{
899             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
900                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
901                     "testCommitWithPersistenceDisabled");
902
903             waitUntilLeader(shard);
904
905             ShardDataTree dataStore = shard.underlyingActor().getDataStore();
906
907             // Setup a simulated transactions with a mock cohort.
908
909             String transactionID = "tx";
910             MutableCompositeModification modification = new MutableCompositeModification();
911             NormalizedNode<?, ?> containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
912             DOMStoreThreePhaseCommitCohort cohort = setupMockWriteTransaction("cohort", dataStore,
913                     TestModel.TEST_PATH, containerNode, modification);
914
915             FiniteDuration duration = duration("5 seconds");
916
917             // Simulate the ForwardedReadyTransaction messages that would be sent
918             // by the ShardTransaction.
919
920             shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
921                     cohort, modification, true, false), getRef());
922             expectMsgClass(duration, ReadyTransactionReply.class);
923
924             // Send the CanCommitTransaction message.
925
926             shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
927             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
928                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
929             assertEquals("Can commit", true, canCommitReply.getCanCommit());
930
931             // Send the CanCommitTransaction message.
932
933             shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
934             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
935
936             InOrder inOrder = inOrder(cohort);
937             inOrder.verify(cohort).canCommit();
938             inOrder.verify(cohort).preCommit();
939             inOrder.verify(cohort).commit();
940
941             NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.TEST_PATH);
942             assertEquals(TestModel.TEST_QNAME.getLocalName(), containerNode, actualNode);
943
944             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
945         }};
946     }
947
948     @Test
949     public void testCommitWhenTransactionHasNoModifications(){
950         // Note that persistence is enabled which would normally result in the entry getting written to the journal
951         // but here that need not happen
952         new ShardTestKit(getSystem()) {
953             {
954                 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
955                         newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
956                         "testCommitWhenTransactionHasNoModifications");
957
958                 waitUntilLeader(shard);
959
960                 String transactionID = "tx1";
961                 MutableCompositeModification modification = new MutableCompositeModification();
962                 DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
963                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
964                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).preCommit();
965                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).commit();
966
967                 FiniteDuration duration = duration("5 seconds");
968
969                 // Simulate the ForwardedReadyTransaction messages that would be sent
970                 // by the ShardTransaction.
971
972                 shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
973                         cohort, modification, true, false), getRef());
974                 expectMsgClass(duration, ReadyTransactionReply.class);
975
976                 // Send the CanCommitTransaction message.
977
978                 shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
979                 CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
980                         expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
981                 assertEquals("Can commit", true, canCommitReply.getCanCommit());
982
983                 shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
984                 expectMsgClass(duration, ThreePhaseCommitCohortMessages.CommitTransactionReply.class);
985
986                 InOrder inOrder = inOrder(cohort);
987                 inOrder.verify(cohort).canCommit();
988                 inOrder.verify(cohort).preCommit();
989                 inOrder.verify(cohort).commit();
990
991                 // Use MBean for verification
992                 // Committed transaction count should increase as usual
993                 assertEquals(1,shard.underlyingActor().getShardMBean().getCommittedTransactionsCount());
994
995                 // Commit index should not advance because this does not go into the journal
996                 assertEquals(-1, shard.underlyingActor().getShardMBean().getCommitIndex());
997
998                 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
999
1000             }
1001         };
1002     }
1003
1004     @Test
1005     public void testCommitWhenTransactionHasModifications(){
1006         new ShardTestKit(getSystem()) {
1007             {
1008                 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1009                         newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1010                         "testCommitWhenTransactionHasModifications");
1011
1012                 waitUntilLeader(shard);
1013
1014                 String transactionID = "tx1";
1015                 MutableCompositeModification modification = new MutableCompositeModification();
1016                 modification.addModification(new DeleteModification(YangInstanceIdentifier.builder().build()));
1017                 DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
1018                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1019                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).preCommit();
1020                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).commit();
1021
1022                 FiniteDuration duration = duration("5 seconds");
1023
1024                 // Simulate the ForwardedReadyTransaction messages that would be sent
1025                 // by the ShardTransaction.
1026
1027                 shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
1028                         cohort, modification, true, false), getRef());
1029                 expectMsgClass(duration, ReadyTransactionReply.class);
1030
1031                 // Send the CanCommitTransaction message.
1032
1033                 shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
1034                 CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1035                         expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1036                 assertEquals("Can commit", true, canCommitReply.getCanCommit());
1037
1038                 shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
1039                 expectMsgClass(duration, ThreePhaseCommitCohortMessages.CommitTransactionReply.class);
1040
1041                 InOrder inOrder = inOrder(cohort);
1042                 inOrder.verify(cohort).canCommit();
1043                 inOrder.verify(cohort).preCommit();
1044                 inOrder.verify(cohort).commit();
1045
1046                 // Use MBean for verification
1047                 // Committed transaction count should increase as usual
1048                 assertEquals(1, shard.underlyingActor().getShardMBean().getCommittedTransactionsCount());
1049
1050                 // Commit index should advance as we do not have an empty modification
1051                 assertEquals(0, shard.underlyingActor().getShardMBean().getCommitIndex());
1052
1053                 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1054
1055             }
1056         };
1057     }
1058
1059     @Test
1060     public void testCommitPhaseFailure() throws Throwable {
1061         new ShardTestKit(getSystem()) {{
1062             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1063                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1064                     "testCommitPhaseFailure");
1065
1066             waitUntilLeader(shard);
1067
1068             // Setup 2 simulated transactions with mock cohorts. The first one fails in the
1069             // commit phase.
1070
1071             String transactionID1 = "tx1";
1072             MutableCompositeModification modification1 = new MutableCompositeModification();
1073             DOMStoreThreePhaseCommitCohort cohort1 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
1074             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
1075             doReturn(Futures.immediateFuture(null)).when(cohort1).preCommit();
1076             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort1).commit();
1077
1078             String transactionID2 = "tx2";
1079             MutableCompositeModification modification2 = new MutableCompositeModification();
1080             DOMStoreThreePhaseCommitCohort cohort2 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort2");
1081             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
1082
1083             FiniteDuration duration = duration("5 seconds");
1084             final Timeout timeout = new Timeout(duration);
1085
1086             // Simulate the ForwardedReadyTransaction messages that would be sent
1087             // by the ShardTransaction.
1088
1089             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1090                     cohort1, modification1, true, false), getRef());
1091             expectMsgClass(duration, ReadyTransactionReply.class);
1092
1093             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1094                     cohort2, modification2, true, false), getRef());
1095             expectMsgClass(duration, ReadyTransactionReply.class);
1096
1097             // Send the CanCommitTransaction message for the first Tx.
1098
1099             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1100             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1101                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1102             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1103
1104             // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
1105             // processed after the first Tx completes.
1106
1107             Future<Object> canCommitFuture = Patterns.ask(shard,
1108                     new CanCommitTransaction(transactionID2).toSerializable(), timeout);
1109
1110             // Send the CommitTransaction message for the first Tx. This should send back an error
1111             // and trigger the 2nd Tx to proceed.
1112
1113             shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
1114             expectMsgClass(duration, akka.actor.Status.Failure.class);
1115
1116             // Wait for the 2nd Tx to complete the canCommit phase.
1117
1118             final CountDownLatch latch = new CountDownLatch(1);
1119             canCommitFuture.onComplete(new OnComplete<Object>() {
1120                 @Override
1121                 public void onComplete(final Throwable t, final Object resp) {
1122                     latch.countDown();
1123                 }
1124             }, getSystem().dispatcher());
1125
1126             assertEquals("2nd CanCommit complete", true, latch.await(5, TimeUnit.SECONDS));
1127
1128             InOrder inOrder = inOrder(cohort1, cohort2);
1129             inOrder.verify(cohort1).canCommit();
1130             inOrder.verify(cohort1).preCommit();
1131             inOrder.verify(cohort1).commit();
1132             inOrder.verify(cohort2).canCommit();
1133
1134             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1135         }};
1136     }
1137
1138     @Test
1139     public void testPreCommitPhaseFailure() throws Throwable {
1140         new ShardTestKit(getSystem()) {{
1141             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1142                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1143                     "testPreCommitPhaseFailure");
1144
1145             waitUntilLeader(shard);
1146
1147             String transactionID1 = "tx1";
1148             MutableCompositeModification modification1 = new MutableCompositeModification();
1149             DOMStoreThreePhaseCommitCohort cohort1 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
1150             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
1151             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort1).preCommit();
1152
1153             String transactionID2 = "tx2";
1154             MutableCompositeModification modification2 = new MutableCompositeModification();
1155             DOMStoreThreePhaseCommitCohort cohort2 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort2");
1156             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
1157
1158             FiniteDuration duration = duration("5 seconds");
1159             final Timeout timeout = new Timeout(duration);
1160
1161             // Simulate the ForwardedReadyTransaction messages that would be sent
1162             // by the ShardTransaction.
1163
1164             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1165                     cohort1, modification1, true, false), getRef());
1166             expectMsgClass(duration, ReadyTransactionReply.class);
1167
1168             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1169                     cohort2, modification2, true, false), getRef());
1170             expectMsgClass(duration, ReadyTransactionReply.class);
1171
1172             // Send the CanCommitTransaction message for the first Tx.
1173
1174             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1175             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1176                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1177             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1178
1179             // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
1180             // processed after the first Tx completes.
1181
1182             Future<Object> canCommitFuture = Patterns.ask(shard,
1183                     new CanCommitTransaction(transactionID2).toSerializable(), timeout);
1184
1185             // Send the CommitTransaction message for the first Tx. This should send back an error
1186             // and trigger the 2nd Tx to proceed.
1187
1188             shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
1189             expectMsgClass(duration, akka.actor.Status.Failure.class);
1190
1191             // Wait for the 2nd Tx to complete the canCommit phase.
1192
1193             final CountDownLatch latch = new CountDownLatch(1);
1194             canCommitFuture.onComplete(new OnComplete<Object>() {
1195                 @Override
1196                 public void onComplete(final Throwable t, final Object resp) {
1197                     latch.countDown();
1198                 }
1199             }, getSystem().dispatcher());
1200
1201             assertEquals("2nd CanCommit complete", true, latch.await(5, TimeUnit.SECONDS));
1202
1203             InOrder inOrder = inOrder(cohort1, cohort2);
1204             inOrder.verify(cohort1).canCommit();
1205             inOrder.verify(cohort1).preCommit();
1206             inOrder.verify(cohort2).canCommit();
1207
1208             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1209         }};
1210     }
1211
1212     @Test
1213     public void testCanCommitPhaseFailure() throws Throwable {
1214         new ShardTestKit(getSystem()) {{
1215             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1216                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1217                     "testCanCommitPhaseFailure");
1218
1219             waitUntilLeader(shard);
1220
1221             final FiniteDuration duration = duration("5 seconds");
1222
1223             String transactionID1 = "tx1";
1224             MutableCompositeModification modification = new MutableCompositeModification();
1225             DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
1226             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).canCommit();
1227
1228             // Simulate the ForwardedReadyTransaction messages that would be sent
1229             // by the ShardTransaction.
1230
1231             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1232                     cohort, modification, true, false), getRef());
1233             expectMsgClass(duration, ReadyTransactionReply.class);
1234
1235             // Send the CanCommitTransaction message.
1236
1237             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1238             expectMsgClass(duration, akka.actor.Status.Failure.class);
1239
1240             // Send another can commit to ensure the failed one got cleaned up.
1241
1242             reset(cohort);
1243
1244             String transactionID2 = "tx2";
1245             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1246
1247             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1248                     cohort, modification, true, false), getRef());
1249             expectMsgClass(duration, ReadyTransactionReply.class);
1250
1251             shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
1252             CanCommitTransactionReply reply = CanCommitTransactionReply.fromSerializable(
1253                     expectMsgClass(CanCommitTransactionReply.SERIALIZABLE_CLASS));
1254             assertEquals("getCanCommit", true, reply.getCanCommit());
1255
1256             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1257         }};
1258     }
1259
1260     @Test
1261     public void testCanCommitPhaseFalseResponse() throws Throwable {
1262         new ShardTestKit(getSystem()) {{
1263             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1264                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1265                     "testCanCommitPhaseFalseResponse");
1266
1267             waitUntilLeader(shard);
1268
1269             final FiniteDuration duration = duration("5 seconds");
1270
1271             String transactionID1 = "tx1";
1272             MutableCompositeModification modification = new MutableCompositeModification();
1273             DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
1274             doReturn(Futures.immediateFuture(Boolean.FALSE)).when(cohort).canCommit();
1275
1276             // Simulate the ForwardedReadyTransaction messages that would be sent
1277             // by the ShardTransaction.
1278
1279             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1280                     cohort, modification, true, false), getRef());
1281             expectMsgClass(duration, ReadyTransactionReply.class);
1282
1283             // Send the CanCommitTransaction message.
1284
1285             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1286             CanCommitTransactionReply reply = CanCommitTransactionReply.fromSerializable(
1287                     expectMsgClass(CanCommitTransactionReply.SERIALIZABLE_CLASS));
1288             assertEquals("getCanCommit", false, reply.getCanCommit());
1289
1290             // Send another can commit to ensure the failed one got cleaned up.
1291
1292             reset(cohort);
1293
1294             String transactionID2 = "tx2";
1295             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1296
1297             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1298                     cohort, modification, true, false), getRef());
1299             expectMsgClass(duration, ReadyTransactionReply.class);
1300
1301             shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
1302             reply = CanCommitTransactionReply.fromSerializable(
1303                     expectMsgClass(CanCommitTransactionReply.SERIALIZABLE_CLASS));
1304             assertEquals("getCanCommit", true, reply.getCanCommit());
1305
1306             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1307         }};
1308     }
1309
1310     @Test
1311     public void testImmediateCommitWithCanCommitPhaseFailure() throws Throwable {
1312         new ShardTestKit(getSystem()) {{
1313             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1314                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1315                     "testImmediateCommitWithCanCommitPhaseFailure");
1316
1317             waitUntilLeader(shard);
1318
1319             final FiniteDuration duration = duration("5 seconds");
1320
1321             String transactionID1 = "tx1";
1322             MutableCompositeModification modification = new MutableCompositeModification();
1323             DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
1324             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).canCommit();
1325
1326             // Simulate the ForwardedReadyTransaction messages that would be sent
1327             // by the ShardTransaction.
1328
1329             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1330                     cohort, modification, true, true), getRef());
1331
1332             expectMsgClass(duration, akka.actor.Status.Failure.class);
1333
1334             // Send another can commit to ensure the failed one got cleaned up.
1335
1336             reset(cohort);
1337
1338             String transactionID2 = "tx2";
1339             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1340             doReturn(Futures.immediateFuture(null)).when(cohort).preCommit();
1341             doReturn(Futures.immediateFuture(null)).when(cohort).commit();
1342
1343             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1344                     cohort, modification, true, true), getRef());
1345
1346             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1347
1348             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1349         }};
1350     }
1351
1352     @Test
1353     public void testImmediateCommitWithCanCommitPhaseFalseResponse() throws Throwable {
1354         new ShardTestKit(getSystem()) {{
1355             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1356                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1357                     "testImmediateCommitWithCanCommitPhaseFalseResponse");
1358
1359             waitUntilLeader(shard);
1360
1361             final FiniteDuration duration = duration("5 seconds");
1362
1363             String transactionID = "tx1";
1364             MutableCompositeModification modification = new MutableCompositeModification();
1365             DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
1366             doReturn(Futures.immediateFuture(Boolean.FALSE)).when(cohort).canCommit();
1367
1368             // Simulate the ForwardedReadyTransaction messages that would be sent
1369             // by the ShardTransaction.
1370
1371             shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
1372                     cohort, modification, true, true), getRef());
1373
1374             expectMsgClass(duration, akka.actor.Status.Failure.class);
1375
1376             // Send another can commit to ensure the failed one got cleaned up.
1377
1378             reset(cohort);
1379
1380             String transactionID2 = "tx2";
1381             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1382             doReturn(Futures.immediateFuture(null)).when(cohort).preCommit();
1383             doReturn(Futures.immediateFuture(null)).when(cohort).commit();
1384
1385             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1386                     cohort, modification, true, true), getRef());
1387
1388             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1389
1390             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1391         }};
1392     }
1393
1394     @Test
1395     public void testAbortBeforeFinishCommit() throws Throwable {
1396         new ShardTestKit(getSystem()) {{
1397             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1398                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1399                     "testAbortBeforeFinishCommit");
1400
1401             waitUntilLeader(shard);
1402
1403             final FiniteDuration duration = duration("5 seconds");
1404             ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1405
1406             final String transactionID = "tx1";
1407             Function<DOMStoreThreePhaseCommitCohort,ListenableFuture<Void>> preCommit =
1408                           new Function<DOMStoreThreePhaseCommitCohort,ListenableFuture<Void>>() {
1409                 @Override
1410                 public ListenableFuture<Void> apply(final DOMStoreThreePhaseCommitCohort cohort) {
1411                     ListenableFuture<Void> preCommitFuture = cohort.preCommit();
1412
1413                     // Simulate an AbortTransaction message occurring during replication, after
1414                     // persisting and before finishing the commit to the in-memory store.
1415                     // We have no followers so due to optimizations in the RaftActor, it does not
1416                     // attempt replication and thus we can't send an AbortTransaction message b/c
1417                     // it would be processed too late after CommitTransaction completes. So we'll
1418                     // simulate an AbortTransaction message occurring during replication by calling
1419                     // the shard directly.
1420                     //
1421                     shard.underlyingActor().doAbortTransaction(transactionID, null);
1422
1423                     return preCommitFuture;
1424                 }
1425             };
1426
1427             MutableCompositeModification modification = new MutableCompositeModification();
1428             DOMStoreThreePhaseCommitCohort cohort = setupMockWriteTransaction("cohort1", dataStore,
1429                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME),
1430                     modification, preCommit);
1431
1432             shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
1433                     cohort, modification, true, false), getRef());
1434             expectMsgClass(duration, ReadyTransactionReply.class);
1435
1436             shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
1437             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1438                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1439             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1440
1441             shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
1442             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1443
1444             NormalizedNode<?, ?> node = readStore(shard, TestModel.TEST_PATH);
1445
1446             // Since we're simulating an abort occurring during replication and before finish commit,
1447             // the data should still get written to the in-memory store since we've gotten past
1448             // canCommit and preCommit and persisted the data.
1449             assertNotNull(TestModel.TEST_QNAME.getLocalName() + " not found", node);
1450
1451             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1452         }};
1453     }
1454
1455     @Test
1456     public void testTransactionCommitTimeout() throws Throwable {
1457         dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1);
1458
1459         new ShardTestKit(getSystem()) {{
1460             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1461                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1462                     "testTransactionCommitTimeout");
1463
1464             waitUntilLeader(shard);
1465
1466             final FiniteDuration duration = duration("5 seconds");
1467
1468             ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1469
1470             writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1471             writeToStore(shard, TestModel.OUTER_LIST_PATH,
1472                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
1473
1474             // Create 1st Tx - will timeout
1475
1476             String transactionID1 = "tx1";
1477             MutableCompositeModification modification1 = new MutableCompositeModification();
1478             DOMStoreThreePhaseCommitCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
1479                     YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
1480                         .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
1481                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1),
1482                     modification1);
1483
1484             // Create 2nd Tx
1485
1486             String transactionID2 = "tx3";
1487             MutableCompositeModification modification2 = new MutableCompositeModification();
1488             YangInstanceIdentifier listNodePath = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
1489                 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build();
1490             DOMStoreThreePhaseCommitCohort cohort2 = setupMockWriteTransaction("cohort3", dataStore,
1491                     listNodePath,
1492                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2),
1493                     modification2);
1494
1495             // Ready the Tx's
1496
1497             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1498                     cohort1, modification1, true, false), getRef());
1499             expectMsgClass(duration, ReadyTransactionReply.class);
1500
1501             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1502                     cohort2, modification2, true, false), getRef());
1503             expectMsgClass(duration, ReadyTransactionReply.class);
1504
1505             // canCommit 1st Tx. We don't send the commit so it should timeout.
1506
1507             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1508             expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
1509
1510             // canCommit the 2nd Tx - it should complete after the 1st Tx times out.
1511
1512             shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
1513             expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
1514
1515             // Try to commit the 1st Tx - should fail as it's not the current Tx.
1516
1517             shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
1518             expectMsgClass(duration, akka.actor.Status.Failure.class);
1519
1520             // Commit the 2nd Tx.
1521
1522             shard.tell(new CommitTransaction(transactionID2).toSerializable(), getRef());
1523             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1524
1525             NormalizedNode<?, ?> node = readStore(shard, listNodePath);
1526             assertNotNull(listNodePath + " not found", node);
1527
1528             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1529         }};
1530     }
1531
1532     @Test
1533     public void testTransactionCommitQueueCapacityExceeded() throws Throwable {
1534         dataStoreContextBuilder.shardTransactionCommitQueueCapacity(1);
1535
1536         new ShardTestKit(getSystem()) {{
1537             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1538                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1539                     "testTransactionCommitQueueCapacityExceeded");
1540
1541             waitUntilLeader(shard);
1542
1543             final FiniteDuration duration = duration("5 seconds");
1544
1545             ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1546
1547             String transactionID1 = "tx1";
1548             MutableCompositeModification modification1 = new MutableCompositeModification();
1549             DOMStoreThreePhaseCommitCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
1550                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
1551
1552             String transactionID2 = "tx2";
1553             MutableCompositeModification modification2 = new MutableCompositeModification();
1554             DOMStoreThreePhaseCommitCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
1555                     TestModel.OUTER_LIST_PATH,
1556                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
1557                     modification2);
1558
1559             String transactionID3 = "tx3";
1560             MutableCompositeModification modification3 = new MutableCompositeModification();
1561             DOMStoreThreePhaseCommitCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
1562                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification3);
1563
1564             // Ready the Tx's
1565
1566             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1567                     cohort1, modification1, true, false), getRef());
1568             expectMsgClass(duration, ReadyTransactionReply.class);
1569
1570             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1571                     cohort2, modification2, true, false), getRef());
1572             expectMsgClass(duration, ReadyTransactionReply.class);
1573
1574             shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
1575                     cohort3, modification3, true, false), getRef());
1576             expectMsgClass(duration, ReadyTransactionReply.class);
1577
1578             // canCommit 1st Tx.
1579
1580             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1581             expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
1582
1583             // canCommit the 2nd Tx - it should get queued.
1584
1585             shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
1586
1587             // canCommit the 3rd Tx - should exceed queue capacity and fail.
1588
1589             shard.tell(new CanCommitTransaction(transactionID3).toSerializable(), getRef());
1590             expectMsgClass(duration, akka.actor.Status.Failure.class);
1591
1592             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1593         }};
1594     }
1595
1596     @Test
1597     public void testCanCommitBeforeReadyFailure() throws Throwable {
1598         new ShardTestKit(getSystem()) {{
1599             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1600                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1601                     "testCanCommitBeforeReadyFailure");
1602
1603             shard.tell(new CanCommitTransaction("tx").toSerializable(), getRef());
1604             expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
1605
1606             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1607         }};
1608     }
1609
1610     @Test
1611     public void testAbortTransaction() throws Throwable {
1612         new ShardTestKit(getSystem()) {{
1613             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1614                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1615                     "testAbortTransaction");
1616
1617             waitUntilLeader(shard);
1618
1619             // Setup 2 simulated transactions with mock cohorts. The first one will be aborted.
1620
1621             String transactionID1 = "tx1";
1622             MutableCompositeModification modification1 = new MutableCompositeModification();
1623             DOMStoreThreePhaseCommitCohort cohort1 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
1624             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
1625             doReturn(Futures.immediateFuture(null)).when(cohort1).abort();
1626
1627             String transactionID2 = "tx2";
1628             MutableCompositeModification modification2 = new MutableCompositeModification();
1629             DOMStoreThreePhaseCommitCohort cohort2 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort2");
1630             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
1631
1632             FiniteDuration duration = duration("5 seconds");
1633             final Timeout timeout = new Timeout(duration);
1634
1635             // Simulate the ForwardedReadyTransaction messages that would be sent
1636             // by the ShardTransaction.
1637
1638             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1639                     cohort1, modification1, true, false), getRef());
1640             expectMsgClass(duration, ReadyTransactionReply.class);
1641
1642             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1643                     cohort2, modification2, true, false), getRef());
1644             expectMsgClass(duration, ReadyTransactionReply.class);
1645
1646             // Send the CanCommitTransaction message for the first Tx.
1647
1648             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1649             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1650                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1651             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1652
1653             // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
1654             // processed after the first Tx completes.
1655
1656             Future<Object> canCommitFuture = Patterns.ask(shard,
1657                     new CanCommitTransaction(transactionID2).toSerializable(), timeout);
1658
1659             // Send the AbortTransaction message for the first Tx. This should trigger the 2nd
1660             // Tx to proceed.
1661
1662             shard.tell(new AbortTransaction(transactionID1).toSerializable(), getRef());
1663             expectMsgClass(duration, AbortTransactionReply.SERIALIZABLE_CLASS);
1664
1665             // Wait for the 2nd Tx to complete the canCommit phase.
1666
1667             Await.ready(canCommitFuture, duration);
1668
1669             InOrder inOrder = inOrder(cohort1, cohort2);
1670             inOrder.verify(cohort1).canCommit();
1671             inOrder.verify(cohort2).canCommit();
1672
1673             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1674         }};
1675     }
1676
1677     @Test
1678     public void testCreateSnapshot() throws Exception {
1679         testCreateSnapshot(true, "testCreateSnapshot");
1680     }
1681
1682     @Test
1683     public void testCreateSnapshotWithNonPersistentData() throws Exception {
1684         testCreateSnapshot(false, "testCreateSnapshotWithNonPersistentData");
1685     }
1686
1687     @SuppressWarnings("serial")
1688     public void testCreateSnapshot(final boolean persistent, final String shardActorName) throws Exception{
1689
1690         final AtomicReference<CountDownLatch> latch = new AtomicReference<>(new CountDownLatch(1));
1691
1692         final AtomicReference<Object> savedSnapshot = new AtomicReference<>();
1693         class TestPersistentDataProvider extends DelegatingPersistentDataProvider {
1694             TestPersistentDataProvider(DataPersistenceProvider delegate) {
1695                 super(delegate);
1696             }
1697
1698             @Override
1699             public void saveSnapshot(Object o) {
1700                 savedSnapshot.set(o);
1701                 super.saveSnapshot(o);
1702             }
1703         }
1704
1705         dataStoreContextBuilder.persistent(persistent);
1706
1707         new ShardTestKit(getSystem()) {{
1708             class TestShard extends Shard {
1709
1710                 protected TestShard(ShardIdentifier name, Map<String, String> peerAddresses,
1711                                     DatastoreContext datastoreContext, SchemaContext schemaContext) {
1712                     super(name, peerAddresses, datastoreContext, schemaContext);
1713                     setPersistence(new TestPersistentDataProvider(super.persistence()));
1714                 }
1715
1716                 @Override
1717                 public void handleCommand(Object message) {
1718                     super.handleCommand(message);
1719
1720                     if (message instanceof SaveSnapshotSuccess || message.equals("commit_snapshot")) {
1721                         latch.get().countDown();
1722                     }
1723                 }
1724
1725                 @Override
1726                 public RaftActorContext getRaftActorContext() {
1727                     return super.getRaftActorContext();
1728                 }
1729             }
1730
1731             Creator<Shard> creator = new Creator<Shard>() {
1732                 @Override
1733                 public Shard create() throws Exception {
1734                     return new TestShard(shardID, Collections.<String,String>emptyMap(),
1735                             newDatastoreContext(), SCHEMA_CONTEXT);
1736                 }
1737             };
1738
1739             TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1740                     Props.create(new DelegatingShardCreator(creator)), shardActorName);
1741
1742             waitUntilLeader(shard);
1743
1744             writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1745
1746             NormalizedNode<?,?> expectedRoot = readStore(shard, YangInstanceIdentifier.builder().build());
1747
1748             // Trigger creation of a snapshot by ensuring
1749             RaftActorContext raftActorContext = ((TestShard) shard.underlyingActor()).getRaftActorContext();
1750             raftActorContext.getSnapshotManager().capture(mock(ReplicatedLogEntry.class), -1);
1751
1752             assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS));
1753
1754             assertTrue("Invalid saved snapshot " + savedSnapshot.get(),
1755                     savedSnapshot.get() instanceof Snapshot);
1756
1757             verifySnapshot((Snapshot)savedSnapshot.get(), expectedRoot);
1758
1759             latch.set(new CountDownLatch(1));
1760             savedSnapshot.set(null);
1761
1762             raftActorContext.getSnapshotManager().capture(mock(ReplicatedLogEntry.class), -1);
1763
1764             assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS));
1765
1766             assertTrue("Invalid saved snapshot " + savedSnapshot.get(),
1767                     savedSnapshot.get() instanceof Snapshot);
1768
1769             verifySnapshot((Snapshot)savedSnapshot.get(), expectedRoot);
1770
1771             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1772         }
1773
1774         private void verifySnapshot(Snapshot snapshot, NormalizedNode<?,?> expectedRoot) {
1775
1776             NormalizedNode<?, ?> actual = SerializationUtils.deserializeNormalizedNode(snapshot.getState());
1777             assertEquals("Root node", expectedRoot, actual);
1778
1779         }};
1780     }
1781
1782     /**
1783      * This test simply verifies that the applySnapShot logic will work
1784      * @throws ReadFailedException
1785      */
1786     @Test
1787     public void testInMemoryDataStoreRestore() throws ReadFailedException {
1788         InMemoryDOMDataStore store = new InMemoryDOMDataStore("test", MoreExecutors.sameThreadExecutor());
1789
1790         store.onGlobalContextUpdated(SCHEMA_CONTEXT);
1791
1792         DOMStoreWriteTransaction putTransaction = store.newWriteOnlyTransaction();
1793         putTransaction.write(TestModel.TEST_PATH,
1794             ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1795         commitTransaction(putTransaction);
1796
1797
1798         NormalizedNode<?, ?> expected = readStore(store);
1799
1800         DOMStoreWriteTransaction writeTransaction = store.newWriteOnlyTransaction();
1801
1802         writeTransaction.delete(YangInstanceIdentifier.builder().build());
1803         writeTransaction.write(YangInstanceIdentifier.builder().build(), expected);
1804
1805         commitTransaction(writeTransaction);
1806
1807         NormalizedNode<?, ?> actual = readStore(store);
1808
1809         assertEquals(expected, actual);
1810     }
1811
1812     @Test
1813     public void testRecoveryApplicable(){
1814
1815         final DatastoreContext persistentContext = DatastoreContext.newBuilder().
1816                 shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(true).build();
1817
1818         final Props persistentProps = Shard.props(shardID, Collections.<String, String>emptyMap(),
1819                 persistentContext, SCHEMA_CONTEXT);
1820
1821         final DatastoreContext nonPersistentContext = DatastoreContext.newBuilder().
1822                 shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(false).build();
1823
1824         final Props nonPersistentProps = Shard.props(shardID, Collections.<String, String>emptyMap(),
1825                 nonPersistentContext, SCHEMA_CONTEXT);
1826
1827         new ShardTestKit(getSystem()) {{
1828             TestActorRef<Shard> shard1 = TestActorRef.create(getSystem(),
1829                     persistentProps, "testPersistence1");
1830
1831             assertTrue("Recovery Applicable", shard1.underlyingActor().persistence().isRecoveryApplicable());
1832
1833             shard1.tell(PoisonPill.getInstance(), ActorRef.noSender());
1834
1835             TestActorRef<Shard> shard2 = TestActorRef.create(getSystem(),
1836                     nonPersistentProps, "testPersistence2");
1837
1838             assertFalse("Recovery Not Applicable", shard2.underlyingActor().persistence().isRecoveryApplicable());
1839
1840             shard2.tell(PoisonPill.getInstance(), ActorRef.noSender());
1841
1842         }};
1843
1844     }
1845
1846     @Test
1847     public void testOnDatastoreContext() {
1848         new ShardTestKit(getSystem()) {{
1849             dataStoreContextBuilder.persistent(true);
1850
1851             TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(), "testOnDatastoreContext");
1852
1853             assertEquals("isRecoveryApplicable", true,
1854                     shard.underlyingActor().persistence().isRecoveryApplicable());
1855
1856             waitUntilLeader(shard);
1857
1858             shard.tell(dataStoreContextBuilder.persistent(false).build(), ActorRef.noSender());
1859
1860             assertEquals("isRecoveryApplicable", false,
1861                     shard.underlyingActor().persistence().isRecoveryApplicable());
1862
1863             shard.tell(dataStoreContextBuilder.persistent(true).build(), ActorRef.noSender());
1864
1865             assertEquals("isRecoveryApplicable", true,
1866                     shard.underlyingActor().persistence().isRecoveryApplicable());
1867
1868             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1869         }};
1870     }
1871
1872     @Test
1873     public void testRegisterRoleChangeListener() throws Exception {
1874         new ShardTestKit(getSystem()) {
1875             {
1876                 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1877                         newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1878                         "testRegisterRoleChangeListener");
1879
1880                 waitUntilLeader(shard);
1881
1882                 TestActorRef<MessageCollectorActor> listener =
1883                         TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
1884
1885                 shard.tell(new RegisterRoleChangeListener(), listener);
1886
1887                 // TODO: MessageCollectorActor exists as a test util in both the akka-raft and distributed-datastore
1888                 // projects. Need to move it to commons as a regular utility and then we can get rid of this arbitrary
1889                 // sleep.
1890                 Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
1891
1892                 List<Object> allMatching = MessageCollectorActor.getAllMatching(listener, RegisterRoleChangeListenerReply.class);
1893
1894                 assertEquals(1, allMatching.size());
1895             }
1896         };
1897     }
1898
1899     @Test
1900     public void testFollowerInitialSyncStatus() throws Exception {
1901         final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1902                 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1903                 "testFollowerInitialSyncStatus");
1904
1905         shard.underlyingActor().onReceiveCommand(new FollowerInitialSyncUpStatus(false, "member-1-shard-inventory-operational"));
1906
1907         assertEquals(false, shard.underlyingActor().getShardMBean().getFollowerInitialSyncStatus());
1908
1909         shard.underlyingActor().onReceiveCommand(new FollowerInitialSyncUpStatus(true, "member-1-shard-inventory-operational"));
1910
1911         assertEquals(true, shard.underlyingActor().getShardMBean().getFollowerInitialSyncStatus());
1912
1913         shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1914     }
1915
1916     private void commitTransaction(final DOMStoreWriteTransaction transaction) {
1917         DOMStoreThreePhaseCommitCohort commitCohort = transaction.ready();
1918         ListenableFuture<Void> future =
1919             commitCohort.preCommit();
1920         try {
1921             future.get();
1922             future = commitCohort.commit();
1923             future.get();
1924         } catch (InterruptedException | ExecutionException e) {
1925         }
1926     }
1927 }