23a200f6bcda05b11afcd24b248ed8145f400122
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / ShardTest.java
1 /*
2  * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertFalse;
13 import static org.junit.Assert.assertNotNull;
14 import static org.junit.Assert.assertNull;
15 import static org.junit.Assert.assertSame;
16 import static org.junit.Assert.assertTrue;
17 import static org.mockito.Mockito.doReturn;
18 import static org.mockito.Mockito.inOrder;
19 import static org.mockito.Mockito.mock;
20 import static org.mockito.Mockito.reset;
21 import static org.mockito.Mockito.verify;
22 import static org.opendaylight.controller.cluster.datastore.DataStoreVersions.CURRENT_VERSION;
23 import akka.actor.ActorRef;
24 import akka.actor.ActorSelection;
25 import akka.actor.Props;
26 import akka.actor.Status.Failure;
27 import akka.dispatch.Dispatchers;
28 import akka.dispatch.OnComplete;
29 import akka.japi.Creator;
30 import akka.pattern.Patterns;
31 import akka.persistence.SaveSnapshotSuccess;
32 import akka.testkit.TestActorRef;
33 import akka.util.Timeout;
34 import com.google.common.base.Function;
35 import com.google.common.util.concurrent.Futures;
36 import com.google.common.util.concurrent.ListenableFuture;
37 import com.google.common.util.concurrent.Uninterruptibles;
38 import java.util.Collections;
39 import java.util.HashSet;
40 import java.util.Set;
41 import java.util.concurrent.CountDownLatch;
42 import java.util.concurrent.TimeUnit;
43 import java.util.concurrent.atomic.AtomicBoolean;
44 import java.util.concurrent.atomic.AtomicReference;
45 import org.junit.Test;
46 import org.mockito.InOrder;
47 import org.opendaylight.controller.cluster.DataPersistenceProvider;
48 import org.opendaylight.controller.cluster.DelegatingPersistentDataProvider;
49 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
50 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
51 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
52 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
53 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
54 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
55 import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
56 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
57 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
58 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
59 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
60 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
61 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
62 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
63 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
64 import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction;
65 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
66 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
67 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
68 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener;
69 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListenerReply;
70 import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
71 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
72 import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
73 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
74 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
75 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
76 import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener;
77 import org.opendaylight.controller.cluster.datastore.utils.MockDataTreeChangeListener;
78 import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
79 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
80 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListenerReply;
81 import org.opendaylight.controller.cluster.raft.RaftActorContext;
82 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
83 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
84 import org.opendaylight.controller.cluster.raft.Snapshot;
85 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
86 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
87 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
88 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
89 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
90 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
91 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
92 import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
93 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
94 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
95 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
96 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
97 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
98 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
99 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
100 import org.opendaylight.controller.protobuff.messages.cohort3pc.ThreePhaseCommitCohortMessages;
101 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
102 import org.opendaylight.yangtools.yang.common.QName;
103 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
104 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
105 import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
106 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
107 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
108 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateNode;
109 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateTip;
110 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
111 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
112 import org.opendaylight.yangtools.yang.data.api.schema.tree.ModificationType;
113 import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType;
114 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
115 import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
116 import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory;
117 import scala.concurrent.Await;
118 import scala.concurrent.Future;
119 import scala.concurrent.duration.FiniteDuration;
120
121 public class ShardTest extends AbstractShardTest {
122     private static final QName CARS_QNAME = QName.create("urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:test:cars", "2014-03-13", "cars");
123
124     private static final String DUMMY_DATA = "Dummy data as snapshot sequence number is set to 0 in InMemorySnapshotStore and journal recovery seq number will start from 1";
125
126     @Test
127     public void testRegisterChangeListener() throws Exception {
128         new ShardTestKit(getSystem()) {{
129             final TestActorRef<Shard> shard = actorFactory.createTestActor(
130                     newShardProps(),  "testRegisterChangeListener");
131
132             waitUntilLeader(shard);
133
134             shard.tell(new UpdateSchemaContext(SchemaContextHelper.full()), ActorRef.noSender());
135
136             final MockDataChangeListener listener = new MockDataChangeListener(1);
137             final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener),
138                     "testRegisterChangeListener-DataChangeListener");
139
140             shard.tell(new RegisterChangeListener(TestModel.TEST_PATH,
141                     dclActor, AsyncDataBroker.DataChangeScope.BASE, true), getRef());
142
143             final RegisterChangeListenerReply reply = expectMsgClass(duration("3 seconds"),
144                     RegisterChangeListenerReply.class);
145             final String replyPath = reply.getListenerRegistrationPath().toString();
146             assertTrue("Incorrect reply path: " + replyPath, replyPath.matches(
147                     "akka:\\/\\/test\\/user\\/testRegisterChangeListener\\/\\$.*"));
148
149             final YangInstanceIdentifier path = TestModel.TEST_PATH;
150             writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
151
152             listener.waitForChangeEvents(path);
153         }};
154     }
155
156     @SuppressWarnings("serial")
157     @Test
158     public void testChangeListenerNotifiedWhenNotTheLeaderOnRegistration() throws Exception {
159         // This test tests the timing window in which a change listener is registered before the
160         // shard becomes the leader. We verify that the listener is registered and notified of the
161         // existing data when the shard becomes the leader.
162         new ShardTestKit(getSystem()) {{
163             // For this test, we want to send the RegisterChangeListener message after the shard
164             // has recovered from persistence and before it becomes the leader. So we subclass
165             // Shard to override onReceiveCommand and, when the first ElectionTimeout is received,
166             // we know that the shard has been initialized to a follower and has started the
167             // election process. The following 2 CountDownLatches are used to coordinate the
168             // ElectionTimeout with the sending of the RegisterChangeListener message.
169             final CountDownLatch onFirstElectionTimeout = new CountDownLatch(1);
170             final CountDownLatch onChangeListenerRegistered = new CountDownLatch(1);
171             final Creator<Shard> creator = new Creator<Shard>() {
172                 boolean firstElectionTimeout = true;
173
174                 @Override
175                 public Shard create() throws Exception {
176                     // Use a non persistent provider because this test actually invokes persist on the journal
177                     // this will cause all other messages to not be queued properly after that.
178                     // The basic issue is that you cannot use TestActorRef with a persistent actor (at least when
179                     // it does do a persist)
180                     return new Shard(newShardBuilder()) {
181                         @Override
182                         public void onReceiveCommand(final Object message) throws Exception {
183                             if(message instanceof ElectionTimeout && firstElectionTimeout) {
184                                 // Got the first ElectionTimeout. We don't forward it to the
185                                 // base Shard yet until we've sent the RegisterChangeListener
186                                 // message. So we signal the onFirstElectionTimeout latch to tell
187                                 // the main thread to send the RegisterChangeListener message and
188                                 // start a thread to wait on the onChangeListenerRegistered latch,
189                                 // which the main thread signals after it has sent the message.
190                                 // After the onChangeListenerRegistered is triggered, we send the
191                                 // original ElectionTimeout message to proceed with the election.
192                                 firstElectionTimeout = false;
193                                 final ActorRef self = getSelf();
194                                 new Thread() {
195                                     @Override
196                                     public void run() {
197                                         Uninterruptibles.awaitUninterruptibly(
198                                                 onChangeListenerRegistered, 5, TimeUnit.SECONDS);
199                                         self.tell(message, self);
200                                     }
201                                 }.start();
202
203                                 onFirstElectionTimeout.countDown();
204                             } else {
205                                 super.onReceiveCommand(message);
206                             }
207                         }
208                     };
209                 }
210             };
211
212             final MockDataChangeListener listener = new MockDataChangeListener(1);
213             final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener),
214                     "testRegisterChangeListenerWhenNotLeaderInitially-DataChangeListener");
215
216             final TestActorRef<Shard> shard = actorFactory.createTestActor(
217                     Props.create(new DelegatingShardCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()),
218                     "testRegisterChangeListenerWhenNotLeaderInitially");
219
220             // Write initial data into the in-memory store.
221             final YangInstanceIdentifier path = TestModel.TEST_PATH;
222             writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
223
224             // Wait until the shard receives the first ElectionTimeout message.
225             assertEquals("Got first ElectionTimeout", true,
226                     onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
227
228             // Now send the RegisterChangeListener and wait for the reply.
229             shard.tell(new RegisterChangeListener(path, dclActor,
230                     AsyncDataBroker.DataChangeScope.SUBTREE, false), getRef());
231
232             final RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
233                     RegisterChangeListenerReply.class);
234             assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
235
236             // Sanity check - verify the shard is not the leader yet.
237             shard.tell(new FindLeader(), getRef());
238             final FindLeaderReply findLeadeReply =
239                     expectMsgClass(duration("5 seconds"), FindLeaderReply.class);
240             assertNull("Expected the shard not to be the leader", findLeadeReply.getLeaderActor());
241
242             // Signal the onChangeListenerRegistered latch to tell the thread above to proceed
243             // with the election process.
244             onChangeListenerRegistered.countDown();
245
246             // Wait for the shard to become the leader and notify our listener with the existing
247             // data in the store.
248             listener.waitForChangeEvents(path);
249         }};
250     }
251
252     @Test
253     public void testRegisterDataTreeChangeListener() throws Exception {
254         new ShardTestKit(getSystem()) {{
255             final TestActorRef<Shard> shard = actorFactory.createTestActor(
256                     newShardProps(), "testRegisterDataTreeChangeListener");
257
258             waitUntilLeader(shard);
259
260             shard.tell(new UpdateSchemaContext(SchemaContextHelper.full()), ActorRef.noSender());
261
262             final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
263             final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener),
264                     "testRegisterDataTreeChangeListener-DataTreeChangeListener");
265
266             shard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor, false), getRef());
267
268             final RegisterDataTreeChangeListenerReply reply = expectMsgClass(duration("3 seconds"),
269                     RegisterDataTreeChangeListenerReply.class);
270             final String replyPath = reply.getListenerRegistrationPath().toString();
271             assertTrue("Incorrect reply path: " + replyPath, replyPath.matches(
272                     "akka:\\/\\/test\\/user\\/testRegisterDataTreeChangeListener\\/\\$.*"));
273
274             final YangInstanceIdentifier path = TestModel.TEST_PATH;
275             writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
276
277             listener.waitForChangeEvents();
278         }};
279     }
280
281     @SuppressWarnings("serial")
282     @Test
283     public void testDataTreeChangeListenerNotifiedWhenNotTheLeaderOnRegistration() throws Exception {
284         new ShardTestKit(getSystem()) {{
285             final CountDownLatch onFirstElectionTimeout = new CountDownLatch(1);
286             final CountDownLatch onChangeListenerRegistered = new CountDownLatch(1);
287             final Creator<Shard> creator = new Creator<Shard>() {
288                 boolean firstElectionTimeout = true;
289
290                 @Override
291                 public Shard create() throws Exception {
292                     return new Shard(Shard.builder().id(shardID).datastoreContext(
293                             dataStoreContextBuilder.persistent(false).build()).schemaContext(SCHEMA_CONTEXT)) {
294                         @Override
295                         public void onReceiveCommand(final Object message) throws Exception {
296                             if(message instanceof ElectionTimeout && firstElectionTimeout) {
297                                 firstElectionTimeout = false;
298                                 final ActorRef self = getSelf();
299                                 new Thread() {
300                                     @Override
301                                     public void run() {
302                                         Uninterruptibles.awaitUninterruptibly(
303                                                 onChangeListenerRegistered, 5, TimeUnit.SECONDS);
304                                         self.tell(message, self);
305                                     }
306                                 }.start();
307
308                                 onFirstElectionTimeout.countDown();
309                             } else {
310                                 super.onReceiveCommand(message);
311                             }
312                         }
313                     };
314                 }
315             };
316
317             final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
318             final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener),
319                     "testDataTreeChangeListenerNotifiedWhenNotTheLeaderOnRegistration-DataChangeListener");
320
321             final TestActorRef<Shard> shard = actorFactory.createTestActor(
322                     Props.create(new DelegatingShardCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()),
323                     "testDataTreeChangeListenerNotifiedWhenNotTheLeaderOnRegistration");
324
325             final YangInstanceIdentifier path = TestModel.TEST_PATH;
326             writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
327
328             assertEquals("Got first ElectionTimeout", true,
329                 onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
330
331             shard.tell(new RegisterDataTreeChangeListener(path, dclActor, false), getRef());
332             final RegisterDataTreeChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
333                 RegisterDataTreeChangeListenerReply.class);
334             assertNotNull("getListenerRegistratioznPath", reply.getListenerRegistrationPath());
335
336             shard.tell(new FindLeader(), getRef());
337             final FindLeaderReply findLeadeReply =
338                     expectMsgClass(duration("5 seconds"), FindLeaderReply.class);
339             assertNull("Expected the shard not to be the leader", findLeadeReply.getLeaderActor());
340
341             writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
342
343             onChangeListenerRegistered.countDown();
344
345             // TODO: investigate why we do not receive data chage events
346             listener.waitForChangeEvents();
347         }};
348     }
349
350     @Test
351     public void testCreateTransaction(){
352         new ShardTestKit(getSystem()) {{
353             final ActorRef shard = actorFactory.createActor(newShardProps(), "testCreateTransaction");
354
355             waitUntilLeader(shard);
356
357             shard.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
358
359             shard.tell(new CreateTransaction("txn-1",
360                     TransactionType.READ_ONLY.ordinal() ).toSerializable(), getRef());
361
362             final CreateTransactionReply reply = expectMsgClass(duration("3 seconds"),
363                     CreateTransactionReply.class);
364
365             final String path = reply.getTransactionActorPath().toString();
366             assertTrue("Unexpected transaction path " + path,
367                     path.contains("akka://test/user/testCreateTransaction/shard-txn-1"));
368         }};
369     }
370
371     @Test
372     public void testCreateTransactionOnChain(){
373         new ShardTestKit(getSystem()) {{
374             final ActorRef shard = actorFactory.createActor(newShardProps(), "testCreateTransactionOnChain");
375
376             waitUntilLeader(shard);
377
378             shard.tell(new CreateTransaction("txn-1",
379                     TransactionType.READ_ONLY.ordinal() , "foobar").toSerializable(),
380                     getRef());
381
382             final CreateTransactionReply reply = expectMsgClass(duration("3 seconds"),
383                     CreateTransactionReply.class);
384
385             final String path = reply.getTransactionActorPath().toString();
386             assertTrue("Unexpected transaction path " + path,
387                     path.contains("akka://test/user/testCreateTransactionOnChain/shard-txn-1"));
388         }};
389     }
390
391     @SuppressWarnings("serial")
392     @Test
393     public void testPeerAddressResolved() throws Exception {
394         new ShardTestKit(getSystem()) {{
395             final CountDownLatch recoveryComplete = new CountDownLatch(1);
396             class TestShard extends Shard {
397                 TestShard() {
398                     super(Shard.builder().id(shardID).datastoreContext(newDatastoreContext()).
399                             peerAddresses(Collections.<String, String>singletonMap(shardID.toString(), null)).
400                             schemaContext(SCHEMA_CONTEXT));
401                 }
402
403                 String getPeerAddress(String id) {
404                     return getRaftActorContext().getPeerAddress(id);
405                 }
406
407                 @Override
408                 protected void onRecoveryComplete() {
409                     try {
410                         super.onRecoveryComplete();
411                     } finally {
412                         recoveryComplete.countDown();
413                     }
414                 }
415             }
416
417             final TestActorRef<Shard> shard = actorFactory.createTestActor(
418                     Props.create(new DelegatingShardCreator(new Creator<Shard>() {
419                         @Override
420                         public TestShard create() throws Exception {
421                             return new TestShard();
422                         }
423                     })), "testPeerAddressResolved");
424
425             assertEquals("Recovery complete", true,
426                 Uninterruptibles.awaitUninterruptibly(recoveryComplete, 5, TimeUnit.SECONDS));
427
428             final String address = "akka://foobar";
429             shard.underlyingActor().onReceiveCommand(new PeerAddressResolved(shardID.toString(), address));
430
431             assertEquals("getPeerAddress", address,
432                 ((TestShard) shard.underlyingActor()).getPeerAddress(shardID.toString()));
433         }};
434     }
435
436     @Test
437     public void testApplySnapshot() throws Exception {
438
439         final TestActorRef<Shard> shard = actorFactory.createTestActor(newShardProps(), "testApplySnapshot");
440
441         ShardTestKit.waitUntilLeader(shard);
442
443         final DataTree store = InMemoryDataTreeFactory.getInstance().create(TreeType.OPERATIONAL);
444         store.setSchemaContext(SCHEMA_CONTEXT);
445
446         final ContainerNode container = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
447                 new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
448                     withChild(ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).addChild(
449                         ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1)).build()).build();
450
451         writeToStore(store, TestModel.TEST_PATH, container);
452
453         final YangInstanceIdentifier root = YangInstanceIdentifier.builder().build();
454         final NormalizedNode<?,?> expected = readStore(store, root);
455
456         final Snapshot snapshot = Snapshot.create(SerializationUtils.serializeNormalizedNode(expected),
457                 Collections.<ReplicatedLogEntry>emptyList(), 1, 2, 3, 4);
458
459         shard.underlyingActor().getRaftActorSnapshotCohort().applySnapshot(snapshot.getState());
460
461         final NormalizedNode<?,?> actual = readStore(shard, root);
462
463         assertEquals("Root node", expected, actual);
464     }
465
466     @Test
467     public void testApplyState() throws Exception {
468         final TestActorRef<Shard> shard = actorFactory.createTestActor(newShardProps(), "testApplyState");
469
470         ShardTestKit.waitUntilLeader(shard);
471
472         final DataTree source = setupInMemorySnapshotStore();
473         final DataTreeModification writeMod = source.takeSnapshot().newModification();
474         ContainerNode node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
475         writeMod.write(TestModel.TEST_PATH, node);
476         writeMod.ready();
477
478         final ApplyState applyState = new ApplyState(null, "test", new ReplicatedLogImplEntry(1, 2,
479                 payloadForModification(source, writeMod)));
480
481         shard.underlyingActor().onReceiveCommand(applyState);
482
483         final NormalizedNode<?,?> actual = readStore(shard, TestModel.TEST_PATH);
484         assertEquals("Applied state", node, actual);
485     }
486
487     @Test
488     public void testDataTreeCandidateRecovery() throws Exception {
489         // Set up the InMemorySnapshotStore.
490         final DataTree source = setupInMemorySnapshotStore();
491
492         final DataTreeModification writeMod = source.takeSnapshot().newModification();
493         writeMod.write(TestModel.OUTER_LIST_PATH, ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
494         writeMod.ready();
495         InMemoryJournal.addEntry(shardID.toString(), 0, DUMMY_DATA);
496
497         // Set up the InMemoryJournal.
498         InMemoryJournal.addEntry(shardID.toString(), 1, new ReplicatedLogImplEntry(0, 1, payloadForModification(source, writeMod)));
499
500         final int nListEntries = 16;
501         final Set<Integer> listEntryKeys = new HashSet<>();
502
503         // Add some ModificationPayload entries
504         for (int i = 1; i <= nListEntries; i++) {
505             listEntryKeys.add(Integer.valueOf(i));
506
507             final YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
508                     .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
509
510             final DataTreeModification mod = source.takeSnapshot().newModification();
511             mod.merge(path, ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i));
512             mod.ready();
513             InMemoryJournal.addEntry(shardID.toString(), i+1, new ReplicatedLogImplEntry(i, 1,
514                 payloadForModification(source, mod)));
515         }
516
517         InMemoryJournal.addEntry(shardID.toString(), nListEntries + 2,
518             new ApplyJournalEntries(nListEntries));
519
520         testRecovery(listEntryKeys);
521     }
522
523     @Test
524     public void testConcurrentThreePhaseCommits() throws Throwable {
525         new ShardTestKit(getSystem()) {{
526             final TestActorRef<Shard> shard = actorFactory.createTestActor(
527                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
528                     "testConcurrentThreePhaseCommits");
529
530             waitUntilLeader(shard);
531
532             // Setup 3 simulated transactions with mock cohorts backed by real cohorts.
533
534             final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
535
536             final String transactionID1 = "tx1";
537             final MutableCompositeModification modification1 = new MutableCompositeModification();
538             final ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
539                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
540
541             final String transactionID2 = "tx2";
542             final MutableCompositeModification modification2 = new MutableCompositeModification();
543             final ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
544                     TestModel.OUTER_LIST_PATH,
545                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
546                     modification2);
547
548             final String transactionID3 = "tx3";
549             final MutableCompositeModification modification3 = new MutableCompositeModification();
550             final ShardDataTreeCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
551                     YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
552                         .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
553                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1),
554                     modification3);
555
556             final long timeoutSec = 5;
557             final FiniteDuration duration = FiniteDuration.create(timeoutSec, TimeUnit.SECONDS);
558             final Timeout timeout = new Timeout(duration);
559
560             shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort1, transactionID1, modification1), getRef());
561             final ReadyTransactionReply readyReply = ReadyTransactionReply.fromSerializable(
562                     expectMsgClass(duration, ReadyTransactionReply.class));
563             assertEquals("Cohort path", shard.path().toString(), readyReply.getCohortPath());
564
565             // Send the CanCommitTransaction message for the first Tx.
566
567             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
568             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
569                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
570             assertEquals("Can commit", true, canCommitReply.getCanCommit());
571
572             shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort2, transactionID2, modification2), getRef());
573             expectMsgClass(duration, ReadyTransactionReply.class);
574
575             shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort3, transactionID3, modification3), getRef());
576             expectMsgClass(duration, ReadyTransactionReply.class);
577
578             // Send the CanCommitTransaction message for the next 2 Tx's. These should get queued and
579             // processed after the first Tx completes.
580
581             final Future<Object> canCommitFuture1 = Patterns.ask(shard,
582                     new CanCommitTransaction(transactionID2).toSerializable(), timeout);
583
584             final Future<Object> canCommitFuture2 = Patterns.ask(shard,
585                     new CanCommitTransaction(transactionID3).toSerializable(), timeout);
586
587             // Send the CommitTransaction message for the first Tx. After it completes, it should
588             // trigger the 2nd Tx to proceed which should in turn then trigger the 3rd.
589
590             shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
591             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
592
593             // Wait for the next 2 Tx's to complete.
594
595             final AtomicReference<Throwable> caughtEx = new AtomicReference<>();
596             final CountDownLatch commitLatch = new CountDownLatch(2);
597
598             class OnFutureComplete extends OnComplete<Object> {
599                 private final Class<?> expRespType;
600
601                 OnFutureComplete(final Class<?> expRespType) {
602                     this.expRespType = expRespType;
603                 }
604
605                 @Override
606                 public void onComplete(final Throwable error, final Object resp) {
607                     if(error != null) {
608                         caughtEx.set(new AssertionError(getClass().getSimpleName() + " failure", error));
609                     } else {
610                         try {
611                             assertEquals("Commit response type", expRespType, resp.getClass());
612                             onSuccess(resp);
613                         } catch (final Exception e) {
614                             caughtEx.set(e);
615                         }
616                     }
617                 }
618
619                 void onSuccess(final Object resp) throws Exception {
620                 }
621             }
622
623             class OnCommitFutureComplete extends OnFutureComplete {
624                 OnCommitFutureComplete() {
625                     super(CommitTransactionReply.SERIALIZABLE_CLASS);
626                 }
627
628                 @Override
629                 public void onComplete(final Throwable error, final Object resp) {
630                     super.onComplete(error, resp);
631                     commitLatch.countDown();
632                 }
633             }
634
635             class OnCanCommitFutureComplete extends OnFutureComplete {
636                 private final String transactionID;
637
638                 OnCanCommitFutureComplete(final String transactionID) {
639                     super(CanCommitTransactionReply.SERIALIZABLE_CLASS);
640                     this.transactionID = transactionID;
641                 }
642
643                 @Override
644                 void onSuccess(final Object resp) throws Exception {
645                     final CanCommitTransactionReply canCommitReply =
646                             CanCommitTransactionReply.fromSerializable(resp);
647                     assertEquals("Can commit", true, canCommitReply.getCanCommit());
648
649                     final Future<Object> commitFuture = Patterns.ask(shard,
650                             new CommitTransaction(transactionID).toSerializable(), timeout);
651                     commitFuture.onComplete(new OnCommitFutureComplete(), getSystem().dispatcher());
652                 }
653             }
654
655             canCommitFuture1.onComplete(new OnCanCommitFutureComplete(transactionID2),
656                     getSystem().dispatcher());
657
658             canCommitFuture2.onComplete(new OnCanCommitFutureComplete(transactionID3),
659                     getSystem().dispatcher());
660
661             final boolean done = commitLatch.await(timeoutSec, TimeUnit.SECONDS);
662
663             if(caughtEx.get() != null) {
664                 throw caughtEx.get();
665             }
666
667             assertEquals("Commits complete", true, done);
668
669             final InOrder inOrder = inOrder(cohort1, cohort2, cohort3);
670             inOrder.verify(cohort1).canCommit();
671             inOrder.verify(cohort1).preCommit();
672             inOrder.verify(cohort1).commit();
673             inOrder.verify(cohort2).canCommit();
674             inOrder.verify(cohort2).preCommit();
675             inOrder.verify(cohort2).commit();
676             inOrder.verify(cohort3).canCommit();
677             inOrder.verify(cohort3).preCommit();
678             inOrder.verify(cohort3).commit();
679
680             // Verify data in the data store.
681
682             verifyOuterListEntry(shard, 1);
683
684             verifyLastApplied(shard, 2);
685         }};
686     }
687
688     @Test
689     public void testBatchedModificationsWithNoCommitOnReady() throws Throwable {
690         new ShardTestKit(getSystem()) {{
691             final TestActorRef<Shard> shard = actorFactory.createTestActor(
692                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
693                     "testBatchedModificationsWithNoCommitOnReady");
694
695             waitUntilLeader(shard);
696
697             final String transactionID = "tx";
698             final FiniteDuration duration = duration("5 seconds");
699
700             final AtomicReference<ShardDataTreeCohort> mockCohort = new AtomicReference<>();
701             final ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() {
702                 @Override
703                 public ShardDataTreeCohort decorate(final String txID, final ShardDataTreeCohort actual) {
704                     if(mockCohort.get() == null) {
705                         mockCohort.set(createDelegatingMockCohort("cohort", actual));
706                     }
707
708                     return mockCohort.get();
709                 }
710             };
711
712             shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator);
713
714             // Send a BatchedModifications to start a transaction.
715
716             shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH,
717                     ImmutableNodes.containerNode(TestModel.TEST_QNAME), false, false, 1), getRef());
718             expectMsgClass(duration, BatchedModificationsReply.class);
719
720             // Send a couple more BatchedModifications.
721
722             shard.tell(newBatchedModifications(transactionID, TestModel.OUTER_LIST_PATH,
723                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), false, false, 2), getRef());
724             expectMsgClass(duration, BatchedModificationsReply.class);
725
726             shard.tell(newBatchedModifications(transactionID, YangInstanceIdentifier.builder(
727                     TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
728                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true, false, 3), getRef());
729             expectMsgClass(duration, ReadyTransactionReply.class);
730
731             // Send the CanCommitTransaction message.
732
733             shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
734             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
735                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
736             assertEquals("Can commit", true, canCommitReply.getCanCommit());
737
738             // Send the CanCommitTransaction message.
739
740             shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
741             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
742
743             final InOrder inOrder = inOrder(mockCohort.get());
744             inOrder.verify(mockCohort.get()).canCommit();
745             inOrder.verify(mockCohort.get()).preCommit();
746             inOrder.verify(mockCohort.get()).commit();
747
748             // Verify data in the data store.
749
750             verifyOuterListEntry(shard, 1);
751         }};
752     }
753
754     @Test
755     public void testBatchedModificationsWithCommitOnReady() throws Throwable {
756         new ShardTestKit(getSystem()) {{
757             final TestActorRef<Shard> shard = actorFactory.createTestActor(
758                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
759                     "testBatchedModificationsWithCommitOnReady");
760
761             waitUntilLeader(shard);
762
763             final String transactionID = "tx";
764             final FiniteDuration duration = duration("5 seconds");
765
766             final AtomicReference<ShardDataTreeCohort> mockCohort = new AtomicReference<>();
767             final ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() {
768                 @Override
769                 public ShardDataTreeCohort decorate(final String txID, final ShardDataTreeCohort actual) {
770                     if(mockCohort.get() == null) {
771                         mockCohort.set(createDelegatingMockCohort("cohort", actual));
772                     }
773
774                     return mockCohort.get();
775                 }
776             };
777
778             shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator);
779
780             // Send a BatchedModifications to start a transaction.
781
782             shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH,
783                     ImmutableNodes.containerNode(TestModel.TEST_QNAME), false, false, 1), getRef());
784             expectMsgClass(duration, BatchedModificationsReply.class);
785
786             // Send a couple more BatchedModifications.
787
788             shard.tell(newBatchedModifications(transactionID, TestModel.OUTER_LIST_PATH,
789                 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), false, false, 2), getRef());
790             expectMsgClass(duration, BatchedModificationsReply.class);
791
792             shard.tell(newBatchedModifications(transactionID, YangInstanceIdentifier.builder(
793                     TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
794                 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true, true, 3), getRef());
795
796             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
797
798             final InOrder inOrder = inOrder(mockCohort.get());
799             inOrder.verify(mockCohort.get()).canCommit();
800             inOrder.verify(mockCohort.get()).preCommit();
801             inOrder.verify(mockCohort.get()).commit();
802
803             // Verify data in the data store.
804
805             verifyOuterListEntry(shard, 1);
806         }};
807     }
808
809     @Test(expected=IllegalStateException.class)
810     public void testBatchedModificationsReadyWithIncorrectTotalMessageCount() throws Throwable {
811         new ShardTestKit(getSystem()) {{
812             final TestActorRef<Shard> shard = actorFactory.createTestActor(
813                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
814                     "testBatchedModificationsReadyWithIncorrectTotalMessageCount");
815
816             waitUntilLeader(shard);
817
818             final String transactionID = "tx1";
819             final BatchedModifications batched = new BatchedModifications(transactionID, DataStoreVersions.CURRENT_VERSION, null);
820             batched.setReady(true);
821             batched.setTotalMessagesSent(2);
822
823             shard.tell(batched, getRef());
824
825             final Failure failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
826
827             if(failure != null) {
828                 throw failure.cause();
829             }
830         }};
831     }
832
833     @Test
834     public void testBatchedModificationsWithOperationFailure() throws Throwable {
835         new ShardTestKit(getSystem()) {{
836             final TestActorRef<Shard> shard = actorFactory.createTestActor(
837                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
838                     "testBatchedModificationsWithOperationFailure");
839
840             waitUntilLeader(shard);
841
842             // Test merge with invalid data. An exception should occur when the merge is applied. Note that
843             // write will not validate the children for performance reasons.
844
845             String transactionID = "tx1";
846
847             ContainerNode invalidData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
848                     new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
849                         withChild(ImmutableNodes.leafNode(TestModel.JUNK_QNAME, "junk")).build();
850
851             BatchedModifications batched = new BatchedModifications(transactionID, CURRENT_VERSION, null);
852             batched.addModification(new MergeModification(TestModel.TEST_PATH, invalidData));
853             shard.tell(batched, getRef());
854             Failure failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
855
856             Throwable cause = failure.cause();
857
858             batched = new BatchedModifications(transactionID, DataStoreVersions.CURRENT_VERSION, null);
859             batched.setReady(true);
860             batched.setTotalMessagesSent(2);
861
862             shard.tell(batched, getRef());
863
864             failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
865             assertEquals("Failure cause", cause, failure.cause());
866         }};
867     }
868
869     @Test
870     public void testBatchedModificationsOnTransactionChain() throws Throwable {
871         new ShardTestKit(getSystem()) {{
872             final TestActorRef<Shard> shard = actorFactory.createTestActor(
873                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
874                     "testBatchedModificationsOnTransactionChain");
875
876             waitUntilLeader(shard);
877
878             final String transactionChainID = "txChain";
879             final String transactionID1 = "tx1";
880             final String transactionID2 = "tx2";
881
882             final FiniteDuration duration = duration("5 seconds");
883
884             // Send a BatchedModifications to start a chained write transaction and ready it.
885
886             final ContainerNode containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
887             final YangInstanceIdentifier path = TestModel.TEST_PATH;
888             shard.tell(newBatchedModifications(transactionID1, transactionChainID, path,
889                     containerNode, true, false, 1), getRef());
890             expectMsgClass(duration, ReadyTransactionReply.class);
891
892             // Create a read Tx on the same chain.
893
894             shard.tell(new CreateTransaction(transactionID2, TransactionType.READ_ONLY.ordinal() ,
895                     transactionChainID).toSerializable(), getRef());
896
897             final CreateTransactionReply createReply = expectMsgClass(duration("3 seconds"), CreateTransactionReply.class);
898
899             getSystem().actorSelection(createReply.getTransactionActorPath()).tell(new ReadData(path), getRef());
900             final ReadDataReply readReply = expectMsgClass(duration("3 seconds"), ReadDataReply.class);
901             assertEquals("Read node", containerNode, readReply.getNormalizedNode());
902
903             // Commit the write transaction.
904
905             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
906             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
907                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
908             assertEquals("Can commit", true, canCommitReply.getCanCommit());
909
910             shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
911             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
912
913             // Verify data in the data store.
914
915             final NormalizedNode<?, ?> actualNode = readStore(shard, path);
916             assertEquals("Stored node", containerNode, actualNode);
917         }};
918     }
919
920     @Test
921     public void testOnBatchedModificationsWhenNotLeader() {
922         final AtomicBoolean overrideLeaderCalls = new AtomicBoolean();
923         new ShardTestKit(getSystem()) {{
924             final Creator<Shard> creator = new Creator<Shard>() {
925                 private static final long serialVersionUID = 1L;
926
927                 @Override
928                 public Shard create() throws Exception {
929                     return new Shard(newShardBuilder()) {
930                         @Override
931                         protected boolean isLeader() {
932                             return overrideLeaderCalls.get() ? false : super.isLeader();
933                         }
934
935                         @Override
936                         protected ActorSelection getLeader() {
937                             return overrideLeaderCalls.get() ? getSystem().actorSelection(getRef().path()) :
938                                 super.getLeader();
939                         }
940                     };
941                 }
942             };
943
944             final TestActorRef<Shard> shard = actorFactory.createTestActor(
945                     Props.create(new DelegatingShardCreator(creator)), "testOnBatchedModificationsWhenNotLeader");
946
947             waitUntilLeader(shard);
948
949             overrideLeaderCalls.set(true);
950
951             final BatchedModifications batched = new BatchedModifications("tx", DataStoreVersions.CURRENT_VERSION, "");
952
953             shard.tell(batched, ActorRef.noSender());
954
955             expectMsgEquals(batched);
956         }};
957     }
958
959     @Test
960     public void testTransactionMessagesWithNoLeader() {
961         new ShardTestKit(getSystem()) {{
962             dataStoreContextBuilder.customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName()).
963                 shardHeartbeatIntervalInMillis(50).shardElectionTimeoutFactor(1);
964             final TestActorRef<Shard> shard = actorFactory.createTestActor(
965                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
966                     "testTransactionMessagesWithNoLeader");
967
968             waitUntilNoLeader(shard);
969
970             shard.tell(new BatchedModifications("tx", DataStoreVersions.CURRENT_VERSION, ""), getRef());
971             Failure failure = expectMsgClass(Failure.class);
972             assertEquals("Failure cause type", NoShardLeaderException.class, failure.cause().getClass());
973
974             shard.tell(prepareForwardedReadyTransaction(mock(ShardDataTreeCohort.class), "tx",
975                     DataStoreVersions.CURRENT_VERSION, true), getRef());
976             failure = expectMsgClass(Failure.class);
977             assertEquals("Failure cause type", NoShardLeaderException.class, failure.cause().getClass());
978
979             shard.tell(new ReadyLocalTransaction("tx", mock(DataTreeModification.class), true), getRef());
980             failure = expectMsgClass(Failure.class);
981             assertEquals("Failure cause type", NoShardLeaderException.class, failure.cause().getClass());
982         }};
983     }
984
985     @Test
986     public void testReadyWithImmediateCommit() throws Exception{
987         testReadyWithImmediateCommit(true);
988         testReadyWithImmediateCommit(false);
989     }
990
991     private void testReadyWithImmediateCommit(final boolean readWrite) throws Exception{
992         new ShardTestKit(getSystem()) {{
993             final TestActorRef<Shard> shard = actorFactory.createTestActor(
994                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
995                     "testReadyWithImmediateCommit-" + readWrite);
996
997             waitUntilLeader(shard);
998
999             final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1000
1001             final String transactionID = "tx1";
1002             final MutableCompositeModification modification = new MutableCompositeModification();
1003             final NormalizedNode<?, ?> containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1004             final ShardDataTreeCohort cohort = setupMockWriteTransaction("cohort", dataStore,
1005                     TestModel.TEST_PATH, containerNode, modification);
1006
1007             final FiniteDuration duration = duration("5 seconds");
1008
1009             shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID, modification, true), getRef());
1010
1011             expectMsgClass(duration, ThreePhaseCommitCohortMessages.CommitTransactionReply.class);
1012
1013             final InOrder inOrder = inOrder(cohort);
1014             inOrder.verify(cohort).canCommit();
1015             inOrder.verify(cohort).preCommit();
1016             inOrder.verify(cohort).commit();
1017
1018             final NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.TEST_PATH);
1019             assertEquals(TestModel.TEST_QNAME.getLocalName(), containerNode, actualNode);
1020         }};
1021     }
1022
1023     @Test
1024     public void testReadyLocalTransactionWithImmediateCommit() throws Exception{
1025         new ShardTestKit(getSystem()) {{
1026             final TestActorRef<Shard> shard = actorFactory.createTestActor(
1027                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1028                     "testReadyLocalTransactionWithImmediateCommit");
1029
1030             waitUntilLeader(shard);
1031
1032             final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1033
1034             final DataTreeModification modification = dataStore.newModification();
1035
1036             final ContainerNode writeData = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1037             new WriteModification(TestModel.TEST_PATH, writeData).apply(modification);
1038             final MapNode mergeData = ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build();
1039             new MergeModification(TestModel.OUTER_LIST_PATH, mergeData).apply(modification);
1040
1041             final String txId = "tx1";
1042             modification.ready();
1043             final ReadyLocalTransaction readyMessage = new ReadyLocalTransaction(txId, modification, true);
1044
1045             shard.tell(readyMessage, getRef());
1046
1047             expectMsgClass(CommitTransactionReply.SERIALIZABLE_CLASS);
1048
1049             final NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.OUTER_LIST_PATH);
1050             assertEquals(TestModel.OUTER_LIST_QNAME.getLocalName(), mergeData, actualNode);
1051         }};
1052     }
1053
1054     @Test
1055     public void testReadyLocalTransactionWithThreePhaseCommit() throws Exception{
1056         new ShardTestKit(getSystem()) {{
1057             final TestActorRef<Shard> shard = actorFactory.createTestActor(
1058                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1059                     "testReadyLocalTransactionWithThreePhaseCommit");
1060
1061             waitUntilLeader(shard);
1062
1063             final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1064
1065             final DataTreeModification modification = dataStore.newModification();
1066
1067             final ContainerNode writeData = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1068             new WriteModification(TestModel.TEST_PATH, writeData).apply(modification);
1069             final MapNode mergeData = ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build();
1070             new MergeModification(TestModel.OUTER_LIST_PATH, mergeData).apply(modification);
1071
1072             final String txId = "tx1";
1073                 modification.ready();
1074             final ReadyLocalTransaction readyMessage = new ReadyLocalTransaction(txId, modification, false);
1075
1076             shard.tell(readyMessage, getRef());
1077
1078             expectMsgClass(ReadyTransactionReply.class);
1079
1080             // Send the CanCommitTransaction message.
1081
1082             shard.tell(new CanCommitTransaction(txId).toSerializable(), getRef());
1083             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1084                     expectMsgClass(CanCommitTransactionReply.SERIALIZABLE_CLASS));
1085             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1086
1087             // Send the CanCommitTransaction message.
1088
1089             shard.tell(new CommitTransaction(txId).toSerializable(), getRef());
1090             expectMsgClass(CommitTransactionReply.SERIALIZABLE_CLASS);
1091
1092             final NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.OUTER_LIST_PATH);
1093             assertEquals(TestModel.OUTER_LIST_QNAME.getLocalName(), mergeData, actualNode);
1094         }};
1095     }
1096
1097     @Test
1098     public void testCommitWithPersistenceDisabled() throws Throwable {
1099         testCommitWithPersistenceDisabled(true);
1100         testCommitWithPersistenceDisabled(false);
1101     }
1102
1103     private void testCommitWithPersistenceDisabled(final boolean readWrite) throws Throwable {
1104         dataStoreContextBuilder.persistent(false);
1105         new ShardTestKit(getSystem()) {{
1106             final TestActorRef<Shard> shard = actorFactory.createTestActor(
1107                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1108                     "testCommitWithPersistenceDisabled-" + readWrite);
1109
1110             waitUntilLeader(shard);
1111
1112             final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1113
1114             // Setup a simulated transactions with a mock cohort.
1115
1116             final String transactionID = "tx";
1117             final MutableCompositeModification modification = new MutableCompositeModification();
1118             final NormalizedNode<?, ?> containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1119             final ShardDataTreeCohort cohort = setupMockWriteTransaction("cohort", dataStore,
1120                 TestModel.TEST_PATH, containerNode, modification);
1121
1122             final FiniteDuration duration = duration("5 seconds");
1123
1124             shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID, modification), getRef());
1125             expectMsgClass(duration, ReadyTransactionReply.class);
1126
1127             // Send the CanCommitTransaction message.
1128
1129             shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
1130             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1131                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1132             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1133
1134             // Send the CanCommitTransaction message.
1135
1136             shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
1137             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1138
1139             final InOrder inOrder = inOrder(cohort);
1140             inOrder.verify(cohort).canCommit();
1141             inOrder.verify(cohort).preCommit();
1142             inOrder.verify(cohort).commit();
1143
1144             final NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.TEST_PATH);
1145             assertEquals(TestModel.TEST_QNAME.getLocalName(), containerNode, actualNode);
1146         }};
1147     }
1148
1149     @Test
1150     public void testCommitWhenTransactionHasNoModifications() {
1151         testCommitWhenTransactionHasNoModifications(true);
1152         testCommitWhenTransactionHasNoModifications(false);
1153     }
1154
1155     private void testCommitWhenTransactionHasNoModifications(final boolean readWrite){
1156         // Note that persistence is enabled which would normally result in the entry getting written to the journal
1157         // but here that need not happen
1158         new ShardTestKit(getSystem()) {
1159             {
1160                 final TestActorRef<Shard> shard = actorFactory.createTestActor(
1161                         newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1162                         "testCommitWhenTransactionHasNoModifications-" + readWrite);
1163
1164                 waitUntilLeader(shard);
1165
1166                 final String transactionID = "tx1";
1167                 final MutableCompositeModification modification = new MutableCompositeModification();
1168                 final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1169                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1170                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).preCommit();
1171                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).commit();
1172                 doReturn(mockUnmodifiedCandidate("cohort1-candidate")).when(cohort).getCandidate();
1173
1174                 final FiniteDuration duration = duration("5 seconds");
1175
1176                 shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID, modification), getRef());
1177                 expectMsgClass(duration, ReadyTransactionReply.class);
1178
1179                 // Send the CanCommitTransaction message.
1180
1181                 shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
1182                 final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1183                         expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1184                 assertEquals("Can commit", true, canCommitReply.getCanCommit());
1185
1186                 shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
1187                 expectMsgClass(duration, ThreePhaseCommitCohortMessages.CommitTransactionReply.class);
1188
1189                 final InOrder inOrder = inOrder(cohort);
1190                 inOrder.verify(cohort).canCommit();
1191                 inOrder.verify(cohort).preCommit();
1192                 inOrder.verify(cohort).commit();
1193
1194                 shard.tell(Shard.GET_SHARD_MBEAN_MESSAGE, getRef());
1195                 final ShardStats shardStats = expectMsgClass(duration, ShardStats.class);
1196
1197                 // Use MBean for verification
1198                 // Committed transaction count should increase as usual
1199                 assertEquals(1,shardStats.getCommittedTransactionsCount());
1200
1201                 // Commit index should not advance because this does not go into the journal
1202                 assertEquals(-1, shardStats.getCommitIndex());
1203             }
1204         };
1205     }
1206
1207     @Test
1208     public void testCommitWhenTransactionHasModifications() {
1209         testCommitWhenTransactionHasModifications(true);
1210         testCommitWhenTransactionHasModifications(false);
1211     }
1212
1213     private void testCommitWhenTransactionHasModifications(final boolean readWrite){
1214         new ShardTestKit(getSystem()) {
1215             {
1216                 final TestActorRef<Shard> shard = actorFactory.createTestActor(
1217                         newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1218                         "testCommitWhenTransactionHasModifications-" + readWrite);
1219
1220                 waitUntilLeader(shard);
1221
1222                 final String transactionID = "tx1";
1223                 final MutableCompositeModification modification = new MutableCompositeModification();
1224                 modification.addModification(new DeleteModification(YangInstanceIdentifier.builder().build()));
1225                 final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1226                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1227                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).preCommit();
1228                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).commit();
1229                 doReturn(mockCandidate("cohort1-candidate")).when(cohort).getCandidate();
1230
1231                 final FiniteDuration duration = duration("5 seconds");
1232
1233                 shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID, modification), getRef());
1234                 expectMsgClass(duration, ReadyTransactionReply.class);
1235
1236                 // Send the CanCommitTransaction message.
1237
1238                 shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
1239                 final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1240                         expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1241                 assertEquals("Can commit", true, canCommitReply.getCanCommit());
1242
1243                 shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
1244                 expectMsgClass(duration, ThreePhaseCommitCohortMessages.CommitTransactionReply.class);
1245
1246                 final InOrder inOrder = inOrder(cohort);
1247                 inOrder.verify(cohort).canCommit();
1248                 inOrder.verify(cohort).preCommit();
1249                 inOrder.verify(cohort).commit();
1250
1251                 shard.tell(Shard.GET_SHARD_MBEAN_MESSAGE, getRef());
1252                 final ShardStats shardStats = expectMsgClass(duration, ShardStats.class);
1253
1254                 // Use MBean for verification
1255                 // Committed transaction count should increase as usual
1256                 assertEquals(1, shardStats.getCommittedTransactionsCount());
1257
1258                 // Commit index should advance as we do not have an empty modification
1259                 assertEquals(0, shardStats.getCommitIndex());
1260             }
1261         };
1262     }
1263
1264     @Test
1265     public void testCommitPhaseFailure() throws Throwable {
1266         testCommitPhaseFailure(true);
1267         testCommitPhaseFailure(false);
1268     }
1269
1270     private void testCommitPhaseFailure(final boolean readWrite) throws Throwable {
1271         new ShardTestKit(getSystem()) {{
1272             final TestActorRef<Shard> shard = actorFactory.createTestActor(
1273                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1274                     "testCommitPhaseFailure-" + readWrite);
1275
1276             waitUntilLeader(shard);
1277
1278             // Setup 2 simulated transactions with mock cohorts. The first one fails in the
1279             // commit phase.
1280
1281             final String transactionID1 = "tx1";
1282             final MutableCompositeModification modification1 = new MutableCompositeModification();
1283             final ShardDataTreeCohort cohort1 = mock(ShardDataTreeCohort.class, "cohort1");
1284             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
1285             doReturn(Futures.immediateFuture(null)).when(cohort1).preCommit();
1286             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort1).commit();
1287             doReturn(mockCandidate("cohort1-candidate")).when(cohort1).getCandidate();
1288
1289             final String transactionID2 = "tx2";
1290             final MutableCompositeModification modification2 = new MutableCompositeModification();
1291             final ShardDataTreeCohort cohort2 = mock(ShardDataTreeCohort.class, "cohort2");
1292             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
1293
1294             final FiniteDuration duration = duration("5 seconds");
1295             final Timeout timeout = new Timeout(duration);
1296
1297             shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort1, transactionID1, modification1), getRef());
1298             expectMsgClass(duration, ReadyTransactionReply.class);
1299
1300             shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort2, transactionID2, modification2), getRef());
1301             expectMsgClass(duration, ReadyTransactionReply.class);
1302
1303             // Send the CanCommitTransaction message for the first Tx.
1304
1305             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1306             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1307                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1308             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1309
1310             // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
1311             // processed after the first Tx completes.
1312
1313             final Future<Object> canCommitFuture = Patterns.ask(shard,
1314                     new CanCommitTransaction(transactionID2).toSerializable(), timeout);
1315
1316             // Send the CommitTransaction message for the first Tx. This should send back an error
1317             // and trigger the 2nd Tx to proceed.
1318
1319             shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
1320             expectMsgClass(duration, akka.actor.Status.Failure.class);
1321
1322             // Wait for the 2nd Tx to complete the canCommit phase.
1323
1324             final CountDownLatch latch = new CountDownLatch(1);
1325             canCommitFuture.onComplete(new OnComplete<Object>() {
1326                 @Override
1327                 public void onComplete(final Throwable t, final Object resp) {
1328                     latch.countDown();
1329                 }
1330             }, getSystem().dispatcher());
1331
1332             assertEquals("2nd CanCommit complete", true, latch.await(5, TimeUnit.SECONDS));
1333
1334             final InOrder inOrder = inOrder(cohort1, cohort2);
1335             inOrder.verify(cohort1).canCommit();
1336             inOrder.verify(cohort1).preCommit();
1337             inOrder.verify(cohort1).commit();
1338             inOrder.verify(cohort2).canCommit();
1339         }};
1340     }
1341
1342     @Test
1343     public void testPreCommitPhaseFailure() throws Throwable {
1344         testPreCommitPhaseFailure(true);
1345         testPreCommitPhaseFailure(false);
1346     }
1347
1348     private void testPreCommitPhaseFailure(final boolean readWrite) throws Throwable {
1349         new ShardTestKit(getSystem()) {{
1350             final TestActorRef<Shard> shard = actorFactory.createTestActor(
1351                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1352                     "testPreCommitPhaseFailure-" + readWrite);
1353
1354             waitUntilLeader(shard);
1355
1356             final String transactionID1 = "tx1";
1357             final MutableCompositeModification modification1 = new MutableCompositeModification();
1358             final ShardDataTreeCohort cohort1 = mock(ShardDataTreeCohort.class, "cohort1");
1359             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
1360             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort1).preCommit();
1361
1362             final String transactionID2 = "tx2";
1363             final MutableCompositeModification modification2 = new MutableCompositeModification();
1364             final ShardDataTreeCohort cohort2 = mock(ShardDataTreeCohort.class, "cohort2");
1365             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
1366
1367             final FiniteDuration duration = duration("5 seconds");
1368             final Timeout timeout = new Timeout(duration);
1369
1370             shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort1, transactionID1, modification1), getRef());
1371             expectMsgClass(duration, ReadyTransactionReply.class);
1372
1373             shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort2, transactionID2, modification2), getRef());
1374             expectMsgClass(duration, ReadyTransactionReply.class);
1375
1376             // Send the CanCommitTransaction message for the first Tx.
1377
1378             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1379             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1380                 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1381             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1382
1383             // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
1384             // processed after the first Tx completes.
1385
1386             final Future<Object> canCommitFuture = Patterns.ask(shard,
1387                     new CanCommitTransaction(transactionID2).toSerializable(), timeout);
1388
1389             // Send the CommitTransaction message for the first Tx. This should send back an error
1390             // and trigger the 2nd Tx to proceed.
1391
1392             shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
1393             expectMsgClass(duration, akka.actor.Status.Failure.class);
1394
1395             // Wait for the 2nd Tx to complete the canCommit phase.
1396
1397             final CountDownLatch latch = new CountDownLatch(1);
1398             canCommitFuture.onComplete(new OnComplete<Object>() {
1399                 @Override
1400                 public void onComplete(final Throwable t, final Object resp) {
1401                     latch.countDown();
1402                 }
1403             }, getSystem().dispatcher());
1404
1405             assertEquals("2nd CanCommit complete", true, latch.await(5, TimeUnit.SECONDS));
1406
1407             final InOrder inOrder = inOrder(cohort1, cohort2);
1408             inOrder.verify(cohort1).canCommit();
1409             inOrder.verify(cohort1).preCommit();
1410             inOrder.verify(cohort2).canCommit();
1411         }};
1412     }
1413
1414     @Test
1415     public void testCanCommitPhaseFailure() throws Throwable {
1416         testCanCommitPhaseFailure(true);
1417         testCanCommitPhaseFailure(false);
1418     }
1419
1420     private void testCanCommitPhaseFailure(final boolean readWrite) throws Throwable {
1421         new ShardTestKit(getSystem()) {{
1422             final TestActorRef<Shard> shard = actorFactory.createTestActor(
1423                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1424                     "testCanCommitPhaseFailure-" + readWrite);
1425
1426             waitUntilLeader(shard);
1427
1428             final FiniteDuration duration = duration("5 seconds");
1429
1430             final String transactionID1 = "tx1";
1431             final MutableCompositeModification modification = new MutableCompositeModification();
1432             final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1433             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).canCommit();
1434
1435             shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID1, modification), getRef());
1436             expectMsgClass(duration, ReadyTransactionReply.class);
1437
1438             // Send the CanCommitTransaction message.
1439
1440             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1441             expectMsgClass(duration, akka.actor.Status.Failure.class);
1442
1443             // Send another can commit to ensure the failed one got cleaned up.
1444
1445             reset(cohort);
1446
1447             final String transactionID2 = "tx2";
1448             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1449
1450             shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID2, modification), getRef());
1451             expectMsgClass(duration, ReadyTransactionReply.class);
1452
1453             shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
1454             final CanCommitTransactionReply reply = CanCommitTransactionReply.fromSerializable(
1455                     expectMsgClass(CanCommitTransactionReply.SERIALIZABLE_CLASS));
1456             assertEquals("getCanCommit", true, reply.getCanCommit());
1457         }};
1458     }
1459
1460     @Test
1461     public void testCanCommitPhaseFalseResponse() throws Throwable {
1462         testCanCommitPhaseFalseResponse(true);
1463         testCanCommitPhaseFalseResponse(false);
1464     }
1465
1466     private void testCanCommitPhaseFalseResponse(final boolean readWrite) throws Throwable {
1467         new ShardTestKit(getSystem()) {{
1468             final TestActorRef<Shard> shard = actorFactory.createTestActor(
1469                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1470                     "testCanCommitPhaseFalseResponse-" + readWrite);
1471
1472             waitUntilLeader(shard);
1473
1474             final FiniteDuration duration = duration("5 seconds");
1475
1476             final String transactionID1 = "tx1";
1477             final MutableCompositeModification modification = new MutableCompositeModification();
1478             final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1479             doReturn(Futures.immediateFuture(Boolean.FALSE)).when(cohort).canCommit();
1480
1481             shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID1, modification), getRef());
1482             expectMsgClass(duration, ReadyTransactionReply.class);
1483
1484             // Send the CanCommitTransaction message.
1485
1486             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1487             CanCommitTransactionReply reply = CanCommitTransactionReply.fromSerializable(
1488                     expectMsgClass(CanCommitTransactionReply.SERIALIZABLE_CLASS));
1489             assertEquals("getCanCommit", false, reply.getCanCommit());
1490
1491             // Send another can commit to ensure the failed one got cleaned up.
1492
1493             reset(cohort);
1494
1495             final String transactionID2 = "tx2";
1496             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1497
1498             shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID2, modification), getRef());
1499             expectMsgClass(duration, ReadyTransactionReply.class);
1500
1501             shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
1502             reply = CanCommitTransactionReply.fromSerializable(
1503                     expectMsgClass(CanCommitTransactionReply.SERIALIZABLE_CLASS));
1504             assertEquals("getCanCommit", true, reply.getCanCommit());
1505         }};
1506     }
1507
1508     @Test
1509     public void testImmediateCommitWithCanCommitPhaseFailure() throws Throwable {
1510         testImmediateCommitWithCanCommitPhaseFailure(true);
1511         testImmediateCommitWithCanCommitPhaseFailure(false);
1512     }
1513
1514     private void testImmediateCommitWithCanCommitPhaseFailure(final boolean readWrite) throws Throwable {
1515         new ShardTestKit(getSystem()) {{
1516             final TestActorRef<Shard> shard = actorFactory.createTestActor(
1517                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1518                     "testImmediateCommitWithCanCommitPhaseFailure-" + readWrite);
1519
1520             waitUntilLeader(shard);
1521
1522             final FiniteDuration duration = duration("5 seconds");
1523
1524             final String transactionID1 = "tx1";
1525             final MutableCompositeModification modification = new MutableCompositeModification();
1526             final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1527             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).canCommit();
1528
1529             shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID1, modification, true), getRef());
1530
1531             expectMsgClass(duration, akka.actor.Status.Failure.class);
1532
1533             // Send another can commit to ensure the failed one got cleaned up.
1534
1535             reset(cohort);
1536
1537             final String transactionID2 = "tx2";
1538             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1539             doReturn(Futures.immediateFuture(null)).when(cohort).preCommit();
1540             doReturn(Futures.immediateFuture(null)).when(cohort).commit();
1541             final DataTreeCandidateTip candidate = mock(DataTreeCandidateTip.class);
1542             final DataTreeCandidateNode candidateRoot = mock(DataTreeCandidateNode.class);
1543             doReturn(ModificationType.UNMODIFIED).when(candidateRoot).getModificationType();
1544             doReturn(candidateRoot).when(candidate).getRootNode();
1545             doReturn(candidate).when(cohort).getCandidate();
1546
1547             shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID2, modification, true), getRef());
1548
1549             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1550         }};
1551     }
1552
1553     @Test
1554     public void testImmediateCommitWithCanCommitPhaseFalseResponse() throws Throwable {
1555         testImmediateCommitWithCanCommitPhaseFalseResponse(true);
1556         testImmediateCommitWithCanCommitPhaseFalseResponse(false);
1557     }
1558
1559     private void testImmediateCommitWithCanCommitPhaseFalseResponse(final boolean readWrite) throws Throwable {
1560         new ShardTestKit(getSystem()) {{
1561             final TestActorRef<Shard> shard = actorFactory.createTestActor(
1562                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1563                     "testImmediateCommitWithCanCommitPhaseFalseResponse-" + readWrite);
1564
1565             waitUntilLeader(shard);
1566
1567             final FiniteDuration duration = duration("5 seconds");
1568
1569             final String transactionID = "tx1";
1570             final MutableCompositeModification modification = new MutableCompositeModification();
1571             final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1572             doReturn(Futures.immediateFuture(Boolean.FALSE)).when(cohort).canCommit();
1573
1574             shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID, modification, true), getRef());
1575
1576             expectMsgClass(duration, akka.actor.Status.Failure.class);
1577
1578             // Send another can commit to ensure the failed one got cleaned up.
1579
1580             reset(cohort);
1581
1582             final String transactionID2 = "tx2";
1583             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1584             doReturn(Futures.immediateFuture(null)).when(cohort).preCommit();
1585             doReturn(Futures.immediateFuture(null)).when(cohort).commit();
1586             final DataTreeCandidateTip candidate = mock(DataTreeCandidateTip.class);
1587             final DataTreeCandidateNode candidateRoot = mock(DataTreeCandidateNode.class);
1588             doReturn(ModificationType.UNMODIFIED).when(candidateRoot).getModificationType();
1589             doReturn(candidateRoot).when(candidate).getRootNode();
1590             doReturn(candidate).when(cohort).getCandidate();
1591
1592             shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID2, modification, true), getRef());
1593
1594             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1595         }};
1596     }
1597
1598     @Test
1599     public void testAbortBeforeFinishCommit() throws Throwable {
1600         testAbortBeforeFinishCommit(true);
1601         testAbortBeforeFinishCommit(false);
1602     }
1603
1604     private void testAbortBeforeFinishCommit(final boolean readWrite) throws Throwable {
1605         new ShardTestKit(getSystem()) {{
1606             final TestActorRef<Shard> shard = actorFactory.createTestActor(
1607                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1608                     "testAbortBeforeFinishCommit-" + readWrite);
1609
1610             waitUntilLeader(shard);
1611
1612             final FiniteDuration duration = duration("5 seconds");
1613             final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1614
1615             final String transactionID = "tx1";
1616             final Function<ShardDataTreeCohort, ListenableFuture<Void>> preCommit =
1617                           new Function<ShardDataTreeCohort, ListenableFuture<Void>>() {
1618                 @Override
1619                 public ListenableFuture<Void> apply(final ShardDataTreeCohort cohort) {
1620                     final ListenableFuture<Void> preCommitFuture = cohort.preCommit();
1621
1622                     // Simulate an AbortTransaction message occurring during replication, after
1623                     // persisting and before finishing the commit to the in-memory store.
1624                     // We have no followers so due to optimizations in the RaftActor, it does not
1625                     // attempt replication and thus we can't send an AbortTransaction message b/c
1626                     // it would be processed too late after CommitTransaction completes. So we'll
1627                     // simulate an AbortTransaction message occurring during replication by calling
1628                     // the shard directly.
1629                     //
1630                     shard.underlyingActor().doAbortTransaction(transactionID, null);
1631
1632                     return preCommitFuture;
1633                 }
1634             };
1635
1636             final MutableCompositeModification modification = new MutableCompositeModification();
1637             final ShardDataTreeCohort cohort = setupMockWriteTransaction("cohort1", dataStore,
1638                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME),
1639                     modification, preCommit);
1640
1641             shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID, modification), getRef());
1642             expectMsgClass(duration, ReadyTransactionReply.class);
1643
1644             shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
1645             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1646                 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1647             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1648
1649             shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
1650             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1651
1652             final NormalizedNode<?, ?> node = readStore(shard, TestModel.TEST_PATH);
1653
1654             // Since we're simulating an abort occurring during replication and before finish commit,
1655             // the data should still get written to the in-memory store since we've gotten past
1656             // canCommit and preCommit and persisted the data.
1657             assertNotNull(TestModel.TEST_QNAME.getLocalName() + " not found", node);
1658         }};
1659     }
1660
1661     @Test
1662     public void testTransactionCommitTimeout() throws Throwable {
1663         testTransactionCommitTimeout(true);
1664         testTransactionCommitTimeout(false);
1665     }
1666
1667     private void testTransactionCommitTimeout(final boolean readWrite) throws Throwable {
1668         dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1);
1669
1670         new ShardTestKit(getSystem()) {{
1671             final TestActorRef<Shard> shard = actorFactory.createTestActor(
1672                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1673                     "testTransactionCommitTimeout-" + readWrite);
1674
1675             waitUntilLeader(shard);
1676
1677             final FiniteDuration duration = duration("5 seconds");
1678
1679             final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1680
1681             writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1682             writeToStore(shard, TestModel.OUTER_LIST_PATH,
1683                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
1684
1685             // Create 1st Tx - will timeout
1686
1687             final String transactionID1 = "tx1";
1688             final MutableCompositeModification modification1 = new MutableCompositeModification();
1689             final ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
1690                     YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
1691                         .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
1692                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1),
1693                     modification1);
1694
1695             // Create 2nd Tx
1696
1697             final String transactionID2 = "tx3";
1698             final MutableCompositeModification modification2 = new MutableCompositeModification();
1699             final YangInstanceIdentifier listNodePath = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
1700                 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build();
1701             final ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort3", dataStore,
1702                     listNodePath,
1703                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2),
1704                     modification2);
1705
1706             // Ready the Tx's
1707
1708             shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort1, transactionID1, modification1), getRef());
1709             expectMsgClass(duration, ReadyTransactionReply.class);
1710
1711             shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort2, transactionID2, modification2), getRef());
1712             expectMsgClass(duration, ReadyTransactionReply.class);
1713
1714             // canCommit 1st Tx. We don't send the commit so it should timeout.
1715
1716             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1717             expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
1718
1719             // canCommit the 2nd Tx - it should complete after the 1st Tx times out.
1720
1721             shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
1722             expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
1723
1724             // Try to commit the 1st Tx - should fail as it's not the current Tx.
1725
1726             shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
1727             expectMsgClass(duration, akka.actor.Status.Failure.class);
1728
1729             // Commit the 2nd Tx.
1730
1731             shard.tell(new CommitTransaction(transactionID2).toSerializable(), getRef());
1732             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1733
1734             final NormalizedNode<?, ?> node = readStore(shard, listNodePath);
1735             assertNotNull(listNodePath + " not found", node);
1736         }};
1737     }
1738
1739     @Test
1740     public void testTransactionCommitQueueCapacityExceeded() throws Throwable {
1741         dataStoreContextBuilder.shardTransactionCommitQueueCapacity(2);
1742
1743         new ShardTestKit(getSystem()) {{
1744             final TestActorRef<Shard> shard = actorFactory.createTestActor(
1745                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1746                     "testTransactionCommitQueueCapacityExceeded");
1747
1748             waitUntilLeader(shard);
1749
1750             final FiniteDuration duration = duration("5 seconds");
1751
1752             final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1753
1754             final String transactionID1 = "tx1";
1755             final MutableCompositeModification modification1 = new MutableCompositeModification();
1756             final ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
1757                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
1758
1759             final String transactionID2 = "tx2";
1760             final MutableCompositeModification modification2 = new MutableCompositeModification();
1761             final ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
1762                     TestModel.OUTER_LIST_PATH,
1763                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
1764                     modification2);
1765
1766             final String transactionID3 = "tx3";
1767             final MutableCompositeModification modification3 = new MutableCompositeModification();
1768             final ShardDataTreeCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
1769                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification3);
1770
1771             // Ready the Tx's
1772
1773             shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort1, transactionID1, modification1), getRef());
1774             expectMsgClass(duration, ReadyTransactionReply.class);
1775
1776             shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort2, transactionID2, modification2), getRef());
1777             expectMsgClass(duration, ReadyTransactionReply.class);
1778
1779             // The 3rd Tx should exceed queue capacity and fail.
1780
1781             shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort3, transactionID3, modification3), getRef());
1782             expectMsgClass(duration, akka.actor.Status.Failure.class);
1783
1784             // canCommit 1st Tx.
1785
1786             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1787             expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
1788
1789             // canCommit the 2nd Tx - it should get queued.
1790
1791             shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
1792
1793             // canCommit the 3rd Tx - should exceed queue capacity and fail.
1794
1795             shard.tell(new CanCommitTransaction(transactionID3).toSerializable(), getRef());
1796             expectMsgClass(duration, akka.actor.Status.Failure.class);
1797         }};
1798     }
1799
1800     @Test
1801     public void testTransactionCommitWithPriorExpiredCohortEntries() throws Throwable {
1802         dataStoreContextBuilder.shardCommitQueueExpiryTimeoutInMillis(1300).shardTransactionCommitTimeoutInSeconds(1);
1803
1804         new ShardTestKit(getSystem()) {{
1805             final TestActorRef<Shard> shard = actorFactory.createTestActor(
1806                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1807                     "testTransactionCommitWithPriorExpiredCohortEntries");
1808
1809             waitUntilLeader(shard);
1810
1811             final FiniteDuration duration = duration("5 seconds");
1812
1813             final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1814
1815             final String transactionID1 = "tx1";
1816             final MutableCompositeModification modification1 = new MutableCompositeModification();
1817             final ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
1818                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
1819
1820             shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort1, transactionID1, modification1), getRef());
1821             expectMsgClass(duration, ReadyTransactionReply.class);
1822
1823             final String transactionID2 = "tx2";
1824             final MutableCompositeModification modification2 = new MutableCompositeModification();
1825             final ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
1826                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification2);
1827
1828             shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort2, transactionID2, modification2), getRef());
1829             expectMsgClass(duration, ReadyTransactionReply.class);
1830
1831             final String transactionID3 = "tx3";
1832             final MutableCompositeModification modification3 = new MutableCompositeModification();
1833             final ShardDataTreeCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
1834                     TestModel.TEST2_PATH, ImmutableNodes.containerNode(TestModel.TEST2_QNAME), modification3);
1835
1836             shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort3, transactionID3, modification3), getRef());
1837             expectMsgClass(duration, ReadyTransactionReply.class);
1838
1839             // All Tx's are readied. We'll send canCommit for the last one but not the others. The others
1840             // should expire from the queue and the last one should be processed.
1841
1842             shard.tell(new CanCommitTransaction(transactionID3).toSerializable(), getRef());
1843             expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
1844         }};
1845     }
1846
1847     @Test
1848     public void testTransactionCommitWithSubsequentExpiredCohortEntry() throws Throwable {
1849         dataStoreContextBuilder.shardCommitQueueExpiryTimeoutInMillis(1300).shardTransactionCommitTimeoutInSeconds(1);
1850
1851         new ShardTestKit(getSystem()) {{
1852             final TestActorRef<Shard> shard = actorFactory.createTestActor(
1853                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1854                     "testTransactionCommitWithSubsequentExpiredCohortEntry");
1855
1856             waitUntilLeader(shard);
1857
1858             final FiniteDuration duration = duration("5 seconds");
1859
1860             final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1861
1862             final String transactionID1 = "tx1";
1863             final MutableCompositeModification modification1 = new MutableCompositeModification();
1864             final ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
1865                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
1866
1867             shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort1, transactionID1, modification1), getRef());
1868             expectMsgClass(duration, ReadyTransactionReply.class);
1869
1870             // CanCommit the first one so it's the current in-progress CohortEntry.
1871
1872             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1873             expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
1874
1875             // Ready the second Tx.
1876
1877             final String transactionID2 = "tx2";
1878             final MutableCompositeModification modification2 = new MutableCompositeModification();
1879             final ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
1880                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification2);
1881
1882             shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort2, transactionID2, modification2), getRef());
1883             expectMsgClass(duration, ReadyTransactionReply.class);
1884
1885             // Ready the third Tx.
1886
1887             final String transactionID3 = "tx3";
1888             final DataTreeModification modification3 = dataStore.newModification();
1889             new WriteModification(TestModel.TEST2_PATH, ImmutableNodes.containerNode(TestModel.TEST2_QNAME))
1890                     .apply(modification3);
1891                 modification3.ready();
1892             final ReadyLocalTransaction readyMessage = new ReadyLocalTransaction(transactionID3, modification3, true);
1893
1894             shard.tell(readyMessage, getRef());
1895
1896             // Commit the first Tx. After completing, the second should expire from the queue and the third
1897             // Tx committed.
1898
1899             shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
1900             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1901
1902             // Expect commit reply from the third Tx.
1903
1904             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1905
1906             final NormalizedNode<?, ?> node = readStore(shard, TestModel.TEST2_PATH);
1907             assertNotNull(TestModel.TEST2_PATH + " not found", node);
1908         }};
1909     }
1910
1911     @Test
1912     public void testCanCommitBeforeReadyFailure() throws Throwable {
1913         new ShardTestKit(getSystem()) {{
1914             final TestActorRef<Shard> shard = actorFactory.createTestActor(
1915                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1916                     "testCanCommitBeforeReadyFailure");
1917
1918             shard.tell(new CanCommitTransaction("tx").toSerializable(), getRef());
1919             expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
1920         }};
1921     }
1922
1923     @Test
1924     public void testAbortCurrentTransaction() throws Throwable {
1925         testAbortCurrentTransaction(true);
1926         testAbortCurrentTransaction(false);
1927     }
1928
1929     private void testAbortCurrentTransaction(final boolean readWrite) throws Throwable {
1930         new ShardTestKit(getSystem()) {{
1931             final TestActorRef<Shard> shard = actorFactory.createTestActor(
1932                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1933                     "testAbortCurrentTransaction-" + readWrite);
1934
1935             waitUntilLeader(shard);
1936
1937             // Setup 2 simulated transactions with mock cohorts. The first one will be aborted.
1938
1939             final String transactionID1 = "tx1";
1940             final MutableCompositeModification modification1 = new MutableCompositeModification();
1941             final ShardDataTreeCohort cohort1 = mock(ShardDataTreeCohort.class, "cohort1");
1942             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
1943             doReturn(Futures.immediateFuture(null)).when(cohort1).abort();
1944
1945             final String transactionID2 = "tx2";
1946             final MutableCompositeModification modification2 = new MutableCompositeModification();
1947             final ShardDataTreeCohort cohort2 = mock(ShardDataTreeCohort.class, "cohort2");
1948             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
1949
1950             final FiniteDuration duration = duration("5 seconds");
1951             final Timeout timeout = new Timeout(duration);
1952
1953             shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort1, transactionID1, modification1), getRef());
1954             expectMsgClass(duration, ReadyTransactionReply.class);
1955
1956             shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort2, transactionID2, modification2), getRef());
1957             expectMsgClass(duration, ReadyTransactionReply.class);
1958
1959             // Send the CanCommitTransaction message for the first Tx.
1960
1961             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1962             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1963                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1964             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1965
1966             // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
1967             // processed after the first Tx completes.
1968
1969             final Future<Object> canCommitFuture = Patterns.ask(shard,
1970                     new CanCommitTransaction(transactionID2).toSerializable(), timeout);
1971
1972             // Send the AbortTransaction message for the first Tx. This should trigger the 2nd
1973             // Tx to proceed.
1974
1975             shard.tell(new AbortTransaction(transactionID1).toSerializable(), getRef());
1976             expectMsgClass(duration, AbortTransactionReply.SERIALIZABLE_CLASS);
1977
1978             // Wait for the 2nd Tx to complete the canCommit phase.
1979
1980             Await.ready(canCommitFuture, duration);
1981
1982             final InOrder inOrder = inOrder(cohort1, cohort2);
1983             inOrder.verify(cohort1).canCommit();
1984             inOrder.verify(cohort2).canCommit();
1985         }};
1986     }
1987
1988     @Test
1989     public void testAbortQueuedTransaction() throws Throwable {
1990         testAbortQueuedTransaction(true);
1991         testAbortQueuedTransaction(false);
1992     }
1993
1994     private void testAbortQueuedTransaction(final boolean readWrite) throws Throwable {
1995         dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1);
1996         new ShardTestKit(getSystem()) {{
1997             final AtomicReference<CountDownLatch> cleaupCheckLatch = new AtomicReference<>();
1998             @SuppressWarnings("serial")
1999             final Creator<Shard> creator = new Creator<Shard>() {
2000                 @Override
2001                 public Shard create() throws Exception {
2002                     return new Shard(newShardBuilder()) {
2003                         @Override
2004                         public void onReceiveCommand(final Object message) throws Exception {
2005                             super.onReceiveCommand(message);
2006                             if(message.equals(TX_COMMIT_TIMEOUT_CHECK_MESSAGE)) {
2007                                 if(cleaupCheckLatch.get() != null) {
2008                                     cleaupCheckLatch.get().countDown();
2009                                 }
2010                             }
2011                         }
2012                     };
2013                 }
2014             };
2015
2016             final TestActorRef<Shard> shard = actorFactory.createTestActor(
2017                     Props.create(new DelegatingShardCreator(creator)).withDispatcher(
2018                             Dispatchers.DefaultDispatcherId()), "testAbortQueuedTransaction-" + readWrite);
2019
2020             waitUntilLeader(shard);
2021
2022             final String transactionID = "tx1";
2023
2024             final MutableCompositeModification modification = new MutableCompositeModification();
2025             final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort");
2026             doReturn(Futures.immediateFuture(null)).when(cohort).abort();
2027
2028             final FiniteDuration duration = duration("5 seconds");
2029
2030             // Ready the tx.
2031
2032             shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID, modification), getRef());
2033             expectMsgClass(duration, ReadyTransactionReply.class);
2034
2035             assertEquals("getPendingTxCommitQueueSize", 1, shard.underlyingActor().getPendingTxCommitQueueSize());
2036
2037             // Send the AbortTransaction message.
2038
2039             shard.tell(new AbortTransaction(transactionID).toSerializable(), getRef());
2040             expectMsgClass(duration, AbortTransactionReply.SERIALIZABLE_CLASS);
2041
2042             verify(cohort).abort();
2043
2044             // Verify the tx cohort is removed from queue at the cleanup check interval.
2045
2046             cleaupCheckLatch.set(new CountDownLatch(1));
2047             assertEquals("TX_COMMIT_TIMEOUT_CHECK_MESSAGE received", true,
2048                     cleaupCheckLatch.get().await(5, TimeUnit.SECONDS));
2049
2050             assertEquals("getPendingTxCommitQueueSize", 0, shard.underlyingActor().getPendingTxCommitQueueSize());
2051
2052             // Now send CanCommitTransaction - should fail.
2053
2054             shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
2055
2056             Throwable failure = expectMsgClass(duration, akka.actor.Status.Failure.class).cause();
2057             assertTrue("Failure type", failure instanceof IllegalStateException);
2058         }};
2059     }
2060
2061     @Test
2062     public void testCreateSnapshot() throws Exception {
2063         testCreateSnapshot(true, "testCreateSnapshot");
2064     }
2065
2066     @Test
2067     public void testCreateSnapshotWithNonPersistentData() throws Exception {
2068         testCreateSnapshot(false, "testCreateSnapshotWithNonPersistentData");
2069     }
2070
2071     @SuppressWarnings("serial")
2072     private void testCreateSnapshot(final boolean persistent, final String shardActorName) throws Exception{
2073
2074         final AtomicReference<CountDownLatch> latch = new AtomicReference<>(new CountDownLatch(1));
2075
2076         final AtomicReference<Object> savedSnapshot = new AtomicReference<>();
2077         class TestPersistentDataProvider extends DelegatingPersistentDataProvider {
2078             TestPersistentDataProvider(final DataPersistenceProvider delegate) {
2079                 super(delegate);
2080             }
2081
2082             @Override
2083             public void saveSnapshot(final Object o) {
2084                 savedSnapshot.set(o);
2085                 super.saveSnapshot(o);
2086             }
2087         }
2088
2089         dataStoreContextBuilder.persistent(persistent);
2090
2091         new ShardTestKit(getSystem()) {{
2092             class TestShard extends Shard {
2093
2094                 protected TestShard(AbstractBuilder<?, ?> builder) {
2095                     super(builder);
2096                     setPersistence(new TestPersistentDataProvider(super.persistence()));
2097                 }
2098
2099                 @Override
2100                 public void handleCommand(final Object message) {
2101                     super.handleCommand(message);
2102
2103                     if (message instanceof SaveSnapshotSuccess || message.equals("commit_snapshot")) {
2104                         latch.get().countDown();
2105                     }
2106                 }
2107
2108                 @Override
2109                 public RaftActorContext getRaftActorContext() {
2110                     return super.getRaftActorContext();
2111                 }
2112             }
2113
2114             final Creator<Shard> creator = new Creator<Shard>() {
2115                 @Override
2116                 public Shard create() throws Exception {
2117                     return new TestShard(newShardBuilder());
2118                 }
2119             };
2120
2121             final TestActorRef<Shard> shard = actorFactory.createTestActor(
2122                     Props.create(new DelegatingShardCreator(creator)), shardActorName);
2123
2124             waitUntilLeader(shard);
2125             writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
2126
2127             final NormalizedNode<?,?> expectedRoot = readStore(shard, YangInstanceIdentifier.builder().build());
2128
2129             // Trigger creation of a snapshot by ensuring
2130             final RaftActorContext raftActorContext = ((TestShard) shard.underlyingActor()).getRaftActorContext();
2131             raftActorContext.getSnapshotManager().capture(mock(ReplicatedLogEntry.class), -1);
2132             awaitAndValidateSnapshot(expectedRoot);
2133
2134             raftActorContext.getSnapshotManager().capture(mock(ReplicatedLogEntry.class), -1);
2135             awaitAndValidateSnapshot(expectedRoot);
2136         }
2137
2138         private void awaitAndValidateSnapshot(NormalizedNode<?,?> expectedRoot) throws InterruptedException {
2139             assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS));
2140
2141             assertTrue("Invalid saved snapshot " + savedSnapshot.get(),
2142                     savedSnapshot.get() instanceof Snapshot);
2143
2144             verifySnapshot((Snapshot)savedSnapshot.get(), expectedRoot);
2145
2146             latch.set(new CountDownLatch(1));
2147             savedSnapshot.set(null);
2148         }
2149
2150         private void verifySnapshot(final Snapshot snapshot, final NormalizedNode<?,?> expectedRoot) {
2151
2152             final NormalizedNode<?, ?> actual = SerializationUtils.deserializeNormalizedNode(snapshot.getState());
2153             assertEquals("Root node", expectedRoot, actual);
2154
2155         }};
2156     }
2157
2158     /**
2159      * This test simply verifies that the applySnapShot logic will work
2160      * @throws ReadFailedException
2161      * @throws DataValidationFailedException
2162      */
2163     @Test
2164     public void testInMemoryDataTreeRestore() throws ReadFailedException, DataValidationFailedException {
2165         final DataTree store = InMemoryDataTreeFactory.getInstance().create(TreeType.OPERATIONAL);
2166         store.setSchemaContext(SCHEMA_CONTEXT);
2167
2168         final DataTreeModification putTransaction = store.takeSnapshot().newModification();
2169         putTransaction.write(TestModel.TEST_PATH,
2170             ImmutableNodes.containerNode(TestModel.TEST_QNAME));
2171         commitTransaction(store, putTransaction);
2172
2173
2174         final NormalizedNode<?, ?> expected = readStore(store, YangInstanceIdentifier.builder().build());
2175
2176         final DataTreeModification writeTransaction = store.takeSnapshot().newModification();
2177
2178         writeTransaction.delete(YangInstanceIdentifier.builder().build());
2179         writeTransaction.write(YangInstanceIdentifier.builder().build(), expected);
2180
2181         commitTransaction(store, writeTransaction);
2182
2183         final NormalizedNode<?, ?> actual = readStore(store, YangInstanceIdentifier.builder().build());
2184
2185         assertEquals(expected, actual);
2186     }
2187
2188     @Test
2189     public void testRecoveryApplicable(){
2190
2191         final DatastoreContext persistentContext = DatastoreContext.newBuilder().
2192                 shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(true).build();
2193
2194         final Props persistentProps = Shard.builder().id(shardID).datastoreContext(persistentContext).
2195                 schemaContext(SCHEMA_CONTEXT).props();
2196
2197         final DatastoreContext nonPersistentContext = DatastoreContext.newBuilder().
2198                 shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(false).build();
2199
2200         final Props nonPersistentProps = Shard.builder().id(shardID).datastoreContext(nonPersistentContext).
2201                 schemaContext(SCHEMA_CONTEXT).props();
2202
2203         new ShardTestKit(getSystem()) {{
2204             final TestActorRef<Shard> shard1 = actorFactory.createTestActor(persistentProps, "testPersistence1");
2205
2206             assertTrue("Recovery Applicable", shard1.underlyingActor().persistence().isRecoveryApplicable());
2207
2208             final TestActorRef<Shard> shard2 = actorFactory.createTestActor(nonPersistentProps, "testPersistence2");
2209
2210             assertFalse("Recovery Not Applicable", shard2.underlyingActor().persistence().isRecoveryApplicable());
2211         }};
2212     }
2213
2214     @Test
2215     public void testOnDatastoreContext() {
2216         new ShardTestKit(getSystem()) {{
2217             dataStoreContextBuilder.persistent(true);
2218
2219             final TestActorRef<Shard> shard = actorFactory.createTestActor(newShardProps(), "testOnDatastoreContext");
2220
2221             assertEquals("isRecoveryApplicable", true,
2222                     shard.underlyingActor().persistence().isRecoveryApplicable());
2223
2224             waitUntilLeader(shard);
2225
2226             shard.tell(dataStoreContextBuilder.persistent(false).build(), ActorRef.noSender());
2227
2228             assertEquals("isRecoveryApplicable", false,
2229                 shard.underlyingActor().persistence().isRecoveryApplicable());
2230
2231             shard.tell(dataStoreContextBuilder.persistent(true).build(), ActorRef.noSender());
2232
2233             assertEquals("isRecoveryApplicable", true,
2234                 shard.underlyingActor().persistence().isRecoveryApplicable());
2235         }};
2236     }
2237
2238     @Test
2239     public void testRegisterRoleChangeListener() throws Exception {
2240         new ShardTestKit(getSystem()) {
2241             {
2242                 final TestActorRef<Shard> shard = actorFactory.createTestActor(
2243                         newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
2244                         "testRegisterRoleChangeListener");
2245
2246                 waitUntilLeader(shard);
2247
2248                 final TestActorRef<MessageCollectorActor> listener =
2249                         TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
2250
2251                 shard.tell(new RegisterRoleChangeListener(), listener);
2252
2253                 MessageCollectorActor.expectFirstMatching(listener, RegisterRoleChangeListenerReply.class);
2254
2255                 ShardLeaderStateChanged leaderStateChanged = MessageCollectorActor.expectFirstMatching(listener,
2256                     ShardLeaderStateChanged.class);
2257                 assertEquals("getLocalShardDataTree present", true,
2258                         leaderStateChanged.getLocalShardDataTree().isPresent());
2259                 assertSame("getLocalShardDataTree", shard.underlyingActor().getDataStore().getDataTree(),
2260                     leaderStateChanged.getLocalShardDataTree().get());
2261
2262                 MessageCollectorActor.clearMessages(listener);
2263
2264                 // Force a leader change
2265
2266                 shard.tell(new RequestVote(10000, "member2", 50, 50), getRef());
2267
2268                 leaderStateChanged = MessageCollectorActor.expectFirstMatching(listener,
2269                         ShardLeaderStateChanged.class);
2270                 assertEquals("getLocalShardDataTree present", false,
2271                         leaderStateChanged.getLocalShardDataTree().isPresent());
2272             }
2273         };
2274     }
2275
2276     @Test
2277     public void testFollowerInitialSyncStatus() throws Exception {
2278         final TestActorRef<Shard> shard = actorFactory.createTestActor(
2279                 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
2280                 "testFollowerInitialSyncStatus");
2281
2282         shard.underlyingActor().onReceiveCommand(new FollowerInitialSyncUpStatus(false, "member-1-shard-inventory-operational"));
2283
2284         assertEquals(false, shard.underlyingActor().getShardMBean().getFollowerInitialSyncStatus());
2285
2286         shard.underlyingActor().onReceiveCommand(new FollowerInitialSyncUpStatus(true, "member-1-shard-inventory-operational"));
2287
2288         assertEquals(true, shard.underlyingActor().getShardMBean().getFollowerInitialSyncStatus());
2289     }
2290
2291     @Test
2292     public void testClusteredDataChangeListenerDelayedRegistration() throws Exception {
2293         new ShardTestKit(getSystem()) {{
2294             String testName = "testClusteredDataChangeListenerDelayedRegistration";
2295             dataStoreContextBuilder.shardElectionTimeoutFactor(1000);
2296
2297             final MockDataChangeListener listener = new MockDataChangeListener(1);
2298             final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener),
2299                     actorFactory.generateActorId(testName + "-DataChangeListener"));
2300
2301             final TestActorRef<Shard> shard = actorFactory.createTestActor(
2302                     newShardBuilder().props().withDispatcher(Dispatchers.DefaultDispatcherId()),
2303                     actorFactory.generateActorId(testName + "-shard"));
2304
2305             waitUntilNoLeader(shard);
2306
2307             final YangInstanceIdentifier path = TestModel.TEST_PATH;
2308
2309             shard.tell(new RegisterChangeListener(path, dclActor, AsyncDataBroker.DataChangeScope.BASE, true), getRef());
2310             final RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
2311                 RegisterChangeListenerReply.class);
2312             assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
2313
2314             writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
2315
2316             shard.tell(new ElectionTimeout(), ActorRef.noSender());
2317
2318             listener.waitForChangeEvents();
2319         }};
2320     }
2321
2322     @Test
2323     public void testClusteredDataChangeListenerRegistration() throws Exception {
2324         new ShardTestKit(getSystem()) {{
2325             String testName = "testClusteredDataChangeListenerRegistration";
2326             final ShardIdentifier followerShardID = ShardIdentifier.builder().memberName(
2327                     actorFactory.generateActorId(testName + "-follower")).shardName("inventory").type("config").build();
2328
2329             final ShardIdentifier leaderShardID = ShardIdentifier.builder().memberName(
2330                     actorFactory.generateActorId(testName + "-leader")).shardName("inventory").type("config").build();
2331
2332             final TestActorRef<Shard> followerShard = actorFactory.createTestActor(
2333                     Shard.builder().id(followerShardID).
2334                         datastoreContext(dataStoreContextBuilder.shardElectionTimeoutFactor(1000).build()).
2335                         peerAddresses(Collections.singletonMap(leaderShardID.toString(),
2336                             "akka://test/user/" + leaderShardID.toString())).schemaContext(SCHEMA_CONTEXT).props().
2337                     withDispatcher(Dispatchers.DefaultDispatcherId()), followerShardID.toString());
2338
2339             final TestActorRef<Shard> leaderShard = actorFactory.createTestActor(
2340                     Shard.builder().id(leaderShardID).datastoreContext(newDatastoreContext()).
2341                         peerAddresses(Collections.singletonMap(followerShardID.toString(),
2342                             "akka://test/user/" + followerShardID.toString())).schemaContext(SCHEMA_CONTEXT).props().
2343                     withDispatcher(Dispatchers.DefaultDispatcherId()), leaderShardID.toString());
2344
2345             leaderShard.tell(new ElectionTimeout(), ActorRef.noSender());
2346             String leaderPath = waitUntilLeader(followerShard);
2347             assertEquals("Shard leader path", leaderShard.path().toString(), leaderPath);
2348
2349             final YangInstanceIdentifier path = TestModel.TEST_PATH;
2350             final MockDataChangeListener listener = new MockDataChangeListener(1);
2351             final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener),
2352                     actorFactory.generateActorId(testName + "-DataChangeListener"));
2353
2354             followerShard.tell(new RegisterChangeListener(path, dclActor, AsyncDataBroker.DataChangeScope.BASE, true), getRef());
2355             final RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
2356                 RegisterChangeListenerReply.class);
2357             assertNotNull("getListenerRegistratioznPath", reply.getListenerRegistrationPath());
2358
2359             writeToStore(followerShard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
2360
2361             listener.waitForChangeEvents();
2362         }};
2363     }
2364
2365     @Test
2366     public void testClusteredDataTreeChangeListenerDelayedRegistration() throws Exception {
2367         new ShardTestKit(getSystem()) {{
2368             String testName = "testClusteredDataTreeChangeListenerDelayedRegistration";
2369             dataStoreContextBuilder.shardElectionTimeoutFactor(1000);
2370
2371             final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
2372             final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener),
2373                     actorFactory.generateActorId(testName + "-DataTreeChangeListener"));
2374
2375             final TestActorRef<Shard> shard = actorFactory.createTestActor(
2376                     newShardBuilder().props().withDispatcher(Dispatchers.DefaultDispatcherId()),
2377                     actorFactory.generateActorId(testName + "-shard"));
2378
2379             waitUntilNoLeader(shard);
2380
2381             final YangInstanceIdentifier path = TestModel.TEST_PATH;
2382
2383             shard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor, true), getRef());
2384             final RegisterDataTreeChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
2385                     RegisterDataTreeChangeListenerReply.class);
2386             assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
2387
2388             writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
2389
2390             shard.tell(new ElectionTimeout(), ActorRef.noSender());
2391
2392             listener.waitForChangeEvents();
2393         }};
2394     }
2395
2396     @Test
2397     public void testClusteredDataTreeChangeListenerRegistration() throws Exception {
2398         new ShardTestKit(getSystem()) {{
2399             String testName = "testClusteredDataTreeChangeListenerRegistration";
2400             final ShardIdentifier followerShardID = ShardIdentifier.builder().memberName(
2401                     actorFactory.generateActorId(testName + "-follower")).shardName("inventory").type("config").build();
2402
2403             final ShardIdentifier leaderShardID = ShardIdentifier.builder().memberName(
2404                     actorFactory.generateActorId(testName + "-leader")).shardName("inventory").type("config").build();
2405
2406             final TestActorRef<Shard> followerShard = actorFactory.createTestActor(
2407                     Shard.builder().id(followerShardID).
2408                         datastoreContext(dataStoreContextBuilder.shardElectionTimeoutFactor(1000).build()).
2409                         peerAddresses(Collections.singletonMap(leaderShardID.toString(),
2410                             "akka://test/user/" + leaderShardID.toString())).schemaContext(SCHEMA_CONTEXT).props().
2411                     withDispatcher(Dispatchers.DefaultDispatcherId()), followerShardID.toString());
2412
2413             final TestActorRef<Shard> leaderShard = actorFactory.createTestActor(
2414                     Shard.builder().id(leaderShardID).datastoreContext(newDatastoreContext()).
2415                         peerAddresses(Collections.singletonMap(followerShardID.toString(),
2416                             "akka://test/user/" + followerShardID.toString())).schemaContext(SCHEMA_CONTEXT).props().
2417                     withDispatcher(Dispatchers.DefaultDispatcherId()), leaderShardID.toString());
2418
2419             leaderShard.tell(new ElectionTimeout(), ActorRef.noSender());
2420             String leaderPath = waitUntilLeader(followerShard);
2421             assertEquals("Shard leader path", leaderShard.path().toString(), leaderPath);
2422
2423             final YangInstanceIdentifier path = TestModel.TEST_PATH;
2424             final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
2425             final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener),
2426                     actorFactory.generateActorId(testName + "-DataTreeChangeListener"));
2427
2428             followerShard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor, true), getRef());
2429             final RegisterDataTreeChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
2430                     RegisterDataTreeChangeListenerReply.class);
2431             assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
2432
2433             writeToStore(followerShard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
2434
2435             listener.waitForChangeEvents();
2436         }};
2437     }
2438
2439     @Test
2440     public void testServerRemoved() throws Exception {
2441         final TestActorRef<MessageCollectorActor> parent = actorFactory.createTestActor(MessageCollectorActor.props());
2442
2443         final ActorRef shard = parent.underlyingActor().context().actorOf(
2444                 newShardBuilder().props().withDispatcher(Dispatchers.DefaultDispatcherId()),
2445                 "testServerRemoved");
2446
2447         shard.tell(new ServerRemoved("test"), ActorRef.noSender());
2448
2449         MessageCollectorActor.expectFirstMatching(parent, ServerRemoved.class);
2450     }
2451 }