Bug 4105: Add CreateShard message in ShardManager
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / ShardTest.java
1 package org.opendaylight.controller.cluster.datastore;
2
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertFalse;
5 import static org.junit.Assert.assertNotNull;
6 import static org.junit.Assert.assertNull;
7 import static org.junit.Assert.assertSame;
8 import static org.junit.Assert.assertTrue;
9 import static org.mockito.Mockito.doReturn;
10 import static org.mockito.Mockito.inOrder;
11 import static org.mockito.Mockito.mock;
12 import static org.mockito.Mockito.reset;
13 import static org.mockito.Mockito.verify;
14 import static org.opendaylight.controller.cluster.datastore.DataStoreVersions.CURRENT_VERSION;
15
16 import akka.actor.ActorRef;
17 import akka.actor.ActorSelection;
18 import akka.actor.PoisonPill;
19 import akka.actor.Props;
20 import akka.actor.Status.Failure;
21 import akka.dispatch.Dispatchers;
22 import akka.dispatch.OnComplete;
23 import akka.japi.Creator;
24 import akka.pattern.Patterns;
25 import akka.persistence.SaveSnapshotSuccess;
26 import akka.testkit.TestActorRef;
27 import akka.util.Timeout;
28 import com.google.common.base.Function;
29 import com.google.common.base.Optional;
30 import com.google.common.util.concurrent.Futures;
31 import com.google.common.util.concurrent.ListenableFuture;
32 import com.google.common.util.concurrent.Uninterruptibles;
33 import java.io.IOException;
34 import java.util.Collections;
35 import java.util.HashSet;
36 import java.util.Map;
37 import java.util.Set;
38 import java.util.concurrent.CountDownLatch;
39 import java.util.concurrent.TimeUnit;
40 import java.util.concurrent.atomic.AtomicBoolean;
41 import java.util.concurrent.atomic.AtomicReference;
42 import org.junit.Test;
43 import org.mockito.InOrder;
44 import org.opendaylight.controller.cluster.DataPersistenceProvider;
45 import org.opendaylight.controller.cluster.DelegatingPersistentDataProvider;
46 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
47 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
48 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
49 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
50 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
51 import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
52 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
53 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
54 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
55 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
56 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
57 import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
58 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
59 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
60 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
61 import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction;
62 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
63 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
64 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
65 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener;
66 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListenerReply;
67 import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
68 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
69 import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
70 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
71 import org.opendaylight.controller.cluster.datastore.modification.Modification;
72 import org.opendaylight.controller.cluster.datastore.modification.ModificationPayload;
73 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
74 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
75 import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener;
76 import org.opendaylight.controller.cluster.datastore.utils.MockDataTreeChangeListener;
77 import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
78 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
79 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListenerReply;
80 import org.opendaylight.controller.cluster.raft.RaftActorContext;
81 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
82 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
83 import org.opendaylight.controller.cluster.raft.Snapshot;
84 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
85 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
86 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
87 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
88 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
89 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
90 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
91 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
92 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
93 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
94 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
95 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
96 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
97 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
98 import org.opendaylight.controller.protobuff.messages.cohort3pc.ThreePhaseCommitCohortMessages;
99 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
100 import org.opendaylight.yangtools.yang.common.QName;
101 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
102 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
103 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
104 import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
105 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
106 import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
107 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
108 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
109 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
110 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateNode;
111 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateTip;
112 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidates;
113 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
114 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
115 import org.opendaylight.yangtools.yang.data.api.schema.tree.ModificationType;
116 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
117 import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
118 import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory;
119 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
120 import scala.concurrent.Await;
121 import scala.concurrent.Future;
122 import scala.concurrent.duration.FiniteDuration;
123
124 public class ShardTest extends AbstractShardTest {
125     private static final QName CARS_QNAME = QName.create("urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:test:cars", "2014-03-13", "cars");
126
127     private static final String DUMMY_DATA = "Dummy data as snapshot sequence number is set to 0 in InMemorySnapshotStore and journal recovery seq number will start from 1";
128
129     @Test
130     public void testRegisterChangeListener() throws Exception {
131         new ShardTestKit(getSystem()) {{
132             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
133                     newShardProps(),  "testRegisterChangeListener");
134
135             waitUntilLeader(shard);
136
137             shard.tell(new UpdateSchemaContext(SchemaContextHelper.full()), ActorRef.noSender());
138
139             final MockDataChangeListener listener = new MockDataChangeListener(1);
140             final ActorRef dclActor = getSystem().actorOf(DataChangeListener.props(listener),
141                     "testRegisterChangeListener-DataChangeListener");
142
143             shard.tell(new RegisterChangeListener(TestModel.TEST_PATH,
144                     dclActor, AsyncDataBroker.DataChangeScope.BASE, true), getRef());
145
146             final RegisterChangeListenerReply reply = expectMsgClass(duration("3 seconds"),
147                     RegisterChangeListenerReply.class);
148             final String replyPath = reply.getListenerRegistrationPath().toString();
149             assertTrue("Incorrect reply path: " + replyPath, replyPath.matches(
150                     "akka:\\/\\/test\\/user\\/testRegisterChangeListener\\/\\$.*"));
151
152             final YangInstanceIdentifier path = TestModel.TEST_PATH;
153             writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
154
155             listener.waitForChangeEvents(path);
156
157             dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
158             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
159         }};
160     }
161
162     @SuppressWarnings("serial")
163     @Test
164     public void testChangeListenerNotifiedWhenNotTheLeaderOnRegistration() throws Exception {
165         // This test tests the timing window in which a change listener is registered before the
166         // shard becomes the leader. We verify that the listener is registered and notified of the
167         // existing data when the shard becomes the leader.
168         new ShardTestKit(getSystem()) {{
169             // For this test, we want to send the RegisterChangeListener message after the shard
170             // has recovered from persistence and before it becomes the leader. So we subclass
171             // Shard to override onReceiveCommand and, when the first ElectionTimeout is received,
172             // we know that the shard has been initialized to a follower and has started the
173             // election process. The following 2 CountDownLatches are used to coordinate the
174             // ElectionTimeout with the sending of the RegisterChangeListener message.
175             final CountDownLatch onFirstElectionTimeout = new CountDownLatch(1);
176             final CountDownLatch onChangeListenerRegistered = new CountDownLatch(1);
177             final Creator<Shard> creator = new Creator<Shard>() {
178                 boolean firstElectionTimeout = true;
179
180                 @Override
181                 public Shard create() throws Exception {
182                     // Use a non persistent provider because this test actually invokes persist on the journal
183                     // this will cause all other messages to not be queued properly after that.
184                     // The basic issue is that you cannot use TestActorRef with a persistent actor (at least when
185                     // it does do a persist)
186                     return new Shard(shardID, Collections.<String,String>emptyMap(),
187                             dataStoreContextBuilder.persistent(false).build(), SCHEMA_CONTEXT) {
188                         @Override
189                         public void onReceiveCommand(final Object message) throws Exception {
190                             if(message instanceof ElectionTimeout && firstElectionTimeout) {
191                                 // Got the first ElectionTimeout. We don't forward it to the
192                                 // base Shard yet until we've sent the RegisterChangeListener
193                                 // message. So we signal the onFirstElectionTimeout latch to tell
194                                 // the main thread to send the RegisterChangeListener message and
195                                 // start a thread to wait on the onChangeListenerRegistered latch,
196                                 // which the main thread signals after it has sent the message.
197                                 // After the onChangeListenerRegistered is triggered, we send the
198                                 // original ElectionTimeout message to proceed with the election.
199                                 firstElectionTimeout = false;
200                                 final ActorRef self = getSelf();
201                                 new Thread() {
202                                     @Override
203                                     public void run() {
204                                         Uninterruptibles.awaitUninterruptibly(
205                                                 onChangeListenerRegistered, 5, TimeUnit.SECONDS);
206                                         self.tell(message, self);
207                                     }
208                                 }.start();
209
210                                 onFirstElectionTimeout.countDown();
211                             } else {
212                                 super.onReceiveCommand(message);
213                             }
214                         }
215                     };
216                 }
217             };
218
219             final MockDataChangeListener listener = new MockDataChangeListener(1);
220             final ActorRef dclActor = getSystem().actorOf(DataChangeListener.props(listener),
221                     "testRegisterChangeListenerWhenNotLeaderInitially-DataChangeListener");
222
223             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
224                     Props.create(new DelegatingShardCreator(creator)),
225                     "testRegisterChangeListenerWhenNotLeaderInitially");
226
227             // Write initial data into the in-memory store.
228             final YangInstanceIdentifier path = TestModel.TEST_PATH;
229             writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
230
231             // Wait until the shard receives the first ElectionTimeout message.
232             assertEquals("Got first ElectionTimeout", true,
233                     onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
234
235             // Now send the RegisterChangeListener and wait for the reply.
236             shard.tell(new RegisterChangeListener(path, dclActor,
237                     AsyncDataBroker.DataChangeScope.SUBTREE, false), getRef());
238
239             final RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
240                     RegisterChangeListenerReply.class);
241             assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
242
243             // Sanity check - verify the shard is not the leader yet.
244             shard.tell(new FindLeader(), getRef());
245             final FindLeaderReply findLeadeReply =
246                     expectMsgClass(duration("5 seconds"), FindLeaderReply.class);
247             assertNull("Expected the shard not to be the leader", findLeadeReply.getLeaderActor());
248
249             // Signal the onChangeListenerRegistered latch to tell the thread above to proceed
250             // with the election process.
251             onChangeListenerRegistered.countDown();
252
253             // Wait for the shard to become the leader and notify our listener with the existing
254             // data in the store.
255             listener.waitForChangeEvents(path);
256
257             dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
258             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
259         }};
260     }
261
262     @Test
263     public void testRegisterDataTreeChangeListener() throws Exception {
264         new ShardTestKit(getSystem()) {{
265             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
266                     newShardProps(), "testRegisterDataTreeChangeListener");
267
268             waitUntilLeader(shard);
269
270             shard.tell(new UpdateSchemaContext(SchemaContextHelper.full()), ActorRef.noSender());
271
272             final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
273             final ActorRef dclActor = getSystem().actorOf(DataTreeChangeListenerActor.props(listener),
274                     "testRegisterDataTreeChangeListener-DataTreeChangeListener");
275
276             shard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor), getRef());
277
278             final RegisterDataTreeChangeListenerReply reply = expectMsgClass(duration("3 seconds"),
279                     RegisterDataTreeChangeListenerReply.class);
280             final String replyPath = reply.getListenerRegistrationPath().toString();
281             assertTrue("Incorrect reply path: " + replyPath, replyPath.matches(
282                     "akka:\\/\\/test\\/user\\/testRegisterDataTreeChangeListener\\/\\$.*"));
283
284             final YangInstanceIdentifier path = TestModel.TEST_PATH;
285             writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
286
287             listener.waitForChangeEvents();
288
289             dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
290             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
291         }};
292     }
293
294     @SuppressWarnings("serial")
295     @Test
296     public void testDataTreeChangeListenerNotifiedWhenNotTheLeaderOnRegistration() throws Exception {
297         new ShardTestKit(getSystem()) {{
298             final CountDownLatch onFirstElectionTimeout = new CountDownLatch(1);
299             final CountDownLatch onChangeListenerRegistered = new CountDownLatch(1);
300             final Creator<Shard> creator = new Creator<Shard>() {
301                 boolean firstElectionTimeout = true;
302
303                 @Override
304                 public Shard create() throws Exception {
305                     return new Shard(shardID, Collections.<String,String>emptyMap(),
306                             dataStoreContextBuilder.persistent(false).build(), SCHEMA_CONTEXT) {
307                         @Override
308                         public void onReceiveCommand(final Object message) throws Exception {
309                             if(message instanceof ElectionTimeout && firstElectionTimeout) {
310                                 firstElectionTimeout = false;
311                                 final ActorRef self = getSelf();
312                                 new Thread() {
313                                     @Override
314                                     public void run() {
315                                         Uninterruptibles.awaitUninterruptibly(
316                                                 onChangeListenerRegistered, 5, TimeUnit.SECONDS);
317                                         self.tell(message, self);
318                                     }
319                                 }.start();
320
321                                 onFirstElectionTimeout.countDown();
322                             } else {
323                                 super.onReceiveCommand(message);
324                             }
325                         }
326                     };
327                 }
328             };
329
330             final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
331             final ActorRef dclActor = getSystem().actorOf(DataTreeChangeListenerActor.props(listener),
332                     "testDataTreeChangeListenerNotifiedWhenNotTheLeaderOnRegistration-DataChangeListener");
333
334             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
335                     Props.create(new DelegatingShardCreator(creator)),
336                     "testDataTreeChangeListenerNotifiedWhenNotTheLeaderOnRegistration");
337
338             final YangInstanceIdentifier path = TestModel.TEST_PATH;
339             writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
340
341             assertEquals("Got first ElectionTimeout", true,
342                 onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
343
344             shard.tell(new RegisterDataTreeChangeListener(path, dclActor), getRef());
345             final RegisterDataTreeChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
346                 RegisterDataTreeChangeListenerReply.class);
347             assertNotNull("getListenerRegistratioznPath", reply.getListenerRegistrationPath());
348
349             shard.tell(new FindLeader(), getRef());
350             final FindLeaderReply findLeadeReply =
351                     expectMsgClass(duration("5 seconds"), FindLeaderReply.class);
352             assertNull("Expected the shard not to be the leader", findLeadeReply.getLeaderActor());
353
354             writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
355
356             onChangeListenerRegistered.countDown();
357
358             // TODO: investigate why we do not receive data chage events
359             listener.waitForChangeEvents();
360
361             dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
362             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
363         }};
364     }
365
366     @Test
367     public void testCreateTransaction(){
368         new ShardTestKit(getSystem()) {{
369             final ActorRef shard = getSystem().actorOf(newShardProps(), "testCreateTransaction");
370
371             waitUntilLeader(shard);
372
373             shard.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
374
375             shard.tell(new CreateTransaction("txn-1",
376                     TransactionType.READ_ONLY.ordinal() ).toSerializable(), getRef());
377
378             final CreateTransactionReply reply = expectMsgClass(duration("3 seconds"),
379                     CreateTransactionReply.class);
380
381             final String path = reply.getTransactionActorPath().toString();
382             assertTrue("Unexpected transaction path " + path,
383                     path.contains("akka://test/user/testCreateTransaction/shard-txn-1"));
384
385             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
386         }};
387     }
388
389     @Test
390     public void testCreateTransactionOnChain(){
391         new ShardTestKit(getSystem()) {{
392             final ActorRef shard = getSystem().actorOf(newShardProps(), "testCreateTransactionOnChain");
393
394             waitUntilLeader(shard);
395
396             shard.tell(new CreateTransaction("txn-1",
397                     TransactionType.READ_ONLY.ordinal() , "foobar").toSerializable(),
398                     getRef());
399
400             final CreateTransactionReply reply = expectMsgClass(duration("3 seconds"),
401                     CreateTransactionReply.class);
402
403             final String path = reply.getTransactionActorPath().toString();
404             assertTrue("Unexpected transaction path " + path,
405                     path.contains("akka://test/user/testCreateTransactionOnChain/shard-txn-1"));
406
407             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
408         }};
409     }
410
411     @SuppressWarnings("serial")
412     @Test
413     public void testPeerAddressResolved() throws Exception {
414         new ShardTestKit(getSystem()) {{
415             final CountDownLatch recoveryComplete = new CountDownLatch(1);
416             class TestShard extends Shard {
417                 TestShard() {
418                     super(shardID, Collections.<String, String>singletonMap(shardID.toString(), null),
419                             newDatastoreContext(), SCHEMA_CONTEXT);
420                 }
421
422                 Map<String, String> getPeerAddresses() {
423                     return getRaftActorContext().getPeerAddresses();
424                 }
425
426                 @Override
427                 protected void onRecoveryComplete() {
428                     try {
429                         super.onRecoveryComplete();
430                     } finally {
431                         recoveryComplete.countDown();
432                     }
433                 }
434             }
435
436             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
437                     Props.create(new DelegatingShardCreator(new Creator<Shard>() {
438                         @Override
439                         public TestShard create() throws Exception {
440                             return new TestShard();
441                         }
442                     })), "testPeerAddressResolved");
443
444             //waitUntilLeader(shard);
445             assertEquals("Recovery complete", true,
446                 Uninterruptibles.awaitUninterruptibly(recoveryComplete, 5, TimeUnit.SECONDS));
447
448             final String address = "akka://foobar";
449             shard.underlyingActor().onReceiveCommand(new PeerAddressResolved(shardID.toString(), address));
450
451             assertEquals("getPeerAddresses", address,
452                 ((TestShard) shard.underlyingActor()).getPeerAddresses().get(shardID.toString()));
453
454             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
455         }};
456     }
457
458     @Test
459     public void testApplySnapshot() throws Exception {
460
461         ShardTestKit testkit = new ShardTestKit(getSystem());
462
463         final TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(),
464                 "testApplySnapshot");
465
466         testkit.waitUntilLeader(shard);
467
468         final DataTree store = InMemoryDataTreeFactory.getInstance().create();
469         store.setSchemaContext(SCHEMA_CONTEXT);
470
471         final ContainerNode container = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
472                 new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
473                     withChild(ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).addChild(
474                         ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1)).build()).build();
475
476         writeToStore(store, TestModel.TEST_PATH, container);
477
478         final YangInstanceIdentifier root = YangInstanceIdentifier.builder().build();
479         final NormalizedNode<?,?> expected = readStore(store, root);
480
481         final Snapshot snapshot = Snapshot.create(SerializationUtils.serializeNormalizedNode(expected),
482                 Collections.<ReplicatedLogEntry>emptyList(), 1, 2, 3, 4);
483
484         shard.underlyingActor().getRaftActorSnapshotCohort().applySnapshot(snapshot.getState());
485
486         final NormalizedNode<?,?> actual = readStore(shard, root);
487
488         assertEquals("Root node", expected, actual);
489
490         shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
491     }
492
493     @Test
494     public void testApplyState() throws Exception {
495
496         ShardTestKit testkit = new ShardTestKit(getSystem());
497
498         final TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(), "testApplyState");
499
500         testkit.waitUntilLeader(shard);
501
502         final NormalizedNode<?, ?> node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
503
504         final ApplyState applyState = new ApplyState(null, "test", new ReplicatedLogImplEntry(1, 2,
505                 newModificationPayload(new WriteModification(TestModel.TEST_PATH, node))));
506
507         shard.underlyingActor().onReceiveCommand(applyState);
508
509         final NormalizedNode<?,?> actual = readStore(shard, TestModel.TEST_PATH);
510         assertEquals("Applied state", node, actual);
511
512         shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
513     }
514
515     @Test
516     public void testApplyStateWithCandidatePayload() throws Exception {
517
518         ShardTestKit testkit = new ShardTestKit(getSystem());
519
520         final TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(), "testApplyState");
521
522         testkit.waitUntilLeader(shard);
523
524         final NormalizedNode<?, ?> node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
525         final DataTreeCandidate candidate = DataTreeCandidates.fromNormalizedNode(TestModel.TEST_PATH, node);
526
527         final ApplyState applyState = new ApplyState(null, "test", new ReplicatedLogImplEntry(1, 2,
528                 DataTreeCandidatePayload.create(candidate)));
529
530         shard.underlyingActor().onReceiveCommand(applyState);
531
532         final NormalizedNode<?,?> actual = readStore(shard, TestModel.TEST_PATH);
533         assertEquals("Applied state", node, actual);
534
535         shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
536     }
537
538     DataTree setupInMemorySnapshotStore() throws DataValidationFailedException {
539         final DataTree testStore = InMemoryDataTreeFactory.getInstance().create();
540         testStore.setSchemaContext(SCHEMA_CONTEXT);
541
542         writeToStore(testStore, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
543
544         final NormalizedNode<?, ?> root = readStore(testStore, YangInstanceIdentifier.builder().build());
545
546         InMemorySnapshotStore.addSnapshot(shardID.toString(), Snapshot.create(
547             SerializationUtils.serializeNormalizedNode(root),
548             Collections.<ReplicatedLogEntry>emptyList(), 0, 1, -1, -1));
549         return testStore;
550     }
551
552     private static DataTreeCandidatePayload payloadForModification(final DataTree source, final DataTreeModification mod) throws DataValidationFailedException {
553         source.validate(mod);
554         final DataTreeCandidate candidate = source.prepare(mod);
555         source.commit(candidate);
556         return DataTreeCandidatePayload.create(candidate);
557     }
558
559     @Test
560     public void testDataTreeCandidateRecovery() throws Exception {
561         // Set up the InMemorySnapshotStore.
562         final DataTree source = setupInMemorySnapshotStore();
563
564         final DataTreeModification writeMod = source.takeSnapshot().newModification();
565         writeMod.write(TestModel.OUTER_LIST_PATH, ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
566         writeMod.ready();
567         InMemoryJournal.addEntry(shardID.toString(), 0, DUMMY_DATA);
568
569         // Set up the InMemoryJournal.
570         InMemoryJournal.addEntry(shardID.toString(), 1, new ReplicatedLogImplEntry(0, 1, payloadForModification(source, writeMod)));
571
572         final int nListEntries = 16;
573         final Set<Integer> listEntryKeys = new HashSet<>();
574
575         // Add some ModificationPayload entries
576         for (int i = 1; i <= nListEntries; i++) {
577             listEntryKeys.add(Integer.valueOf(i));
578
579             final YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
580                     .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
581
582             final DataTreeModification mod = source.takeSnapshot().newModification();
583             mod.merge(path, ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i));
584             mod.ready();
585             InMemoryJournal.addEntry(shardID.toString(), i+1, new ReplicatedLogImplEntry(i, 1,
586                 payloadForModification(source, mod)));
587         }
588
589         InMemoryJournal.addEntry(shardID.toString(), nListEntries + 2,
590             new ApplyJournalEntries(nListEntries));
591
592         testRecovery(listEntryKeys);
593     }
594
595     @Test
596     public void testModicationRecovery() throws Exception {
597
598         // Set up the InMemorySnapshotStore.
599         setupInMemorySnapshotStore();
600
601         // Set up the InMemoryJournal.
602
603         InMemoryJournal.addEntry(shardID.toString(), 0, DUMMY_DATA);
604
605         InMemoryJournal.addEntry(shardID.toString(), 1, new ReplicatedLogImplEntry(0, 1, newModificationPayload(
606             new WriteModification(TestModel.OUTER_LIST_PATH,
607                 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build()))));
608
609         final int nListEntries = 16;
610         final Set<Integer> listEntryKeys = new HashSet<>();
611
612         // Add some ModificationPayload entries
613         for(int i = 1; i <= nListEntries; i++) {
614             listEntryKeys.add(Integer.valueOf(i));
615             final YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
616                     .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
617             final Modification mod = new MergeModification(path,
618                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i));
619             InMemoryJournal.addEntry(shardID.toString(), i + 1, new ReplicatedLogImplEntry(i, 1,
620                     newModificationPayload(mod)));
621         }
622
623         InMemoryJournal.addEntry(shardID.toString(), nListEntries + 2,
624                 new ApplyJournalEntries(nListEntries));
625
626         testRecovery(listEntryKeys);
627     }
628
629     private static ModificationPayload newModificationPayload(final Modification... mods) throws IOException {
630         final MutableCompositeModification compMod = new MutableCompositeModification();
631         for(final Modification mod: mods) {
632             compMod.addModification(mod);
633         }
634
635         return new ModificationPayload(compMod);
636     }
637
638     @Test
639     public void testConcurrentThreePhaseCommits() throws Throwable {
640         new ShardTestKit(getSystem()) {{
641             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
642                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
643                     "testConcurrentThreePhaseCommits");
644
645             waitUntilLeader(shard);
646
647          // Setup 3 simulated transactions with mock cohorts backed by real cohorts.
648
649             final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
650
651             final String transactionID1 = "tx1";
652             final MutableCompositeModification modification1 = new MutableCompositeModification();
653             final ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
654                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
655
656             final String transactionID2 = "tx2";
657             final MutableCompositeModification modification2 = new MutableCompositeModification();
658             final ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
659                     TestModel.OUTER_LIST_PATH,
660                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
661                     modification2);
662
663             final String transactionID3 = "tx3";
664             final MutableCompositeModification modification3 = new MutableCompositeModification();
665             final ShardDataTreeCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
666                     YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
667                         .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
668                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1),
669                     modification3);
670
671             final long timeoutSec = 5;
672             final FiniteDuration duration = FiniteDuration.create(timeoutSec, TimeUnit.SECONDS);
673             final Timeout timeout = new Timeout(duration);
674
675             // Simulate the ForwardedReadyTransaction message for the first Tx that would be sent
676             // by the ShardTransaction.
677
678             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
679                     cohort1, modification1, true, false), getRef());
680             final ReadyTransactionReply readyReply = ReadyTransactionReply.fromSerializable(
681                     expectMsgClass(duration, ReadyTransactionReply.class));
682             assertEquals("Cohort path", shard.path().toString(), readyReply.getCohortPath());
683
684             // Send the CanCommitTransaction message for the first Tx.
685
686             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
687             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
688                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
689             assertEquals("Can commit", true, canCommitReply.getCanCommit());
690
691             // Send the ForwardedReadyTransaction for the next 2 Tx's.
692
693             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
694                     cohort2, modification2, true, false), getRef());
695             expectMsgClass(duration, ReadyTransactionReply.class);
696
697             shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
698                     cohort3, modification3, true, false), getRef());
699             expectMsgClass(duration, ReadyTransactionReply.class);
700
701             // Send the CanCommitTransaction message for the next 2 Tx's. These should get queued and
702             // processed after the first Tx completes.
703
704             final Future<Object> canCommitFuture1 = Patterns.ask(shard,
705                     new CanCommitTransaction(transactionID2).toSerializable(), timeout);
706
707             final Future<Object> canCommitFuture2 = Patterns.ask(shard,
708                     new CanCommitTransaction(transactionID3).toSerializable(), timeout);
709
710             // Send the CommitTransaction message for the first Tx. After it completes, it should
711             // trigger the 2nd Tx to proceed which should in turn then trigger the 3rd.
712
713             shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
714             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
715
716             // Wait for the next 2 Tx's to complete.
717
718             final AtomicReference<Throwable> caughtEx = new AtomicReference<>();
719             final CountDownLatch commitLatch = new CountDownLatch(2);
720
721             class OnFutureComplete extends OnComplete<Object> {
722                 private final Class<?> expRespType;
723
724                 OnFutureComplete(final Class<?> expRespType) {
725                     this.expRespType = expRespType;
726                 }
727
728                 @Override
729                 public void onComplete(final Throwable error, final Object resp) {
730                     if(error != null) {
731                         caughtEx.set(new AssertionError(getClass().getSimpleName() + " failure", error));
732                     } else {
733                         try {
734                             assertEquals("Commit response type", expRespType, resp.getClass());
735                             onSuccess(resp);
736                         } catch (final Exception e) {
737                             caughtEx.set(e);
738                         }
739                     }
740                 }
741
742                 void onSuccess(final Object resp) throws Exception {
743                 }
744             }
745
746             class OnCommitFutureComplete extends OnFutureComplete {
747                 OnCommitFutureComplete() {
748                     super(CommitTransactionReply.SERIALIZABLE_CLASS);
749                 }
750
751                 @Override
752                 public void onComplete(final Throwable error, final Object resp) {
753                     super.onComplete(error, resp);
754                     commitLatch.countDown();
755                 }
756             }
757
758             class OnCanCommitFutureComplete extends OnFutureComplete {
759                 private final String transactionID;
760
761                 OnCanCommitFutureComplete(final String transactionID) {
762                     super(CanCommitTransactionReply.SERIALIZABLE_CLASS);
763                     this.transactionID = transactionID;
764                 }
765
766                 @Override
767                 void onSuccess(final Object resp) throws Exception {
768                     final CanCommitTransactionReply canCommitReply =
769                             CanCommitTransactionReply.fromSerializable(resp);
770                     assertEquals("Can commit", true, canCommitReply.getCanCommit());
771
772                     final Future<Object> commitFuture = Patterns.ask(shard,
773                             new CommitTransaction(transactionID).toSerializable(), timeout);
774                     commitFuture.onComplete(new OnCommitFutureComplete(), getSystem().dispatcher());
775                 }
776             }
777
778             canCommitFuture1.onComplete(new OnCanCommitFutureComplete(transactionID2),
779                     getSystem().dispatcher());
780
781             canCommitFuture2.onComplete(new OnCanCommitFutureComplete(transactionID3),
782                     getSystem().dispatcher());
783
784             final boolean done = commitLatch.await(timeoutSec, TimeUnit.SECONDS);
785
786             if(caughtEx.get() != null) {
787                 throw caughtEx.get();
788             }
789
790             assertEquals("Commits complete", true, done);
791
792             final InOrder inOrder = inOrder(cohort1, cohort2, cohort3);
793             inOrder.verify(cohort1).canCommit();
794             inOrder.verify(cohort1).preCommit();
795             inOrder.verify(cohort1).commit();
796             inOrder.verify(cohort2).canCommit();
797             inOrder.verify(cohort2).preCommit();
798             inOrder.verify(cohort2).commit();
799             inOrder.verify(cohort3).canCommit();
800             inOrder.verify(cohort3).preCommit();
801             inOrder.verify(cohort3).commit();
802
803             // Verify data in the data store.
804
805             verifyOuterListEntry(shard, 1);
806
807             verifyLastApplied(shard, 2);
808
809             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
810         }};
811     }
812
813     private static BatchedModifications newBatchedModifications(final String transactionID, final YangInstanceIdentifier path,
814             final NormalizedNode<?, ?> data, final boolean ready, final boolean doCommitOnReady, final int messagesSent) {
815         return newBatchedModifications(transactionID, null, path, data, ready, doCommitOnReady, messagesSent);
816     }
817
818     private static BatchedModifications newBatchedModifications(final String transactionID, final String transactionChainID,
819             final YangInstanceIdentifier path, final NormalizedNode<?, ?> data, final boolean ready, final boolean doCommitOnReady,
820             final int messagesSent) {
821         final BatchedModifications batched = new BatchedModifications(transactionID, CURRENT_VERSION, transactionChainID);
822         batched.addModification(new WriteModification(path, data));
823         batched.setReady(ready);
824         batched.setDoCommitOnReady(doCommitOnReady);
825         batched.setTotalMessagesSent(messagesSent);
826         return batched;
827     }
828
829     @Test
830     public void testBatchedModificationsWithNoCommitOnReady() throws Throwable {
831         new ShardTestKit(getSystem()) {{
832             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
833                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
834                     "testBatchedModificationsWithNoCommitOnReady");
835
836             waitUntilLeader(shard);
837
838             final String transactionID = "tx";
839             final FiniteDuration duration = duration("5 seconds");
840
841             final AtomicReference<ShardDataTreeCohort> mockCohort = new AtomicReference<>();
842             final ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() {
843                 @Override
844                 public ShardDataTreeCohort decorate(final String txID, final ShardDataTreeCohort actual) {
845                     if(mockCohort.get() == null) {
846                         mockCohort.set(createDelegatingMockCohort("cohort", actual));
847                     }
848
849                     return mockCohort.get();
850                 }
851             };
852
853             shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator);
854
855             // Send a BatchedModifications to start a transaction.
856
857             shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH,
858                     ImmutableNodes.containerNode(TestModel.TEST_QNAME), false, false, 1), getRef());
859             expectMsgClass(duration, BatchedModificationsReply.class);
860
861             // Send a couple more BatchedModifications.
862
863             shard.tell(newBatchedModifications(transactionID, TestModel.OUTER_LIST_PATH,
864                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), false, false, 2), getRef());
865             expectMsgClass(duration, BatchedModificationsReply.class);
866
867             shard.tell(newBatchedModifications(transactionID, YangInstanceIdentifier.builder(
868                     TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
869                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true, false, 3), getRef());
870             expectMsgClass(duration, ReadyTransactionReply.class);
871
872             // Send the CanCommitTransaction message.
873
874             shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
875             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
876                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
877             assertEquals("Can commit", true, canCommitReply.getCanCommit());
878
879             // Send the CanCommitTransaction message.
880
881             shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
882             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
883
884             final InOrder inOrder = inOrder(mockCohort.get());
885             inOrder.verify(mockCohort.get()).canCommit();
886             inOrder.verify(mockCohort.get()).preCommit();
887             inOrder.verify(mockCohort.get()).commit();
888
889             // Verify data in the data store.
890
891             verifyOuterListEntry(shard, 1);
892
893             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
894         }};
895     }
896
897     @Test
898     public void testBatchedModificationsWithCommitOnReady() throws Throwable {
899         new ShardTestKit(getSystem()) {{
900             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
901                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
902                     "testBatchedModificationsWithCommitOnReady");
903
904             waitUntilLeader(shard);
905
906             final String transactionID = "tx";
907             final FiniteDuration duration = duration("5 seconds");
908
909             final AtomicReference<ShardDataTreeCohort> mockCohort = new AtomicReference<>();
910             final ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() {
911                 @Override
912                 public ShardDataTreeCohort decorate(final String txID, final ShardDataTreeCohort actual) {
913                     if(mockCohort.get() == null) {
914                         mockCohort.set(createDelegatingMockCohort("cohort", actual));
915                     }
916
917                     return mockCohort.get();
918                 }
919             };
920
921             shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator);
922
923             // Send a BatchedModifications to start a transaction.
924
925             shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH,
926                     ImmutableNodes.containerNode(TestModel.TEST_QNAME), false, false, 1), getRef());
927             expectMsgClass(duration, BatchedModificationsReply.class);
928
929             // Send a couple more BatchedModifications.
930
931             shard.tell(newBatchedModifications(transactionID, TestModel.OUTER_LIST_PATH,
932                 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), false, false, 2), getRef());
933             expectMsgClass(duration, BatchedModificationsReply.class);
934
935             shard.tell(newBatchedModifications(transactionID, YangInstanceIdentifier.builder(
936                     TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
937                 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true, true, 3), getRef());
938
939             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
940
941             final InOrder inOrder = inOrder(mockCohort.get());
942             inOrder.verify(mockCohort.get()).canCommit();
943             inOrder.verify(mockCohort.get()).preCommit();
944             inOrder.verify(mockCohort.get()).commit();
945
946             // Verify data in the data store.
947
948             verifyOuterListEntry(shard, 1);
949
950             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
951         }};
952     }
953
954     @Test(expected=IllegalStateException.class)
955     public void testBatchedModificationsReadyWithIncorrectTotalMessageCount() throws Throwable {
956         new ShardTestKit(getSystem()) {{
957             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
958                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
959                     "testBatchedModificationsReadyWithIncorrectTotalMessageCount");
960
961             waitUntilLeader(shard);
962
963             final String transactionID = "tx1";
964             final BatchedModifications batched = new BatchedModifications(transactionID, DataStoreVersions.CURRENT_VERSION, null);
965             batched.setReady(true);
966             batched.setTotalMessagesSent(2);
967
968             shard.tell(batched, getRef());
969
970             final Failure failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
971
972             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
973
974             if(failure != null) {
975                 throw failure.cause();
976             }
977         }};
978     }
979
980     @Test
981     public void testBatchedModificationsWithOperationFailure() throws Throwable {
982         new ShardTestKit(getSystem()) {{
983             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
984                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
985                     "testBatchedModificationsWithOperationFailure");
986
987             waitUntilLeader(shard);
988
989             // Test merge with invalid data. An exception should occur when the merge is applied. Note that
990             // write will not validate the children for performance reasons.
991
992             String transactionID = "tx1";
993
994             ContainerNode invalidData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
995                     new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
996                         withChild(ImmutableNodes.leafNode(TestModel.JUNK_QNAME, "junk")).build();
997
998             BatchedModifications batched = new BatchedModifications(transactionID, CURRENT_VERSION, null);
999             batched.addModification(new MergeModification(TestModel.TEST_PATH, invalidData));
1000             shard.tell(batched, getRef());
1001             Failure failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
1002
1003             Throwable cause = failure.cause();
1004
1005             batched = new BatchedModifications(transactionID, DataStoreVersions.CURRENT_VERSION, null);
1006             batched.setReady(true);
1007             batched.setTotalMessagesSent(2);
1008
1009             shard.tell(batched, getRef());
1010
1011             failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
1012             assertEquals("Failure cause", cause, failure.cause());
1013
1014             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1015         }};
1016     }
1017
1018     @SuppressWarnings("unchecked")
1019     private static void verifyOuterListEntry(final TestActorRef<Shard> shard, final Object expIDValue) throws Exception {
1020         final NormalizedNode<?, ?> outerList = readStore(shard, TestModel.OUTER_LIST_PATH);
1021         assertNotNull(TestModel.OUTER_LIST_QNAME.getLocalName() + " not found", outerList);
1022         assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " value is not Iterable",
1023                 outerList.getValue() instanceof Iterable);
1024         final Object entry = ((Iterable<Object>)outerList.getValue()).iterator().next();
1025         assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " entry is not MapEntryNode",
1026                 entry instanceof MapEntryNode);
1027         final MapEntryNode mapEntry = (MapEntryNode)entry;
1028         final Optional<DataContainerChild<? extends PathArgument, ?>> idLeaf =
1029                 mapEntry.getChild(new YangInstanceIdentifier.NodeIdentifier(TestModel.ID_QNAME));
1030         assertTrue("Missing leaf " + TestModel.ID_QNAME.getLocalName(), idLeaf.isPresent());
1031         assertEquals(TestModel.ID_QNAME.getLocalName() + " value", expIDValue, idLeaf.get().getValue());
1032     }
1033
1034     @Test
1035     public void testBatchedModificationsOnTransactionChain() throws Throwable {
1036         new ShardTestKit(getSystem()) {{
1037             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1038                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1039                     "testBatchedModificationsOnTransactionChain");
1040
1041             waitUntilLeader(shard);
1042
1043             final String transactionChainID = "txChain";
1044             final String transactionID1 = "tx1";
1045             final String transactionID2 = "tx2";
1046
1047             final FiniteDuration duration = duration("5 seconds");
1048
1049             // Send a BatchedModifications to start a chained write transaction and ready it.
1050
1051             final ContainerNode containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1052             final YangInstanceIdentifier path = TestModel.TEST_PATH;
1053             shard.tell(newBatchedModifications(transactionID1, transactionChainID, path,
1054                     containerNode, true, false, 1), getRef());
1055             expectMsgClass(duration, ReadyTransactionReply.class);
1056
1057             // Create a read Tx on the same chain.
1058
1059             shard.tell(new CreateTransaction(transactionID2, TransactionType.READ_ONLY.ordinal() ,
1060                     transactionChainID).toSerializable(), getRef());
1061
1062             final CreateTransactionReply createReply = expectMsgClass(duration("3 seconds"), CreateTransactionReply.class);
1063
1064             getSystem().actorSelection(createReply.getTransactionActorPath()).tell(new ReadData(path), getRef());
1065             final ReadDataReply readReply = expectMsgClass(duration("3 seconds"), ReadDataReply.class);
1066             assertEquals("Read node", containerNode, readReply.getNormalizedNode());
1067
1068             // Commit the write transaction.
1069
1070             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1071             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1072                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1073             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1074
1075             shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
1076             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1077
1078             // Verify data in the data store.
1079
1080             final NormalizedNode<?, ?> actualNode = readStore(shard, path);
1081             assertEquals("Stored node", containerNode, actualNode);
1082
1083             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1084         }};
1085     }
1086
1087     @Test
1088     public void testOnBatchedModificationsWhenNotLeader() {
1089         final AtomicBoolean overrideLeaderCalls = new AtomicBoolean();
1090         new ShardTestKit(getSystem()) {{
1091             final Creator<Shard> creator = new Creator<Shard>() {
1092                 private static final long serialVersionUID = 1L;
1093
1094                 @Override
1095                 public Shard create() throws Exception {
1096                     return new Shard(shardID, Collections.<String,String>emptyMap(),
1097                             newDatastoreContext(), SCHEMA_CONTEXT) {
1098                         @Override
1099                         protected boolean isLeader() {
1100                             return overrideLeaderCalls.get() ? false : super.isLeader();
1101                         }
1102
1103                         @Override
1104                         protected ActorSelection getLeader() {
1105                             return overrideLeaderCalls.get() ? getSystem().actorSelection(getRef().path()) :
1106                                 super.getLeader();
1107                         }
1108                     };
1109                 }
1110             };
1111
1112             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1113                     Props.create(new DelegatingShardCreator(creator)), "testOnBatchedModificationsWhenNotLeader");
1114
1115             waitUntilLeader(shard);
1116
1117             overrideLeaderCalls.set(true);
1118
1119             final BatchedModifications batched = new BatchedModifications("tx", DataStoreVersions.CURRENT_VERSION, "");
1120
1121             shard.tell(batched, ActorRef.noSender());
1122
1123             expectMsgEquals(batched);
1124
1125             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1126         }};
1127     }
1128
1129     @Test
1130     public void testForwardedReadyTransactionWithImmediateCommit() throws Exception{
1131         new ShardTestKit(getSystem()) {{
1132             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1133                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1134                     "testForwardedReadyTransactionWithImmediateCommit");
1135
1136             waitUntilLeader(shard);
1137
1138             final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1139
1140             final String transactionID = "tx1";
1141             final MutableCompositeModification modification = new MutableCompositeModification();
1142             final NormalizedNode<?, ?> containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1143             final ShardDataTreeCohort cohort = setupMockWriteTransaction("cohort", dataStore,
1144                     TestModel.TEST_PATH, containerNode, modification);
1145
1146             final FiniteDuration duration = duration("5 seconds");
1147
1148             // Simulate the ForwardedReadyTransaction messages that would be sent
1149             // by the ShardTransaction.
1150
1151             shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
1152                     cohort, modification, true, true), getRef());
1153
1154             expectMsgClass(duration, ThreePhaseCommitCohortMessages.CommitTransactionReply.class);
1155
1156             final InOrder inOrder = inOrder(cohort);
1157             inOrder.verify(cohort).canCommit();
1158             inOrder.verify(cohort).preCommit();
1159             inOrder.verify(cohort).commit();
1160
1161             final NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.TEST_PATH);
1162             assertEquals(TestModel.TEST_QNAME.getLocalName(), containerNode, actualNode);
1163
1164             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1165         }};
1166     }
1167
1168     @Test
1169     public void testReadyLocalTransactionWithImmediateCommit() throws Exception{
1170         new ShardTestKit(getSystem()) {{
1171             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1172                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1173                     "testReadyLocalTransactionWithImmediateCommit");
1174
1175             waitUntilLeader(shard);
1176
1177             final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1178
1179             final DataTreeModification modification = dataStore.getDataTree().takeSnapshot().newModification();
1180
1181             final ContainerNode writeData = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1182             new WriteModification(TestModel.TEST_PATH, writeData).apply(modification);
1183             final MapNode mergeData = ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build();
1184             new MergeModification(TestModel.OUTER_LIST_PATH, mergeData).apply(modification);
1185
1186             final String txId = "tx1";
1187             modification.ready();
1188             final ReadyLocalTransaction readyMessage = new ReadyLocalTransaction(txId, modification, true);
1189
1190             shard.tell(readyMessage, getRef());
1191
1192             expectMsgClass(CommitTransactionReply.SERIALIZABLE_CLASS);
1193
1194             final NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.OUTER_LIST_PATH);
1195             assertEquals(TestModel.OUTER_LIST_QNAME.getLocalName(), mergeData, actualNode);
1196
1197             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1198         }};
1199     }
1200
1201     @Test
1202     public void testReadyLocalTransactionWithThreePhaseCommit() throws Exception{
1203         new ShardTestKit(getSystem()) {{
1204             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1205                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1206                     "testReadyLocalTransactionWithThreePhaseCommit");
1207
1208             waitUntilLeader(shard);
1209
1210             final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1211
1212             final DataTreeModification modification = dataStore.getDataTree().takeSnapshot().newModification();
1213
1214             final ContainerNode writeData = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1215             new WriteModification(TestModel.TEST_PATH, writeData).apply(modification);
1216             final MapNode mergeData = ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build();
1217             new MergeModification(TestModel.OUTER_LIST_PATH, mergeData).apply(modification);
1218
1219             final String txId = "tx1";
1220                 modification.ready();
1221             final ReadyLocalTransaction readyMessage = new ReadyLocalTransaction(txId, modification, false);
1222
1223             shard.tell(readyMessage, getRef());
1224
1225             expectMsgClass(ReadyTransactionReply.class);
1226
1227             // Send the CanCommitTransaction message.
1228
1229             shard.tell(new CanCommitTransaction(txId).toSerializable(), getRef());
1230             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1231                     expectMsgClass(CanCommitTransactionReply.SERIALIZABLE_CLASS));
1232             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1233
1234             // Send the CanCommitTransaction message.
1235
1236             shard.tell(new CommitTransaction(txId).toSerializable(), getRef());
1237             expectMsgClass(CommitTransactionReply.SERIALIZABLE_CLASS);
1238
1239             final NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.OUTER_LIST_PATH);
1240             assertEquals(TestModel.OUTER_LIST_QNAME.getLocalName(), mergeData, actualNode);
1241
1242             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1243         }};
1244     }
1245
1246     @Test
1247     public void testCommitWithPersistenceDisabled() throws Throwable {
1248         dataStoreContextBuilder.persistent(false);
1249         new ShardTestKit(getSystem()) {{
1250             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1251                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1252                     "testCommitWithPersistenceDisabled");
1253
1254             waitUntilLeader(shard);
1255
1256             final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1257
1258             // Setup a simulated transactions with a mock cohort.
1259
1260             final String transactionID = "tx";
1261             final MutableCompositeModification modification = new MutableCompositeModification();
1262             final NormalizedNode<?, ?> containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1263             final ShardDataTreeCohort cohort = setupMockWriteTransaction("cohort", dataStore,
1264                 TestModel.TEST_PATH, containerNode, modification);
1265
1266             final FiniteDuration duration = duration("5 seconds");
1267
1268             // Simulate the ForwardedReadyTransaction messages that would be sent
1269             // by the ShardTransaction.
1270
1271             shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
1272                 cohort, modification, true, false), getRef());
1273             expectMsgClass(duration, ReadyTransactionReply.class);
1274
1275             // Send the CanCommitTransaction message.
1276
1277             shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
1278             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1279                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1280             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1281
1282             // Send the CanCommitTransaction message.
1283
1284             shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
1285             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1286
1287             final InOrder inOrder = inOrder(cohort);
1288             inOrder.verify(cohort).canCommit();
1289             inOrder.verify(cohort).preCommit();
1290             inOrder.verify(cohort).commit();
1291
1292             final NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.TEST_PATH);
1293             assertEquals(TestModel.TEST_QNAME.getLocalName(), containerNode, actualNode);
1294
1295             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1296         }};
1297     }
1298
1299     private static DataTreeCandidateTip mockCandidate(final String name) {
1300         final DataTreeCandidateTip mockCandidate = mock(DataTreeCandidateTip.class, name);
1301         final DataTreeCandidateNode mockCandidateNode = mock(DataTreeCandidateNode.class, name + "-node");
1302         doReturn(ModificationType.WRITE).when(mockCandidateNode).getModificationType();
1303         doReturn(Optional.of(ImmutableNodes.containerNode(CARS_QNAME))).when(mockCandidateNode).getDataAfter();
1304         doReturn(YangInstanceIdentifier.builder().build()).when(mockCandidate).getRootPath();
1305         doReturn(mockCandidateNode).when(mockCandidate).getRootNode();
1306         return mockCandidate;
1307     }
1308
1309     private static DataTreeCandidateTip mockUnmodifiedCandidate(final String name) {
1310         final DataTreeCandidateTip mockCandidate = mock(DataTreeCandidateTip.class, name);
1311         final DataTreeCandidateNode mockCandidateNode = mock(DataTreeCandidateNode.class, name + "-node");
1312         doReturn(ModificationType.UNMODIFIED).when(mockCandidateNode).getModificationType();
1313         doReturn(YangInstanceIdentifier.builder().build()).when(mockCandidate).getRootPath();
1314         doReturn(mockCandidateNode).when(mockCandidate).getRootNode();
1315         return mockCandidate;
1316     }
1317
1318     @Test
1319     public void testCommitWhenTransactionHasNoModifications(){
1320         // Note that persistence is enabled which would normally result in the entry getting written to the journal
1321         // but here that need not happen
1322         new ShardTestKit(getSystem()) {
1323             {
1324                 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1325                         newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1326                         "testCommitWhenTransactionHasNoModifications");
1327
1328                 waitUntilLeader(shard);
1329
1330                 final String transactionID = "tx1";
1331                 final MutableCompositeModification modification = new MutableCompositeModification();
1332                 final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1333                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1334                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).preCommit();
1335                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).commit();
1336                 doReturn(mockUnmodifiedCandidate("cohort1-candidate")).when(cohort).getCandidate();
1337
1338                 final FiniteDuration duration = duration("5 seconds");
1339
1340                 // Simulate the ForwardedReadyTransaction messages that would be sent
1341                 // by the ShardTransaction.
1342
1343                 shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
1344                         cohort, modification, true, false), getRef());
1345                 expectMsgClass(duration, ReadyTransactionReply.class);
1346
1347                 // Send the CanCommitTransaction message.
1348
1349                 shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
1350                 final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1351                         expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1352                 assertEquals("Can commit", true, canCommitReply.getCanCommit());
1353
1354                 shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
1355                 expectMsgClass(duration, ThreePhaseCommitCohortMessages.CommitTransactionReply.class);
1356
1357                 final InOrder inOrder = inOrder(cohort);
1358                 inOrder.verify(cohort).canCommit();
1359                 inOrder.verify(cohort).preCommit();
1360                 inOrder.verify(cohort).commit();
1361
1362                 shard.tell(Shard.GET_SHARD_MBEAN_MESSAGE, getRef());
1363                 final ShardStats shardStats = expectMsgClass(duration, ShardStats.class);
1364
1365                 // Use MBean for verification
1366                 // Committed transaction count should increase as usual
1367                 assertEquals(1,shardStats.getCommittedTransactionsCount());
1368
1369                 // Commit index should not advance because this does not go into the journal
1370                 assertEquals(-1, shardStats.getCommitIndex());
1371
1372                 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1373
1374             }
1375         };
1376     }
1377
1378     @Test
1379     public void testCommitWhenTransactionHasModifications(){
1380         new ShardTestKit(getSystem()) {
1381             {
1382                 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1383                         newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1384                         "testCommitWhenTransactionHasModifications");
1385
1386                 waitUntilLeader(shard);
1387
1388                 final String transactionID = "tx1";
1389                 final MutableCompositeModification modification = new MutableCompositeModification();
1390                 modification.addModification(new DeleteModification(YangInstanceIdentifier.builder().build()));
1391                 final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1392                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1393                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).preCommit();
1394                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).commit();
1395                 doReturn(mockCandidate("cohort1-candidate")).when(cohort).getCandidate();
1396
1397                 final FiniteDuration duration = duration("5 seconds");
1398
1399                 // Simulate the ForwardedReadyTransaction messages that would be sent
1400                 // by the ShardTransaction.
1401
1402                 shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
1403                         cohort, modification, true, false), getRef());
1404                 expectMsgClass(duration, ReadyTransactionReply.class);
1405
1406                 // Send the CanCommitTransaction message.
1407
1408                 shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
1409                 final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1410                         expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1411                 assertEquals("Can commit", true, canCommitReply.getCanCommit());
1412
1413                 shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
1414                 expectMsgClass(duration, ThreePhaseCommitCohortMessages.CommitTransactionReply.class);
1415
1416                 final InOrder inOrder = inOrder(cohort);
1417                 inOrder.verify(cohort).canCommit();
1418                 inOrder.verify(cohort).preCommit();
1419                 inOrder.verify(cohort).commit();
1420
1421                 shard.tell(Shard.GET_SHARD_MBEAN_MESSAGE, getRef());
1422                 final ShardStats shardStats = expectMsgClass(duration, ShardStats.class);
1423
1424                 // Use MBean for verification
1425                 // Committed transaction count should increase as usual
1426                 assertEquals(1, shardStats.getCommittedTransactionsCount());
1427
1428                 // Commit index should advance as we do not have an empty modification
1429                 assertEquals(0, shardStats.getCommitIndex());
1430
1431                 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1432
1433             }
1434         };
1435     }
1436
1437     @Test
1438     public void testCommitPhaseFailure() throws Throwable {
1439         new ShardTestKit(getSystem()) {{
1440             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1441                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1442                     "testCommitPhaseFailure");
1443
1444             waitUntilLeader(shard);
1445
1446             // Setup 2 simulated transactions with mock cohorts. The first one fails in the
1447             // commit phase.
1448
1449             final String transactionID1 = "tx1";
1450             final MutableCompositeModification modification1 = new MutableCompositeModification();
1451             final ShardDataTreeCohort cohort1 = mock(ShardDataTreeCohort.class, "cohort1");
1452             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
1453             doReturn(Futures.immediateFuture(null)).when(cohort1).preCommit();
1454             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort1).commit();
1455             doReturn(mockCandidate("cohort1-candidate")).when(cohort1).getCandidate();
1456
1457             final String transactionID2 = "tx2";
1458             final MutableCompositeModification modification2 = new MutableCompositeModification();
1459             final ShardDataTreeCohort cohort2 = mock(ShardDataTreeCohort.class, "cohort2");
1460             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
1461
1462             final FiniteDuration duration = duration("5 seconds");
1463             final Timeout timeout = new Timeout(duration);
1464
1465             // Simulate the ForwardedReadyTransaction messages that would be sent
1466             // by the ShardTransaction.
1467
1468             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1469                     cohort1, modification1, true, false), getRef());
1470             expectMsgClass(duration, ReadyTransactionReply.class);
1471
1472             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1473                     cohort2, modification2, true, false), getRef());
1474             expectMsgClass(duration, ReadyTransactionReply.class);
1475
1476             // Send the CanCommitTransaction message for the first Tx.
1477
1478             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1479             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1480                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1481             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1482
1483             // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
1484             // processed after the first Tx completes.
1485
1486             final Future<Object> canCommitFuture = Patterns.ask(shard,
1487                     new CanCommitTransaction(transactionID2).toSerializable(), timeout);
1488
1489             // Send the CommitTransaction message for the first Tx. This should send back an error
1490             // and trigger the 2nd Tx to proceed.
1491
1492             shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
1493             expectMsgClass(duration, akka.actor.Status.Failure.class);
1494
1495             // Wait for the 2nd Tx to complete the canCommit phase.
1496
1497             final CountDownLatch latch = new CountDownLatch(1);
1498             canCommitFuture.onComplete(new OnComplete<Object>() {
1499                 @Override
1500                 public void onComplete(final Throwable t, final Object resp) {
1501                     latch.countDown();
1502                 }
1503             }, getSystem().dispatcher());
1504
1505             assertEquals("2nd CanCommit complete", true, latch.await(5, TimeUnit.SECONDS));
1506
1507             final InOrder inOrder = inOrder(cohort1, cohort2);
1508             inOrder.verify(cohort1).canCommit();
1509             inOrder.verify(cohort1).preCommit();
1510             inOrder.verify(cohort1).commit();
1511             inOrder.verify(cohort2).canCommit();
1512
1513             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1514         }};
1515     }
1516
1517     @Test
1518     public void testPreCommitPhaseFailure() throws Throwable {
1519         new ShardTestKit(getSystem()) {{
1520             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1521                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1522                     "testPreCommitPhaseFailure");
1523
1524             waitUntilLeader(shard);
1525
1526             final String transactionID1 = "tx1";
1527             final MutableCompositeModification modification1 = new MutableCompositeModification();
1528             final ShardDataTreeCohort cohort1 = mock(ShardDataTreeCohort.class, "cohort1");
1529             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
1530             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort1).preCommit();
1531
1532             final String transactionID2 = "tx2";
1533             final MutableCompositeModification modification2 = new MutableCompositeModification();
1534             final ShardDataTreeCohort cohort2 = mock(ShardDataTreeCohort.class, "cohort2");
1535             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
1536
1537             final FiniteDuration duration = duration("5 seconds");
1538             final Timeout timeout = new Timeout(duration);
1539
1540             // Simulate the ForwardedReadyTransaction messages that would be sent
1541             // by the ShardTransaction.
1542
1543             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1544                     cohort1, modification1, true, false), getRef());
1545             expectMsgClass(duration, ReadyTransactionReply.class);
1546
1547             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1548                     cohort2, modification2, true, false), getRef());
1549             expectMsgClass(duration, ReadyTransactionReply.class);
1550
1551             // Send the CanCommitTransaction message for the first Tx.
1552
1553             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1554             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1555                 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1556             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1557
1558             // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
1559             // processed after the first Tx completes.
1560
1561             final Future<Object> canCommitFuture = Patterns.ask(shard,
1562                     new CanCommitTransaction(transactionID2).toSerializable(), timeout);
1563
1564             // Send the CommitTransaction message for the first Tx. This should send back an error
1565             // and trigger the 2nd Tx to proceed.
1566
1567             shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
1568             expectMsgClass(duration, akka.actor.Status.Failure.class);
1569
1570             // Wait for the 2nd Tx to complete the canCommit phase.
1571
1572             final CountDownLatch latch = new CountDownLatch(1);
1573             canCommitFuture.onComplete(new OnComplete<Object>() {
1574                 @Override
1575                 public void onComplete(final Throwable t, final Object resp) {
1576                     latch.countDown();
1577                 }
1578             }, getSystem().dispatcher());
1579
1580             assertEquals("2nd CanCommit complete", true, latch.await(5, TimeUnit.SECONDS));
1581
1582             final InOrder inOrder = inOrder(cohort1, cohort2);
1583             inOrder.verify(cohort1).canCommit();
1584             inOrder.verify(cohort1).preCommit();
1585             inOrder.verify(cohort2).canCommit();
1586
1587             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1588         }};
1589     }
1590
1591     @Test
1592     public void testCanCommitPhaseFailure() throws Throwable {
1593         new ShardTestKit(getSystem()) {{
1594             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1595                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1596                     "testCanCommitPhaseFailure");
1597
1598             waitUntilLeader(shard);
1599
1600             final FiniteDuration duration = duration("5 seconds");
1601
1602             final String transactionID1 = "tx1";
1603             final MutableCompositeModification modification = new MutableCompositeModification();
1604             final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1605             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).canCommit();
1606
1607             // Simulate the ForwardedReadyTransaction messages that would be sent
1608             // by the ShardTransaction.
1609
1610             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1611                     cohort, modification, true, false), getRef());
1612             expectMsgClass(duration, ReadyTransactionReply.class);
1613
1614             // Send the CanCommitTransaction message.
1615
1616             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1617             expectMsgClass(duration, akka.actor.Status.Failure.class);
1618
1619             // Send another can commit to ensure the failed one got cleaned up.
1620
1621             reset(cohort);
1622
1623             final String transactionID2 = "tx2";
1624             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1625
1626             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1627                     cohort, modification, true, false), getRef());
1628             expectMsgClass(duration, ReadyTransactionReply.class);
1629
1630             shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
1631             final CanCommitTransactionReply reply = CanCommitTransactionReply.fromSerializable(
1632                     expectMsgClass(CanCommitTransactionReply.SERIALIZABLE_CLASS));
1633             assertEquals("getCanCommit", true, reply.getCanCommit());
1634
1635             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1636         }};
1637     }
1638
1639     @Test
1640     public void testCanCommitPhaseFalseResponse() throws Throwable {
1641         new ShardTestKit(getSystem()) {{
1642             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1643                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1644                     "testCanCommitPhaseFalseResponse");
1645
1646             waitUntilLeader(shard);
1647
1648             final FiniteDuration duration = duration("5 seconds");
1649
1650             final String transactionID1 = "tx1";
1651             final MutableCompositeModification modification = new MutableCompositeModification();
1652             final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1653             doReturn(Futures.immediateFuture(Boolean.FALSE)).when(cohort).canCommit();
1654
1655             // Simulate the ForwardedReadyTransaction messages that would be sent
1656             // by the ShardTransaction.
1657
1658             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1659                     cohort, modification, true, false), getRef());
1660             expectMsgClass(duration, ReadyTransactionReply.class);
1661
1662             // Send the CanCommitTransaction message.
1663
1664             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1665             CanCommitTransactionReply reply = CanCommitTransactionReply.fromSerializable(
1666                     expectMsgClass(CanCommitTransactionReply.SERIALIZABLE_CLASS));
1667             assertEquals("getCanCommit", false, reply.getCanCommit());
1668
1669             // Send another can commit to ensure the failed one got cleaned up.
1670
1671             reset(cohort);
1672
1673             final String transactionID2 = "tx2";
1674             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1675
1676             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1677                 cohort, modification, true, false), getRef());
1678             expectMsgClass(duration, ReadyTransactionReply.class);
1679
1680             shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
1681             reply = CanCommitTransactionReply.fromSerializable(
1682                     expectMsgClass(CanCommitTransactionReply.SERIALIZABLE_CLASS));
1683             assertEquals("getCanCommit", true, reply.getCanCommit());
1684
1685             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1686         }};
1687     }
1688
1689     @Test
1690     public void testImmediateCommitWithCanCommitPhaseFailure() throws Throwable {
1691         new ShardTestKit(getSystem()) {{
1692             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1693                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1694                     "testImmediateCommitWithCanCommitPhaseFailure");
1695
1696             waitUntilLeader(shard);
1697
1698             final FiniteDuration duration = duration("5 seconds");
1699
1700             final String transactionID1 = "tx1";
1701             final MutableCompositeModification modification = new MutableCompositeModification();
1702             final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1703             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).canCommit();
1704
1705             // Simulate the ForwardedReadyTransaction messages that would be sent
1706             // by the ShardTransaction.
1707
1708             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1709                     cohort, modification, true, true), getRef());
1710
1711             expectMsgClass(duration, akka.actor.Status.Failure.class);
1712
1713             // Send another can commit to ensure the failed one got cleaned up.
1714
1715             reset(cohort);
1716
1717             final String transactionID2 = "tx2";
1718             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1719             doReturn(Futures.immediateFuture(null)).when(cohort).preCommit();
1720             doReturn(Futures.immediateFuture(null)).when(cohort).commit();
1721             final DataTreeCandidateTip candidate = mock(DataTreeCandidateTip.class);
1722             final DataTreeCandidateNode candidateRoot = mock(DataTreeCandidateNode.class);
1723             doReturn(ModificationType.UNMODIFIED).when(candidateRoot).getModificationType();
1724             doReturn(candidateRoot).when(candidate).getRootNode();
1725             doReturn(candidate).when(cohort).getCandidate();
1726
1727             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1728                     cohort, modification, true, true), getRef());
1729
1730             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1731
1732             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1733         }};
1734     }
1735
1736     @Test
1737     public void testImmediateCommitWithCanCommitPhaseFalseResponse() throws Throwable {
1738         new ShardTestKit(getSystem()) {{
1739             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1740                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1741                     "testImmediateCommitWithCanCommitPhaseFalseResponse");
1742
1743             waitUntilLeader(shard);
1744
1745             final FiniteDuration duration = duration("5 seconds");
1746
1747             final String transactionID = "tx1";
1748             final MutableCompositeModification modification = new MutableCompositeModification();
1749             final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1750             doReturn(Futures.immediateFuture(Boolean.FALSE)).when(cohort).canCommit();
1751
1752             // Simulate the ForwardedReadyTransaction messages that would be sent
1753             // by the ShardTransaction.
1754
1755             shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
1756                     cohort, modification, true, true), getRef());
1757
1758             expectMsgClass(duration, akka.actor.Status.Failure.class);
1759
1760             // Send another can commit to ensure the failed one got cleaned up.
1761
1762             reset(cohort);
1763
1764             final String transactionID2 = "tx2";
1765             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1766             doReturn(Futures.immediateFuture(null)).when(cohort).preCommit();
1767             doReturn(Futures.immediateFuture(null)).when(cohort).commit();
1768             final DataTreeCandidateTip candidate = mock(DataTreeCandidateTip.class);
1769             final DataTreeCandidateNode candidateRoot = mock(DataTreeCandidateNode.class);
1770             doReturn(ModificationType.UNMODIFIED).when(candidateRoot).getModificationType();
1771             doReturn(candidateRoot).when(candidate).getRootNode();
1772             doReturn(candidate).when(cohort).getCandidate();
1773
1774             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1775                     cohort, modification, true, true), getRef());
1776
1777             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1778
1779             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1780         }};
1781     }
1782
1783     @Test
1784     public void testAbortBeforeFinishCommit() throws Throwable {
1785         new ShardTestKit(getSystem()) {{
1786             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1787                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1788                     "testAbortBeforeFinishCommit");
1789
1790             waitUntilLeader(shard);
1791
1792             final FiniteDuration duration = duration("5 seconds");
1793             final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1794
1795             final String transactionID = "tx1";
1796             final Function<ShardDataTreeCohort, ListenableFuture<Void>> preCommit =
1797                           new Function<ShardDataTreeCohort, ListenableFuture<Void>>() {
1798                 @Override
1799                 public ListenableFuture<Void> apply(final ShardDataTreeCohort cohort) {
1800                     final ListenableFuture<Void> preCommitFuture = cohort.preCommit();
1801
1802                     // Simulate an AbortTransaction message occurring during replication, after
1803                     // persisting and before finishing the commit to the in-memory store.
1804                     // We have no followers so due to optimizations in the RaftActor, it does not
1805                     // attempt replication and thus we can't send an AbortTransaction message b/c
1806                     // it would be processed too late after CommitTransaction completes. So we'll
1807                     // simulate an AbortTransaction message occurring during replication by calling
1808                     // the shard directly.
1809                     //
1810                     shard.underlyingActor().doAbortTransaction(transactionID, null);
1811
1812                     return preCommitFuture;
1813                 }
1814             };
1815
1816             final MutableCompositeModification modification = new MutableCompositeModification();
1817             final ShardDataTreeCohort cohort = setupMockWriteTransaction("cohort1", dataStore,
1818                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME),
1819                     modification, preCommit);
1820
1821             shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
1822                 cohort, modification, true, false), getRef());
1823             expectMsgClass(duration, ReadyTransactionReply.class);
1824
1825             shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
1826             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1827                 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1828             assertEquals("Can commit", true, canCommitReply.getCanCommit());
1829
1830             shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
1831             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1832
1833             final NormalizedNode<?, ?> node = readStore(shard, TestModel.TEST_PATH);
1834
1835             // Since we're simulating an abort occurring during replication and before finish commit,
1836             // the data should still get written to the in-memory store since we've gotten past
1837             // canCommit and preCommit and persisted the data.
1838             assertNotNull(TestModel.TEST_QNAME.getLocalName() + " not found", node);
1839
1840             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1841         }};
1842     }
1843
1844     @Test
1845     public void testTransactionCommitTimeout() throws Throwable {
1846         dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1);
1847
1848         new ShardTestKit(getSystem()) {{
1849             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1850                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1851                     "testTransactionCommitTimeout");
1852
1853             waitUntilLeader(shard);
1854
1855             final FiniteDuration duration = duration("5 seconds");
1856
1857             final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1858
1859             writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1860             writeToStore(shard, TestModel.OUTER_LIST_PATH,
1861                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
1862
1863             // Create 1st Tx - will timeout
1864
1865             final String transactionID1 = "tx1";
1866             final MutableCompositeModification modification1 = new MutableCompositeModification();
1867             final ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
1868                     YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
1869                         .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
1870                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1),
1871                     modification1);
1872
1873             // Create 2nd Tx
1874
1875             final String transactionID2 = "tx3";
1876             final MutableCompositeModification modification2 = new MutableCompositeModification();
1877             final YangInstanceIdentifier listNodePath = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
1878                 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build();
1879             final ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort3", dataStore,
1880                     listNodePath,
1881                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2),
1882                     modification2);
1883
1884             // Ready the Tx's
1885
1886             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1887                     cohort1, modification1, true, false), getRef());
1888             expectMsgClass(duration, ReadyTransactionReply.class);
1889
1890             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1891                     cohort2, modification2, true, false), getRef());
1892             expectMsgClass(duration, ReadyTransactionReply.class);
1893
1894             // canCommit 1st Tx. We don't send the commit so it should timeout.
1895
1896             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1897             expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
1898
1899             // canCommit the 2nd Tx - it should complete after the 1st Tx times out.
1900
1901             shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
1902             expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
1903
1904             // Try to commit the 1st Tx - should fail as it's not the current Tx.
1905
1906             shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
1907             expectMsgClass(duration, akka.actor.Status.Failure.class);
1908
1909             // Commit the 2nd Tx.
1910
1911             shard.tell(new CommitTransaction(transactionID2).toSerializable(), getRef());
1912             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1913
1914             final NormalizedNode<?, ?> node = readStore(shard, listNodePath);
1915             assertNotNull(listNodePath + " not found", node);
1916
1917             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1918         }};
1919     }
1920
1921     @Test
1922     public void testTransactionCommitQueueCapacityExceeded() throws Throwable {
1923         dataStoreContextBuilder.shardTransactionCommitQueueCapacity(2);
1924
1925         new ShardTestKit(getSystem()) {{
1926             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1927                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1928                     "testTransactionCommitQueueCapacityExceeded");
1929
1930             waitUntilLeader(shard);
1931
1932             final FiniteDuration duration = duration("5 seconds");
1933
1934             final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1935
1936             final String transactionID1 = "tx1";
1937             final MutableCompositeModification modification1 = new MutableCompositeModification();
1938             final ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
1939                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
1940
1941             final String transactionID2 = "tx2";
1942             final MutableCompositeModification modification2 = new MutableCompositeModification();
1943             final ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
1944                     TestModel.OUTER_LIST_PATH,
1945                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
1946                     modification2);
1947
1948             final String transactionID3 = "tx3";
1949             final MutableCompositeModification modification3 = new MutableCompositeModification();
1950             final ShardDataTreeCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
1951                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification3);
1952
1953             // Ready the Tx's
1954
1955             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1956                     cohort1, modification1, true, false), getRef());
1957             expectMsgClass(duration, ReadyTransactionReply.class);
1958
1959             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1960                     cohort2, modification2, true, false), getRef());
1961             expectMsgClass(duration, ReadyTransactionReply.class);
1962
1963             // The 3rd Tx should exceed queue capacity and fail.
1964
1965             shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
1966                     cohort3, modification3, true, false), getRef());
1967             expectMsgClass(duration, akka.actor.Status.Failure.class);
1968
1969             // canCommit 1st Tx.
1970
1971             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1972             expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
1973
1974             // canCommit the 2nd Tx - it should get queued.
1975
1976             shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
1977
1978             // canCommit the 3rd Tx - should exceed queue capacity and fail.
1979
1980             shard.tell(new CanCommitTransaction(transactionID3).toSerializable(), getRef());
1981             expectMsgClass(duration, akka.actor.Status.Failure.class);
1982
1983             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1984         }};
1985     }
1986
1987     @Test
1988     public void testTransactionCommitWithPriorExpiredCohortEntries() throws Throwable {
1989         dataStoreContextBuilder.shardCommitQueueExpiryTimeoutInMillis(1300).shardTransactionCommitTimeoutInSeconds(1);
1990
1991         new ShardTestKit(getSystem()) {{
1992             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1993                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1994                     "testTransactionCommitWithPriorExpiredCohortEntries");
1995
1996             waitUntilLeader(shard);
1997
1998             final FiniteDuration duration = duration("5 seconds");
1999
2000             final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
2001
2002             final String transactionID1 = "tx1";
2003             final MutableCompositeModification modification1 = new MutableCompositeModification();
2004             final ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
2005                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
2006
2007             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
2008                 cohort1, modification1, true, false), getRef());
2009             expectMsgClass(duration, ReadyTransactionReply.class);
2010
2011             final String transactionID2 = "tx2";
2012             final MutableCompositeModification modification2 = new MutableCompositeModification();
2013             final ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
2014                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification2);
2015
2016             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
2017                     cohort2, modification2, true, false), getRef());
2018             expectMsgClass(duration, ReadyTransactionReply.class);
2019
2020             final String transactionID3 = "tx3";
2021             final MutableCompositeModification modification3 = new MutableCompositeModification();
2022             final ShardDataTreeCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
2023                     TestModel.TEST2_PATH, ImmutableNodes.containerNode(TestModel.TEST2_QNAME), modification3);
2024
2025             shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
2026                 cohort3, modification3, true, false), getRef());
2027             expectMsgClass(duration, ReadyTransactionReply.class);
2028
2029             // All Tx's are readied. We'll send canCommit for the last one but not the others. The others
2030             // should expire from the queue and the last one should be processed.
2031
2032             shard.tell(new CanCommitTransaction(transactionID3).toSerializable(), getRef());
2033             expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
2034
2035             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2036         }};
2037     }
2038
2039     @Test
2040     public void testTransactionCommitWithSubsequentExpiredCohortEntry() throws Throwable {
2041         dataStoreContextBuilder.shardCommitQueueExpiryTimeoutInMillis(1300).shardTransactionCommitTimeoutInSeconds(1);
2042
2043         new ShardTestKit(getSystem()) {{
2044             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
2045                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
2046                     "testTransactionCommitWithSubsequentExpiredCohortEntry");
2047
2048             waitUntilLeader(shard);
2049
2050             final FiniteDuration duration = duration("5 seconds");
2051
2052             final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
2053
2054             final String transactionID1 = "tx1";
2055             final MutableCompositeModification modification1 = new MutableCompositeModification();
2056             final ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
2057                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
2058
2059             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
2060                     cohort1, modification1, true, false), getRef());
2061             expectMsgClass(duration, ReadyTransactionReply.class);
2062
2063             // CanCommit the first one so it's the current in-progress CohortEntry.
2064
2065             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
2066             expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
2067
2068             // Ready the second Tx.
2069
2070             final String transactionID2 = "tx2";
2071             final MutableCompositeModification modification2 = new MutableCompositeModification();
2072             final ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
2073                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification2);
2074
2075             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
2076                     cohort2, modification2, true, false), getRef());
2077             expectMsgClass(duration, ReadyTransactionReply.class);
2078
2079             // Ready the third Tx.
2080
2081             final String transactionID3 = "tx3";
2082             final DataTreeModification modification3 = dataStore.getDataTree().takeSnapshot().newModification();
2083             new WriteModification(TestModel.TEST2_PATH, ImmutableNodes.containerNode(TestModel.TEST2_QNAME))
2084                     .apply(modification3);
2085                 modification3.ready();
2086             final ReadyLocalTransaction readyMessage = new ReadyLocalTransaction(transactionID3, modification3, true);
2087
2088             shard.tell(readyMessage, getRef());
2089
2090             // Commit the first Tx. After completing, the second should expire from the queue and the third
2091             // Tx committed.
2092
2093             shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
2094             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
2095
2096             // Expect commit reply from the third Tx.
2097
2098             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
2099
2100             final NormalizedNode<?, ?> node = readStore(shard, TestModel.TEST2_PATH);
2101             assertNotNull(TestModel.TEST2_PATH + " not found", node);
2102
2103             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2104         }};
2105     }
2106
2107     @Test
2108     public void testCanCommitBeforeReadyFailure() throws Throwable {
2109         new ShardTestKit(getSystem()) {{
2110             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
2111                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
2112                     "testCanCommitBeforeReadyFailure");
2113
2114             shard.tell(new CanCommitTransaction("tx").toSerializable(), getRef());
2115             expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
2116
2117             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2118         }};
2119     }
2120
2121     @Test
2122     public void testAbortCurrentTransaction() throws Throwable {
2123         new ShardTestKit(getSystem()) {{
2124             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
2125                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
2126                     "testAbortCurrentTransaction");
2127
2128             waitUntilLeader(shard);
2129
2130             // Setup 2 simulated transactions with mock cohorts. The first one will be aborted.
2131
2132             final String transactionID1 = "tx1";
2133             final MutableCompositeModification modification1 = new MutableCompositeModification();
2134             final ShardDataTreeCohort cohort1 = mock(ShardDataTreeCohort.class, "cohort1");
2135             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
2136             doReturn(Futures.immediateFuture(null)).when(cohort1).abort();
2137
2138             final String transactionID2 = "tx2";
2139             final MutableCompositeModification modification2 = new MutableCompositeModification();
2140             final ShardDataTreeCohort cohort2 = mock(ShardDataTreeCohort.class, "cohort2");
2141             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
2142
2143             final FiniteDuration duration = duration("5 seconds");
2144             final Timeout timeout = new Timeout(duration);
2145
2146             // Simulate the ForwardedReadyTransaction messages that would be sent
2147             // by the ShardTransaction.
2148
2149             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
2150                     cohort1, modification1, true, false), getRef());
2151             expectMsgClass(duration, ReadyTransactionReply.class);
2152
2153             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
2154                     cohort2, modification2, true, false), getRef());
2155             expectMsgClass(duration, ReadyTransactionReply.class);
2156
2157             // Send the CanCommitTransaction message for the first Tx.
2158
2159             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
2160             final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
2161                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
2162             assertEquals("Can commit", true, canCommitReply.getCanCommit());
2163
2164             // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
2165             // processed after the first Tx completes.
2166
2167             final Future<Object> canCommitFuture = Patterns.ask(shard,
2168                     new CanCommitTransaction(transactionID2).toSerializable(), timeout);
2169
2170             // Send the AbortTransaction message for the first Tx. This should trigger the 2nd
2171             // Tx to proceed.
2172
2173             shard.tell(new AbortTransaction(transactionID1).toSerializable(), getRef());
2174             expectMsgClass(duration, AbortTransactionReply.SERIALIZABLE_CLASS);
2175
2176             // Wait for the 2nd Tx to complete the canCommit phase.
2177
2178             Await.ready(canCommitFuture, duration);
2179
2180             final InOrder inOrder = inOrder(cohort1, cohort2);
2181             inOrder.verify(cohort1).canCommit();
2182             inOrder.verify(cohort2).canCommit();
2183
2184             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2185         }};
2186     }
2187
2188     @Test
2189     public void testAbortQueuedTransaction() throws Throwable {
2190         dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1);
2191         new ShardTestKit(getSystem()) {{
2192             final AtomicReference<CountDownLatch> cleaupCheckLatch = new AtomicReference<>();
2193             @SuppressWarnings("serial")
2194             final Creator<Shard> creator = new Creator<Shard>() {
2195                 @Override
2196                 public Shard create() throws Exception {
2197                     return new Shard(shardID, Collections.<String,String>emptyMap(),
2198                             dataStoreContextBuilder.build(), SCHEMA_CONTEXT) {
2199                         @Override
2200                         public void onReceiveCommand(final Object message) throws Exception {
2201                             super.onReceiveCommand(message);
2202                             if(message.equals(TX_COMMIT_TIMEOUT_CHECK_MESSAGE)) {
2203                                 if(cleaupCheckLatch.get() != null) {
2204                                     cleaupCheckLatch.get().countDown();
2205                                 }
2206                             }
2207                         }
2208                     };
2209                 }
2210             };
2211
2212             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
2213                     Props.create(new DelegatingShardCreator(creator)).withDispatcher(
2214                             Dispatchers.DefaultDispatcherId()), "testAbortQueuedTransaction");
2215
2216             waitUntilLeader(shard);
2217
2218             final String transactionID = "tx1";
2219
2220             final MutableCompositeModification modification = new MutableCompositeModification();
2221             final ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort");
2222             doReturn(Futures.immediateFuture(null)).when(cohort).abort();
2223
2224             final FiniteDuration duration = duration("5 seconds");
2225
2226             // Ready the tx.
2227
2228             shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
2229                     cohort, modification, true, false), getRef());
2230             expectMsgClass(duration, ReadyTransactionReply.class);
2231
2232             assertEquals("getPendingTxCommitQueueSize", 1, shard.underlyingActor().getPendingTxCommitQueueSize());
2233
2234             // Send the AbortTransaction message.
2235
2236             shard.tell(new AbortTransaction(transactionID).toSerializable(), getRef());
2237             expectMsgClass(duration, AbortTransactionReply.SERIALIZABLE_CLASS);
2238
2239             verify(cohort).abort();
2240
2241             // Verify the tx cohort is removed from queue at the cleanup check interval.
2242
2243             cleaupCheckLatch.set(new CountDownLatch(1));
2244             assertEquals("TX_COMMIT_TIMEOUT_CHECK_MESSAGE received", true,
2245                     cleaupCheckLatch.get().await(5, TimeUnit.SECONDS));
2246
2247             assertEquals("getPendingTxCommitQueueSize", 0, shard.underlyingActor().getPendingTxCommitQueueSize());
2248
2249             // Now send CanCommitTransaction - should fail.
2250
2251             shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
2252
2253             Throwable failure = expectMsgClass(duration, akka.actor.Status.Failure.class).cause();
2254             assertTrue("Failure type", failure instanceof IllegalStateException);
2255
2256             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2257         }};
2258     }
2259
2260     @Test
2261     public void testCreateSnapshot() throws Exception {
2262         testCreateSnapshot(true, "testCreateSnapshot");
2263     }
2264
2265     @Test
2266     public void testCreateSnapshotWithNonPersistentData() throws Exception {
2267         testCreateSnapshot(false, "testCreateSnapshotWithNonPersistentData");
2268     }
2269
2270     @SuppressWarnings("serial")
2271     public void testCreateSnapshot(final boolean persistent, final String shardActorName) throws Exception{
2272
2273         final AtomicReference<CountDownLatch> latch = new AtomicReference<>(new CountDownLatch(1));
2274
2275         final AtomicReference<Object> savedSnapshot = new AtomicReference<>();
2276         class TestPersistentDataProvider extends DelegatingPersistentDataProvider {
2277             TestPersistentDataProvider(final DataPersistenceProvider delegate) {
2278                 super(delegate);
2279             }
2280
2281             @Override
2282             public void saveSnapshot(final Object o) {
2283                 savedSnapshot.set(o);
2284                 super.saveSnapshot(o);
2285             }
2286         }
2287
2288         dataStoreContextBuilder.persistent(persistent);
2289
2290         new ShardTestKit(getSystem()) {{
2291             class TestShard extends Shard {
2292
2293                 protected TestShard(final ShardIdentifier name, final Map<String, String> peerAddresses,
2294                                     final DatastoreContext datastoreContext, final SchemaContext schemaContext) {
2295                     super(name, peerAddresses, datastoreContext, schemaContext);
2296                     setPersistence(new TestPersistentDataProvider(super.persistence()));
2297                 }
2298
2299                 @Override
2300                 public void handleCommand(final Object message) {
2301                     super.handleCommand(message);
2302
2303                     if (message instanceof SaveSnapshotSuccess || message.equals("commit_snapshot")) {
2304                         latch.get().countDown();
2305                     }
2306                 }
2307
2308                 @Override
2309                 public RaftActorContext getRaftActorContext() {
2310                     return super.getRaftActorContext();
2311                 }
2312             }
2313
2314             final Creator<Shard> creator = new Creator<Shard>() {
2315                 @Override
2316                 public Shard create() throws Exception {
2317                     return new TestShard(shardID, Collections.<String,String>emptyMap(),
2318                             newDatastoreContext(), SCHEMA_CONTEXT);
2319                 }
2320             };
2321
2322             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
2323                     Props.create(new DelegatingShardCreator(creator)), shardActorName);
2324
2325             waitUntilLeader(shard);
2326
2327             writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
2328
2329             final NormalizedNode<?,?> expectedRoot = readStore(shard, YangInstanceIdentifier.builder().build());
2330
2331             // Trigger creation of a snapshot by ensuring
2332             final RaftActorContext raftActorContext = ((TestShard) shard.underlyingActor()).getRaftActorContext();
2333             raftActorContext.getSnapshotManager().capture(mock(ReplicatedLogEntry.class), -1);
2334
2335             assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS));
2336
2337             assertTrue("Invalid saved snapshot " + savedSnapshot.get(),
2338                     savedSnapshot.get() instanceof Snapshot);
2339
2340             verifySnapshot((Snapshot)savedSnapshot.get(), expectedRoot);
2341
2342             latch.set(new CountDownLatch(1));
2343             savedSnapshot.set(null);
2344
2345             raftActorContext.getSnapshotManager().capture(mock(ReplicatedLogEntry.class), -1);
2346
2347             assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS));
2348
2349             assertTrue("Invalid saved snapshot " + savedSnapshot.get(),
2350                     savedSnapshot.get() instanceof Snapshot);
2351
2352             verifySnapshot((Snapshot)savedSnapshot.get(), expectedRoot);
2353
2354             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2355         }
2356
2357         private void verifySnapshot(final Snapshot snapshot, final NormalizedNode<?,?> expectedRoot) {
2358
2359             final NormalizedNode<?, ?> actual = SerializationUtils.deserializeNormalizedNode(snapshot.getState());
2360             assertEquals("Root node", expectedRoot, actual);
2361
2362         }};
2363     }
2364
2365     /**
2366      * This test simply verifies that the applySnapShot logic will work
2367      * @throws ReadFailedException
2368      * @throws DataValidationFailedException
2369      */
2370     @Test
2371     public void testInMemoryDataTreeRestore() throws ReadFailedException, DataValidationFailedException {
2372         final DataTree store = InMemoryDataTreeFactory.getInstance().create();
2373         store.setSchemaContext(SCHEMA_CONTEXT);
2374
2375         final DataTreeModification putTransaction = store.takeSnapshot().newModification();
2376         putTransaction.write(TestModel.TEST_PATH,
2377             ImmutableNodes.containerNode(TestModel.TEST_QNAME));
2378         commitTransaction(store, putTransaction);
2379
2380
2381         final NormalizedNode<?, ?> expected = readStore(store, YangInstanceIdentifier.builder().build());
2382
2383         final DataTreeModification writeTransaction = store.takeSnapshot().newModification();
2384
2385         writeTransaction.delete(YangInstanceIdentifier.builder().build());
2386         writeTransaction.write(YangInstanceIdentifier.builder().build(), expected);
2387
2388         commitTransaction(store, writeTransaction);
2389
2390         final NormalizedNode<?, ?> actual = readStore(store, YangInstanceIdentifier.builder().build());
2391
2392         assertEquals(expected, actual);
2393     }
2394
2395     @Test
2396     public void testRecoveryApplicable(){
2397
2398         final DatastoreContext persistentContext = DatastoreContext.newBuilder().
2399                 shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(true).build();
2400
2401         final Props persistentProps = Shard.props(shardID, Collections.<String, String>emptyMap(),
2402                 persistentContext, SCHEMA_CONTEXT);
2403
2404         final DatastoreContext nonPersistentContext = DatastoreContext.newBuilder().
2405                 shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(false).build();
2406
2407         final Props nonPersistentProps = Shard.props(shardID, Collections.<String, String>emptyMap(),
2408             nonPersistentContext, SCHEMA_CONTEXT);
2409
2410         new ShardTestKit(getSystem()) {{
2411             final TestActorRef<Shard> shard1 = TestActorRef.create(getSystem(),
2412                     persistentProps, "testPersistence1");
2413
2414             assertTrue("Recovery Applicable", shard1.underlyingActor().persistence().isRecoveryApplicable());
2415
2416             shard1.tell(PoisonPill.getInstance(), ActorRef.noSender());
2417
2418             final TestActorRef<Shard> shard2 = TestActorRef.create(getSystem(),
2419                     nonPersistentProps, "testPersistence2");
2420
2421             assertFalse("Recovery Not Applicable", shard2.underlyingActor().persistence().isRecoveryApplicable());
2422
2423             shard2.tell(PoisonPill.getInstance(), ActorRef.noSender());
2424
2425         }};
2426
2427     }
2428
2429     @Test
2430     public void testOnDatastoreContext() {
2431         new ShardTestKit(getSystem()) {{
2432             dataStoreContextBuilder.persistent(true);
2433
2434             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(), "testOnDatastoreContext");
2435
2436             assertEquals("isRecoveryApplicable", true,
2437                     shard.underlyingActor().persistence().isRecoveryApplicable());
2438
2439             waitUntilLeader(shard);
2440
2441             shard.tell(dataStoreContextBuilder.persistent(false).build(), ActorRef.noSender());
2442
2443             assertEquals("isRecoveryApplicable", false,
2444                 shard.underlyingActor().persistence().isRecoveryApplicable());
2445
2446             shard.tell(dataStoreContextBuilder.persistent(true).build(), ActorRef.noSender());
2447
2448             assertEquals("isRecoveryApplicable", true,
2449                 shard.underlyingActor().persistence().isRecoveryApplicable());
2450
2451             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2452         }};
2453     }
2454
2455     @Test
2456     public void testRegisterRoleChangeListener() throws Exception {
2457         new ShardTestKit(getSystem()) {
2458             {
2459                 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
2460                         newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
2461                         "testRegisterRoleChangeListener");
2462
2463                 waitUntilLeader(shard);
2464
2465                 final TestActorRef<MessageCollectorActor> listener =
2466                         TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
2467
2468                 shard.tell(new RegisterRoleChangeListener(), listener);
2469
2470                 MessageCollectorActor.expectFirstMatching(listener, RegisterRoleChangeListenerReply.class);
2471
2472                 ShardLeaderStateChanged leaderStateChanged = MessageCollectorActor.expectFirstMatching(listener,
2473                     ShardLeaderStateChanged.class);
2474                 assertEquals("getLocalShardDataTree present", true,
2475                         leaderStateChanged.getLocalShardDataTree().isPresent());
2476                 assertSame("getLocalShardDataTree", shard.underlyingActor().getDataStore().getDataTree(),
2477                     leaderStateChanged.getLocalShardDataTree().get());
2478
2479                 MessageCollectorActor.clearMessages(listener);
2480
2481                 // Force a leader change
2482
2483                 shard.tell(new RequestVote(10000, "member2", 50, 50), getRef());
2484
2485                 leaderStateChanged = MessageCollectorActor.expectFirstMatching(listener,
2486                         ShardLeaderStateChanged.class);
2487                 assertEquals("getLocalShardDataTree present", false,
2488                         leaderStateChanged.getLocalShardDataTree().isPresent());
2489
2490                 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2491             }
2492         };
2493     }
2494
2495     @Test
2496     public void testFollowerInitialSyncStatus() throws Exception {
2497         final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
2498                 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
2499                 "testFollowerInitialSyncStatus");
2500
2501         shard.underlyingActor().onReceiveCommand(new FollowerInitialSyncUpStatus(false, "member-1-shard-inventory-operational"));
2502
2503         assertEquals(false, shard.underlyingActor().getShardMBean().getFollowerInitialSyncStatus());
2504
2505         shard.underlyingActor().onReceiveCommand(new FollowerInitialSyncUpStatus(true, "member-1-shard-inventory-operational"));
2506
2507         assertEquals(true, shard.underlyingActor().getShardMBean().getFollowerInitialSyncStatus());
2508
2509         shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2510     }
2511
2512     private static void commitTransaction(final DataTree store, final DataTreeModification modification) throws DataValidationFailedException {
2513         modification.ready();
2514         store.validate(modification);
2515         store.commit(store.prepare(modification));
2516     }
2517
2518     @Test
2519     public void testClusteredDataChangeListernerDelayedRegistration() throws Exception {
2520         new ShardTestKit(getSystem()) {{
2521             final CountDownLatch onFirstElectionTimeout = new CountDownLatch(1);
2522             final CountDownLatch onChangeListenerRegistered = new CountDownLatch(1);
2523             final Creator<Shard> creator = new Creator<Shard>() {
2524                 boolean firstElectionTimeout = true;
2525
2526                 @Override
2527                 public Shard create() throws Exception {
2528                     return new Shard(shardID, Collections.<String,String>emptyMap(),
2529                         dataStoreContextBuilder.persistent(false).build(), SCHEMA_CONTEXT) {
2530                         @Override
2531                         public void onReceiveCommand(final Object message) throws Exception {
2532                             if(message instanceof ElectionTimeout && firstElectionTimeout) {
2533                                 firstElectionTimeout = false;
2534                                 final ActorRef self = getSelf();
2535                                 new Thread() {
2536                                     @Override
2537                                     public void run() {
2538                                         Uninterruptibles.awaitUninterruptibly(
2539                                             onChangeListenerRegistered, 5, TimeUnit.SECONDS);
2540                                         self.tell(message, self);
2541                                     }
2542                                 }.start();
2543
2544                                 onFirstElectionTimeout.countDown();
2545                             } else {
2546                                 super.onReceiveCommand(message);
2547                             }
2548                         }
2549                     };
2550                 }
2551             };
2552
2553             final MockDataChangeListener listener = new MockDataChangeListener(1);
2554             final ActorRef dclActor = getSystem().actorOf(DataChangeListener.props(listener),
2555                 "testDataChangeListenerOnFollower-DataChangeListener");
2556
2557             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
2558                 Props.create(new DelegatingShardCreator(creator)),
2559                 "testDataChangeListenerOnFollower");
2560
2561             assertEquals("Got first ElectionTimeout", true,
2562                 onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
2563
2564             shard.tell(new FindLeader(), getRef());
2565             final FindLeaderReply findLeadeReply =
2566                 expectMsgClass(duration("5 seconds"), FindLeaderReply.class);
2567             assertNull("Expected the shard not to be the leader", findLeadeReply.getLeaderActor());
2568
2569             final YangInstanceIdentifier path = TestModel.TEST_PATH;
2570
2571             shard.tell(new RegisterChangeListener(path, dclActor, AsyncDataBroker.DataChangeScope.BASE, true), getRef());
2572             final RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
2573                 RegisterChangeListenerReply.class);
2574             assertNotNull("getListenerRegistratioznPath", reply.getListenerRegistrationPath());
2575
2576             writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
2577
2578             onChangeListenerRegistered.countDown();
2579
2580             listener.waitForChangeEvents();
2581
2582             dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
2583             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2584         }};
2585     }
2586
2587     @Test
2588     public void testClusteredDataChangeListernerRegistration() throws Exception {
2589         new ShardTestKit(getSystem()) {{
2590             final ShardIdentifier member1ShardID = ShardIdentifier.builder().memberName("member-1")
2591                 .shardName("inventory").type("config").build();
2592
2593             final ShardIdentifier member2ShardID = ShardIdentifier.builder().memberName("member-2")
2594                 .shardName("inventory").type("config").build();
2595             final Creator<Shard> followerShardCreator = new Creator<Shard>() {
2596
2597                 @Override
2598                 public Shard create() throws Exception {
2599                     return new Shard(member1ShardID, Collections.singletonMap(member2ShardID.toString(),
2600                         "akka://test/user/" + member2ShardID.toString()),
2601                         dataStoreContextBuilder.persistent(false).build(), SCHEMA_CONTEXT) {
2602                         @Override
2603                         public void onReceiveCommand(final Object message) throws Exception {
2604
2605                             if(!(message instanceof ElectionTimeout)) {
2606                                 super.onReceiveCommand(message);
2607                             }
2608                         }
2609                     };
2610                 }
2611             };
2612
2613             final Creator<Shard> leaderShardCreator = new Creator<Shard>() {
2614
2615                 @Override
2616                 public Shard create() throws Exception {
2617                     return new Shard(member2ShardID, Collections.singletonMap(member1ShardID.toString(),
2618                         "akka://test/user/" + member1ShardID.toString()),
2619                         dataStoreContextBuilder.persistent(false).build(), SCHEMA_CONTEXT) { };
2620                 }
2621             };
2622
2623
2624             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
2625                 Props.create(new DelegatingShardCreator(followerShardCreator)),
2626                 member1ShardID.toString());
2627
2628             final TestActorRef<Shard> shardLeader = TestActorRef.create(getSystem(),
2629                 Props.create(new DelegatingShardCreator(leaderShardCreator)),
2630                 member2ShardID.toString());
2631             // Sleep to let election happen
2632             Uninterruptibles.sleepUninterruptibly(2, TimeUnit.SECONDS);
2633
2634             shard.tell(new FindLeader(), getRef());
2635             final FindLeaderReply findLeaderReply =
2636                     expectMsgClass(duration("5 seconds"), FindLeaderReply.class);
2637             assertEquals("Shard leader does not match", shardLeader.path().toString(), findLeaderReply.getLeaderActor());
2638
2639             final YangInstanceIdentifier path = TestModel.TEST_PATH;
2640             final MockDataChangeListener listener = new MockDataChangeListener(1);
2641             final ActorRef dclActor = getSystem().actorOf(DataChangeListener.props(listener),
2642                 "testDataChangeListenerOnFollower-DataChangeListener");
2643
2644             shard.tell(new RegisterChangeListener(path, dclActor, AsyncDataBroker.DataChangeScope.BASE, true), getRef());
2645             final RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
2646                 RegisterChangeListenerReply.class);
2647             assertNotNull("getListenerRegistratioznPath", reply.getListenerRegistrationPath());
2648
2649             writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
2650
2651             listener.waitForChangeEvents();
2652
2653             dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
2654             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2655         }};
2656     }
2657 }