CDS: Changes to Tx abort in Shard
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / ShardTest.java
1 package org.opendaylight.controller.cluster.datastore;
2
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertFalse;
5 import static org.junit.Assert.assertNotNull;
6 import static org.junit.Assert.assertNull;
7 import static org.junit.Assert.assertSame;
8 import static org.junit.Assert.assertTrue;
9 import static org.mockito.Mockito.doReturn;
10 import static org.mockito.Mockito.inOrder;
11 import static org.mockito.Mockito.mock;
12 import static org.mockito.Mockito.reset;
13 import static org.mockito.Mockito.verify;
14 import static org.opendaylight.controller.cluster.datastore.DataStoreVersions.CURRENT_VERSION;
15 import akka.actor.ActorRef;
16 import akka.actor.ActorSelection;
17 import akka.actor.PoisonPill;
18 import akka.actor.Props;
19 import akka.actor.Status.Failure;
20 import akka.dispatch.Dispatchers;
21 import akka.dispatch.OnComplete;
22 import akka.japi.Creator;
23 import akka.pattern.Patterns;
24 import akka.persistence.SaveSnapshotSuccess;
25 import akka.testkit.TestActorRef;
26 import akka.util.Timeout;
27 import com.google.common.base.Function;
28 import com.google.common.base.Optional;
29 import com.google.common.util.concurrent.Futures;
30 import com.google.common.util.concurrent.ListenableFuture;
31 import com.google.common.util.concurrent.Uninterruptibles;
32 import java.io.IOException;
33 import java.util.Collections;
34 import java.util.HashSet;
35 import java.util.Map;
36 import java.util.Set;
37 import java.util.concurrent.CountDownLatch;
38 import java.util.concurrent.TimeUnit;
39 import java.util.concurrent.atomic.AtomicBoolean;
40 import java.util.concurrent.atomic.AtomicReference;
41 import org.junit.Test;
42 import org.mockito.InOrder;
43 import org.opendaylight.controller.cluster.DataPersistenceProvider;
44 import org.opendaylight.controller.cluster.DelegatingPersistentDataProvider;
45 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
46 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
47 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
48 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
49 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
50 import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
51 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
52 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
53 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
54 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
55 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
56 import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
57 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
58 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
59 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
60 import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction;
61 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
62 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
63 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
64 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener;
65 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListenerReply;
66 import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
67 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
68 import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
69 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
70 import org.opendaylight.controller.cluster.datastore.modification.Modification;
71 import org.opendaylight.controller.cluster.datastore.modification.ModificationPayload;
72 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
73 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
74 import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener;
75 import org.opendaylight.controller.cluster.datastore.utils.MockDataTreeChangeListener;
76 import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
77 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
78 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListenerReply;
79 import org.opendaylight.controller.cluster.raft.RaftActorContext;
80 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
81 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
82 import org.opendaylight.controller.cluster.raft.Snapshot;
83 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
84 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
85 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
86 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
87 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
88 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
89 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
90 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
91 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
92 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
93 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
94 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
95 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
96 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
97 import org.opendaylight.controller.protobuff.messages.cohort3pc.ThreePhaseCommitCohortMessages;
98 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
99 import org.opendaylight.yangtools.yang.common.QName;
100 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
101 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
102 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
103 import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
104 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
105 import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
106 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
107 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
108 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
109 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateNode;
110 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateTip;
111 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidates;
112 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
113 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
114 import org.opendaylight.yangtools.yang.data.api.schema.tree.ModificationType;
115 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
116 import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
117 import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory;
118 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
119 import scala.concurrent.Await;
120 import scala.concurrent.Future;
121 import scala.concurrent.duration.FiniteDuration;
122
123 public class ShardTest extends AbstractShardTest {
124     private static final QName CARS_QNAME = QName.create("urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:test:cars", "2014-03-13", "cars");
125
126     private static final String DUMMY_DATA = "Dummy data as snapshot sequence number is set to 0 in InMemorySnapshotStore and journal recovery seq number will start from 1";
127
128     @Test
129     public void testRegisterChangeListener() throws Exception {
130         new ShardTestKit(getSystem()) {{
131             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
132                     newShardProps(),  "testRegisterChangeListener");
133
134             waitUntilLeader(shard);
135
136             shard.tell(new UpdateSchemaContext(SchemaContextHelper.full()), ActorRef.noSender());
137
138             final MockDataChangeListener listener = new MockDataChangeListener(1);
139             final ActorRef dclActor = getSystem().actorOf(DataChangeListener.props(listener),
140                     "testRegisterChangeListener-DataChangeListener");
141
142             shard.tell(new RegisterChangeListener(TestModel.TEST_PATH,
143                     dclActor, AsyncDataBroker.DataChangeScope.BASE), getRef());
144
145             final RegisterChangeListenerReply reply = expectMsgClass(duration("3 seconds"),
146                     RegisterChangeListenerReply.class);
147             final String replyPath = reply.getListenerRegistrationPath().toString();
148             assertTrue("Incorrect reply path: " + replyPath, replyPath.matches(
149                     "akka:\\/\\/test\\/user\\/testRegisterChangeListener\\/\\$.*"));
150
151             final YangInstanceIdentifier path = TestModel.TEST_PATH;
152             writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
153
154             listener.waitForChangeEvents(path);
155
156             dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
157             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
158         }};
159     }
160
161     @SuppressWarnings("serial")
162     @Test
163     public void testChangeListenerNotifiedWhenNotTheLeaderOnRegistration() throws Exception {
164         // This test tests the timing window in which a change listener is registered before the
165         // shard becomes the leader. We verify that the listener is registered and notified of the
166         // existing data when the shard becomes the leader.
167         new ShardTestKit(getSystem()) {{
168             // For this test, we want to send the RegisterChangeListener message after the shard
169             // has recovered from persistence and before it becomes the leader. So we subclass
170             // Shard to override onReceiveCommand and, when the first ElectionTimeout is received,
171             // we know that the shard has been initialized to a follower and has started the
172             // election process. The following 2 CountDownLatches are used to coordinate the
173             // ElectionTimeout with the sending of the RegisterChangeListener message.
174             final CountDownLatch onFirstElectionTimeout = new CountDownLatch(1);
175             final CountDownLatch onChangeListenerRegistered = new CountDownLatch(1);
176             final Creator<Shard> creator = new Creator<Shard>() {
177                 boolean firstElectionTimeout = true;
178
179                 @Override
180                 public Shard create() throws Exception {
181                     // Use a non persistent provider because this test actually invokes persist on the journal
182                     // this will cause all other messages to not be queued properly after that.
183                     // The basic issue is that you cannot use TestActorRef with a persistent actor (at least when
184                     // it does do a persist)
185                     return new Shard(shardID, Collections.<String,String>emptyMap(),
186                             dataStoreContextBuilder.persistent(false).build(), SCHEMA_CONTEXT) {
187                         @Override
188                         public void onReceiveCommand(final Object message) throws Exception {
189                             if(message instanceof ElectionTimeout && firstElectionTimeout) {
190                                 // Got the first ElectionTimeout. We don't forward it to the
191                                 // base Shard yet until we've sent the RegisterChangeListener
192                                 // message. So we signal the onFirstElectionTimeout latch to tell
193                                 // the main thread to send the RegisterChangeListener message and
194                                 // start a thread to wait on the onChangeListenerRegistered latch,
195                                 // which the main thread signals after it has sent the message.
196                                 // After the onChangeListenerRegistered is triggered, we send the
197                                 // original ElectionTimeout message to proceed with the election.
198                                 firstElectionTimeout = false;
199                                 final ActorRef self = getSelf();
200                                 new Thread() {
201                                     @Override
202                                     public void run() {
203                                         Uninterruptibles.awaitUninterruptibly(
204                                                 onChangeListenerRegistered, 5, TimeUnit.SECONDS);
205                                         self.tell(message, self);
206                                     }
207                                 }.start();
208
209                                 onFirstElectionTimeout.countDown();
210                             } else {
211                                 super.onReceiveCommand(message);
212                             }
213                         }
214                     };
215                 }
216             };
217
218             final MockDataChangeListener listener = new MockDataChangeListener(1);
219             final ActorRef dclActor = getSystem().actorOf(DataChangeListener.props(listener),
220                     "testRegisterChangeListenerWhenNotLeaderInitially-DataChangeListener");
221
222             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
223                     Props.create(new DelegatingShardCreator(creator)),
224                     "testRegisterChangeListenerWhenNotLeaderInitially");
225
226             // Write initial data into the in-memory store.
227             final YangInstanceIdentifier path = TestModel.TEST_PATH;
228             writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
229
230             // Wait until the shard receives the first ElectionTimeout message.
231             assertEquals("Got first ElectionTimeout", true,
232                     onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
233
234             // Now send the RegisterChangeListener and wait for the reply.
235             shard.tell(new RegisterChangeListener(path, dclActor,
236                     AsyncDataBroker.DataChangeScope.SUBTREE), getRef());
237
238             final RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
239                     RegisterChangeListenerReply.class);
240             assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
241
242             // Sanity check - verify the shard is not the leader yet.
243             shard.tell(new FindLeader(), getRef());
244             final FindLeaderReply findLeadeReply =
245                     expectMsgClass(duration("5 seconds"), FindLeaderReply.class);
246             assertNull("Expected the shard not to be the leader", findLeadeReply.getLeaderActor());
247
248             // Signal the onChangeListenerRegistered latch to tell the thread above to proceed
249             // with the election process.
250             onChangeListenerRegistered.countDown();
251
252             // Wait for the shard to become the leader and notify our listener with the existing
253             // data in the store.
254             listener.waitForChangeEvents(path);
255
256             dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
257             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
258         }};
259     }
260
261     @Test
262     public void testRegisterDataTreeChangeListener() throws Exception {
263         new ShardTestKit(getSystem()) {{
264             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
265                     newShardProps(), "testRegisterDataTreeChangeListener");
266
267             waitUntilLeader(shard);
268
269             shard.tell(new UpdateSchemaContext(SchemaContextHelper.full()), ActorRef.noSender());
270
271             final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
272             final ActorRef dclActor = getSystem().actorOf(DataTreeChangeListenerActor.props(listener),
273                     "testRegisterDataTreeChangeListener-DataTreeChangeListener");
274
275             shard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor), getRef());
276
277             final RegisterDataTreeChangeListenerReply reply = expectMsgClass(duration("3 seconds"),
278                     RegisterDataTreeChangeListenerReply.class);
279             final String replyPath = reply.getListenerRegistrationPath().toString();
280             assertTrue("Incorrect reply path: " + replyPath, replyPath.matches(
281                     "akka:\\/\\/test\\/user\\/testRegisterDataTreeChangeListener\\/\\$.*"));
282
283             final YangInstanceIdentifier path = TestModel.TEST_PATH;
284             writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
285
286             listener.waitForChangeEvents();
287
288             dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
289             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
290         }};
291     }
292
293     @SuppressWarnings("serial")
294     @Test
295     public void testDataTreeChangeListenerNotifiedWhenNotTheLeaderOnRegistration() throws Exception {
296         new ShardTestKit(getSystem()) {{
297             final CountDownLatch onFirstElectionTimeout = new CountDownLatch(1);
298             final CountDownLatch onChangeListenerRegistered = new CountDownLatch(1);
299             final Creator<Shard> creator = new Creator<Shard>() {
300                 boolean firstElectionTimeout = true;
301
302                 @Override
303                 public Shard create() throws Exception {
304                     return new Shard(shardID, Collections.<String,String>emptyMap(),
305                             dataStoreContextBuilder.persistent(false).build(), SCHEMA_CONTEXT) {
306                         @Override
307                         public void onReceiveCommand(final Object message) throws Exception {
308                             if(message instanceof ElectionTimeout && firstElectionTimeout) {
309                                 firstElectionTimeout = false;
310                                 final ActorRef self = getSelf();
311                                 new Thread() {
312                                     @Override
313                                     public void run() {
314                                         Uninterruptibles.awaitUninterruptibly(
315                                                 onChangeListenerRegistered, 5, TimeUnit.SECONDS);
316                                         self.tell(message, self);
317                                     }
318                                 }.start();
319
320                                 onFirstElectionTimeout.countDown();
321                             } else {
322                                 super.onReceiveCommand(message);
323                             }
324                         }
325                     };
326                 }
327             };
328
329             final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
330             final ActorRef dclActor = getSystem().actorOf(DataTreeChangeListenerActor.props(listener),
331                     "testDataTreeChangeListenerNotifiedWhenNotTheLeaderOnRegistration-DataChangeListener");
332
333             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
334                     Props.create(new DelegatingShardCreator(creator)),
335                     "testDataTreeChangeListenerNotifiedWhenNotTheLeaderOnRegistration");
336
337             final YangInstanceIdentifier path = TestModel.TEST_PATH;
338             writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
339
340             assertEquals("Got first ElectionTimeout", true,
341                     onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
342
343             shard.tell(new RegisterDataTreeChangeListener(path, dclActor), getRef());
344             final RegisterDataTreeChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
345                     RegisterDataTreeChangeListenerReply.class);
346             assertNotNull("getListenerRegistratioznPath", reply.getListenerRegistrationPath());
347
348             shard.tell(new FindLeader(), getRef());
349             final FindLeaderReply findLeadeReply =
350                     expectMsgClass(duration("5 seconds"), FindLeaderReply.class);
351             assertNull("Expected the shard not to be the leader", findLeadeReply.getLeaderActor());
352
353             writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
354
355             onChangeListenerRegistered.countDown();
356
357             // TODO: investigate why we do not receive data chage events
358             listener.waitForChangeEvents();
359
360             dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
361             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
362         }};
363     }
364
365     @Test
366     public void testCreateTransaction(){
367         new ShardTestKit(getSystem()) {{
368             final ActorRef shard = getSystem().actorOf(newShardProps(), "testCreateTransaction");
369
370             waitUntilLeader(shard);
371
372             shard.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
373
374             shard.tell(new CreateTransaction("txn-1",
375                     TransactionType.READ_ONLY.ordinal() ).toSerializable(), getRef());
376
377             final CreateTransactionReply reply = expectMsgClass(duration("3 seconds"),
378                     CreateTransactionReply.class);
379
380             final String path = reply.getTransactionActorPath().toString();
381             assertTrue("Unexpected transaction path " + path,
382                     path.contains("akka://test/user/testCreateTransaction/shard-txn-1"));
383
384             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
385         }};
386     }
387
388     @Test
389     public void testCreateTransactionOnChain(){
390         new ShardTestKit(getSystem()) {{
391             final ActorRef shard = getSystem().actorOf(newShardProps(), "testCreateTransactionOnChain");
392
393             waitUntilLeader(shard);
394
395             shard.tell(new CreateTransaction("txn-1",
396                     TransactionType.READ_ONLY.ordinal() , "foobar").toSerializable(),
397                     getRef());
398
399             final CreateTransactionReply reply = expectMsgClass(duration("3 seconds"),
400                     CreateTransactionReply.class);
401
402             final String path = reply.getTransactionActorPath().toString();
403             assertTrue("Unexpected transaction path " + path,
404                     path.contains("akka://test/user/testCreateTransactionOnChain/shard-txn-1"));
405
406             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
407         }};
408     }
409
410     @SuppressWarnings("serial")
411     @Test
412     public void testPeerAddressResolved() throws Exception {
413         new ShardTestKit(getSystem()) {{
414             final CountDownLatch recoveryComplete = new CountDownLatch(1);
415             class TestShard extends Shard {
416                 TestShard() {
417                     super(shardID, Collections.<String, String>singletonMap(shardID.toString(), null),
418                             newDatastoreContext(), SCHEMA_CONTEXT);
419                 }
420
421                 Map<String, String> getPeerAddresses() {
422                     return getRaftActorContext().getPeerAddresses();
423                 }
424
425                 @Override
426                 protected void onRecoveryComplete() {
427                     try {
428                         super.onRecoveryComplete();
429                     } finally {
430                         recoveryComplete.countDown();
431                     }
432                 }
433             }
434
435             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
436                     Props.create(new DelegatingShardCreator(new Creator<Shard>() {
437                         @Override
438                         public TestShard create() throws Exception {
439                             return new TestShard();
440                         }
441                     })), "testPeerAddressResolved");
442
443             //waitUntilLeader(shard);
444             assertEquals("Recovery complete", true,
445                     Uninterruptibles.awaitUninterruptibly(recoveryComplete, 5, TimeUnit.SECONDS));
446
447             final String address = "akka://foobar";
448             shard.underlyingActor().onReceiveCommand(new PeerAddressResolved(shardID.toString(), address));
449
450             assertEquals("getPeerAddresses", address,
451                     ((TestShard)shard.underlyingActor()).getPeerAddresses().get(shardID.toString()));
452
453             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
454         }};
455     }
456
457     @Test
458     public void testApplySnapshot() throws Exception {
459
460         ShardTestKit testkit = new ShardTestKit(getSystem());
461
462         final TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(),
463                 "testApplySnapshot");
464
465         testkit.waitUntilLeader(shard);
466
467         final DataTree store = InMemoryDataTreeFactory.getInstance().create();
468         store.setSchemaContext(SCHEMA_CONTEXT);
469
470         final ContainerNode container = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
471                 new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
472                     withChild(ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).addChild(
473                         ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1)).build()).build();
474
475         writeToStore(store, TestModel.TEST_PATH, container);
476
477         final YangInstanceIdentifier root = YangInstanceIdentifier.builder().build();
478         final NormalizedNode<?,?> expected = readStore(store, root);
479
480         final Snapshot snapshot = Snapshot.create(SerializationUtils.serializeNormalizedNode(expected),
481                 Collections.<ReplicatedLogEntry>emptyList(), 1, 2, 3, 4);
482
483         shard.underlyingActor().getRaftActorSnapshotCohort().applySnapshot(snapshot.getState());
484
485         final NormalizedNode<?,?> actual = readStore(shard, root);
486
487         assertEquals("Root node", expected, actual);
488
489         shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
490     }
491
492     @Test
493     public void testApplyState() throws Exception {
494
495         ShardTestKit testkit = new ShardTestKit(getSystem());
496
497         final TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(), "testApplyState");
498
499         testkit.waitUntilLeader(shard);
500
501         final NormalizedNode<?, ?> node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
502
503         final ApplyState applyState = new ApplyState(null, "test", new ReplicatedLogImplEntry(1, 2,
504                 newModificationPayload(new WriteModification(TestModel.TEST_PATH, node))));
505
506         shard.underlyingActor().onReceiveCommand(applyState);
507
508         final NormalizedNode<?,?> actual = readStore(shard, TestModel.TEST_PATH);
509         assertEquals("Applied state", node, actual);
510
511         shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
512     }
513
514     @Test
515     public void testApplyStateWithCandidatePayload() throws Exception {
516
517         ShardTestKit testkit = new ShardTestKit(getSystem());
518
519         final TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(), "testApplyState");
520
521         testkit.waitUntilLeader(shard);
522
523         final NormalizedNode<?, ?> node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
524         final DataTreeCandidate candidate = DataTreeCandidates.fromNormalizedNode(TestModel.TEST_PATH, node);
525
526         final ApplyState applyState = new ApplyState(null, "test", new ReplicatedLogImplEntry(1, 2,
527                 DataTreeCandidatePayload.create(candidate)));
528
529         shard.underlyingActor().onReceiveCommand(applyState);
530
531         final NormalizedNode<?,?> actual = readStore(shard, TestModel.TEST_PATH);
532         assertEquals("Applied state", node, actual);
533
534         shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
535     }
536
537     DataTree setupInMemorySnapshotStore() throws DataValidationFailedException {
538         final DataTree testStore = InMemoryDataTreeFactory.getInstance().create();
539         testStore.setSchemaContext(SCHEMA_CONTEXT);
540
541         writeToStore(testStore, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
542
543         final NormalizedNode<?, ?> root = readStore(testStore, YangInstanceIdentifier.builder().build());
544
545         InMemorySnapshotStore.addSnapshot(shardID.toString(), Snapshot.create(
546                 SerializationUtils.serializeNormalizedNode(root),
547                 Collections.<ReplicatedLogEntry>emptyList(), 0, 1, -1, -1));
548         return testStore;
549     }
550
551     private static DataTreeCandidatePayload payloadForModification(final DataTree source, final DataTreeModification mod) throws DataValidationFailedException {
552         source.validate(mod);
553         final DataTreeCandidate candidate = source.prepare(mod);
554         source.commit(candidate);
555         return DataTreeCandidatePayload.create(candidate);
556     }
557
558     @Test
559     public void testDataTreeCandidateRecovery() throws Exception {
560         // Set up the InMemorySnapshotStore.
561         final DataTree source = setupInMemorySnapshotStore();
562
563         final DataTreeModification writeMod = source.takeSnapshot().newModification();
564         writeMod.write(TestModel.OUTER_LIST_PATH, ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
565         writeMod.ready();
566         InMemoryJournal.addEntry(shardID.toString(), 0, DUMMY_DATA);
567
568         // Set up the InMemoryJournal.
569         InMemoryJournal.addEntry(shardID.toString(), 1, new ReplicatedLogImplEntry(0, 1, payloadForModification(source, writeMod)));
570
571         final int nListEntries = 16;
572         final Set<Integer> listEntryKeys = new HashSet<>();
573
574         // Add some ModificationPayload entries
575         for (int i = 1; i <= nListEntries; i++) {
576             listEntryKeys.add(Integer.valueOf(i));
577
578             final YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
579                     .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
580
581             final DataTreeModification mod = source.takeSnapshot().newModification();
582             mod.merge(path, ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i));
583             mod.ready();
584             InMemoryJournal.addEntry(shardID.toString(), i+1, new ReplicatedLogImplEntry(i, 1,
585                 payloadForModification(source, mod)));
586         }
587
588         InMemoryJournal.addEntry(shardID.toString(), nListEntries + 2,
589                 new ApplyJournalEntries(nListEntries));
590
591         testRecovery(listEntryKeys);
592     }
593
594     @Test
595     public void testModicationRecovery() throws Exception {
596
597         // Set up the InMemorySnapshotStore.
598         setupInMemorySnapshotStore();
599
600         // Set up the InMemoryJournal.
601
602         InMemoryJournal.addEntry(shardID.toString(), 0, DUMMY_DATA);
603
604         InMemoryJournal.addEntry(shardID.toString(), 1, new ReplicatedLogImplEntry(0, 1, newModificationPayload(
605                   new WriteModification(TestModel.OUTER_LIST_PATH,
606                           ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build()))));
607
608         final int nListEntries = 16;
609         final Set<Integer> listEntryKeys = new HashSet<>();
610
611         // Add some ModificationPayload entries
612         for(int i = 1; i <= nListEntries; i++) {
613             listEntryKeys.add(Integer.valueOf(i));
614             final YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
615                     .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
616             final Modification mod = new MergeModification(path,
617                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i));
618             InMemoryJournal.addEntry(shardID.toString(), i + 1, new ReplicatedLogImplEntry(i, 1,
619                     newModificationPayload(mod)));
620         }
621
622         InMemoryJournal.addEntry(shardID.toString(), nListEntries + 2,
623                 new ApplyJournalEntries(nListEntries));
624
625         testRecovery(listEntryKeys);
626     }
627
628     private static ModificationPayload newModificationPayload(final Modification... mods) throws IOException {
629         final MutableCompositeModification compMod = new MutableCompositeModification();
630         for(final Modification mod: mods) {
631             compMod.addModification(mod);
632         }
633
634         return new ModificationPayload(compMod);
635     }
636
637     @Test
638     public void testConcurrentThreePhaseCommits() throws Throwable {
639         new ShardTestKit(getSystem()) {{
640             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
641                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
642                     "testConcurrentThreePhaseCommits");
643
644             waitUntilLeader(shard);
645
646          // Setup 3 simulated transactions with mock cohorts backed by real cohorts.
647
648             final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
649
650             final String transactionID1 = "tx1";
651             final MutableCompositeModification modification1 = new MutableCompositeModification();
652             final ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
653                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
654
655             final String transactionID2 = "tx2";
656             final MutableCompositeModification modification2 = new MutableCompositeModification();
657             final ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
658                     TestModel.OUTER_LIST_PATH,
659                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
660                     modification2);
661
662             final String transactionID3 = "tx3";
663             final MutableCompositeModification modification3 = new MutableCompositeModification();
664             final ShardDataTreeCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
665                     YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
666                         .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
667                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1),
668                     modification3);
669
670             final long timeoutSec = 5;
671             final FiniteDuration duration = FiniteDuration.create(timeoutSec, TimeUnit.SECONDS);
672             final Timeout timeout = new Timeout(duration);
673
674             // Simulate the ForwardedReadyTransaction message for the first Tx that would be sent
675             // by the ShardTransaction.
676
677             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
678                     cohort1, modification1, true, false), getRef());
679             final ReadyTransactionReply readyReply = ReadyTransactionReply.fromSerializable(
680                     expectMsgClass(duration, ReadyTransactionReply.class));
681             assertEquals("Cohort path", shard.path().toString(), readyReply.getCohortPath());
682
683             // Send the CanCommitTransaction message for the first Tx.
684
685             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
686             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
687                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
688             assertEquals("Can commit", true, canCommitReply.getCanCommit());
689
690             // Send the ForwardedReadyTransaction for the next 2 Tx's.
691
692             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
693                     cohort2, modification2, true, false), getRef());
694             expectMsgClass(duration, ReadyTransactionReply.class);
695
696             shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
697                     cohort3, modification3, true, false), getRef());
698             expectMsgClass(duration, ReadyTransactionReply.class);
699
700             // Send the CanCommitTransaction message for the next 2 Tx's. These should get queued and
701             // processed after the first Tx completes.
702
703             final Future<Object> canCommitFuture1 = Patterns.ask(shard,
704                     new CanCommitTransaction(transactionID2).toSerializable(), timeout);
705
706             final Future<Object> canCommitFuture2 = Patterns.ask(shard,
707                     new CanCommitTransaction(transactionID3).toSerializable(), timeout);
708
709             // Send the CommitTransaction message for the first Tx. After it completes, it should
710             // trigger the 2nd Tx to proceed which should in turn then trigger the 3rd.
711
712             shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
713             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
714
715             // Wait for the next 2 Tx's to complete.
716
717             final AtomicReference<Throwable> caughtEx = new AtomicReference<>();
718             final CountDownLatch commitLatch = new CountDownLatch(2);
719
720             class OnFutureComplete extends OnComplete<Object> {
721                 private final Class<?> expRespType;
722
723                 OnFutureComplete(final Class<?> expRespType) {
724                     this.expRespType = expRespType;
725                 }
726
727                 @Override
728                 public void onComplete(final Throwable error, final Object resp) {
729                     if(error != null) {
730                         caughtEx.set(new AssertionError(getClass().getSimpleName() + " failure", error));
731                     } else {
732                         try {
733                             assertEquals("Commit response type", expRespType, resp.getClass());
734                             onSuccess(resp);
735                         } catch (final Exception e) {
736                             caughtEx.set(e);
737                         }
738                     }
739                 }
740
741                 void onSuccess(final Object resp) throws Exception {
742                 }
743             }
744
745             class OnCommitFutureComplete extends OnFutureComplete {
746                 OnCommitFutureComplete() {
747                     super(CommitTransactionReply.SERIALIZABLE_CLASS);
748                 }
749
750                 @Override
751                 public void onComplete(final Throwable error, final Object resp) {
752                     super.onComplete(error, resp);
753                     commitLatch.countDown();
754                 }
755             }
756
757             class OnCanCommitFutureComplete extends OnFutureComplete {
758                 private final String transactionID;
759
760                 OnCanCommitFutureComplete(final String transactionID) {
761                     super(CanCommitTransactionReply.SERIALIZABLE_CLASS);
762                     this.transactionID = transactionID;
763                 }
764
765                 @Override
766                 void onSuccess(final Object resp) throws Exception {
767                     final CanCommitTransactionReply canCommitReply =
768                             CanCommitTransactionReply.fromSerializable(resp);
769                     assertEquals("Can commit", true, canCommitReply.getCanCommit());
770
771                     final Future<Object> commitFuture = Patterns.ask(shard,
772                             new CommitTransaction(transactionID).toSerializable(), timeout);
773                     commitFuture.onComplete(new OnCommitFutureComplete(), getSystem().dispatcher());
774                 }
775             }
776
777             canCommitFuture1.onComplete(new OnCanCommitFutureComplete(transactionID2),
778                     getSystem().dispatcher());
779
780             canCommitFuture2.onComplete(new OnCanCommitFutureComplete(transactionID3),
781                     getSystem().dispatcher());
782
783             final boolean done = commitLatch.await(timeoutSec, TimeUnit.SECONDS);
784
785             if(caughtEx.get() != null) {
786                 throw caughtEx.get();
787             }
788
789             assertEquals("Commits complete", true, done);
790
791             final InOrder inOrder = inOrder(cohort1, cohort2, cohort3);
792             inOrder.verify(cohort1).canCommit();
793             inOrder.verify(cohort1).preCommit();
794             inOrder.verify(cohort1).commit();
795             inOrder.verify(cohort2).canCommit();
796             inOrder.verify(cohort2).preCommit();
797             inOrder.verify(cohort2).commit();
798             inOrder.verify(cohort3).canCommit();
799             inOrder.verify(cohort3).preCommit();
800             inOrder.verify(cohort3).commit();
801
802             // Verify data in the data store.
803
804             verifyOuterListEntry(shard, 1);
805
806             verifyLastApplied(shard, 2);
807
808             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
809         }};
810     }
811
812     private static BatchedModifications newBatchedModifications(final String transactionID, final YangInstanceIdentifier path,
813             final NormalizedNode<?, ?> data, final boolean ready, final boolean doCommitOnReady, final int messagesSent) {
814         return newBatchedModifications(transactionID, null, path, data, ready, doCommitOnReady, messagesSent);
815     }
816
817     private static BatchedModifications newBatchedModifications(final String transactionID, final String transactionChainID,
818             final YangInstanceIdentifier path, final NormalizedNode<?, ?> data, final boolean ready, final boolean doCommitOnReady,
819             final int messagesSent) {
820         final BatchedModifications batched = new BatchedModifications(transactionID, CURRENT_VERSION, transactionChainID);
821         batched.addModification(new WriteModification(path, data));
822         batched.setReady(ready);
823         batched.setDoCommitOnReady(doCommitOnReady);
824         batched.setTotalMessagesSent(messagesSent);
825         return batched;
826     }
827
828     @Test
829     public void testBatchedModificationsWithNoCommitOnReady() throws Throwable {
830         new ShardTestKit(getSystem()) {{
831             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
832                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
833                     "testBatchedModificationsWithNoCommitOnReady");
834
835             waitUntilLeader(shard);
836
837             final String transactionID = "tx";
838             final FiniteDuration duration = duration("5 seconds");
839
840             final AtomicReference<ShardDataTreeCohort> mockCohort = new AtomicReference<>();
841             final ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() {
842                 @Override
843                 public ShardDataTreeCohort decorate(final String txID, final ShardDataTreeCohort actual) {
844                     if(mockCohort.get() == null) {
845                         mockCohort.set(createDelegatingMockCohort("cohort", actual));
846                     }
847
848                     return mockCohort.get();
849                 }
850             };
851
852             shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator);
853
854             // Send a BatchedModifications to start a transaction.
855
856             shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH,
857                     ImmutableNodes.containerNode(TestModel.TEST_QNAME), false, false, 1), getRef());
858             expectMsgClass(duration, BatchedModificationsReply.class);
859
860             // Send a couple more BatchedModifications.
861
862             shard.tell(newBatchedModifications(transactionID, TestModel.OUTER_LIST_PATH,
863                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), false, false, 2), getRef());
864             expectMsgClass(duration, BatchedModificationsReply.class);
865
866             shard.tell(newBatchedModifications(transactionID, YangInstanceIdentifier.builder(
867                     TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
868                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true, false, 3), getRef());
869             expectMsgClass(duration, ReadyTransactionReply.class);
870
871             // Send the CanCommitTransaction message.
872
873             shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
874             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
875                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
876             assertEquals("Can commit", true, canCommitReply.getCanCommit());
877
878             // Send the CanCommitTransaction message.
879
880             shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
881             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
882
883             final InOrder inOrder = inOrder(mockCohort.get());
884             inOrder.verify(mockCohort.get()).canCommit();
885             inOrder.verify(mockCohort.get()).preCommit();
886             inOrder.verify(mockCohort.get()).commit();
887
888             // Verify data in the data store.
889
890             verifyOuterListEntry(shard, 1);
891
892             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
893         }};
894     }
895
896     @Test
897     public void testBatchedModificationsWithCommitOnReady() throws Throwable {
898         new ShardTestKit(getSystem()) {{
899             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
900                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
901                     "testBatchedModificationsWithCommitOnReady");
902
903             waitUntilLeader(shard);
904
905             final String transactionID = "tx";
906             final FiniteDuration duration = duration("5 seconds");
907
908             final AtomicReference<ShardDataTreeCohort> mockCohort = new AtomicReference<>();
909             final ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() {
910                 @Override
911                 public ShardDataTreeCohort decorate(final String txID, final ShardDataTreeCohort actual) {
912                     if(mockCohort.get() == null) {
913                         mockCohort.set(createDelegatingMockCohort("cohort", actual));
914                     }
915
916                     return mockCohort.get();
917                 }
918             };
919
920             shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator);
921
922             // Send a BatchedModifications to start a transaction.
923
924             shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH,
925                     ImmutableNodes.containerNode(TestModel.TEST_QNAME), false, false, 1), getRef());
926             expectMsgClass(duration, BatchedModificationsReply.class);
927
928             // Send a couple more BatchedModifications.
929
930             shard.tell(newBatchedModifications(transactionID, TestModel.OUTER_LIST_PATH,
931                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), false, false, 2), getRef());
932             expectMsgClass(duration, BatchedModificationsReply.class);
933
934             shard.tell(newBatchedModifications(transactionID, YangInstanceIdentifier.builder(
935                     TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
936                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true, true, 3), getRef());
937
938             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
939
940             final InOrder inOrder = inOrder(mockCohort.get());
941             inOrder.verify(mockCohort.get()).canCommit();
942             inOrder.verify(mockCohort.get()).preCommit();
943             inOrder.verify(mockCohort.get()).commit();
944
945             // Verify data in the data store.
946
947             verifyOuterListEntry(shard, 1);
948
949             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
950         }};
951     }
952
953     @Test(expected=IllegalStateException.class)
954     public void testBatchedModificationsReadyWithIncorrectTotalMessageCount() throws Throwable {
955         new ShardTestKit(getSystem()) {{
956             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
957                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
958                     "testBatchedModificationsReadyWithIncorrectTotalMessageCount");
959
960             waitUntilLeader(shard);
961
962             final String transactionID = "tx1";
963             final BatchedModifications batched = new BatchedModifications(transactionID, DataStoreVersions.CURRENT_VERSION, null);
964             batched.setReady(true);
965             batched.setTotalMessagesSent(2);
966
967             shard.tell(batched, getRef());
968
969             final Failure failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
970
971             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
972
973             if(failure != null) {
974                 throw failure.cause();
975             }
976         }};
977     }
978
979     @Test
980     public void testBatchedModificationsWithOperationFailure() throws Throwable {
981         new ShardTestKit(getSystem()) {{
982             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
983                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
984                     "testBatchedModificationsWithOperationFailure");
985
986             waitUntilLeader(shard);
987
988             // Test merge with invalid data. An exception should occur when the merge is applied. Note that
989             // write will not validate the children for performance reasons.
990
991             String transactionID = "tx1";
992
993             ContainerNode invalidData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
994                     new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
995                         withChild(ImmutableNodes.leafNode(TestModel.JUNK_QNAME, "junk")).build();
996
997             BatchedModifications batched = new BatchedModifications(transactionID, CURRENT_VERSION, null);
998             batched.addModification(new MergeModification(TestModel.TEST_PATH, invalidData));
999             shard.tell(batched, getRef());
1000             Failure failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
1001
1002             Throwable cause = failure.cause();
1003
1004             batched = new BatchedModifications(transactionID, DataStoreVersions.CURRENT_VERSION, null);
1005             batched.setReady(true);
1006             batched.setTotalMessagesSent(2);
1007
1008             shard.tell(batched, getRef());
1009
1010             failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
1011             assertEquals("Failure cause", cause, failure.cause());
1012
1013             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1014         }};
1015     }
1016
1017     @SuppressWarnings("unchecked")
1018     private static void verifyOuterListEntry(final TestActorRef<Shard> shard, final Object expIDValue) throws Exception {
1019         final NormalizedNode<?, ?> outerList = readStore(shard, TestModel.OUTER_LIST_PATH);
1020         assertNotNull(TestModel.OUTER_LIST_QNAME.getLocalName() + " not found", outerList);
1021         assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " value is not Iterable",
1022                 outerList.getValue() instanceof Iterable);
1023         final Object entry = ((Iterable<Object>)outerList.getValue()).iterator().next();
1024         assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " entry is not MapEntryNode",
1025                 entry instanceof MapEntryNode);
1026         final MapEntryNode mapEntry = (MapEntryNode)entry;
1027         final Optional<DataContainerChild<? extends PathArgument, ?>> idLeaf =
1028                 mapEntry.getChild(new YangInstanceIdentifier.NodeIdentifier(TestModel.ID_QNAME));
1029         assertTrue("Missing leaf " + TestModel.ID_QNAME.getLocalName(), idLeaf.isPresent());
1030         assertEquals(TestModel.ID_QNAME.getLocalName() + " value", expIDValue, idLeaf.get().getValue());
1031     }
1032
1033     @Test
1034     public void testBatchedModificationsOnTransactionChain() throws Throwable {
1035         new ShardTestKit(getSystem()) {{
1036             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1037                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1038                     "testBatchedModificationsOnTransactionChain");
1039
1040             waitUntilLeader(shard);
1041
1042             final String transactionChainID = "txChain";
1043             final String transactionID1 = "tx1";
1044             final String transactionID2 = "tx2";
1045
1046             final FiniteDuration duration = duration("5 seconds");
1047
1048             // Send a BatchedModifications to start a chained write transaction and ready it.
1049
1050             final ContainerNode containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1051             final YangInstanceIdentifier path = TestModel.TEST_PATH;
1052             shard.tell(newBatchedModifications(transactionID1, transactionChainID, path,
1053                     containerNode, true, false, 1), getRef());
1054             expectMsgClass(duration, ReadyTransactionReply.class);
1055
1056             // Create a read Tx on the same chain.
1057
1058             shard.tell(new CreateTransaction(transactionID2, TransactionType.READ_ONLY.ordinal() ,
1059                     transactionChainID).toSerializable(), getRef());
1060
1061             final CreateTransactionReply createReply = expectMsgClass(duration("3 seconds"), CreateTransactionReply.class);
1062
1063             getSystem().actorSelection(createReply.getTransactionActorPath()).tell(new ReadData(path), getRef());
1064             final ReadDataReply readReply = expectMsgClass(duration("3 seconds"), ReadDataReply.class);
1065             assertEquals("Read node", containerNode, readReply.getNormalizedNode());
1066
1067             // Commit the write transaction.
1068
1069             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1070             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1071                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1072             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1073
1074             shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
1075             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1076
1077             // Verify data in the data store.
1078
1079             final NormalizedNode<?, ?> actualNode = readStore(shard, path);
1080             assertEquals("Stored node", containerNode, actualNode);
1081
1082             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1083         }};
1084     }
1085
1086     @Test
1087     public void testOnBatchedModificationsWhenNotLeader() {
1088         final AtomicBoolean overrideLeaderCalls = new AtomicBoolean();
1089         new ShardTestKit(getSystem()) {{
1090             final Creator<Shard> creator = new Creator<Shard>() {
1091                 private static final long serialVersionUID = 1L;
1092
1093                 @Override
1094                 public Shard create() throws Exception {
1095                     return new Shard(shardID, Collections.<String,String>emptyMap(),
1096                             newDatastoreContext(), SCHEMA_CONTEXT) {
1097                         @Override
1098                         protected boolean isLeader() {
1099                             return overrideLeaderCalls.get() ? false : super.isLeader();
1100                         }
1101
1102                         @Override
1103                         protected ActorSelection getLeader() {
1104                             return overrideLeaderCalls.get() ? getSystem().actorSelection(getRef().path()) :
1105                                 super.getLeader();
1106                         }
1107                     };
1108                 }
1109             };
1110
1111             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1112                     Props.create(new DelegatingShardCreator(creator)), "testOnBatchedModificationsWhenNotLeader");
1113
1114             waitUntilLeader(shard);
1115
1116             overrideLeaderCalls.set(true);
1117
1118             final BatchedModifications batched = new BatchedModifications("tx", DataStoreVersions.CURRENT_VERSION, "");
1119
1120             shard.tell(batched, ActorRef.noSender());
1121
1122             expectMsgEquals(batched);
1123
1124             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1125         }};
1126     }
1127
1128     @Test
1129     public void testForwardedReadyTransactionWithImmediateCommit() throws Exception{
1130         new ShardTestKit(getSystem()) {{
1131             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1132                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1133                     "testForwardedReadyTransactionWithImmediateCommit");
1134
1135             waitUntilLeader(shard);
1136
1137             final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1138
1139             final String transactionID = "tx1";
1140             final MutableCompositeModification modification = new MutableCompositeModification();
1141             final NormalizedNode<?, ?> containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1142             final ShardDataTreeCohort cohort = setupMockWriteTransaction("cohort", dataStore,
1143                     TestModel.TEST_PATH, containerNode, modification);
1144
1145             final FiniteDuration duration = duration("5 seconds");
1146
1147             // Simulate the ForwardedReadyTransaction messages that would be sent
1148             // by the ShardTransaction.
1149
1150             shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
1151                     cohort, modification, true, true), getRef());
1152
1153             expectMsgClass(duration, ThreePhaseCommitCohortMessages.CommitTransactionReply.class);
1154
1155             final InOrder inOrder = inOrder(cohort);
1156             inOrder.verify(cohort).canCommit();
1157             inOrder.verify(cohort).preCommit();
1158             inOrder.verify(cohort).commit();
1159
1160             final NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.TEST_PATH);
1161             assertEquals(TestModel.TEST_QNAME.getLocalName(), containerNode, actualNode);
1162
1163             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1164         }};
1165     }
1166
1167     @Test
1168     public void testReadyLocalTransactionWithImmediateCommit() throws Exception{
1169         new ShardTestKit(getSystem()) {{
1170             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1171                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1172                     "testReadyLocalTransactionWithImmediateCommit");
1173
1174             waitUntilLeader(shard);
1175
1176             final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1177
1178             final DataTreeModification modification = dataStore.getDataTree().takeSnapshot().newModification();
1179
1180             final ContainerNode writeData = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1181             new WriteModification(TestModel.TEST_PATH, writeData).apply(modification);
1182             final MapNode mergeData = ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build();
1183             new MergeModification(TestModel.OUTER_LIST_PATH, mergeData).apply(modification);
1184
1185             final String txId = "tx1";
1186             modification.ready();
1187             final ReadyLocalTransaction readyMessage = new ReadyLocalTransaction(txId, modification, true);
1188
1189             shard.tell(readyMessage, getRef());
1190
1191             expectMsgClass(CommitTransactionReply.SERIALIZABLE_CLASS);
1192
1193             final NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.OUTER_LIST_PATH);
1194             assertEquals(TestModel.OUTER_LIST_QNAME.getLocalName(), mergeData, actualNode);
1195
1196             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1197         }};
1198     }
1199
1200     @Test
1201     public void testReadyLocalTransactionWithThreePhaseCommit() throws Exception{
1202         new ShardTestKit(getSystem()) {{
1203             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1204                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1205                     "testReadyLocalTransactionWithThreePhaseCommit");
1206
1207             waitUntilLeader(shard);
1208
1209             final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1210
1211             final DataTreeModification modification = dataStore.getDataTree().takeSnapshot().newModification();
1212
1213             final ContainerNode writeData = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1214             new WriteModification(TestModel.TEST_PATH, writeData).apply(modification);
1215             final MapNode mergeData = ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build();
1216             new MergeModification(TestModel.OUTER_LIST_PATH, mergeData).apply(modification);
1217
1218             final String txId = "tx1";
1219                 modification.ready();
1220             final ReadyLocalTransaction readyMessage = new ReadyLocalTransaction(txId, modification, false);
1221
1222             shard.tell(readyMessage, getRef());
1223
1224             expectMsgClass(ReadyTransactionReply.class);
1225
1226             // Send the CanCommitTransaction message.
1227
1228             shard.tell(new CanCommitTransaction(txId).toSerializable(), getRef());
1229             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1230                     expectMsgClass(CanCommitTransactionReply.SERIALIZABLE_CLASS));
1231             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1232
1233             // Send the CanCommitTransaction message.
1234
1235             shard.tell(new CommitTransaction(txId).toSerializable(), getRef());
1236             expectMsgClass(CommitTransactionReply.SERIALIZABLE_CLASS);
1237
1238             final NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.OUTER_LIST_PATH);
1239             assertEquals(TestModel.OUTER_LIST_QNAME.getLocalName(), mergeData, actualNode);
1240
1241             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1242         }};
1243     }
1244
1245     @Test
1246     public void testCommitWithPersistenceDisabled() throws Throwable {
1247         dataStoreContextBuilder.persistent(false);
1248         new ShardTestKit(getSystem()) {{
1249             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1250                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1251                     "testCommitWithPersistenceDisabled");
1252
1253             waitUntilLeader(shard);
1254
1255             final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1256
1257             // Setup a simulated transactions with a mock cohort.
1258
1259             final String transactionID = "tx";
1260             final MutableCompositeModification modification = new MutableCompositeModification();
1261             final NormalizedNode<?, ?> containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1262             final ShardDataTreeCohort cohort = setupMockWriteTransaction("cohort", dataStore,
1263                     TestModel.TEST_PATH, containerNode, modification);
1264
1265             final FiniteDuration duration = duration("5 seconds");
1266
1267             // Simulate the ForwardedReadyTransaction messages that would be sent
1268             // by the ShardTransaction.
1269
1270             shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
1271                     cohort, modification, true, false), getRef());
1272             expectMsgClass(duration, ReadyTransactionReply.class);
1273
1274             // Send the CanCommitTransaction message.
1275
1276             shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
1277             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1278                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1279             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1280
1281             // Send the CanCommitTransaction message.
1282
1283             shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
1284             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1285
1286             final InOrder inOrder = inOrder(cohort);
1287             inOrder.verify(cohort).canCommit();
1288             inOrder.verify(cohort).preCommit();
1289             inOrder.verify(cohort).commit();
1290
1291             final NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.TEST_PATH);
1292             assertEquals(TestModel.TEST_QNAME.getLocalName(), containerNode, actualNode);
1293
1294             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1295         }};
1296     }
1297
1298     private static DataTreeCandidateTip mockCandidate(final String name) {
1299         final DataTreeCandidateTip mockCandidate = mock(DataTreeCandidateTip.class, name);
1300         final DataTreeCandidateNode mockCandidateNode = mock(DataTreeCandidateNode.class, name + "-node");
1301         doReturn(ModificationType.WRITE).when(mockCandidateNode).getModificationType();
1302         doReturn(Optional.of(ImmutableNodes.containerNode(CARS_QNAME))).when(mockCandidateNode).getDataAfter();
1303         doReturn(YangInstanceIdentifier.builder().build()).when(mockCandidate).getRootPath();
1304         doReturn(mockCandidateNode).when(mockCandidate).getRootNode();
1305         return mockCandidate;
1306     }
1307
1308     private static DataTreeCandidateTip mockUnmodifiedCandidate(final String name) {
1309         final DataTreeCandidateTip mockCandidate = mock(DataTreeCandidateTip.class, name);
1310         final DataTreeCandidateNode mockCandidateNode = mock(DataTreeCandidateNode.class, name + "-node");
1311         doReturn(ModificationType.UNMODIFIED).when(mockCandidateNode).getModificationType();
1312         doReturn(YangInstanceIdentifier.builder().build()).when(mockCandidate).getRootPath();
1313         doReturn(mockCandidateNode).when(mockCandidate).getRootNode();
1314         return mockCandidate;
1315     }
1316
1317     @Test
1318     public void testCommitWhenTransactionHasNoModifications(){
1319         // Note that persistence is enabled which would normally result in the entry getting written to the journal
1320         // but here that need not happen
1321         new ShardTestKit(getSystem()) {
1322             {
1323                 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1324                         newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1325                         "testCommitWhenTransactionHasNoModifications");
1326
1327                 waitUntilLeader(shard);
1328
1329                 final String transactionID = "tx1";
1330                 final MutableCompositeModification modification = new MutableCompositeModification();
1331                 final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1332                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1333                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).preCommit();
1334                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).commit();
1335                 doReturn(mockUnmodifiedCandidate("cohort1-candidate")).when(cohort).getCandidate();
1336
1337                 final FiniteDuration duration = duration("5 seconds");
1338
1339                 // Simulate the ForwardedReadyTransaction messages that would be sent
1340                 // by the ShardTransaction.
1341
1342                 shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
1343                         cohort, modification, true, false), getRef());
1344                 expectMsgClass(duration, ReadyTransactionReply.class);
1345
1346                 // Send the CanCommitTransaction message.
1347
1348                 shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
1349                 final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1350                         expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1351                 assertEquals("Can commit", true, canCommitReply.getCanCommit());
1352
1353                 shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
1354                 expectMsgClass(duration, ThreePhaseCommitCohortMessages.CommitTransactionReply.class);
1355
1356                 final InOrder inOrder = inOrder(cohort);
1357                 inOrder.verify(cohort).canCommit();
1358                 inOrder.verify(cohort).preCommit();
1359                 inOrder.verify(cohort).commit();
1360
1361                 shard.tell(Shard.GET_SHARD_MBEAN_MESSAGE, getRef());
1362                 final ShardStats shardStats = expectMsgClass(duration, ShardStats.class);
1363
1364                 // Use MBean for verification
1365                 // Committed transaction count should increase as usual
1366                 assertEquals(1,shardStats.getCommittedTransactionsCount());
1367
1368                 // Commit index should not advance because this does not go into the journal
1369                 assertEquals(-1, shardStats.getCommitIndex());
1370
1371                 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1372
1373             }
1374         };
1375     }
1376
1377     @Test
1378     public void testCommitWhenTransactionHasModifications(){
1379         new ShardTestKit(getSystem()) {
1380             {
1381                 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1382                         newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1383                         "testCommitWhenTransactionHasModifications");
1384
1385                 waitUntilLeader(shard);
1386
1387                 final String transactionID = "tx1";
1388                 final MutableCompositeModification modification = new MutableCompositeModification();
1389                 modification.addModification(new DeleteModification(YangInstanceIdentifier.builder().build()));
1390                 final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1391                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1392                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).preCommit();
1393                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).commit();
1394                 doReturn(mockCandidate("cohort1-candidate")).when(cohort).getCandidate();
1395
1396                 final FiniteDuration duration = duration("5 seconds");
1397
1398                 // Simulate the ForwardedReadyTransaction messages that would be sent
1399                 // by the ShardTransaction.
1400
1401                 shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
1402                         cohort, modification, true, false), getRef());
1403                 expectMsgClass(duration, ReadyTransactionReply.class);
1404
1405                 // Send the CanCommitTransaction message.
1406
1407                 shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
1408                 final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1409                         expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1410                 assertEquals("Can commit", true, canCommitReply.getCanCommit());
1411
1412                 shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
1413                 expectMsgClass(duration, ThreePhaseCommitCohortMessages.CommitTransactionReply.class);
1414
1415                 final InOrder inOrder = inOrder(cohort);
1416                 inOrder.verify(cohort).canCommit();
1417                 inOrder.verify(cohort).preCommit();
1418                 inOrder.verify(cohort).commit();
1419
1420                 shard.tell(Shard.GET_SHARD_MBEAN_MESSAGE, getRef());
1421                 final ShardStats shardStats = expectMsgClass(duration, ShardStats.class);
1422
1423                 // Use MBean for verification
1424                 // Committed transaction count should increase as usual
1425                 assertEquals(1, shardStats.getCommittedTransactionsCount());
1426
1427                 // Commit index should advance as we do not have an empty modification
1428                 assertEquals(0, shardStats.getCommitIndex());
1429
1430                 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1431
1432             }
1433         };
1434     }
1435
1436     @Test
1437     public void testCommitPhaseFailure() throws Throwable {
1438         new ShardTestKit(getSystem()) {{
1439             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1440                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1441                     "testCommitPhaseFailure");
1442
1443             waitUntilLeader(shard);
1444
1445             // Setup 2 simulated transactions with mock cohorts. The first one fails in the
1446             // commit phase.
1447
1448             final String transactionID1 = "tx1";
1449             final MutableCompositeModification modification1 = new MutableCompositeModification();
1450             final ShardDataTreeCohort cohort1 = mock(ShardDataTreeCohort.class, "cohort1");
1451             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
1452             doReturn(Futures.immediateFuture(null)).when(cohort1).preCommit();
1453             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort1).commit();
1454             doReturn(mockCandidate("cohort1-candidate")).when(cohort1).getCandidate();
1455
1456             final String transactionID2 = "tx2";
1457             final MutableCompositeModification modification2 = new MutableCompositeModification();
1458             final ShardDataTreeCohort cohort2 = mock(ShardDataTreeCohort.class, "cohort2");
1459             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
1460
1461             final FiniteDuration duration = duration("5 seconds");
1462             final Timeout timeout = new Timeout(duration);
1463
1464             // Simulate the ForwardedReadyTransaction messages that would be sent
1465             // by the ShardTransaction.
1466
1467             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1468                     cohort1, modification1, true, false), getRef());
1469             expectMsgClass(duration, ReadyTransactionReply.class);
1470
1471             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1472                     cohort2, modification2, true, false), getRef());
1473             expectMsgClass(duration, ReadyTransactionReply.class);
1474
1475             // Send the CanCommitTransaction message for the first Tx.
1476
1477             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1478             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1479                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1480             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1481
1482             // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
1483             // processed after the first Tx completes.
1484
1485             final Future<Object> canCommitFuture = Patterns.ask(shard,
1486                     new CanCommitTransaction(transactionID2).toSerializable(), timeout);
1487
1488             // Send the CommitTransaction message for the first Tx. This should send back an error
1489             // and trigger the 2nd Tx to proceed.
1490
1491             shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
1492             expectMsgClass(duration, akka.actor.Status.Failure.class);
1493
1494             // Wait for the 2nd Tx to complete the canCommit phase.
1495
1496             final CountDownLatch latch = new CountDownLatch(1);
1497             canCommitFuture.onComplete(new OnComplete<Object>() {
1498                 @Override
1499                 public void onComplete(final Throwable t, final Object resp) {
1500                     latch.countDown();
1501                 }
1502             }, getSystem().dispatcher());
1503
1504             assertEquals("2nd CanCommit complete", true, latch.await(5, TimeUnit.SECONDS));
1505
1506             final InOrder inOrder = inOrder(cohort1, cohort2);
1507             inOrder.verify(cohort1).canCommit();
1508             inOrder.verify(cohort1).preCommit();
1509             inOrder.verify(cohort1).commit();
1510             inOrder.verify(cohort2).canCommit();
1511
1512             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1513         }};
1514     }
1515
1516     @Test
1517     public void testPreCommitPhaseFailure() throws Throwable {
1518         new ShardTestKit(getSystem()) {{
1519             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1520                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1521                     "testPreCommitPhaseFailure");
1522
1523             waitUntilLeader(shard);
1524
1525             final String transactionID1 = "tx1";
1526             final MutableCompositeModification modification1 = new MutableCompositeModification();
1527             final ShardDataTreeCohort cohort1 = mock(ShardDataTreeCohort.class, "cohort1");
1528             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
1529             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort1).preCommit();
1530
1531             final String transactionID2 = "tx2";
1532             final MutableCompositeModification modification2 = new MutableCompositeModification();
1533             final ShardDataTreeCohort cohort2 = mock(ShardDataTreeCohort.class, "cohort2");
1534             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
1535
1536             final FiniteDuration duration = duration("5 seconds");
1537             final Timeout timeout = new Timeout(duration);
1538
1539             // Simulate the ForwardedReadyTransaction messages that would be sent
1540             // by the ShardTransaction.
1541
1542             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1543                     cohort1, modification1, true, false), getRef());
1544             expectMsgClass(duration, ReadyTransactionReply.class);
1545
1546             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1547                     cohort2, modification2, true, false), getRef());
1548             expectMsgClass(duration, ReadyTransactionReply.class);
1549
1550             // Send the CanCommitTransaction message for the first Tx.
1551
1552             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1553             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1554                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1555             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1556
1557             // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
1558             // processed after the first Tx completes.
1559
1560             final Future<Object> canCommitFuture = Patterns.ask(shard,
1561                     new CanCommitTransaction(transactionID2).toSerializable(), timeout);
1562
1563             // Send the CommitTransaction message for the first Tx. This should send back an error
1564             // and trigger the 2nd Tx to proceed.
1565
1566             shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
1567             expectMsgClass(duration, akka.actor.Status.Failure.class);
1568
1569             // Wait for the 2nd Tx to complete the canCommit phase.
1570
1571             final CountDownLatch latch = new CountDownLatch(1);
1572             canCommitFuture.onComplete(new OnComplete<Object>() {
1573                 @Override
1574                 public void onComplete(final Throwable t, final Object resp) {
1575                     latch.countDown();
1576                 }
1577             }, getSystem().dispatcher());
1578
1579             assertEquals("2nd CanCommit complete", true, latch.await(5, TimeUnit.SECONDS));
1580
1581             final InOrder inOrder = inOrder(cohort1, cohort2);
1582             inOrder.verify(cohort1).canCommit();
1583             inOrder.verify(cohort1).preCommit();
1584             inOrder.verify(cohort2).canCommit();
1585
1586             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1587         }};
1588     }
1589
1590     @Test
1591     public void testCanCommitPhaseFailure() throws Throwable {
1592         new ShardTestKit(getSystem()) {{
1593             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1594                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1595                     "testCanCommitPhaseFailure");
1596
1597             waitUntilLeader(shard);
1598
1599             final FiniteDuration duration = duration("5 seconds");
1600
1601             final String transactionID1 = "tx1";
1602             final MutableCompositeModification modification = new MutableCompositeModification();
1603             final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1604             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).canCommit();
1605
1606             // Simulate the ForwardedReadyTransaction messages that would be sent
1607             // by the ShardTransaction.
1608
1609             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1610                     cohort, modification, true, false), getRef());
1611             expectMsgClass(duration, ReadyTransactionReply.class);
1612
1613             // Send the CanCommitTransaction message.
1614
1615             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1616             expectMsgClass(duration, akka.actor.Status.Failure.class);
1617
1618             // Send another can commit to ensure the failed one got cleaned up.
1619
1620             reset(cohort);
1621
1622             final String transactionID2 = "tx2";
1623             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1624
1625             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1626                     cohort, modification, true, false), getRef());
1627             expectMsgClass(duration, ReadyTransactionReply.class);
1628
1629             shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
1630             final CanCommitTransactionReply reply = CanCommitTransactionReply.fromSerializable(
1631                     expectMsgClass(CanCommitTransactionReply.SERIALIZABLE_CLASS));
1632             assertEquals("getCanCommit", true, reply.getCanCommit());
1633
1634             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1635         }};
1636     }
1637
1638     @Test
1639     public void testCanCommitPhaseFalseResponse() throws Throwable {
1640         new ShardTestKit(getSystem()) {{
1641             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1642                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1643                     "testCanCommitPhaseFalseResponse");
1644
1645             waitUntilLeader(shard);
1646
1647             final FiniteDuration duration = duration("5 seconds");
1648
1649             final String transactionID1 = "tx1";
1650             final MutableCompositeModification modification = new MutableCompositeModification();
1651             final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1652             doReturn(Futures.immediateFuture(Boolean.FALSE)).when(cohort).canCommit();
1653
1654             // Simulate the ForwardedReadyTransaction messages that would be sent
1655             // by the ShardTransaction.
1656
1657             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1658                     cohort, modification, true, false), getRef());
1659             expectMsgClass(duration, ReadyTransactionReply.class);
1660
1661             // Send the CanCommitTransaction message.
1662
1663             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1664             CanCommitTransactionReply reply = CanCommitTransactionReply.fromSerializable(
1665                     expectMsgClass(CanCommitTransactionReply.SERIALIZABLE_CLASS));
1666             assertEquals("getCanCommit", false, reply.getCanCommit());
1667
1668             // Send another can commit to ensure the failed one got cleaned up.
1669
1670             reset(cohort);
1671
1672             final String transactionID2 = "tx2";
1673             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1674
1675             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1676                     cohort, modification, true, false), getRef());
1677             expectMsgClass(duration, ReadyTransactionReply.class);
1678
1679             shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
1680             reply = CanCommitTransactionReply.fromSerializable(
1681                     expectMsgClass(CanCommitTransactionReply.SERIALIZABLE_CLASS));
1682             assertEquals("getCanCommit", true, reply.getCanCommit());
1683
1684             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1685         }};
1686     }
1687
1688     @Test
1689     public void testImmediateCommitWithCanCommitPhaseFailure() throws Throwable {
1690         new ShardTestKit(getSystem()) {{
1691             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1692                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1693                     "testImmediateCommitWithCanCommitPhaseFailure");
1694
1695             waitUntilLeader(shard);
1696
1697             final FiniteDuration duration = duration("5 seconds");
1698
1699             final String transactionID1 = "tx1";
1700             final MutableCompositeModification modification = new MutableCompositeModification();
1701             final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1702             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).canCommit();
1703
1704             // Simulate the ForwardedReadyTransaction messages that would be sent
1705             // by the ShardTransaction.
1706
1707             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1708                     cohort, modification, true, true), getRef());
1709
1710             expectMsgClass(duration, akka.actor.Status.Failure.class);
1711
1712             // Send another can commit to ensure the failed one got cleaned up.
1713
1714             reset(cohort);
1715
1716             final String transactionID2 = "tx2";
1717             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1718             doReturn(Futures.immediateFuture(null)).when(cohort).preCommit();
1719             doReturn(Futures.immediateFuture(null)).when(cohort).commit();
1720             final DataTreeCandidateTip candidate = mock(DataTreeCandidateTip.class);
1721             final DataTreeCandidateNode candidateRoot = mock(DataTreeCandidateNode.class);
1722             doReturn(ModificationType.UNMODIFIED).when(candidateRoot).getModificationType();
1723             doReturn(candidateRoot).when(candidate).getRootNode();
1724             doReturn(candidate).when(cohort).getCandidate();
1725
1726             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1727                     cohort, modification, true, true), getRef());
1728
1729             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1730
1731             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1732         }};
1733     }
1734
1735     @Test
1736     public void testImmediateCommitWithCanCommitPhaseFalseResponse() throws Throwable {
1737         new ShardTestKit(getSystem()) {{
1738             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1739                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1740                     "testImmediateCommitWithCanCommitPhaseFalseResponse");
1741
1742             waitUntilLeader(shard);
1743
1744             final FiniteDuration duration = duration("5 seconds");
1745
1746             final String transactionID = "tx1";
1747             final MutableCompositeModification modification = new MutableCompositeModification();
1748             final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1749             doReturn(Futures.immediateFuture(Boolean.FALSE)).when(cohort).canCommit();
1750
1751             // Simulate the ForwardedReadyTransaction messages that would be sent
1752             // by the ShardTransaction.
1753
1754             shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
1755                     cohort, modification, true, true), getRef());
1756
1757             expectMsgClass(duration, akka.actor.Status.Failure.class);
1758
1759             // Send another can commit to ensure the failed one got cleaned up.
1760
1761             reset(cohort);
1762
1763             final String transactionID2 = "tx2";
1764             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1765             doReturn(Futures.immediateFuture(null)).when(cohort).preCommit();
1766             doReturn(Futures.immediateFuture(null)).when(cohort).commit();
1767             final DataTreeCandidateTip candidate = mock(DataTreeCandidateTip.class);
1768             final DataTreeCandidateNode candidateRoot = mock(DataTreeCandidateNode.class);
1769             doReturn(ModificationType.UNMODIFIED).when(candidateRoot).getModificationType();
1770             doReturn(candidateRoot).when(candidate).getRootNode();
1771             doReturn(candidate).when(cohort).getCandidate();
1772
1773             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1774                     cohort, modification, true, true), getRef());
1775
1776             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1777
1778             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1779         }};
1780     }
1781
1782     @Test
1783     public void testAbortBeforeFinishCommit() throws Throwable {
1784         new ShardTestKit(getSystem()) {{
1785             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1786                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1787                     "testAbortBeforeFinishCommit");
1788
1789             waitUntilLeader(shard);
1790
1791             final FiniteDuration duration = duration("5 seconds");
1792             final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1793
1794             final String transactionID = "tx1";
1795             final Function<ShardDataTreeCohort, ListenableFuture<Void>> preCommit =
1796                           new Function<ShardDataTreeCohort, ListenableFuture<Void>>() {
1797                 @Override
1798                 public ListenableFuture<Void> apply(final ShardDataTreeCohort cohort) {
1799                     final ListenableFuture<Void> preCommitFuture = cohort.preCommit();
1800
1801                     // Simulate an AbortTransaction message occurring during replication, after
1802                     // persisting and before finishing the commit to the in-memory store.
1803                     // We have no followers so due to optimizations in the RaftActor, it does not
1804                     // attempt replication and thus we can't send an AbortTransaction message b/c
1805                     // it would be processed too late after CommitTransaction completes. So we'll
1806                     // simulate an AbortTransaction message occurring during replication by calling
1807                     // the shard directly.
1808                     //
1809                     shard.underlyingActor().doAbortTransaction(transactionID, null);
1810
1811                     return preCommitFuture;
1812                 }
1813             };
1814
1815             final MutableCompositeModification modification = new MutableCompositeModification();
1816             final ShardDataTreeCohort cohort = setupMockWriteTransaction("cohort1", dataStore,
1817                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME),
1818                     modification, preCommit);
1819
1820             shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
1821                     cohort, modification, true, false), getRef());
1822             expectMsgClass(duration, ReadyTransactionReply.class);
1823
1824             shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
1825             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1826                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1827             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1828
1829             shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
1830             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1831
1832             final NormalizedNode<?, ?> node = readStore(shard, TestModel.TEST_PATH);
1833
1834             // Since we're simulating an abort occurring during replication and before finish commit,
1835             // the data should still get written to the in-memory store since we've gotten past
1836             // canCommit and preCommit and persisted the data.
1837             assertNotNull(TestModel.TEST_QNAME.getLocalName() + " not found", node);
1838
1839             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1840         }};
1841     }
1842
1843     @Test
1844     public void testTransactionCommitTimeout() throws Throwable {
1845         dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1);
1846
1847         new ShardTestKit(getSystem()) {{
1848             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1849                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1850                     "testTransactionCommitTimeout");
1851
1852             waitUntilLeader(shard);
1853
1854             final FiniteDuration duration = duration("5 seconds");
1855
1856             final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1857
1858             writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1859             writeToStore(shard, TestModel.OUTER_LIST_PATH,
1860                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
1861
1862             // Create 1st Tx - will timeout
1863
1864             final String transactionID1 = "tx1";
1865             final MutableCompositeModification modification1 = new MutableCompositeModification();
1866             final ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
1867                     YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
1868                         .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
1869                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1),
1870                     modification1);
1871
1872             // Create 2nd Tx
1873
1874             final String transactionID2 = "tx3";
1875             final MutableCompositeModification modification2 = new MutableCompositeModification();
1876             final YangInstanceIdentifier listNodePath = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
1877                 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build();
1878             final ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort3", dataStore,
1879                     listNodePath,
1880                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2),
1881                     modification2);
1882
1883             // Ready the Tx's
1884
1885             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1886                     cohort1, modification1, true, false), getRef());
1887             expectMsgClass(duration, ReadyTransactionReply.class);
1888
1889             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1890                     cohort2, modification2, true, false), getRef());
1891             expectMsgClass(duration, ReadyTransactionReply.class);
1892
1893             // canCommit 1st Tx. We don't send the commit so it should timeout.
1894
1895             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1896             expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
1897
1898             // canCommit the 2nd Tx - it should complete after the 1st Tx times out.
1899
1900             shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
1901             expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
1902
1903             // Try to commit the 1st Tx - should fail as it's not the current Tx.
1904
1905             shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
1906             expectMsgClass(duration, akka.actor.Status.Failure.class);
1907
1908             // Commit the 2nd Tx.
1909
1910             shard.tell(new CommitTransaction(transactionID2).toSerializable(), getRef());
1911             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1912
1913             final NormalizedNode<?, ?> node = readStore(shard, listNodePath);
1914             assertNotNull(listNodePath + " not found", node);
1915
1916             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1917         }};
1918     }
1919
1920     @Test
1921     public void testTransactionCommitQueueCapacityExceeded() throws Throwable {
1922         dataStoreContextBuilder.shardTransactionCommitQueueCapacity(2);
1923
1924         new ShardTestKit(getSystem()) {{
1925             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1926                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1927                     "testTransactionCommitQueueCapacityExceeded");
1928
1929             waitUntilLeader(shard);
1930
1931             final FiniteDuration duration = duration("5 seconds");
1932
1933             final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1934
1935             final String transactionID1 = "tx1";
1936             final MutableCompositeModification modification1 = new MutableCompositeModification();
1937             final ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
1938                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
1939
1940             final String transactionID2 = "tx2";
1941             final MutableCompositeModification modification2 = new MutableCompositeModification();
1942             final ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
1943                     TestModel.OUTER_LIST_PATH,
1944                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
1945                     modification2);
1946
1947             final String transactionID3 = "tx3";
1948             final MutableCompositeModification modification3 = new MutableCompositeModification();
1949             final ShardDataTreeCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
1950                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification3);
1951
1952             // Ready the Tx's
1953
1954             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1955                     cohort1, modification1, true, false), getRef());
1956             expectMsgClass(duration, ReadyTransactionReply.class);
1957
1958             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1959                     cohort2, modification2, true, false), getRef());
1960             expectMsgClass(duration, ReadyTransactionReply.class);
1961
1962             // The 3rd Tx should exceed queue capacity and fail.
1963
1964             shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
1965                     cohort3, modification3, true, false), getRef());
1966             expectMsgClass(duration, akka.actor.Status.Failure.class);
1967
1968             // canCommit 1st Tx.
1969
1970             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1971             expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
1972
1973             // canCommit the 2nd Tx - it should get queued.
1974
1975             shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
1976
1977             // canCommit the 3rd Tx - should exceed queue capacity and fail.
1978
1979             shard.tell(new CanCommitTransaction(transactionID3).toSerializable(), getRef());
1980             expectMsgClass(duration, akka.actor.Status.Failure.class);
1981
1982             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1983         }};
1984     }
1985
1986     @Test
1987     public void testTransactionCommitWithPriorExpiredCohortEntries() throws Throwable {
1988         dataStoreContextBuilder.shardCommitQueueExpiryTimeoutInMillis(1300).shardTransactionCommitTimeoutInSeconds(1);
1989
1990         new ShardTestKit(getSystem()) {{
1991             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1992                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1993                     "testTransactionCommitWithPriorExpiredCohortEntries");
1994
1995             waitUntilLeader(shard);
1996
1997             final FiniteDuration duration = duration("5 seconds");
1998
1999             final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
2000
2001             final String transactionID1 = "tx1";
2002             final MutableCompositeModification modification1 = new MutableCompositeModification();
2003             final ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
2004                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
2005
2006             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
2007                     cohort1, modification1, true, false), getRef());
2008             expectMsgClass(duration, ReadyTransactionReply.class);
2009
2010             final String transactionID2 = "tx2";
2011             final MutableCompositeModification modification2 = new MutableCompositeModification();
2012             final ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
2013                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification2);
2014
2015             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
2016                     cohort2, modification2, true, false), getRef());
2017             expectMsgClass(duration, ReadyTransactionReply.class);
2018
2019             final String transactionID3 = "tx3";
2020             final MutableCompositeModification modification3 = new MutableCompositeModification();
2021             final ShardDataTreeCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
2022                     TestModel.TEST2_PATH, ImmutableNodes.containerNode(TestModel.TEST2_QNAME), modification3);
2023
2024             shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
2025                     cohort3, modification3, true, false), getRef());
2026             expectMsgClass(duration, ReadyTransactionReply.class);
2027
2028             // All Tx's are readied. We'll send canCommit for the last one but not the others. The others
2029             // should expire from the queue and the last one should be processed.
2030
2031             shard.tell(new CanCommitTransaction(transactionID3).toSerializable(), getRef());
2032             expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
2033
2034             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2035         }};
2036     }
2037
2038     @Test
2039     public void testTransactionCommitWithSubsequentExpiredCohortEntry() throws Throwable {
2040         dataStoreContextBuilder.shardCommitQueueExpiryTimeoutInMillis(1300).shardTransactionCommitTimeoutInSeconds(1);
2041
2042         new ShardTestKit(getSystem()) {{
2043             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
2044                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
2045                     "testTransactionCommitWithSubsequentExpiredCohortEntry");
2046
2047             waitUntilLeader(shard);
2048
2049             final FiniteDuration duration = duration("5 seconds");
2050
2051             final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
2052
2053             final String transactionID1 = "tx1";
2054             final MutableCompositeModification modification1 = new MutableCompositeModification();
2055             final ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
2056                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
2057
2058             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
2059                     cohort1, modification1, true, false), getRef());
2060             expectMsgClass(duration, ReadyTransactionReply.class);
2061
2062             // CanCommit the first one so it's the current in-progress CohortEntry.
2063
2064             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
2065             expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
2066
2067             // Ready the second Tx.
2068
2069             final String transactionID2 = "tx2";
2070             final MutableCompositeModification modification2 = new MutableCompositeModification();
2071             final ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
2072                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification2);
2073
2074             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
2075                     cohort2, modification2, true, false), getRef());
2076             expectMsgClass(duration, ReadyTransactionReply.class);
2077
2078             // Ready the third Tx.
2079
2080             final String transactionID3 = "tx3";
2081             final DataTreeModification modification3 = dataStore.getDataTree().takeSnapshot().newModification();
2082             new WriteModification(TestModel.TEST2_PATH, ImmutableNodes.containerNode(TestModel.TEST2_QNAME))
2083                     .apply(modification3);
2084                 modification3.ready();
2085             final ReadyLocalTransaction readyMessage = new ReadyLocalTransaction(transactionID3, modification3, true);
2086
2087             shard.tell(readyMessage, getRef());
2088
2089             // Commit the first Tx. After completing, the second should expire from the queue and the third
2090             // Tx committed.
2091
2092             shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
2093             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
2094
2095             // Expect commit reply from the third Tx.
2096
2097             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
2098
2099             final NormalizedNode<?, ?> node = readStore(shard, TestModel.TEST2_PATH);
2100             assertNotNull(TestModel.TEST2_PATH + " not found", node);
2101
2102             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2103         }};
2104     }
2105
2106     @Test
2107     public void testCanCommitBeforeReadyFailure() throws Throwable {
2108         new ShardTestKit(getSystem()) {{
2109             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
2110                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
2111                     "testCanCommitBeforeReadyFailure");
2112
2113             shard.tell(new CanCommitTransaction("tx").toSerializable(), getRef());
2114             expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
2115
2116             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2117         }};
2118     }
2119
2120     @Test
2121     public void testAbortCurrentTransaction() throws Throwable {
2122         new ShardTestKit(getSystem()) {{
2123             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
2124                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
2125                     "testAbortCurrentTransaction");
2126
2127             waitUntilLeader(shard);
2128
2129             // Setup 2 simulated transactions with mock cohorts. The first one will be aborted.
2130
2131             final String transactionID1 = "tx1";
2132             final MutableCompositeModification modification1 = new MutableCompositeModification();
2133             final ShardDataTreeCohort cohort1 = mock(ShardDataTreeCohort.class, "cohort1");
2134             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
2135             doReturn(Futures.immediateFuture(null)).when(cohort1).abort();
2136
2137             final String transactionID2 = "tx2";
2138             final MutableCompositeModification modification2 = new MutableCompositeModification();
2139             final ShardDataTreeCohort cohort2 = mock(ShardDataTreeCohort.class, "cohort2");
2140             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
2141
2142             final FiniteDuration duration = duration("5 seconds");
2143             final Timeout timeout = new Timeout(duration);
2144
2145             // Simulate the ForwardedReadyTransaction messages that would be sent
2146             // by the ShardTransaction.
2147
2148             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
2149                     cohort1, modification1, true, false), getRef());
2150             expectMsgClass(duration, ReadyTransactionReply.class);
2151
2152             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
2153                     cohort2, modification2, true, false), getRef());
2154             expectMsgClass(duration, ReadyTransactionReply.class);
2155
2156             // Send the CanCommitTransaction message for the first Tx.
2157
2158             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
2159             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
2160                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
2161             assertEquals("Can commit", true, canCommitReply.getCanCommit());
2162
2163             // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
2164             // processed after the first Tx completes.
2165
2166             final Future<Object> canCommitFuture = Patterns.ask(shard,
2167                     new CanCommitTransaction(transactionID2).toSerializable(), timeout);
2168
2169             // Send the AbortTransaction message for the first Tx. This should trigger the 2nd
2170             // Tx to proceed.
2171
2172             shard.tell(new AbortTransaction(transactionID1).toSerializable(), getRef());
2173             expectMsgClass(duration, AbortTransactionReply.SERIALIZABLE_CLASS);
2174
2175             // Wait for the 2nd Tx to complete the canCommit phase.
2176
2177             Await.ready(canCommitFuture, duration);
2178
2179             final InOrder inOrder = inOrder(cohort1, cohort2);
2180             inOrder.verify(cohort1).canCommit();
2181             inOrder.verify(cohort2).canCommit();
2182
2183             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2184         }};
2185     }
2186
2187     @Test
2188     public void testAbortQueuedTransaction() throws Throwable {
2189         dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1);
2190         new ShardTestKit(getSystem()) {{
2191             final AtomicReference<CountDownLatch> cleaupCheckLatch = new AtomicReference<>();
2192             @SuppressWarnings("serial")
2193             final Creator<Shard> creator = new Creator<Shard>() {
2194                 @Override
2195                 public Shard create() throws Exception {
2196                     return new Shard(shardID, Collections.<String,String>emptyMap(),
2197                             dataStoreContextBuilder.build(), SCHEMA_CONTEXT) {
2198                         @Override
2199                         public void onReceiveCommand(final Object message) throws Exception {
2200                             super.onReceiveCommand(message);
2201                             if(message.equals(TX_COMMIT_TIMEOUT_CHECK_MESSAGE)) {
2202                                 if(cleaupCheckLatch.get() != null) {
2203                                     cleaupCheckLatch.get().countDown();
2204                                 }
2205                             }
2206                         }
2207                     };
2208                 }
2209             };
2210
2211             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
2212                     Props.create(new DelegatingShardCreator(creator)).withDispatcher(
2213                             Dispatchers.DefaultDispatcherId()), "testAbortQueuedTransaction");
2214
2215             waitUntilLeader(shard);
2216
2217             final String transactionID = "tx1";
2218
2219             final MutableCompositeModification modification = new MutableCompositeModification();
2220             final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort");
2221             doReturn(Futures.immediateFuture(null)).when(cohort).abort();
2222
2223             final FiniteDuration duration = duration("5 seconds");
2224
2225             // Ready the tx.
2226
2227             shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
2228                     cohort, modification, true, false), getRef());
2229             expectMsgClass(duration, ReadyTransactionReply.class);
2230
2231             assertEquals("getPendingTxCommitQueueSize", 1, shard.underlyingActor().getPendingTxCommitQueueSize());
2232
2233             // Send the AbortTransaction message.
2234
2235             shard.tell(new AbortTransaction(transactionID).toSerializable(), getRef());
2236             expectMsgClass(duration, AbortTransactionReply.SERIALIZABLE_CLASS);
2237
2238             verify(cohort).abort();
2239
2240             // Verify the tx cohort is removed from queue at the cleanup check interval.
2241
2242             cleaupCheckLatch.set(new CountDownLatch(1));
2243             assertEquals("TX_COMMIT_TIMEOUT_CHECK_MESSAGE received", true,
2244                     cleaupCheckLatch.get().await(5, TimeUnit.SECONDS));
2245
2246             assertEquals("getPendingTxCommitQueueSize", 0, shard.underlyingActor().getPendingTxCommitQueueSize());
2247
2248             // Now send CanCommitTransaction - should fail.
2249
2250             shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
2251
2252             Throwable failure = expectMsgClass(duration, akka.actor.Status.Failure.class).cause();
2253             assertTrue("Failure type", failure instanceof IllegalStateException);
2254
2255             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2256         }};
2257     }
2258
2259     @Test
2260     public void testCreateSnapshot() throws Exception {
2261         testCreateSnapshot(true, "testCreateSnapshot");
2262     }
2263
2264     @Test
2265     public void testCreateSnapshotWithNonPersistentData() throws Exception {
2266         testCreateSnapshot(false, "testCreateSnapshotWithNonPersistentData");
2267     }
2268
2269     @SuppressWarnings("serial")
2270     public void testCreateSnapshot(final boolean persistent, final String shardActorName) throws Exception{
2271
2272         final AtomicReference<CountDownLatch> latch = new AtomicReference<>(new CountDownLatch(1));
2273
2274         final AtomicReference<Object> savedSnapshot = new AtomicReference<>();
2275         class TestPersistentDataProvider extends DelegatingPersistentDataProvider {
2276             TestPersistentDataProvider(final DataPersistenceProvider delegate) {
2277                 super(delegate);
2278             }
2279
2280             @Override
2281             public void saveSnapshot(final Object o) {
2282                 savedSnapshot.set(o);
2283                 super.saveSnapshot(o);
2284             }
2285         }
2286
2287         dataStoreContextBuilder.persistent(persistent);
2288
2289         new ShardTestKit(getSystem()) {{
2290             class TestShard extends Shard {
2291
2292                 protected TestShard(final ShardIdentifier name, final Map<String, String> peerAddresses,
2293                                     final DatastoreContext datastoreContext, final SchemaContext schemaContext) {
2294                     super(name, peerAddresses, datastoreContext, schemaContext);
2295                     setPersistence(new TestPersistentDataProvider(super.persistence()));
2296                 }
2297
2298                 @Override
2299                 public void handleCommand(final Object message) {
2300                     super.handleCommand(message);
2301
2302                     if (message instanceof SaveSnapshotSuccess || message.equals("commit_snapshot")) {
2303                         latch.get().countDown();
2304                     }
2305                 }
2306
2307                 @Override
2308                 public RaftActorContext getRaftActorContext() {
2309                     return super.getRaftActorContext();
2310                 }
2311             }
2312
2313             final Creator<Shard> creator = new Creator<Shard>() {
2314                 @Override
2315                 public Shard create() throws Exception {
2316                     return new TestShard(shardID, Collections.<String,String>emptyMap(),
2317                             newDatastoreContext(), SCHEMA_CONTEXT);
2318                 }
2319             };
2320
2321             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
2322                     Props.create(new DelegatingShardCreator(creator)), shardActorName);
2323
2324             waitUntilLeader(shard);
2325
2326             writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
2327
2328             final NormalizedNode<?,?> expectedRoot = readStore(shard, YangInstanceIdentifier.builder().build());
2329
2330             // Trigger creation of a snapshot by ensuring
2331             final RaftActorContext raftActorContext = ((TestShard) shard.underlyingActor()).getRaftActorContext();
2332             raftActorContext.getSnapshotManager().capture(mock(ReplicatedLogEntry.class), -1);
2333
2334             assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS));
2335
2336             assertTrue("Invalid saved snapshot " + savedSnapshot.get(),
2337                     savedSnapshot.get() instanceof Snapshot);
2338
2339             verifySnapshot((Snapshot)savedSnapshot.get(), expectedRoot);
2340
2341             latch.set(new CountDownLatch(1));
2342             savedSnapshot.set(null);
2343
2344             raftActorContext.getSnapshotManager().capture(mock(ReplicatedLogEntry.class), -1);
2345
2346             assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS));
2347
2348             assertTrue("Invalid saved snapshot " + savedSnapshot.get(),
2349                     savedSnapshot.get() instanceof Snapshot);
2350
2351             verifySnapshot((Snapshot)savedSnapshot.get(), expectedRoot);
2352
2353             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2354         }
2355
2356         private void verifySnapshot(final Snapshot snapshot, final NormalizedNode<?,?> expectedRoot) {
2357
2358             final NormalizedNode<?, ?> actual = SerializationUtils.deserializeNormalizedNode(snapshot.getState());
2359             assertEquals("Root node", expectedRoot, actual);
2360
2361         }};
2362     }
2363
2364     /**
2365      * This test simply verifies that the applySnapShot logic will work
2366      * @throws ReadFailedException
2367      * @throws DataValidationFailedException
2368      */
2369     @Test
2370     public void testInMemoryDataTreeRestore() throws ReadFailedException, DataValidationFailedException {
2371         final DataTree store = InMemoryDataTreeFactory.getInstance().create();
2372         store.setSchemaContext(SCHEMA_CONTEXT);
2373
2374         final DataTreeModification putTransaction = store.takeSnapshot().newModification();
2375         putTransaction.write(TestModel.TEST_PATH,
2376             ImmutableNodes.containerNode(TestModel.TEST_QNAME));
2377         commitTransaction(store, putTransaction);
2378
2379
2380         final NormalizedNode<?, ?> expected = readStore(store, YangInstanceIdentifier.builder().build());
2381
2382         final DataTreeModification writeTransaction = store.takeSnapshot().newModification();
2383
2384         writeTransaction.delete(YangInstanceIdentifier.builder().build());
2385         writeTransaction.write(YangInstanceIdentifier.builder().build(), expected);
2386
2387         commitTransaction(store, writeTransaction);
2388
2389         final NormalizedNode<?, ?> actual = readStore(store, YangInstanceIdentifier.builder().build());
2390
2391         assertEquals(expected, actual);
2392     }
2393
2394     @Test
2395     public void testRecoveryApplicable(){
2396
2397         final DatastoreContext persistentContext = DatastoreContext.newBuilder().
2398                 shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(true).build();
2399
2400         final Props persistentProps = Shard.props(shardID, Collections.<String, String>emptyMap(),
2401                 persistentContext, SCHEMA_CONTEXT);
2402
2403         final DatastoreContext nonPersistentContext = DatastoreContext.newBuilder().
2404                 shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(false).build();
2405
2406         final Props nonPersistentProps = Shard.props(shardID, Collections.<String, String>emptyMap(),
2407                 nonPersistentContext, SCHEMA_CONTEXT);
2408
2409         new ShardTestKit(getSystem()) {{
2410             final TestActorRef<Shard> shard1 = TestActorRef.create(getSystem(),
2411                     persistentProps, "testPersistence1");
2412
2413             assertTrue("Recovery Applicable", shard1.underlyingActor().persistence().isRecoveryApplicable());
2414
2415             shard1.tell(PoisonPill.getInstance(), ActorRef.noSender());
2416
2417             final TestActorRef<Shard> shard2 = TestActorRef.create(getSystem(),
2418                     nonPersistentProps, "testPersistence2");
2419
2420             assertFalse("Recovery Not Applicable", shard2.underlyingActor().persistence().isRecoveryApplicable());
2421
2422             shard2.tell(PoisonPill.getInstance(), ActorRef.noSender());
2423
2424         }};
2425
2426     }
2427
2428     @Test
2429     public void testOnDatastoreContext() {
2430         new ShardTestKit(getSystem()) {{
2431             dataStoreContextBuilder.persistent(true);
2432
2433             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(), "testOnDatastoreContext");
2434
2435             assertEquals("isRecoveryApplicable", true,
2436                     shard.underlyingActor().persistence().isRecoveryApplicable());
2437
2438             waitUntilLeader(shard);
2439
2440             shard.tell(dataStoreContextBuilder.persistent(false).build(), ActorRef.noSender());
2441
2442             assertEquals("isRecoveryApplicable", false,
2443                     shard.underlyingActor().persistence().isRecoveryApplicable());
2444
2445             shard.tell(dataStoreContextBuilder.persistent(true).build(), ActorRef.noSender());
2446
2447             assertEquals("isRecoveryApplicable", true,
2448                     shard.underlyingActor().persistence().isRecoveryApplicable());
2449
2450             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2451         }};
2452     }
2453
2454     @Test
2455     public void testRegisterRoleChangeListener() throws Exception {
2456         new ShardTestKit(getSystem()) {
2457             {
2458                 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
2459                         newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
2460                         "testRegisterRoleChangeListener");
2461
2462                 waitUntilLeader(shard);
2463
2464                 final TestActorRef<MessageCollectorActor> listener =
2465                         TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
2466
2467                 shard.tell(new RegisterRoleChangeListener(), listener);
2468
2469                 MessageCollectorActor.expectFirstMatching(listener, RegisterRoleChangeListenerReply.class);
2470
2471                 ShardLeaderStateChanged leaderStateChanged = MessageCollectorActor.expectFirstMatching(listener,
2472                         ShardLeaderStateChanged.class);
2473                 assertEquals("getLocalShardDataTree present", true,
2474                         leaderStateChanged.getLocalShardDataTree().isPresent());
2475                 assertSame("getLocalShardDataTree", shard.underlyingActor().getDataStore().getDataTree(),
2476                         leaderStateChanged.getLocalShardDataTree().get());
2477
2478                 MessageCollectorActor.clearMessages(listener);
2479
2480                 // Force a leader change
2481
2482                 shard.tell(new RequestVote(10000, "member2", 50, 50), getRef());
2483
2484                 leaderStateChanged = MessageCollectorActor.expectFirstMatching(listener,
2485                         ShardLeaderStateChanged.class);
2486                 assertEquals("getLocalShardDataTree present", false,
2487                         leaderStateChanged.getLocalShardDataTree().isPresent());
2488
2489                 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2490             }
2491         };
2492     }
2493
2494     @Test
2495     public void testFollowerInitialSyncStatus() throws Exception {
2496         final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
2497                 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
2498                 "testFollowerInitialSyncStatus");
2499
2500         shard.underlyingActor().onReceiveCommand(new FollowerInitialSyncUpStatus(false, "member-1-shard-inventory-operational"));
2501
2502         assertEquals(false, shard.underlyingActor().getShardMBean().getFollowerInitialSyncStatus());
2503
2504         shard.underlyingActor().onReceiveCommand(new FollowerInitialSyncUpStatus(true, "member-1-shard-inventory-operational"));
2505
2506         assertEquals(true, shard.underlyingActor().getShardMBean().getFollowerInitialSyncStatus());
2507
2508         shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2509     }
2510
2511     private static void commitTransaction(final DataTree store, final DataTreeModification modification) throws DataValidationFailedException {
2512         modification.ready();
2513         store.validate(modification);
2514         store.commit(store.prepare(modification));
2515     }
2516 }