Deprecate ReadData/DataExists protobuff messages
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / ShardTest.java
1 /*
2  * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertFalse;
13 import static org.junit.Assert.assertNotNull;
14 import static org.junit.Assert.assertNull;
15 import static org.junit.Assert.assertSame;
16 import static org.junit.Assert.assertTrue;
17 import static org.mockito.Mockito.doReturn;
18 import static org.mockito.Mockito.inOrder;
19 import static org.mockito.Mockito.mock;
20 import static org.mockito.Mockito.reset;
21 import static org.mockito.Mockito.verify;
22 import static org.opendaylight.controller.cluster.datastore.DataStoreVersions.CURRENT_VERSION;
23 import akka.actor.ActorRef;
24 import akka.actor.ActorSelection;
25 import akka.actor.Props;
26 import akka.actor.Status.Failure;
27 import akka.dispatch.Dispatchers;
28 import akka.dispatch.OnComplete;
29 import akka.japi.Creator;
30 import akka.pattern.Patterns;
31 import akka.persistence.SaveSnapshotSuccess;
32 import akka.testkit.TestActorRef;
33 import akka.util.Timeout;
34 import com.google.common.base.Function;
35 import com.google.common.util.concurrent.Futures;
36 import com.google.common.util.concurrent.ListenableFuture;
37 import com.google.common.util.concurrent.Uninterruptibles;
38 import java.util.Collections;
39 import java.util.HashSet;
40 import java.util.Set;
41 import java.util.concurrent.CountDownLatch;
42 import java.util.concurrent.TimeUnit;
43 import java.util.concurrent.atomic.AtomicBoolean;
44 import java.util.concurrent.atomic.AtomicReference;
45 import org.junit.Test;
46 import org.mockito.InOrder;
47 import org.opendaylight.controller.cluster.DataPersistenceProvider;
48 import org.opendaylight.controller.cluster.DelegatingPersistentDataProvider;
49 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
50 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
51 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
52 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
53 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
54 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
55 import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
56 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
57 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
58 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
59 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
60 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
61 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
62 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
63 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
64 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
65 import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction;
66 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
67 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
68 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
69 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener;
70 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListenerReply;
71 import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
72 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
73 import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
74 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
75 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
76 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
77 import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener;
78 import org.opendaylight.controller.cluster.datastore.utils.MockDataTreeChangeListener;
79 import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
80 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
81 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListenerReply;
82 import org.opendaylight.controller.cluster.raft.RaftActorContext;
83 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
84 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
85 import org.opendaylight.controller.cluster.raft.Snapshot;
86 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
87 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
88 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
89 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
90 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
91 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
92 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
93 import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
94 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
95 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
96 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
97 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
98 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
99 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
100 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
101 import org.opendaylight.controller.protobuff.messages.cohort3pc.ThreePhaseCommitCohortMessages;
102 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
103 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
104 import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
105 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
106 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
107 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateNode;
108 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateTip;
109 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
110 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
111 import org.opendaylight.yangtools.yang.data.api.schema.tree.ModificationType;
112 import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType;
113 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
114 import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
115 import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory;
116 import scala.concurrent.Await;
117 import scala.concurrent.Future;
118 import scala.concurrent.duration.FiniteDuration;
119
120 public class ShardTest extends AbstractShardTest {
121     private static final String DUMMY_DATA = "Dummy data as snapshot sequence number is set to 0 in InMemorySnapshotStore and journal recovery seq number will start from 1";
122
123     @Test
124     public void testRegisterChangeListener() throws Exception {
125         new ShardTestKit(getSystem()) {{
126             final TestActorRef<Shard> shard = actorFactory.createTestActor(
127                     newShardProps(),  "testRegisterChangeListener");
128
129             waitUntilLeader(shard);
130
131             shard.tell(new UpdateSchemaContext(SchemaContextHelper.full()), ActorRef.noSender());
132
133             final MockDataChangeListener listener = new MockDataChangeListener(1);
134             final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener),
135                     "testRegisterChangeListener-DataChangeListener");
136
137             shard.tell(new RegisterChangeListener(TestModel.TEST_PATH,
138                     dclActor, AsyncDataBroker.DataChangeScope.BASE, true), getRef());
139
140             final RegisterChangeListenerReply reply = expectMsgClass(duration("3 seconds"),
141                     RegisterChangeListenerReply.class);
142             final String replyPath = reply.getListenerRegistrationPath().toString();
143             assertTrue("Incorrect reply path: " + replyPath, replyPath.matches(
144                     "akka:\\/\\/test\\/user\\/testRegisterChangeListener\\/\\$.*"));
145
146             final YangInstanceIdentifier path = TestModel.TEST_PATH;
147             writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
148
149             listener.waitForChangeEvents(path);
150         }};
151     }
152
153     @SuppressWarnings("serial")
154     @Test
155     public void testChangeListenerNotifiedWhenNotTheLeaderOnRegistration() throws Exception {
156         // This test tests the timing window in which a change listener is registered before the
157         // shard becomes the leader. We verify that the listener is registered and notified of the
158         // existing data when the shard becomes the leader.
159         new ShardTestKit(getSystem()) {{
160             // For this test, we want to send the RegisterChangeListener message after the shard
161             // has recovered from persistence and before it becomes the leader. So we subclass
162             // Shard to override onReceiveCommand and, when the first ElectionTimeout is received,
163             // we know that the shard has been initialized to a follower and has started the
164             // election process. The following 2 CountDownLatches are used to coordinate the
165             // ElectionTimeout with the sending of the RegisterChangeListener message.
166             final CountDownLatch onFirstElectionTimeout = new CountDownLatch(1);
167             final CountDownLatch onChangeListenerRegistered = new CountDownLatch(1);
168             final Creator<Shard> creator = new Creator<Shard>() {
169                 boolean firstElectionTimeout = true;
170
171                 @Override
172                 public Shard create() throws Exception {
173                     // Use a non persistent provider because this test actually invokes persist on the journal
174                     // this will cause all other messages to not be queued properly after that.
175                     // The basic issue is that you cannot use TestActorRef with a persistent actor (at least when
176                     // it does do a persist)
177                     return new Shard(newShardBuilder()) {
178                         @Override
179                         public void onReceiveCommand(final Object message) throws Exception {
180                             if(message instanceof ElectionTimeout && firstElectionTimeout) {
181                                 // Got the first ElectionTimeout. We don't forward it to the
182                                 // base Shard yet until we've sent the RegisterChangeListener
183                                 // message. So we signal the onFirstElectionTimeout latch to tell
184                                 // the main thread to send the RegisterChangeListener message and
185                                 // start a thread to wait on the onChangeListenerRegistered latch,
186                                 // which the main thread signals after it has sent the message.
187                                 // After the onChangeListenerRegistered is triggered, we send the
188                                 // original ElectionTimeout message to proceed with the election.
189                                 firstElectionTimeout = false;
190                                 final ActorRef self = getSelf();
191                                 new Thread() {
192                                     @Override
193                                     public void run() {
194                                         Uninterruptibles.awaitUninterruptibly(
195                                                 onChangeListenerRegistered, 5, TimeUnit.SECONDS);
196                                         self.tell(message, self);
197                                     }
198                                 }.start();
199
200                                 onFirstElectionTimeout.countDown();
201                             } else {
202                                 super.onReceiveCommand(message);
203                             }
204                         }
205                     };
206                 }
207             };
208
209             final MockDataChangeListener listener = new MockDataChangeListener(1);
210             final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener),
211                     "testRegisterChangeListenerWhenNotLeaderInitially-DataChangeListener");
212
213             final TestActorRef<Shard> shard = actorFactory.createTestActor(
214                     Props.create(new DelegatingShardCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()),
215                     "testRegisterChangeListenerWhenNotLeaderInitially");
216
217             // Write initial data into the in-memory store.
218             final YangInstanceIdentifier path = TestModel.TEST_PATH;
219             writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
220
221             // Wait until the shard receives the first ElectionTimeout message.
222             assertEquals("Got first ElectionTimeout", true,
223                     onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
224
225             // Now send the RegisterChangeListener and wait for the reply.
226             shard.tell(new RegisterChangeListener(path, dclActor,
227                     AsyncDataBroker.DataChangeScope.SUBTREE, false), getRef());
228
229             final RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
230                     RegisterChangeListenerReply.class);
231             assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
232
233             // Sanity check - verify the shard is not the leader yet.
234             shard.tell(new FindLeader(), getRef());
235             final FindLeaderReply findLeadeReply =
236                     expectMsgClass(duration("5 seconds"), FindLeaderReply.class);
237             assertNull("Expected the shard not to be the leader", findLeadeReply.getLeaderActor());
238
239             // Signal the onChangeListenerRegistered latch to tell the thread above to proceed
240             // with the election process.
241             onChangeListenerRegistered.countDown();
242
243             // Wait for the shard to become the leader and notify our listener with the existing
244             // data in the store.
245             listener.waitForChangeEvents(path);
246         }};
247     }
248
249     @Test
250     public void testRegisterDataTreeChangeListener() throws Exception {
251         new ShardTestKit(getSystem()) {{
252             final TestActorRef<Shard> shard = actorFactory.createTestActor(
253                     newShardProps(), "testRegisterDataTreeChangeListener");
254
255             waitUntilLeader(shard);
256
257             shard.tell(new UpdateSchemaContext(SchemaContextHelper.full()), ActorRef.noSender());
258
259             final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
260             final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener),
261                     "testRegisterDataTreeChangeListener-DataTreeChangeListener");
262
263             shard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor, false), getRef());
264
265             final RegisterDataTreeChangeListenerReply reply = expectMsgClass(duration("3 seconds"),
266                     RegisterDataTreeChangeListenerReply.class);
267             final String replyPath = reply.getListenerRegistrationPath().toString();
268             assertTrue("Incorrect reply path: " + replyPath, replyPath.matches(
269                     "akka:\\/\\/test\\/user\\/testRegisterDataTreeChangeListener\\/\\$.*"));
270
271             final YangInstanceIdentifier path = TestModel.TEST_PATH;
272             writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
273
274             listener.waitForChangeEvents();
275         }};
276     }
277
278     @SuppressWarnings("serial")
279     @Test
280     public void testDataTreeChangeListenerNotifiedWhenNotTheLeaderOnRegistration() throws Exception {
281         new ShardTestKit(getSystem()) {{
282             final CountDownLatch onFirstElectionTimeout = new CountDownLatch(1);
283             final CountDownLatch onChangeListenerRegistered = new CountDownLatch(1);
284             final Creator<Shard> creator = new Creator<Shard>() {
285                 boolean firstElectionTimeout = true;
286
287                 @Override
288                 public Shard create() throws Exception {
289                     return new Shard(Shard.builder().id(shardID).datastoreContext(
290                             dataStoreContextBuilder.persistent(false).build()).schemaContext(SCHEMA_CONTEXT)) {
291                         @Override
292                         public void onReceiveCommand(final Object message) throws Exception {
293                             if(message instanceof ElectionTimeout && firstElectionTimeout) {
294                                 firstElectionTimeout = false;
295                                 final ActorRef self = getSelf();
296                                 new Thread() {
297                                     @Override
298                                     public void run() {
299                                         Uninterruptibles.awaitUninterruptibly(
300                                                 onChangeListenerRegistered, 5, TimeUnit.SECONDS);
301                                         self.tell(message, self);
302                                     }
303                                 }.start();
304
305                                 onFirstElectionTimeout.countDown();
306                             } else {
307                                 super.onReceiveCommand(message);
308                             }
309                         }
310                     };
311                 }
312             };
313
314             final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
315             final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener),
316                     "testDataTreeChangeListenerNotifiedWhenNotTheLeaderOnRegistration-DataChangeListener");
317
318             final TestActorRef<Shard> shard = actorFactory.createTestActor(
319                     Props.create(new DelegatingShardCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()),
320                     "testDataTreeChangeListenerNotifiedWhenNotTheLeaderOnRegistration");
321
322             final YangInstanceIdentifier path = TestModel.TEST_PATH;
323             writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
324
325             assertEquals("Got first ElectionTimeout", true,
326                 onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
327
328             shard.tell(new RegisterDataTreeChangeListener(path, dclActor, false), getRef());
329             final RegisterDataTreeChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
330                 RegisterDataTreeChangeListenerReply.class);
331             assertNotNull("getListenerRegistratioznPath", reply.getListenerRegistrationPath());
332
333             shard.tell(new FindLeader(), getRef());
334             final FindLeaderReply findLeadeReply =
335                     expectMsgClass(duration("5 seconds"), FindLeaderReply.class);
336             assertNull("Expected the shard not to be the leader", findLeadeReply.getLeaderActor());
337
338             writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
339
340             onChangeListenerRegistered.countDown();
341
342             // TODO: investigate why we do not receive data chage events
343             listener.waitForChangeEvents();
344         }};
345     }
346
347     @Test
348     public void testCreateTransaction(){
349         new ShardTestKit(getSystem()) {{
350             final ActorRef shard = actorFactory.createActor(newShardProps(), "testCreateTransaction");
351
352             waitUntilLeader(shard);
353
354             shard.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
355
356             shard.tell(new CreateTransaction("txn-1", TransactionType.READ_ONLY.ordinal(), null,
357                     DataStoreVersions.CURRENT_VERSION).toSerializable(), getRef());
358
359             final CreateTransactionReply reply = expectMsgClass(duration("3 seconds"),
360                     CreateTransactionReply.class);
361
362             final String path = reply.getTransactionPath().toString();
363             assertTrue("Unexpected transaction path " + path,
364                     path.contains("akka://test/user/testCreateTransaction/shard-txn-1"));
365         }};
366     }
367
368     @Test
369     public void testCreateTransactionOnChain(){
370         new ShardTestKit(getSystem()) {{
371             final ActorRef shard = actorFactory.createActor(newShardProps(), "testCreateTransactionOnChain");
372
373             waitUntilLeader(shard);
374
375             shard.tell(new CreateTransaction("txn-1",TransactionType.READ_ONLY.ordinal(), "foobar",
376                     DataStoreVersions.CURRENT_VERSION).toSerializable(), getRef());
377
378             final CreateTransactionReply reply = expectMsgClass(duration("3 seconds"),
379                     CreateTransactionReply.class);
380
381             final String path = reply.getTransactionPath().toString();
382             assertTrue("Unexpected transaction path " + path,
383                     path.contains("akka://test/user/testCreateTransactionOnChain/shard-txn-1"));
384         }};
385     }
386
387     @SuppressWarnings("serial")
388     @Test
389     public void testPeerAddressResolved() throws Exception {
390         new ShardTestKit(getSystem()) {{
391             final CountDownLatch recoveryComplete = new CountDownLatch(1);
392             class TestShard extends Shard {
393                 TestShard() {
394                     super(Shard.builder().id(shardID).datastoreContext(newDatastoreContext()).
395                             peerAddresses(Collections.<String, String>singletonMap(shardID.toString(), null)).
396                             schemaContext(SCHEMA_CONTEXT));
397                 }
398
399                 String getPeerAddress(String id) {
400                     return getRaftActorContext().getPeerAddress(id);
401                 }
402
403                 @Override
404                 protected void onRecoveryComplete() {
405                     try {
406                         super.onRecoveryComplete();
407                     } finally {
408                         recoveryComplete.countDown();
409                     }
410                 }
411             }
412
413             final TestActorRef<Shard> shard = actorFactory.createTestActor(
414                     Props.create(new DelegatingShardCreator(new Creator<Shard>() {
415                         @Override
416                         public TestShard create() throws Exception {
417                             return new TestShard();
418                         }
419                     })), "testPeerAddressResolved");
420
421             assertEquals("Recovery complete", true,
422                 Uninterruptibles.awaitUninterruptibly(recoveryComplete, 5, TimeUnit.SECONDS));
423
424             final String address = "akka://foobar";
425             shard.underlyingActor().onReceiveCommand(new PeerAddressResolved(shardID.toString(), address));
426
427             assertEquals("getPeerAddress", address,
428                 ((TestShard) shard.underlyingActor()).getPeerAddress(shardID.toString()));
429         }};
430     }
431
432     @Test
433     public void testApplySnapshot() throws Exception {
434
435         final TestActorRef<Shard> shard = actorFactory.createTestActor(newShardProps(), "testApplySnapshot");
436
437         ShardTestKit.waitUntilLeader(shard);
438
439         final DataTree store = InMemoryDataTreeFactory.getInstance().create(TreeType.OPERATIONAL);
440         store.setSchemaContext(SCHEMA_CONTEXT);
441
442         final ContainerNode container = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
443                 new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
444                     withChild(ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).addChild(
445                         ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1)).build()).build();
446
447         writeToStore(store, TestModel.TEST_PATH, container);
448
449         final YangInstanceIdentifier root = YangInstanceIdentifier.builder().build();
450         final NormalizedNode<?,?> expected = readStore(store, root);
451
452         final Snapshot snapshot = Snapshot.create(SerializationUtils.serializeNormalizedNode(expected),
453                 Collections.<ReplicatedLogEntry>emptyList(), 1, 2, 3, 4);
454
455         shard.underlyingActor().getRaftActorSnapshotCohort().applySnapshot(snapshot.getState());
456
457         final NormalizedNode<?,?> actual = readStore(shard, root);
458
459         assertEquals("Root node", expected, actual);
460     }
461
462     @Test
463     public void testApplyState() throws Exception {
464         final TestActorRef<Shard> shard = actorFactory.createTestActor(newShardProps(), "testApplyState");
465
466         ShardTestKit.waitUntilLeader(shard);
467
468         final DataTree source = setupInMemorySnapshotStore();
469         final DataTreeModification writeMod = source.takeSnapshot().newModification();
470         ContainerNode node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
471         writeMod.write(TestModel.TEST_PATH, node);
472         writeMod.ready();
473
474         final ApplyState applyState = new ApplyState(null, "test", new ReplicatedLogImplEntry(1, 2,
475                 payloadForModification(source, writeMod)));
476
477         shard.underlyingActor().onReceiveCommand(applyState);
478
479         final NormalizedNode<?,?> actual = readStore(shard, TestModel.TEST_PATH);
480         assertEquals("Applied state", node, actual);
481     }
482
483     @Test
484     public void testDataTreeCandidateRecovery() throws Exception {
485         // Set up the InMemorySnapshotStore.
486         final DataTree source = setupInMemorySnapshotStore();
487
488         final DataTreeModification writeMod = source.takeSnapshot().newModification();
489         writeMod.write(TestModel.OUTER_LIST_PATH, ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
490         writeMod.ready();
491         InMemoryJournal.addEntry(shardID.toString(), 0, DUMMY_DATA);
492
493         // Set up the InMemoryJournal.
494         InMemoryJournal.addEntry(shardID.toString(), 1, new ReplicatedLogImplEntry(0, 1, payloadForModification(source, writeMod)));
495
496         final int nListEntries = 16;
497         final Set<Integer> listEntryKeys = new HashSet<>();
498
499         // Add some ModificationPayload entries
500         for (int i = 1; i <= nListEntries; i++) {
501             listEntryKeys.add(Integer.valueOf(i));
502
503             final YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
504                     .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
505
506             final DataTreeModification mod = source.takeSnapshot().newModification();
507             mod.merge(path, ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i));
508             mod.ready();
509             InMemoryJournal.addEntry(shardID.toString(), i+1, new ReplicatedLogImplEntry(i, 1,
510                 payloadForModification(source, mod)));
511         }
512
513         InMemoryJournal.addEntry(shardID.toString(), nListEntries + 2,
514             new ApplyJournalEntries(nListEntries));
515
516         testRecovery(listEntryKeys);
517     }
518
519     @Test
520     public void testConcurrentThreePhaseCommits() throws Throwable {
521         new ShardTestKit(getSystem()) {{
522             final TestActorRef<Shard> shard = actorFactory.createTestActor(
523                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
524                     "testConcurrentThreePhaseCommits");
525
526             waitUntilLeader(shard);
527
528             // Setup 3 simulated transactions with mock cohorts backed by real cohorts.
529
530             final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
531
532             final String transactionID1 = "tx1";
533             final MutableCompositeModification modification1 = new MutableCompositeModification();
534             final ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
535                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
536
537             final String transactionID2 = "tx2";
538             final MutableCompositeModification modification2 = new MutableCompositeModification();
539             final ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
540                     TestModel.OUTER_LIST_PATH,
541                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
542                     modification2);
543
544             final String transactionID3 = "tx3";
545             final MutableCompositeModification modification3 = new MutableCompositeModification();
546             final ShardDataTreeCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
547                     YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
548                         .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
549                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1),
550                     modification3);
551
552             final long timeoutSec = 5;
553             final FiniteDuration duration = FiniteDuration.create(timeoutSec, TimeUnit.SECONDS);
554             final Timeout timeout = new Timeout(duration);
555
556             shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort1, transactionID1, modification1), getRef());
557             final ReadyTransactionReply readyReply = ReadyTransactionReply.fromSerializable(
558                     expectMsgClass(duration, ReadyTransactionReply.class));
559             assertEquals("Cohort path", shard.path().toString(), readyReply.getCohortPath());
560
561             // Send the CanCommitTransaction message for the first Tx.
562
563             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
564             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
565                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
566             assertEquals("Can commit", true, canCommitReply.getCanCommit());
567
568             shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort2, transactionID2, modification2), getRef());
569             expectMsgClass(duration, ReadyTransactionReply.class);
570
571             shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort3, transactionID3, modification3), getRef());
572             expectMsgClass(duration, ReadyTransactionReply.class);
573
574             // Send the CanCommitTransaction message for the next 2 Tx's. These should get queued and
575             // processed after the first Tx completes.
576
577             final Future<Object> canCommitFuture1 = Patterns.ask(shard,
578                     new CanCommitTransaction(transactionID2).toSerializable(), timeout);
579
580             final Future<Object> canCommitFuture2 = Patterns.ask(shard,
581                     new CanCommitTransaction(transactionID3).toSerializable(), timeout);
582
583             // Send the CommitTransaction message for the first Tx. After it completes, it should
584             // trigger the 2nd Tx to proceed which should in turn then trigger the 3rd.
585
586             shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
587             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
588
589             // Wait for the next 2 Tx's to complete.
590
591             final AtomicReference<Throwable> caughtEx = new AtomicReference<>();
592             final CountDownLatch commitLatch = new CountDownLatch(2);
593
594             class OnFutureComplete extends OnComplete<Object> {
595                 private final Class<?> expRespType;
596
597                 OnFutureComplete(final Class<?> expRespType) {
598                     this.expRespType = expRespType;
599                 }
600
601                 @Override
602                 public void onComplete(final Throwable error, final Object resp) {
603                     if(error != null) {
604                         caughtEx.set(new AssertionError(getClass().getSimpleName() + " failure", error));
605                     } else {
606                         try {
607                             assertEquals("Commit response type", expRespType, resp.getClass());
608                             onSuccess(resp);
609                         } catch (final Exception e) {
610                             caughtEx.set(e);
611                         }
612                     }
613                 }
614
615                 void onSuccess(final Object resp) throws Exception {
616                 }
617             }
618
619             class OnCommitFutureComplete extends OnFutureComplete {
620                 OnCommitFutureComplete() {
621                     super(CommitTransactionReply.SERIALIZABLE_CLASS);
622                 }
623
624                 @Override
625                 public void onComplete(final Throwable error, final Object resp) {
626                     super.onComplete(error, resp);
627                     commitLatch.countDown();
628                 }
629             }
630
631             class OnCanCommitFutureComplete extends OnFutureComplete {
632                 private final String transactionID;
633
634                 OnCanCommitFutureComplete(final String transactionID) {
635                     super(CanCommitTransactionReply.SERIALIZABLE_CLASS);
636                     this.transactionID = transactionID;
637                 }
638
639                 @Override
640                 void onSuccess(final Object resp) throws Exception {
641                     final CanCommitTransactionReply canCommitReply =
642                             CanCommitTransactionReply.fromSerializable(resp);
643                     assertEquals("Can commit", true, canCommitReply.getCanCommit());
644
645                     final Future<Object> commitFuture = Patterns.ask(shard,
646                             new CommitTransaction(transactionID).toSerializable(), timeout);
647                     commitFuture.onComplete(new OnCommitFutureComplete(), getSystem().dispatcher());
648                 }
649             }
650
651             canCommitFuture1.onComplete(new OnCanCommitFutureComplete(transactionID2),
652                     getSystem().dispatcher());
653
654             canCommitFuture2.onComplete(new OnCanCommitFutureComplete(transactionID3),
655                     getSystem().dispatcher());
656
657             final boolean done = commitLatch.await(timeoutSec, TimeUnit.SECONDS);
658
659             if(caughtEx.get() != null) {
660                 throw caughtEx.get();
661             }
662
663             assertEquals("Commits complete", true, done);
664
665             final InOrder inOrder = inOrder(cohort1, cohort2, cohort3);
666             inOrder.verify(cohort1).canCommit();
667             inOrder.verify(cohort1).preCommit();
668             inOrder.verify(cohort1).commit();
669             inOrder.verify(cohort2).canCommit();
670             inOrder.verify(cohort2).preCommit();
671             inOrder.verify(cohort2).commit();
672             inOrder.verify(cohort3).canCommit();
673             inOrder.verify(cohort3).preCommit();
674             inOrder.verify(cohort3).commit();
675
676             // Verify data in the data store.
677
678             verifyOuterListEntry(shard, 1);
679
680             verifyLastApplied(shard, 2);
681         }};
682     }
683
684     @Test
685     public void testBatchedModificationsWithNoCommitOnReady() throws Throwable {
686         new ShardTestKit(getSystem()) {{
687             final TestActorRef<Shard> shard = actorFactory.createTestActor(
688                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
689                     "testBatchedModificationsWithNoCommitOnReady");
690
691             waitUntilLeader(shard);
692
693             final String transactionID = "tx";
694             final FiniteDuration duration = duration("5 seconds");
695
696             final AtomicReference<ShardDataTreeCohort> mockCohort = new AtomicReference<>();
697             final ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() {
698                 @Override
699                 public ShardDataTreeCohort decorate(final String txID, final ShardDataTreeCohort actual) {
700                     if(mockCohort.get() == null) {
701                         mockCohort.set(createDelegatingMockCohort("cohort", actual));
702                     }
703
704                     return mockCohort.get();
705                 }
706             };
707
708             shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator);
709
710             // Send a BatchedModifications to start a transaction.
711
712             shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH,
713                     ImmutableNodes.containerNode(TestModel.TEST_QNAME), false, false, 1), getRef());
714             expectMsgClass(duration, BatchedModificationsReply.class);
715
716             // Send a couple more BatchedModifications.
717
718             shard.tell(newBatchedModifications(transactionID, TestModel.OUTER_LIST_PATH,
719                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), false, false, 2), getRef());
720             expectMsgClass(duration, BatchedModificationsReply.class);
721
722             shard.tell(newBatchedModifications(transactionID, YangInstanceIdentifier.builder(
723                     TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
724                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true, false, 3), getRef());
725             expectMsgClass(duration, ReadyTransactionReply.class);
726
727             // Send the CanCommitTransaction message.
728
729             shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
730             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
731                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
732             assertEquals("Can commit", true, canCommitReply.getCanCommit());
733
734             // Send the CanCommitTransaction message.
735
736             shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
737             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
738
739             final InOrder inOrder = inOrder(mockCohort.get());
740             inOrder.verify(mockCohort.get()).canCommit();
741             inOrder.verify(mockCohort.get()).preCommit();
742             inOrder.verify(mockCohort.get()).commit();
743
744             // Verify data in the data store.
745
746             verifyOuterListEntry(shard, 1);
747         }};
748     }
749
750     @Test
751     public void testBatchedModificationsWithCommitOnReady() throws Throwable {
752         new ShardTestKit(getSystem()) {{
753             final TestActorRef<Shard> shard = actorFactory.createTestActor(
754                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
755                     "testBatchedModificationsWithCommitOnReady");
756
757             waitUntilLeader(shard);
758
759             final String transactionID = "tx";
760             final FiniteDuration duration = duration("5 seconds");
761
762             final AtomicReference<ShardDataTreeCohort> mockCohort = new AtomicReference<>();
763             final ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() {
764                 @Override
765                 public ShardDataTreeCohort decorate(final String txID, final ShardDataTreeCohort actual) {
766                     if(mockCohort.get() == null) {
767                         mockCohort.set(createDelegatingMockCohort("cohort", actual));
768                     }
769
770                     return mockCohort.get();
771                 }
772             };
773
774             shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator);
775
776             // Send a BatchedModifications to start a transaction.
777
778             shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH,
779                     ImmutableNodes.containerNode(TestModel.TEST_QNAME), false, false, 1), getRef());
780             expectMsgClass(duration, BatchedModificationsReply.class);
781
782             // Send a couple more BatchedModifications.
783
784             shard.tell(newBatchedModifications(transactionID, TestModel.OUTER_LIST_PATH,
785                 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), false, false, 2), getRef());
786             expectMsgClass(duration, BatchedModificationsReply.class);
787
788             shard.tell(newBatchedModifications(transactionID, YangInstanceIdentifier.builder(
789                     TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
790                 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true, true, 3), getRef());
791
792             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
793
794             final InOrder inOrder = inOrder(mockCohort.get());
795             inOrder.verify(mockCohort.get()).canCommit();
796             inOrder.verify(mockCohort.get()).preCommit();
797             inOrder.verify(mockCohort.get()).commit();
798
799             // Verify data in the data store.
800
801             verifyOuterListEntry(shard, 1);
802         }};
803     }
804
805     @Test(expected=IllegalStateException.class)
806     public void testBatchedModificationsReadyWithIncorrectTotalMessageCount() throws Throwable {
807         new ShardTestKit(getSystem()) {{
808             final TestActorRef<Shard> shard = actorFactory.createTestActor(
809                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
810                     "testBatchedModificationsReadyWithIncorrectTotalMessageCount");
811
812             waitUntilLeader(shard);
813
814             final String transactionID = "tx1";
815             final BatchedModifications batched = new BatchedModifications(transactionID, DataStoreVersions.CURRENT_VERSION, null);
816             batched.setReady(true);
817             batched.setTotalMessagesSent(2);
818
819             shard.tell(batched, getRef());
820
821             final Failure failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
822
823             if(failure != null) {
824                 throw failure.cause();
825             }
826         }};
827     }
828
829     @Test
830     public void testBatchedModificationsWithOperationFailure() throws Throwable {
831         new ShardTestKit(getSystem()) {{
832             final TestActorRef<Shard> shard = actorFactory.createTestActor(
833                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
834                     "testBatchedModificationsWithOperationFailure");
835
836             waitUntilLeader(shard);
837
838             // Test merge with invalid data. An exception should occur when the merge is applied. Note that
839             // write will not validate the children for performance reasons.
840
841             String transactionID = "tx1";
842
843             ContainerNode invalidData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
844                     new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
845                         withChild(ImmutableNodes.leafNode(TestModel.JUNK_QNAME, "junk")).build();
846
847             BatchedModifications batched = new BatchedModifications(transactionID, CURRENT_VERSION, null);
848             batched.addModification(new MergeModification(TestModel.TEST_PATH, invalidData));
849             shard.tell(batched, getRef());
850             Failure failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
851
852             Throwable cause = failure.cause();
853
854             batched = new BatchedModifications(transactionID, DataStoreVersions.CURRENT_VERSION, null);
855             batched.setReady(true);
856             batched.setTotalMessagesSent(2);
857
858             shard.tell(batched, getRef());
859
860             failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
861             assertEquals("Failure cause", cause, failure.cause());
862         }};
863     }
864
865     @Test
866     public void testBatchedModificationsOnTransactionChain() throws Throwable {
867         new ShardTestKit(getSystem()) {{
868             final TestActorRef<Shard> shard = actorFactory.createTestActor(
869                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
870                     "testBatchedModificationsOnTransactionChain");
871
872             waitUntilLeader(shard);
873
874             final String transactionChainID = "txChain";
875             final String transactionID1 = "tx1";
876             final String transactionID2 = "tx2";
877
878             final FiniteDuration duration = duration("5 seconds");
879
880             // Send a BatchedModifications to start a chained write transaction and ready it.
881
882             final ContainerNode containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
883             final YangInstanceIdentifier path = TestModel.TEST_PATH;
884             shard.tell(newBatchedModifications(transactionID1, transactionChainID, path,
885                     containerNode, true, false, 1), getRef());
886             expectMsgClass(duration, ReadyTransactionReply.class);
887
888             // Create a read Tx on the same chain.
889
890             shard.tell(new CreateTransaction(transactionID2, TransactionType.READ_ONLY.ordinal(),
891                     transactionChainID, DataStoreVersions.CURRENT_VERSION).toSerializable(), getRef());
892
893             final CreateTransactionReply createReply = expectMsgClass(duration("3 seconds"), CreateTransactionReply.class);
894
895             getSystem().actorSelection(createReply.getTransactionPath()).tell(
896                     new ReadData(path, DataStoreVersions.CURRENT_VERSION), getRef());
897             final ReadDataReply readReply = expectMsgClass(duration("3 seconds"), ReadDataReply.class);
898             assertEquals("Read node", containerNode, readReply.getNormalizedNode());
899
900             // Commit the write transaction.
901
902             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
903             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
904                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
905             assertEquals("Can commit", true, canCommitReply.getCanCommit());
906
907             shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
908             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
909
910             // Verify data in the data store.
911
912             final NormalizedNode<?, ?> actualNode = readStore(shard, path);
913             assertEquals("Stored node", containerNode, actualNode);
914         }};
915     }
916
917     @Test
918     public void testOnBatchedModificationsWhenNotLeader() {
919         final AtomicBoolean overrideLeaderCalls = new AtomicBoolean();
920         new ShardTestKit(getSystem()) {{
921             final Creator<Shard> creator = new Creator<Shard>() {
922                 private static final long serialVersionUID = 1L;
923
924                 @Override
925                 public Shard create() throws Exception {
926                     return new Shard(newShardBuilder()) {
927                         @Override
928                         protected boolean isLeader() {
929                             return overrideLeaderCalls.get() ? false : super.isLeader();
930                         }
931
932                         @Override
933                         protected ActorSelection getLeader() {
934                             return overrideLeaderCalls.get() ? getSystem().actorSelection(getRef().path()) :
935                                 super.getLeader();
936                         }
937                     };
938                 }
939             };
940
941             final TestActorRef<Shard> shard = actorFactory.createTestActor(
942                     Props.create(new DelegatingShardCreator(creator)), "testOnBatchedModificationsWhenNotLeader");
943
944             waitUntilLeader(shard);
945
946             overrideLeaderCalls.set(true);
947
948             final BatchedModifications batched = new BatchedModifications("tx", DataStoreVersions.CURRENT_VERSION, "");
949
950             shard.tell(batched, ActorRef.noSender());
951
952             expectMsgEquals(batched);
953         }};
954     }
955
956     @Test
957     public void testTransactionMessagesWithNoLeader() {
958         new ShardTestKit(getSystem()) {{
959             dataStoreContextBuilder.customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName()).
960                 shardHeartbeatIntervalInMillis(50).shardElectionTimeoutFactor(1);
961             final TestActorRef<Shard> shard = actorFactory.createTestActor(
962                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
963                     "testTransactionMessagesWithNoLeader");
964
965             waitUntilNoLeader(shard);
966
967             shard.tell(new BatchedModifications("tx", DataStoreVersions.CURRENT_VERSION, ""), getRef());
968             Failure failure = expectMsgClass(Failure.class);
969             assertEquals("Failure cause type", NoShardLeaderException.class, failure.cause().getClass());
970
971             shard.tell(prepareForwardedReadyTransaction(mock(ShardDataTreeCohort.class), "tx",
972                     DataStoreVersions.CURRENT_VERSION, true), getRef());
973             failure = expectMsgClass(Failure.class);
974             assertEquals("Failure cause type", NoShardLeaderException.class, failure.cause().getClass());
975
976             shard.tell(new ReadyLocalTransaction("tx", mock(DataTreeModification.class), true), getRef());
977             failure = expectMsgClass(Failure.class);
978             assertEquals("Failure cause type", NoShardLeaderException.class, failure.cause().getClass());
979         }};
980     }
981
982     @Test
983     public void testReadyWithImmediateCommit() throws Exception{
984         testReadyWithImmediateCommit(true);
985         testReadyWithImmediateCommit(false);
986     }
987
988     private void testReadyWithImmediateCommit(final boolean readWrite) throws Exception{
989         new ShardTestKit(getSystem()) {{
990             final TestActorRef<Shard> shard = actorFactory.createTestActor(
991                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
992                     "testReadyWithImmediateCommit-" + readWrite);
993
994             waitUntilLeader(shard);
995
996             final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
997
998             final String transactionID = "tx1";
999             final MutableCompositeModification modification = new MutableCompositeModification();
1000             final NormalizedNode<?, ?> containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1001             final ShardDataTreeCohort cohort = setupMockWriteTransaction("cohort", dataStore,
1002                     TestModel.TEST_PATH, containerNode, modification);
1003
1004             final FiniteDuration duration = duration("5 seconds");
1005
1006             shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID, modification, true), getRef());
1007
1008             expectMsgClass(duration, ThreePhaseCommitCohortMessages.CommitTransactionReply.class);
1009
1010             final InOrder inOrder = inOrder(cohort);
1011             inOrder.verify(cohort).canCommit();
1012             inOrder.verify(cohort).preCommit();
1013             inOrder.verify(cohort).commit();
1014
1015             final NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.TEST_PATH);
1016             assertEquals(TestModel.TEST_QNAME.getLocalName(), containerNode, actualNode);
1017         }};
1018     }
1019
1020     @Test
1021     public void testReadyLocalTransactionWithImmediateCommit() throws Exception{
1022         new ShardTestKit(getSystem()) {{
1023             final TestActorRef<Shard> shard = actorFactory.createTestActor(
1024                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1025                     "testReadyLocalTransactionWithImmediateCommit");
1026
1027             waitUntilLeader(shard);
1028
1029             final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1030
1031             final DataTreeModification modification = dataStore.newModification();
1032
1033             final ContainerNode writeData = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1034             new WriteModification(TestModel.TEST_PATH, writeData).apply(modification);
1035             final MapNode mergeData = ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build();
1036             new MergeModification(TestModel.OUTER_LIST_PATH, mergeData).apply(modification);
1037
1038             final String txId = "tx1";
1039             modification.ready();
1040             final ReadyLocalTransaction readyMessage = new ReadyLocalTransaction(txId, modification, true);
1041
1042             shard.tell(readyMessage, getRef());
1043
1044             expectMsgClass(CommitTransactionReply.SERIALIZABLE_CLASS);
1045
1046             final NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.OUTER_LIST_PATH);
1047             assertEquals(TestModel.OUTER_LIST_QNAME.getLocalName(), mergeData, actualNode);
1048         }};
1049     }
1050
1051     @Test
1052     public void testReadyLocalTransactionWithThreePhaseCommit() throws Exception{
1053         new ShardTestKit(getSystem()) {{
1054             final TestActorRef<Shard> shard = actorFactory.createTestActor(
1055                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1056                     "testReadyLocalTransactionWithThreePhaseCommit");
1057
1058             waitUntilLeader(shard);
1059
1060             final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1061
1062             final DataTreeModification modification = dataStore.newModification();
1063
1064             final ContainerNode writeData = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1065             new WriteModification(TestModel.TEST_PATH, writeData).apply(modification);
1066             final MapNode mergeData = ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build();
1067             new MergeModification(TestModel.OUTER_LIST_PATH, mergeData).apply(modification);
1068
1069             final String txId = "tx1";
1070                 modification.ready();
1071             final ReadyLocalTransaction readyMessage = new ReadyLocalTransaction(txId, modification, false);
1072
1073             shard.tell(readyMessage, getRef());
1074
1075             expectMsgClass(ReadyTransactionReply.class);
1076
1077             // Send the CanCommitTransaction message.
1078
1079             shard.tell(new CanCommitTransaction(txId).toSerializable(), getRef());
1080             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1081                     expectMsgClass(CanCommitTransactionReply.SERIALIZABLE_CLASS));
1082             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1083
1084             // Send the CanCommitTransaction message.
1085
1086             shard.tell(new CommitTransaction(txId).toSerializable(), getRef());
1087             expectMsgClass(CommitTransactionReply.SERIALIZABLE_CLASS);
1088
1089             final NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.OUTER_LIST_PATH);
1090             assertEquals(TestModel.OUTER_LIST_QNAME.getLocalName(), mergeData, actualNode);
1091         }};
1092     }
1093
1094     @Test
1095     public void testCommitWithPersistenceDisabled() throws Throwable {
1096         testCommitWithPersistenceDisabled(true);
1097         testCommitWithPersistenceDisabled(false);
1098     }
1099
1100     private void testCommitWithPersistenceDisabled(final boolean readWrite) throws Throwable {
1101         dataStoreContextBuilder.persistent(false);
1102         new ShardTestKit(getSystem()) {{
1103             final TestActorRef<Shard> shard = actorFactory.createTestActor(
1104                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1105                     "testCommitWithPersistenceDisabled-" + readWrite);
1106
1107             waitUntilLeader(shard);
1108
1109             final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1110
1111             // Setup a simulated transactions with a mock cohort.
1112
1113             final String transactionID = "tx";
1114             final MutableCompositeModification modification = new MutableCompositeModification();
1115             final NormalizedNode<?, ?> containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1116             final ShardDataTreeCohort cohort = setupMockWriteTransaction("cohort", dataStore,
1117                 TestModel.TEST_PATH, containerNode, modification);
1118
1119             final FiniteDuration duration = duration("5 seconds");
1120
1121             shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID, modification), getRef());
1122             expectMsgClass(duration, ReadyTransactionReply.class);
1123
1124             // Send the CanCommitTransaction message.
1125
1126             shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
1127             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1128                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1129             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1130
1131             // Send the CanCommitTransaction message.
1132
1133             shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
1134             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1135
1136             final InOrder inOrder = inOrder(cohort);
1137             inOrder.verify(cohort).canCommit();
1138             inOrder.verify(cohort).preCommit();
1139             inOrder.verify(cohort).commit();
1140
1141             final NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.TEST_PATH);
1142             assertEquals(TestModel.TEST_QNAME.getLocalName(), containerNode, actualNode);
1143         }};
1144     }
1145
1146     @Test
1147     public void testCommitWhenTransactionHasNoModifications() {
1148         testCommitWhenTransactionHasNoModifications(true);
1149         testCommitWhenTransactionHasNoModifications(false);
1150     }
1151
1152     private void testCommitWhenTransactionHasNoModifications(final boolean readWrite){
1153         // Note that persistence is enabled which would normally result in the entry getting written to the journal
1154         // but here that need not happen
1155         new ShardTestKit(getSystem()) {
1156             {
1157                 final TestActorRef<Shard> shard = actorFactory.createTestActor(
1158                         newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1159                         "testCommitWhenTransactionHasNoModifications-" + readWrite);
1160
1161                 waitUntilLeader(shard);
1162
1163                 final String transactionID = "tx1";
1164                 final MutableCompositeModification modification = new MutableCompositeModification();
1165                 final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1166                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1167                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).preCommit();
1168                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).commit();
1169                 doReturn(mockUnmodifiedCandidate("cohort1-candidate")).when(cohort).getCandidate();
1170
1171                 final FiniteDuration duration = duration("5 seconds");
1172
1173                 shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID, modification), getRef());
1174                 expectMsgClass(duration, ReadyTransactionReply.class);
1175
1176                 // Send the CanCommitTransaction message.
1177
1178                 shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
1179                 final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1180                         expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1181                 assertEquals("Can commit", true, canCommitReply.getCanCommit());
1182
1183                 shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
1184                 expectMsgClass(duration, ThreePhaseCommitCohortMessages.CommitTransactionReply.class);
1185
1186                 final InOrder inOrder = inOrder(cohort);
1187                 inOrder.verify(cohort).canCommit();
1188                 inOrder.verify(cohort).preCommit();
1189                 inOrder.verify(cohort).commit();
1190
1191                 shard.tell(Shard.GET_SHARD_MBEAN_MESSAGE, getRef());
1192                 final ShardStats shardStats = expectMsgClass(duration, ShardStats.class);
1193
1194                 // Use MBean for verification
1195                 // Committed transaction count should increase as usual
1196                 assertEquals(1,shardStats.getCommittedTransactionsCount());
1197
1198                 // Commit index should not advance because this does not go into the journal
1199                 assertEquals(-1, shardStats.getCommitIndex());
1200             }
1201         };
1202     }
1203
1204     @Test
1205     public void testCommitWhenTransactionHasModifications() {
1206         testCommitWhenTransactionHasModifications(true);
1207         testCommitWhenTransactionHasModifications(false);
1208     }
1209
1210     private void testCommitWhenTransactionHasModifications(final boolean readWrite){
1211         new ShardTestKit(getSystem()) {
1212             {
1213                 final TestActorRef<Shard> shard = actorFactory.createTestActor(
1214                         newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1215                         "testCommitWhenTransactionHasModifications-" + readWrite);
1216
1217                 waitUntilLeader(shard);
1218
1219                 final String transactionID = "tx1";
1220                 final MutableCompositeModification modification = new MutableCompositeModification();
1221                 modification.addModification(new DeleteModification(YangInstanceIdentifier.builder().build()));
1222                 final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1223                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1224                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).preCommit();
1225                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).commit();
1226                 doReturn(mockCandidate("cohort1-candidate")).when(cohort).getCandidate();
1227
1228                 final FiniteDuration duration = duration("5 seconds");
1229
1230                 shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID, modification), getRef());
1231                 expectMsgClass(duration, ReadyTransactionReply.class);
1232
1233                 // Send the CanCommitTransaction message.
1234
1235                 shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
1236                 final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1237                         expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1238                 assertEquals("Can commit", true, canCommitReply.getCanCommit());
1239
1240                 shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
1241                 expectMsgClass(duration, ThreePhaseCommitCohortMessages.CommitTransactionReply.class);
1242
1243                 final InOrder inOrder = inOrder(cohort);
1244                 inOrder.verify(cohort).canCommit();
1245                 inOrder.verify(cohort).preCommit();
1246                 inOrder.verify(cohort).commit();
1247
1248                 shard.tell(Shard.GET_SHARD_MBEAN_MESSAGE, getRef());
1249                 final ShardStats shardStats = expectMsgClass(duration, ShardStats.class);
1250
1251                 // Use MBean for verification
1252                 // Committed transaction count should increase as usual
1253                 assertEquals(1, shardStats.getCommittedTransactionsCount());
1254
1255                 // Commit index should advance as we do not have an empty modification
1256                 assertEquals(0, shardStats.getCommitIndex());
1257             }
1258         };
1259     }
1260
1261     @Test
1262     public void testCommitPhaseFailure() throws Throwable {
1263         testCommitPhaseFailure(true);
1264         testCommitPhaseFailure(false);
1265     }
1266
1267     private void testCommitPhaseFailure(final boolean readWrite) throws Throwable {
1268         new ShardTestKit(getSystem()) {{
1269             final TestActorRef<Shard> shard = actorFactory.createTestActor(
1270                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1271                     "testCommitPhaseFailure-" + readWrite);
1272
1273             waitUntilLeader(shard);
1274
1275             // Setup 2 simulated transactions with mock cohorts. The first one fails in the
1276             // commit phase.
1277
1278             final String transactionID1 = "tx1";
1279             final MutableCompositeModification modification1 = new MutableCompositeModification();
1280             final ShardDataTreeCohort cohort1 = mock(ShardDataTreeCohort.class, "cohort1");
1281             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
1282             doReturn(Futures.immediateFuture(null)).when(cohort1).preCommit();
1283             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort1).commit();
1284             doReturn(mockCandidate("cohort1-candidate")).when(cohort1).getCandidate();
1285
1286             final String transactionID2 = "tx2";
1287             final MutableCompositeModification modification2 = new MutableCompositeModification();
1288             final ShardDataTreeCohort cohort2 = mock(ShardDataTreeCohort.class, "cohort2");
1289             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
1290
1291             final FiniteDuration duration = duration("5 seconds");
1292             final Timeout timeout = new Timeout(duration);
1293
1294             shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort1, transactionID1, modification1), getRef());
1295             expectMsgClass(duration, ReadyTransactionReply.class);
1296
1297             shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort2, transactionID2, modification2), getRef());
1298             expectMsgClass(duration, ReadyTransactionReply.class);
1299
1300             // Send the CanCommitTransaction message for the first Tx.
1301
1302             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1303             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1304                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1305             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1306
1307             // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
1308             // processed after the first Tx completes.
1309
1310             final Future<Object> canCommitFuture = Patterns.ask(shard,
1311                     new CanCommitTransaction(transactionID2).toSerializable(), timeout);
1312
1313             // Send the CommitTransaction message for the first Tx. This should send back an error
1314             // and trigger the 2nd Tx to proceed.
1315
1316             shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
1317             expectMsgClass(duration, akka.actor.Status.Failure.class);
1318
1319             // Wait for the 2nd Tx to complete the canCommit phase.
1320
1321             final CountDownLatch latch = new CountDownLatch(1);
1322             canCommitFuture.onComplete(new OnComplete<Object>() {
1323                 @Override
1324                 public void onComplete(final Throwable t, final Object resp) {
1325                     latch.countDown();
1326                 }
1327             }, getSystem().dispatcher());
1328
1329             assertEquals("2nd CanCommit complete", true, latch.await(5, TimeUnit.SECONDS));
1330
1331             final InOrder inOrder = inOrder(cohort1, cohort2);
1332             inOrder.verify(cohort1).canCommit();
1333             inOrder.verify(cohort1).preCommit();
1334             inOrder.verify(cohort1).commit();
1335             inOrder.verify(cohort2).canCommit();
1336         }};
1337     }
1338
1339     @Test
1340     public void testPreCommitPhaseFailure() throws Throwable {
1341         testPreCommitPhaseFailure(true);
1342         testPreCommitPhaseFailure(false);
1343     }
1344
1345     private void testPreCommitPhaseFailure(final boolean readWrite) throws Throwable {
1346         new ShardTestKit(getSystem()) {{
1347             final TestActorRef<Shard> shard = actorFactory.createTestActor(
1348                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1349                     "testPreCommitPhaseFailure-" + readWrite);
1350
1351             waitUntilLeader(shard);
1352
1353             final String transactionID1 = "tx1";
1354             final MutableCompositeModification modification1 = new MutableCompositeModification();
1355             final ShardDataTreeCohort cohort1 = mock(ShardDataTreeCohort.class, "cohort1");
1356             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
1357             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort1).preCommit();
1358
1359             final String transactionID2 = "tx2";
1360             final MutableCompositeModification modification2 = new MutableCompositeModification();
1361             final ShardDataTreeCohort cohort2 = mock(ShardDataTreeCohort.class, "cohort2");
1362             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
1363
1364             final FiniteDuration duration = duration("5 seconds");
1365             final Timeout timeout = new Timeout(duration);
1366
1367             shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort1, transactionID1, modification1), getRef());
1368             expectMsgClass(duration, ReadyTransactionReply.class);
1369
1370             shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort2, transactionID2, modification2), getRef());
1371             expectMsgClass(duration, ReadyTransactionReply.class);
1372
1373             // Send the CanCommitTransaction message for the first Tx.
1374
1375             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1376             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1377                 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1378             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1379
1380             // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
1381             // processed after the first Tx completes.
1382
1383             final Future<Object> canCommitFuture = Patterns.ask(shard,
1384                     new CanCommitTransaction(transactionID2).toSerializable(), timeout);
1385
1386             // Send the CommitTransaction message for the first Tx. This should send back an error
1387             // and trigger the 2nd Tx to proceed.
1388
1389             shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
1390             expectMsgClass(duration, akka.actor.Status.Failure.class);
1391
1392             // Wait for the 2nd Tx to complete the canCommit phase.
1393
1394             final CountDownLatch latch = new CountDownLatch(1);
1395             canCommitFuture.onComplete(new OnComplete<Object>() {
1396                 @Override
1397                 public void onComplete(final Throwable t, final Object resp) {
1398                     latch.countDown();
1399                 }
1400             }, getSystem().dispatcher());
1401
1402             assertEquals("2nd CanCommit complete", true, latch.await(5, TimeUnit.SECONDS));
1403
1404             final InOrder inOrder = inOrder(cohort1, cohort2);
1405             inOrder.verify(cohort1).canCommit();
1406             inOrder.verify(cohort1).preCommit();
1407             inOrder.verify(cohort2).canCommit();
1408         }};
1409     }
1410
1411     @Test
1412     public void testCanCommitPhaseFailure() throws Throwable {
1413         testCanCommitPhaseFailure(true);
1414         testCanCommitPhaseFailure(false);
1415     }
1416
1417     private void testCanCommitPhaseFailure(final boolean readWrite) throws Throwable {
1418         new ShardTestKit(getSystem()) {{
1419             final TestActorRef<Shard> shard = actorFactory.createTestActor(
1420                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1421                     "testCanCommitPhaseFailure-" + readWrite);
1422
1423             waitUntilLeader(shard);
1424
1425             final FiniteDuration duration = duration("5 seconds");
1426
1427             final String transactionID1 = "tx1";
1428             final MutableCompositeModification modification = new MutableCompositeModification();
1429             final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1430             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).canCommit();
1431
1432             shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID1, modification), getRef());
1433             expectMsgClass(duration, ReadyTransactionReply.class);
1434
1435             // Send the CanCommitTransaction message.
1436
1437             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1438             expectMsgClass(duration, akka.actor.Status.Failure.class);
1439
1440             // Send another can commit to ensure the failed one got cleaned up.
1441
1442             reset(cohort);
1443
1444             final String transactionID2 = "tx2";
1445             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1446
1447             shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID2, modification), getRef());
1448             expectMsgClass(duration, ReadyTransactionReply.class);
1449
1450             shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
1451             final CanCommitTransactionReply reply = CanCommitTransactionReply.fromSerializable(
1452                     expectMsgClass(CanCommitTransactionReply.SERIALIZABLE_CLASS));
1453             assertEquals("getCanCommit", true, reply.getCanCommit());
1454         }};
1455     }
1456
1457     @Test
1458     public void testCanCommitPhaseFalseResponse() throws Throwable {
1459         testCanCommitPhaseFalseResponse(true);
1460         testCanCommitPhaseFalseResponse(false);
1461     }
1462
1463     private void testCanCommitPhaseFalseResponse(final boolean readWrite) throws Throwable {
1464         new ShardTestKit(getSystem()) {{
1465             final TestActorRef<Shard> shard = actorFactory.createTestActor(
1466                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1467                     "testCanCommitPhaseFalseResponse-" + readWrite);
1468
1469             waitUntilLeader(shard);
1470
1471             final FiniteDuration duration = duration("5 seconds");
1472
1473             final String transactionID1 = "tx1";
1474             final MutableCompositeModification modification = new MutableCompositeModification();
1475             final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1476             doReturn(Futures.immediateFuture(Boolean.FALSE)).when(cohort).canCommit();
1477
1478             shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID1, modification), getRef());
1479             expectMsgClass(duration, ReadyTransactionReply.class);
1480
1481             // Send the CanCommitTransaction message.
1482
1483             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1484             CanCommitTransactionReply reply = CanCommitTransactionReply.fromSerializable(
1485                     expectMsgClass(CanCommitTransactionReply.SERIALIZABLE_CLASS));
1486             assertEquals("getCanCommit", false, reply.getCanCommit());
1487
1488             // Send another can commit to ensure the failed one got cleaned up.
1489
1490             reset(cohort);
1491
1492             final String transactionID2 = "tx2";
1493             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1494
1495             shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID2, modification), getRef());
1496             expectMsgClass(duration, ReadyTransactionReply.class);
1497
1498             shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
1499             reply = CanCommitTransactionReply.fromSerializable(
1500                     expectMsgClass(CanCommitTransactionReply.SERIALIZABLE_CLASS));
1501             assertEquals("getCanCommit", true, reply.getCanCommit());
1502         }};
1503     }
1504
1505     @Test
1506     public void testImmediateCommitWithCanCommitPhaseFailure() throws Throwable {
1507         testImmediateCommitWithCanCommitPhaseFailure(true);
1508         testImmediateCommitWithCanCommitPhaseFailure(false);
1509     }
1510
1511     private void testImmediateCommitWithCanCommitPhaseFailure(final boolean readWrite) throws Throwable {
1512         new ShardTestKit(getSystem()) {{
1513             final TestActorRef<Shard> shard = actorFactory.createTestActor(
1514                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1515                     "testImmediateCommitWithCanCommitPhaseFailure-" + readWrite);
1516
1517             waitUntilLeader(shard);
1518
1519             final FiniteDuration duration = duration("5 seconds");
1520
1521             final String transactionID1 = "tx1";
1522             final MutableCompositeModification modification = new MutableCompositeModification();
1523             final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1524             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).canCommit();
1525
1526             shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID1, modification, true), getRef());
1527
1528             expectMsgClass(duration, akka.actor.Status.Failure.class);
1529
1530             // Send another can commit to ensure the failed one got cleaned up.
1531
1532             reset(cohort);
1533
1534             final String transactionID2 = "tx2";
1535             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1536             doReturn(Futures.immediateFuture(null)).when(cohort).preCommit();
1537             doReturn(Futures.immediateFuture(null)).when(cohort).commit();
1538             final DataTreeCandidateTip candidate = mock(DataTreeCandidateTip.class);
1539             final DataTreeCandidateNode candidateRoot = mock(DataTreeCandidateNode.class);
1540             doReturn(ModificationType.UNMODIFIED).when(candidateRoot).getModificationType();
1541             doReturn(candidateRoot).when(candidate).getRootNode();
1542             doReturn(candidate).when(cohort).getCandidate();
1543
1544             shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID2, modification, true), getRef());
1545
1546             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1547         }};
1548     }
1549
1550     @Test
1551     public void testImmediateCommitWithCanCommitPhaseFalseResponse() throws Throwable {
1552         testImmediateCommitWithCanCommitPhaseFalseResponse(true);
1553         testImmediateCommitWithCanCommitPhaseFalseResponse(false);
1554     }
1555
1556     private void testImmediateCommitWithCanCommitPhaseFalseResponse(final boolean readWrite) throws Throwable {
1557         new ShardTestKit(getSystem()) {{
1558             final TestActorRef<Shard> shard = actorFactory.createTestActor(
1559                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1560                     "testImmediateCommitWithCanCommitPhaseFalseResponse-" + readWrite);
1561
1562             waitUntilLeader(shard);
1563
1564             final FiniteDuration duration = duration("5 seconds");
1565
1566             final String transactionID = "tx1";
1567             final MutableCompositeModification modification = new MutableCompositeModification();
1568             final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1569             doReturn(Futures.immediateFuture(Boolean.FALSE)).when(cohort).canCommit();
1570
1571             shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID, modification, true), getRef());
1572
1573             expectMsgClass(duration, akka.actor.Status.Failure.class);
1574
1575             // Send another can commit to ensure the failed one got cleaned up.
1576
1577             reset(cohort);
1578
1579             final String transactionID2 = "tx2";
1580             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1581             doReturn(Futures.immediateFuture(null)).when(cohort).preCommit();
1582             doReturn(Futures.immediateFuture(null)).when(cohort).commit();
1583             final DataTreeCandidateTip candidate = mock(DataTreeCandidateTip.class);
1584             final DataTreeCandidateNode candidateRoot = mock(DataTreeCandidateNode.class);
1585             doReturn(ModificationType.UNMODIFIED).when(candidateRoot).getModificationType();
1586             doReturn(candidateRoot).when(candidate).getRootNode();
1587             doReturn(candidate).when(cohort).getCandidate();
1588
1589             shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID2, modification, true), getRef());
1590
1591             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1592         }};
1593     }
1594
1595     @Test
1596     public void testAbortBeforeFinishCommit() throws Throwable {
1597         testAbortBeforeFinishCommit(true);
1598         testAbortBeforeFinishCommit(false);
1599     }
1600
1601     private void testAbortBeforeFinishCommit(final boolean readWrite) throws Throwable {
1602         new ShardTestKit(getSystem()) {{
1603             final TestActorRef<Shard> shard = actorFactory.createTestActor(
1604                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1605                     "testAbortBeforeFinishCommit-" + readWrite);
1606
1607             waitUntilLeader(shard);
1608
1609             final FiniteDuration duration = duration("5 seconds");
1610             final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1611
1612             final String transactionID = "tx1";
1613             final Function<ShardDataTreeCohort, ListenableFuture<Void>> preCommit =
1614                           new Function<ShardDataTreeCohort, ListenableFuture<Void>>() {
1615                 @Override
1616                 public ListenableFuture<Void> apply(final ShardDataTreeCohort cohort) {
1617                     final ListenableFuture<Void> preCommitFuture = cohort.preCommit();
1618
1619                     // Simulate an AbortTransaction message occurring during replication, after
1620                     // persisting and before finishing the commit to the in-memory store.
1621                     // We have no followers so due to optimizations in the RaftActor, it does not
1622                     // attempt replication and thus we can't send an AbortTransaction message b/c
1623                     // it would be processed too late after CommitTransaction completes. So we'll
1624                     // simulate an AbortTransaction message occurring during replication by calling
1625                     // the shard directly.
1626                     //
1627                     shard.underlyingActor().doAbortTransaction(transactionID, null);
1628
1629                     return preCommitFuture;
1630                 }
1631             };
1632
1633             final MutableCompositeModification modification = new MutableCompositeModification();
1634             final ShardDataTreeCohort cohort = setupMockWriteTransaction("cohort1", dataStore,
1635                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME),
1636                     modification, preCommit);
1637
1638             shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID, modification), getRef());
1639             expectMsgClass(duration, ReadyTransactionReply.class);
1640
1641             shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
1642             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1643                 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1644             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1645
1646             shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
1647             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1648
1649             final NormalizedNode<?, ?> node = readStore(shard, TestModel.TEST_PATH);
1650
1651             // Since we're simulating an abort occurring during replication and before finish commit,
1652             // the data should still get written to the in-memory store since we've gotten past
1653             // canCommit and preCommit and persisted the data.
1654             assertNotNull(TestModel.TEST_QNAME.getLocalName() + " not found", node);
1655         }};
1656     }
1657
1658     @Test
1659     public void testTransactionCommitTimeout() throws Throwable {
1660         testTransactionCommitTimeout(true);
1661         testTransactionCommitTimeout(false);
1662     }
1663
1664     private void testTransactionCommitTimeout(final boolean readWrite) throws Throwable {
1665         dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1);
1666
1667         new ShardTestKit(getSystem()) {{
1668             final TestActorRef<Shard> shard = actorFactory.createTestActor(
1669                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1670                     "testTransactionCommitTimeout-" + readWrite);
1671
1672             waitUntilLeader(shard);
1673
1674             final FiniteDuration duration = duration("5 seconds");
1675
1676             final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1677
1678             writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1679             writeToStore(shard, TestModel.OUTER_LIST_PATH,
1680                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
1681
1682             // Create 1st Tx - will timeout
1683
1684             final String transactionID1 = "tx1";
1685             final MutableCompositeModification modification1 = new MutableCompositeModification();
1686             final ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
1687                     YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
1688                         .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
1689                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1),
1690                     modification1);
1691
1692             // Create 2nd Tx
1693
1694             final String transactionID2 = "tx3";
1695             final MutableCompositeModification modification2 = new MutableCompositeModification();
1696             final YangInstanceIdentifier listNodePath = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
1697                 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build();
1698             final ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort3", dataStore,
1699                     listNodePath,
1700                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2),
1701                     modification2);
1702
1703             // Ready the Tx's
1704
1705             shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort1, transactionID1, modification1), getRef());
1706             expectMsgClass(duration, ReadyTransactionReply.class);
1707
1708             shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort2, transactionID2, modification2), getRef());
1709             expectMsgClass(duration, ReadyTransactionReply.class);
1710
1711             // canCommit 1st Tx. We don't send the commit so it should timeout.
1712
1713             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1714             expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
1715
1716             // canCommit the 2nd Tx - it should complete after the 1st Tx times out.
1717
1718             shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
1719             expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
1720
1721             // Try to commit the 1st Tx - should fail as it's not the current Tx.
1722
1723             shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
1724             expectMsgClass(duration, akka.actor.Status.Failure.class);
1725
1726             // Commit the 2nd Tx.
1727
1728             shard.tell(new CommitTransaction(transactionID2).toSerializable(), getRef());
1729             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1730
1731             final NormalizedNode<?, ?> node = readStore(shard, listNodePath);
1732             assertNotNull(listNodePath + " not found", node);
1733         }};
1734     }
1735
1736     @Test
1737     public void testTransactionCommitQueueCapacityExceeded() throws Throwable {
1738         dataStoreContextBuilder.shardTransactionCommitQueueCapacity(2);
1739
1740         new ShardTestKit(getSystem()) {{
1741             final TestActorRef<Shard> shard = actorFactory.createTestActor(
1742                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1743                     "testTransactionCommitQueueCapacityExceeded");
1744
1745             waitUntilLeader(shard);
1746
1747             final FiniteDuration duration = duration("5 seconds");
1748
1749             final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1750
1751             final String transactionID1 = "tx1";
1752             final MutableCompositeModification modification1 = new MutableCompositeModification();
1753             final ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
1754                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
1755
1756             final String transactionID2 = "tx2";
1757             final MutableCompositeModification modification2 = new MutableCompositeModification();
1758             final ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
1759                     TestModel.OUTER_LIST_PATH,
1760                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
1761                     modification2);
1762
1763             final String transactionID3 = "tx3";
1764             final MutableCompositeModification modification3 = new MutableCompositeModification();
1765             final ShardDataTreeCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
1766                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification3);
1767
1768             // Ready the Tx's
1769
1770             shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort1, transactionID1, modification1), getRef());
1771             expectMsgClass(duration, ReadyTransactionReply.class);
1772
1773             shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort2, transactionID2, modification2), getRef());
1774             expectMsgClass(duration, ReadyTransactionReply.class);
1775
1776             // The 3rd Tx should exceed queue capacity and fail.
1777
1778             shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort3, transactionID3, modification3), getRef());
1779             expectMsgClass(duration, akka.actor.Status.Failure.class);
1780
1781             // canCommit 1st Tx.
1782
1783             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1784             expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
1785
1786             // canCommit the 2nd Tx - it should get queued.
1787
1788             shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
1789
1790             // canCommit the 3rd Tx - should exceed queue capacity and fail.
1791
1792             shard.tell(new CanCommitTransaction(transactionID3).toSerializable(), getRef());
1793             expectMsgClass(duration, akka.actor.Status.Failure.class);
1794         }};
1795     }
1796
1797     @Test
1798     public void testTransactionCommitWithPriorExpiredCohortEntries() throws Throwable {
1799         dataStoreContextBuilder.shardCommitQueueExpiryTimeoutInMillis(1300).shardTransactionCommitTimeoutInSeconds(1);
1800
1801         new ShardTestKit(getSystem()) {{
1802             final TestActorRef<Shard> shard = actorFactory.createTestActor(
1803                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1804                     "testTransactionCommitWithPriorExpiredCohortEntries");
1805
1806             waitUntilLeader(shard);
1807
1808             final FiniteDuration duration = duration("5 seconds");
1809
1810             final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1811
1812             final String transactionID1 = "tx1";
1813             final MutableCompositeModification modification1 = new MutableCompositeModification();
1814             final ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
1815                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
1816
1817             shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort1, transactionID1, modification1), getRef());
1818             expectMsgClass(duration, ReadyTransactionReply.class);
1819
1820             final String transactionID2 = "tx2";
1821             final MutableCompositeModification modification2 = new MutableCompositeModification();
1822             final ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
1823                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification2);
1824
1825             shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort2, transactionID2, modification2), getRef());
1826             expectMsgClass(duration, ReadyTransactionReply.class);
1827
1828             final String transactionID3 = "tx3";
1829             final MutableCompositeModification modification3 = new MutableCompositeModification();
1830             final ShardDataTreeCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
1831                     TestModel.TEST2_PATH, ImmutableNodes.containerNode(TestModel.TEST2_QNAME), modification3);
1832
1833             shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort3, transactionID3, modification3), getRef());
1834             expectMsgClass(duration, ReadyTransactionReply.class);
1835
1836             // All Tx's are readied. We'll send canCommit for the last one but not the others. The others
1837             // should expire from the queue and the last one should be processed.
1838
1839             shard.tell(new CanCommitTransaction(transactionID3).toSerializable(), getRef());
1840             expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
1841         }};
1842     }
1843
1844     @Test
1845     public void testTransactionCommitWithSubsequentExpiredCohortEntry() throws Throwable {
1846         dataStoreContextBuilder.shardCommitQueueExpiryTimeoutInMillis(1300).shardTransactionCommitTimeoutInSeconds(1);
1847
1848         new ShardTestKit(getSystem()) {{
1849             final TestActorRef<Shard> shard = actorFactory.createTestActor(
1850                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1851                     "testTransactionCommitWithSubsequentExpiredCohortEntry");
1852
1853             waitUntilLeader(shard);
1854
1855             final FiniteDuration duration = duration("5 seconds");
1856
1857             final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1858
1859             final String transactionID1 = "tx1";
1860             final MutableCompositeModification modification1 = new MutableCompositeModification();
1861             final ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
1862                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
1863
1864             shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort1, transactionID1, modification1), getRef());
1865             expectMsgClass(duration, ReadyTransactionReply.class);
1866
1867             // CanCommit the first one so it's the current in-progress CohortEntry.
1868
1869             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1870             expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
1871
1872             // Ready the second Tx.
1873
1874             final String transactionID2 = "tx2";
1875             final MutableCompositeModification modification2 = new MutableCompositeModification();
1876             final ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
1877                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification2);
1878
1879             shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort2, transactionID2, modification2), getRef());
1880             expectMsgClass(duration, ReadyTransactionReply.class);
1881
1882             // Ready the third Tx.
1883
1884             final String transactionID3 = "tx3";
1885             final DataTreeModification modification3 = dataStore.newModification();
1886             new WriteModification(TestModel.TEST2_PATH, ImmutableNodes.containerNode(TestModel.TEST2_QNAME))
1887                     .apply(modification3);
1888                 modification3.ready();
1889             final ReadyLocalTransaction readyMessage = new ReadyLocalTransaction(transactionID3, modification3, true);
1890
1891             shard.tell(readyMessage, getRef());
1892
1893             // Commit the first Tx. After completing, the second should expire from the queue and the third
1894             // Tx committed.
1895
1896             shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
1897             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1898
1899             // Expect commit reply from the third Tx.
1900
1901             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1902
1903             final NormalizedNode<?, ?> node = readStore(shard, TestModel.TEST2_PATH);
1904             assertNotNull(TestModel.TEST2_PATH + " not found", node);
1905         }};
1906     }
1907
1908     @Test
1909     public void testCanCommitBeforeReadyFailure() throws Throwable {
1910         new ShardTestKit(getSystem()) {{
1911             final TestActorRef<Shard> shard = actorFactory.createTestActor(
1912                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1913                     "testCanCommitBeforeReadyFailure");
1914
1915             shard.tell(new CanCommitTransaction("tx").toSerializable(), getRef());
1916             expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
1917         }};
1918     }
1919
1920     @Test
1921     public void testAbortCurrentTransaction() throws Throwable {
1922         testAbortCurrentTransaction(true);
1923         testAbortCurrentTransaction(false);
1924     }
1925
1926     private void testAbortCurrentTransaction(final boolean readWrite) throws Throwable {
1927         new ShardTestKit(getSystem()) {{
1928             final TestActorRef<Shard> shard = actorFactory.createTestActor(
1929                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1930                     "testAbortCurrentTransaction-" + readWrite);
1931
1932             waitUntilLeader(shard);
1933
1934             // Setup 2 simulated transactions with mock cohorts. The first one will be aborted.
1935
1936             final String transactionID1 = "tx1";
1937             final MutableCompositeModification modification1 = new MutableCompositeModification();
1938             final ShardDataTreeCohort cohort1 = mock(ShardDataTreeCohort.class, "cohort1");
1939             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
1940             doReturn(Futures.immediateFuture(null)).when(cohort1).abort();
1941
1942             final String transactionID2 = "tx2";
1943             final MutableCompositeModification modification2 = new MutableCompositeModification();
1944             final ShardDataTreeCohort cohort2 = mock(ShardDataTreeCohort.class, "cohort2");
1945             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
1946
1947             final FiniteDuration duration = duration("5 seconds");
1948             final Timeout timeout = new Timeout(duration);
1949
1950             shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort1, transactionID1, modification1), getRef());
1951             expectMsgClass(duration, ReadyTransactionReply.class);
1952
1953             shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort2, transactionID2, modification2), getRef());
1954             expectMsgClass(duration, ReadyTransactionReply.class);
1955
1956             // Send the CanCommitTransaction message for the first Tx.
1957
1958             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1959             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1960                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1961             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1962
1963             // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
1964             // processed after the first Tx completes.
1965
1966             final Future<Object> canCommitFuture = Patterns.ask(shard,
1967                     new CanCommitTransaction(transactionID2).toSerializable(), timeout);
1968
1969             // Send the AbortTransaction message for the first Tx. This should trigger the 2nd
1970             // Tx to proceed.
1971
1972             shard.tell(new AbortTransaction(transactionID1).toSerializable(), getRef());
1973             expectMsgClass(duration, AbortTransactionReply.SERIALIZABLE_CLASS);
1974
1975             // Wait for the 2nd Tx to complete the canCommit phase.
1976
1977             Await.ready(canCommitFuture, duration);
1978
1979             final InOrder inOrder = inOrder(cohort1, cohort2);
1980             inOrder.verify(cohort1).canCommit();
1981             inOrder.verify(cohort2).canCommit();
1982         }};
1983     }
1984
1985     @Test
1986     public void testAbortQueuedTransaction() throws Throwable {
1987         testAbortQueuedTransaction(true);
1988         testAbortQueuedTransaction(false);
1989     }
1990
1991     private void testAbortQueuedTransaction(final boolean readWrite) throws Throwable {
1992         dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1);
1993         new ShardTestKit(getSystem()) {{
1994             final AtomicReference<CountDownLatch> cleaupCheckLatch = new AtomicReference<>();
1995             @SuppressWarnings("serial")
1996             final Creator<Shard> creator = new Creator<Shard>() {
1997                 @Override
1998                 public Shard create() throws Exception {
1999                     return new Shard(newShardBuilder()) {
2000                         @Override
2001                         public void onReceiveCommand(final Object message) throws Exception {
2002                             super.onReceiveCommand(message);
2003                             if(message.equals(TX_COMMIT_TIMEOUT_CHECK_MESSAGE)) {
2004                                 if(cleaupCheckLatch.get() != null) {
2005                                     cleaupCheckLatch.get().countDown();
2006                                 }
2007                             }
2008                         }
2009                     };
2010                 }
2011             };
2012
2013             final TestActorRef<Shard> shard = actorFactory.createTestActor(
2014                     Props.create(new DelegatingShardCreator(creator)).withDispatcher(
2015                             Dispatchers.DefaultDispatcherId()), "testAbortQueuedTransaction-" + readWrite);
2016
2017             waitUntilLeader(shard);
2018
2019             final String transactionID = "tx1";
2020
2021             final MutableCompositeModification modification = new MutableCompositeModification();
2022             final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort");
2023             doReturn(Futures.immediateFuture(null)).when(cohort).abort();
2024
2025             final FiniteDuration duration = duration("5 seconds");
2026
2027             // Ready the tx.
2028
2029             shard.tell(prepareReadyTransactionMessage(readWrite, shard.underlyingActor(), cohort, transactionID, modification), getRef());
2030             expectMsgClass(duration, ReadyTransactionReply.class);
2031
2032             assertEquals("getPendingTxCommitQueueSize", 1, shard.underlyingActor().getPendingTxCommitQueueSize());
2033
2034             // Send the AbortTransaction message.
2035
2036             shard.tell(new AbortTransaction(transactionID).toSerializable(), getRef());
2037             expectMsgClass(duration, AbortTransactionReply.SERIALIZABLE_CLASS);
2038
2039             verify(cohort).abort();
2040
2041             // Verify the tx cohort is removed from queue at the cleanup check interval.
2042
2043             cleaupCheckLatch.set(new CountDownLatch(1));
2044             assertEquals("TX_COMMIT_TIMEOUT_CHECK_MESSAGE received", true,
2045                     cleaupCheckLatch.get().await(5, TimeUnit.SECONDS));
2046
2047             assertEquals("getPendingTxCommitQueueSize", 0, shard.underlyingActor().getPendingTxCommitQueueSize());
2048
2049             // Now send CanCommitTransaction - should fail.
2050
2051             shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
2052
2053             Throwable failure = expectMsgClass(duration, akka.actor.Status.Failure.class).cause();
2054             assertTrue("Failure type", failure instanceof IllegalStateException);
2055         }};
2056     }
2057
2058     @Test
2059     public void testCreateSnapshot() throws Exception {
2060         testCreateSnapshot(true, "testCreateSnapshot");
2061     }
2062
2063     @Test
2064     public void testCreateSnapshotWithNonPersistentData() throws Exception {
2065         testCreateSnapshot(false, "testCreateSnapshotWithNonPersistentData");
2066     }
2067
2068     @SuppressWarnings("serial")
2069     private void testCreateSnapshot(final boolean persistent, final String shardActorName) throws Exception{
2070
2071         final AtomicReference<CountDownLatch> latch = new AtomicReference<>(new CountDownLatch(1));
2072
2073         final AtomicReference<Object> savedSnapshot = new AtomicReference<>();
2074         class TestPersistentDataProvider extends DelegatingPersistentDataProvider {
2075             TestPersistentDataProvider(final DataPersistenceProvider delegate) {
2076                 super(delegate);
2077             }
2078
2079             @Override
2080             public void saveSnapshot(final Object o) {
2081                 savedSnapshot.set(o);
2082                 super.saveSnapshot(o);
2083             }
2084         }
2085
2086         dataStoreContextBuilder.persistent(persistent);
2087
2088         new ShardTestKit(getSystem()) {{
2089             class TestShard extends Shard {
2090
2091                 protected TestShard(AbstractBuilder<?, ?> builder) {
2092                     super(builder);
2093                     setPersistence(new TestPersistentDataProvider(super.persistence()));
2094                 }
2095
2096                 @Override
2097                 public void handleCommand(final Object message) {
2098                     super.handleCommand(message);
2099
2100                     if (message instanceof SaveSnapshotSuccess || message.equals("commit_snapshot")) {
2101                         latch.get().countDown();
2102                     }
2103                 }
2104
2105                 @Override
2106                 public RaftActorContext getRaftActorContext() {
2107                     return super.getRaftActorContext();
2108                 }
2109             }
2110
2111             final Creator<Shard> creator = new Creator<Shard>() {
2112                 @Override
2113                 public Shard create() throws Exception {
2114                     return new TestShard(newShardBuilder());
2115                 }
2116             };
2117
2118             final TestActorRef<Shard> shard = actorFactory.createTestActor(
2119                     Props.create(new DelegatingShardCreator(creator)), shardActorName);
2120
2121             waitUntilLeader(shard);
2122             writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
2123
2124             final NormalizedNode<?,?> expectedRoot = readStore(shard, YangInstanceIdentifier.builder().build());
2125
2126             // Trigger creation of a snapshot by ensuring
2127             final RaftActorContext raftActorContext = ((TestShard) shard.underlyingActor()).getRaftActorContext();
2128             raftActorContext.getSnapshotManager().capture(mock(ReplicatedLogEntry.class), -1);
2129             awaitAndValidateSnapshot(expectedRoot);
2130
2131             raftActorContext.getSnapshotManager().capture(mock(ReplicatedLogEntry.class), -1);
2132             awaitAndValidateSnapshot(expectedRoot);
2133         }
2134
2135         private void awaitAndValidateSnapshot(NormalizedNode<?,?> expectedRoot) throws InterruptedException {
2136             assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS));
2137
2138             assertTrue("Invalid saved snapshot " + savedSnapshot.get(),
2139                     savedSnapshot.get() instanceof Snapshot);
2140
2141             verifySnapshot((Snapshot)savedSnapshot.get(), expectedRoot);
2142
2143             latch.set(new CountDownLatch(1));
2144             savedSnapshot.set(null);
2145         }
2146
2147         private void verifySnapshot(final Snapshot snapshot, final NormalizedNode<?,?> expectedRoot) {
2148
2149             final NormalizedNode<?, ?> actual = SerializationUtils.deserializeNormalizedNode(snapshot.getState());
2150             assertEquals("Root node", expectedRoot, actual);
2151
2152         }};
2153     }
2154
2155     /**
2156      * This test simply verifies that the applySnapShot logic will work
2157      * @throws ReadFailedException
2158      * @throws DataValidationFailedException
2159      */
2160     @Test
2161     public void testInMemoryDataTreeRestore() throws ReadFailedException, DataValidationFailedException {
2162         final DataTree store = InMemoryDataTreeFactory.getInstance().create(TreeType.OPERATIONAL);
2163         store.setSchemaContext(SCHEMA_CONTEXT);
2164
2165         final DataTreeModification putTransaction = store.takeSnapshot().newModification();
2166         putTransaction.write(TestModel.TEST_PATH,
2167             ImmutableNodes.containerNode(TestModel.TEST_QNAME));
2168         commitTransaction(store, putTransaction);
2169
2170
2171         final NormalizedNode<?, ?> expected = readStore(store, YangInstanceIdentifier.builder().build());
2172
2173         final DataTreeModification writeTransaction = store.takeSnapshot().newModification();
2174
2175         writeTransaction.delete(YangInstanceIdentifier.builder().build());
2176         writeTransaction.write(YangInstanceIdentifier.builder().build(), expected);
2177
2178         commitTransaction(store, writeTransaction);
2179
2180         final NormalizedNode<?, ?> actual = readStore(store, YangInstanceIdentifier.builder().build());
2181
2182         assertEquals(expected, actual);
2183     }
2184
2185     @Test
2186     public void testRecoveryApplicable(){
2187
2188         final DatastoreContext persistentContext = DatastoreContext.newBuilder().
2189                 shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(true).build();
2190
2191         final Props persistentProps = Shard.builder().id(shardID).datastoreContext(persistentContext).
2192                 schemaContext(SCHEMA_CONTEXT).props();
2193
2194         final DatastoreContext nonPersistentContext = DatastoreContext.newBuilder().
2195                 shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(false).build();
2196
2197         final Props nonPersistentProps = Shard.builder().id(shardID).datastoreContext(nonPersistentContext).
2198                 schemaContext(SCHEMA_CONTEXT).props();
2199
2200         new ShardTestKit(getSystem()) {{
2201             final TestActorRef<Shard> shard1 = actorFactory.createTestActor(persistentProps, "testPersistence1");
2202
2203             assertTrue("Recovery Applicable", shard1.underlyingActor().persistence().isRecoveryApplicable());
2204
2205             final TestActorRef<Shard> shard2 = actorFactory.createTestActor(nonPersistentProps, "testPersistence2");
2206
2207             assertFalse("Recovery Not Applicable", shard2.underlyingActor().persistence().isRecoveryApplicable());
2208         }};
2209     }
2210
2211     @Test
2212     public void testOnDatastoreContext() {
2213         new ShardTestKit(getSystem()) {{
2214             dataStoreContextBuilder.persistent(true);
2215
2216             final TestActorRef<Shard> shard = actorFactory.createTestActor(newShardProps(), "testOnDatastoreContext");
2217
2218             assertEquals("isRecoveryApplicable", true,
2219                     shard.underlyingActor().persistence().isRecoveryApplicable());
2220
2221             waitUntilLeader(shard);
2222
2223             shard.tell(dataStoreContextBuilder.persistent(false).build(), ActorRef.noSender());
2224
2225             assertEquals("isRecoveryApplicable", false,
2226                 shard.underlyingActor().persistence().isRecoveryApplicable());
2227
2228             shard.tell(dataStoreContextBuilder.persistent(true).build(), ActorRef.noSender());
2229
2230             assertEquals("isRecoveryApplicable", true,
2231                 shard.underlyingActor().persistence().isRecoveryApplicable());
2232         }};
2233     }
2234
2235     @Test
2236     public void testRegisterRoleChangeListener() throws Exception {
2237         new ShardTestKit(getSystem()) {
2238             {
2239                 final TestActorRef<Shard> shard = actorFactory.createTestActor(
2240                         newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
2241                         "testRegisterRoleChangeListener");
2242
2243                 waitUntilLeader(shard);
2244
2245                 final TestActorRef<MessageCollectorActor> listener =
2246                         TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
2247
2248                 shard.tell(new RegisterRoleChangeListener(), listener);
2249
2250                 MessageCollectorActor.expectFirstMatching(listener, RegisterRoleChangeListenerReply.class);
2251
2252                 ShardLeaderStateChanged leaderStateChanged = MessageCollectorActor.expectFirstMatching(listener,
2253                     ShardLeaderStateChanged.class);
2254                 assertEquals("getLocalShardDataTree present", true,
2255                         leaderStateChanged.getLocalShardDataTree().isPresent());
2256                 assertSame("getLocalShardDataTree", shard.underlyingActor().getDataStore().getDataTree(),
2257                     leaderStateChanged.getLocalShardDataTree().get());
2258
2259                 MessageCollectorActor.clearMessages(listener);
2260
2261                 // Force a leader change
2262
2263                 shard.tell(new RequestVote(10000, "member2", 50, 50), getRef());
2264
2265                 leaderStateChanged = MessageCollectorActor.expectFirstMatching(listener,
2266                         ShardLeaderStateChanged.class);
2267                 assertEquals("getLocalShardDataTree present", false,
2268                         leaderStateChanged.getLocalShardDataTree().isPresent());
2269             }
2270         };
2271     }
2272
2273     @Test
2274     public void testFollowerInitialSyncStatus() throws Exception {
2275         final TestActorRef<Shard> shard = actorFactory.createTestActor(
2276                 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
2277                 "testFollowerInitialSyncStatus");
2278
2279         shard.underlyingActor().onReceiveCommand(new FollowerInitialSyncUpStatus(false, "member-1-shard-inventory-operational"));
2280
2281         assertEquals(false, shard.underlyingActor().getShardMBean().getFollowerInitialSyncStatus());
2282
2283         shard.underlyingActor().onReceiveCommand(new FollowerInitialSyncUpStatus(true, "member-1-shard-inventory-operational"));
2284
2285         assertEquals(true, shard.underlyingActor().getShardMBean().getFollowerInitialSyncStatus());
2286     }
2287
2288     @Test
2289     public void testClusteredDataChangeListenerDelayedRegistration() throws Exception {
2290         new ShardTestKit(getSystem()) {{
2291             String testName = "testClusteredDataChangeListenerDelayedRegistration";
2292             dataStoreContextBuilder.shardElectionTimeoutFactor(1000);
2293
2294             final MockDataChangeListener listener = new MockDataChangeListener(1);
2295             final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener),
2296                     actorFactory.generateActorId(testName + "-DataChangeListener"));
2297
2298             final TestActorRef<Shard> shard = actorFactory.createTestActor(
2299                     newShardBuilder().props().withDispatcher(Dispatchers.DefaultDispatcherId()),
2300                     actorFactory.generateActorId(testName + "-shard"));
2301
2302             waitUntilNoLeader(shard);
2303
2304             final YangInstanceIdentifier path = TestModel.TEST_PATH;
2305
2306             shard.tell(new RegisterChangeListener(path, dclActor, AsyncDataBroker.DataChangeScope.BASE, true), getRef());
2307             final RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
2308                 RegisterChangeListenerReply.class);
2309             assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
2310
2311             writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
2312
2313             shard.tell(new ElectionTimeout(), ActorRef.noSender());
2314
2315             listener.waitForChangeEvents();
2316         }};
2317     }
2318
2319     @Test
2320     public void testClusteredDataChangeListenerRegistration() throws Exception {
2321         new ShardTestKit(getSystem()) {{
2322             String testName = "testClusteredDataChangeListenerRegistration";
2323             final ShardIdentifier followerShardID = ShardIdentifier.builder().memberName(
2324                     actorFactory.generateActorId(testName + "-follower")).shardName("inventory").type("config").build();
2325
2326             final ShardIdentifier leaderShardID = ShardIdentifier.builder().memberName(
2327                     actorFactory.generateActorId(testName + "-leader")).shardName("inventory").type("config").build();
2328
2329             final TestActorRef<Shard> followerShard = actorFactory.createTestActor(
2330                     Shard.builder().id(followerShardID).
2331                         datastoreContext(dataStoreContextBuilder.shardElectionTimeoutFactor(1000).build()).
2332                         peerAddresses(Collections.singletonMap(leaderShardID.toString(),
2333                             "akka://test/user/" + leaderShardID.toString())).schemaContext(SCHEMA_CONTEXT).props().
2334                     withDispatcher(Dispatchers.DefaultDispatcherId()), followerShardID.toString());
2335
2336             final TestActorRef<Shard> leaderShard = actorFactory.createTestActor(
2337                     Shard.builder().id(leaderShardID).datastoreContext(newDatastoreContext()).
2338                         peerAddresses(Collections.singletonMap(followerShardID.toString(),
2339                             "akka://test/user/" + followerShardID.toString())).schemaContext(SCHEMA_CONTEXT).props().
2340                     withDispatcher(Dispatchers.DefaultDispatcherId()), leaderShardID.toString());
2341
2342             leaderShard.tell(new ElectionTimeout(), ActorRef.noSender());
2343             String leaderPath = waitUntilLeader(followerShard);
2344             assertEquals("Shard leader path", leaderShard.path().toString(), leaderPath);
2345
2346             final YangInstanceIdentifier path = TestModel.TEST_PATH;
2347             final MockDataChangeListener listener = new MockDataChangeListener(1);
2348             final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener),
2349                     actorFactory.generateActorId(testName + "-DataChangeListener"));
2350
2351             followerShard.tell(new RegisterChangeListener(path, dclActor, AsyncDataBroker.DataChangeScope.BASE, true), getRef());
2352             final RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
2353                 RegisterChangeListenerReply.class);
2354             assertNotNull("getListenerRegistratioznPath", reply.getListenerRegistrationPath());
2355
2356             writeToStore(followerShard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
2357
2358             listener.waitForChangeEvents();
2359         }};
2360     }
2361
2362     @Test
2363     public void testClusteredDataTreeChangeListenerDelayedRegistration() throws Exception {
2364         new ShardTestKit(getSystem()) {{
2365             String testName = "testClusteredDataTreeChangeListenerDelayedRegistration";
2366             dataStoreContextBuilder.shardElectionTimeoutFactor(1000);
2367
2368             final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
2369             final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener),
2370                     actorFactory.generateActorId(testName + "-DataTreeChangeListener"));
2371
2372             final TestActorRef<Shard> shard = actorFactory.createTestActor(
2373                     newShardBuilder().props().withDispatcher(Dispatchers.DefaultDispatcherId()),
2374                     actorFactory.generateActorId(testName + "-shard"));
2375
2376             waitUntilNoLeader(shard);
2377
2378             final YangInstanceIdentifier path = TestModel.TEST_PATH;
2379
2380             shard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor, true), getRef());
2381             final RegisterDataTreeChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
2382                     RegisterDataTreeChangeListenerReply.class);
2383             assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
2384
2385             writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
2386
2387             shard.tell(new ElectionTimeout(), ActorRef.noSender());
2388
2389             listener.waitForChangeEvents();
2390         }};
2391     }
2392
2393     @Test
2394     public void testClusteredDataTreeChangeListenerRegistration() throws Exception {
2395         new ShardTestKit(getSystem()) {{
2396             String testName = "testClusteredDataTreeChangeListenerRegistration";
2397             final ShardIdentifier followerShardID = ShardIdentifier.builder().memberName(
2398                     actorFactory.generateActorId(testName + "-follower")).shardName("inventory").type("config").build();
2399
2400             final ShardIdentifier leaderShardID = ShardIdentifier.builder().memberName(
2401                     actorFactory.generateActorId(testName + "-leader")).shardName("inventory").type("config").build();
2402
2403             final TestActorRef<Shard> followerShard = actorFactory.createTestActor(
2404                     Shard.builder().id(followerShardID).
2405                         datastoreContext(dataStoreContextBuilder.shardElectionTimeoutFactor(1000).build()).
2406                         peerAddresses(Collections.singletonMap(leaderShardID.toString(),
2407                             "akka://test/user/" + leaderShardID.toString())).schemaContext(SCHEMA_CONTEXT).props().
2408                     withDispatcher(Dispatchers.DefaultDispatcherId()), followerShardID.toString());
2409
2410             final TestActorRef<Shard> leaderShard = actorFactory.createTestActor(
2411                     Shard.builder().id(leaderShardID).datastoreContext(newDatastoreContext()).
2412                         peerAddresses(Collections.singletonMap(followerShardID.toString(),
2413                             "akka://test/user/" + followerShardID.toString())).schemaContext(SCHEMA_CONTEXT).props().
2414                     withDispatcher(Dispatchers.DefaultDispatcherId()), leaderShardID.toString());
2415
2416             leaderShard.tell(new ElectionTimeout(), ActorRef.noSender());
2417             String leaderPath = waitUntilLeader(followerShard);
2418             assertEquals("Shard leader path", leaderShard.path().toString(), leaderPath);
2419
2420             final YangInstanceIdentifier path = TestModel.TEST_PATH;
2421             final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
2422             final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener),
2423                     actorFactory.generateActorId(testName + "-DataTreeChangeListener"));
2424
2425             followerShard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor, true), getRef());
2426             final RegisterDataTreeChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
2427                     RegisterDataTreeChangeListenerReply.class);
2428             assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
2429
2430             writeToStore(followerShard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
2431
2432             listener.waitForChangeEvents();
2433         }};
2434     }
2435
2436     @Test
2437     public void testServerRemoved() throws Exception {
2438         final TestActorRef<MessageCollectorActor> parent = actorFactory.createTestActor(MessageCollectorActor.props());
2439
2440         final ActorRef shard = parent.underlyingActor().context().actorOf(
2441                 newShardBuilder().props().withDispatcher(Dispatchers.DefaultDispatcherId()),
2442                 "testServerRemoved");
2443
2444         shard.tell(new ServerRemoved("test"), ActorRef.noSender());
2445
2446         MessageCollectorActor.expectFirstMatching(parent, ServerRemoved.class);
2447     }
2448 }