Remove unused exceptions
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / ShardTest.java
1 /*
2  * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertFalse;
13 import static org.junit.Assert.assertNotNull;
14 import static org.junit.Assert.assertSame;
15 import static org.junit.Assert.assertTrue;
16 import static org.junit.Assert.fail;
17 import static org.mockito.Matchers.any;
18 import static org.mockito.Mockito.doThrow;
19 import static org.mockito.Mockito.inOrder;
20 import static org.mockito.Mockito.mock;
21 import static org.opendaylight.controller.cluster.datastore.DataStoreVersions.CURRENT_VERSION;
22
23 import akka.actor.ActorRef;
24 import akka.actor.ActorSelection;
25 import akka.actor.Props;
26 import akka.actor.Status.Failure;
27 import akka.dispatch.Dispatchers;
28 import akka.dispatch.OnComplete;
29 import akka.japi.Creator;
30 import akka.pattern.Patterns;
31 import akka.persistence.SaveSnapshotSuccess;
32 import akka.testkit.TestActorRef;
33 import akka.util.Timeout;
34 import com.google.common.base.Stopwatch;
35 import com.google.common.base.Throwables;
36 import com.google.common.util.concurrent.Uninterruptibles;
37 import java.util.Collections;
38 import java.util.HashSet;
39 import java.util.Map;
40 import java.util.Optional;
41 import java.util.Set;
42 import java.util.concurrent.CountDownLatch;
43 import java.util.concurrent.TimeUnit;
44 import java.util.concurrent.atomic.AtomicBoolean;
45 import java.util.concurrent.atomic.AtomicReference;
46 import org.junit.Test;
47 import org.mockito.InOrder;
48 import org.opendaylight.controller.cluster.DataPersistenceProvider;
49 import org.opendaylight.controller.cluster.DelegatingPersistentDataProvider;
50 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
51 import org.opendaylight.controller.cluster.access.concepts.MemberName;
52 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
53 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
54 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
55 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
56 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
57 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
58 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
59 import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
60 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
61 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
62 import org.opendaylight.controller.cluster.datastore.messages.CloseDataTreeNotificationListenerRegistration;
63 import org.opendaylight.controller.cluster.datastore.messages.CloseDataTreeNotificationListenerRegistrationReply;
64 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
65 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
66 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
67 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
68 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
69 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
70 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
71 import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction;
72 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
73 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener;
74 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeNotificationListenerReply;
75 import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
76 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
77 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
78 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
79 import org.opendaylight.controller.cluster.datastore.persisted.MetadataShardDataTreeSnapshot;
80 import org.opendaylight.controller.cluster.datastore.persisted.ShardSnapshotState;
81 import org.opendaylight.controller.cluster.datastore.utils.MockDataTreeChangeListener;
82 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
83 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListenerReply;
84 import org.opendaylight.controller.cluster.raft.RaftActorContext;
85 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
86 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
87 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
88 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
89 import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
90 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
91 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
92 import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState;
93 import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState;
94 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
95 import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
96 import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
97 import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
98 import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
99 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
100 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
101 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
102 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
103 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
104 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
105 import org.opendaylight.yangtools.concepts.Identifier;
106 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
107 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
108 import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
109 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
110 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
111 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
112 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeConfiguration;
113 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
114 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
115 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
116 import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
117 import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory;
118 import scala.concurrent.Await;
119 import scala.concurrent.Future;
120 import scala.concurrent.duration.FiniteDuration;
121
122 public class ShardTest extends AbstractShardTest {
123     private static final String DUMMY_DATA = "Dummy data as snapshot sequence number is set to 0 in "
124             + "InMemorySnapshotStore and journal recovery seq number will start from 1";
125
126     @Test
127     public void testRegisterDataTreeChangeListener() throws Exception {
128         new ShardTestKit(getSystem()) {
129             {
130                 final TestActorRef<Shard> shard = actorFactory.createTestActor(
131                         newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
132                         "testRegisterDataTreeChangeListener");
133
134                 waitUntilLeader(shard);
135
136                 shard.tell(new UpdateSchemaContext(SchemaContextHelper.full()), ActorRef.noSender());
137
138                 final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
139                 final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener,
140                         TestModel.TEST_PATH), "testRegisterDataTreeChangeListener-DataTreeChangeListener");
141
142                 shard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor, false), getRef());
143
144                 final RegisterDataTreeNotificationListenerReply reply = expectMsgClass(duration("3 seconds"),
145                         RegisterDataTreeNotificationListenerReply.class);
146                 final String replyPath = reply.getListenerRegistrationPath().toString();
147                 assertTrue("Incorrect reply path: " + replyPath,
148                         replyPath.matches("akka:\\/\\/test\\/user\\/testRegisterDataTreeChangeListener\\/\\$.*"));
149
150                 final YangInstanceIdentifier path = TestModel.TEST_PATH;
151                 writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
152
153                 listener.waitForChangeEvents();
154             }
155         };
156     }
157
158     @SuppressWarnings("serial")
159     @Test
160     public void testDataTreeChangeListenerNotifiedWhenNotTheLeaderOnRegistration() throws Exception {
161         final CountDownLatch onFirstElectionTimeout = new CountDownLatch(1);
162         final CountDownLatch onChangeListenerRegistered = new CountDownLatch(1);
163         final Creator<Shard> creator = new Creator<Shard>() {
164             boolean firstElectionTimeout = true;
165
166             @Override
167             public Shard create() {
168                 return new Shard(newShardBuilder()) {
169                     @Override
170                     public void handleCommand(final Object message) {
171                         if (message instanceof ElectionTimeout && firstElectionTimeout) {
172                             firstElectionTimeout = false;
173                             final ActorRef self = getSelf();
174                             new Thread(() -> {
175                                 Uninterruptibles.awaitUninterruptibly(
176                                         onChangeListenerRegistered, 5, TimeUnit.SECONDS);
177                                 self.tell(message, self);
178                             }).start();
179
180                             onFirstElectionTimeout.countDown();
181                         } else {
182                             super.handleCommand(message);
183                         }
184                     }
185                 };
186             }
187         };
188
189         setupInMemorySnapshotStore();
190
191         final YangInstanceIdentifier path = TestModel.TEST_PATH;
192         final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
193         final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener, path),
194                 "testDataTreeChangeListenerNotifiedWhenNotTheLeaderOnRegistration-DataChangeListener");
195
196         final TestActorRef<Shard> shard = actorFactory.createTestActor(
197                 Props.create(new DelegatingShardCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()),
198                 "testDataTreeChangeListenerNotifiedWhenNotTheLeaderOnRegistration");
199
200         new ShardTestKit(getSystem()) {
201             {
202                 assertEquals("Got first ElectionTimeout", true, onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
203
204                 shard.tell(new RegisterDataTreeChangeListener(path, dclActor, false), getRef());
205                 final RegisterDataTreeNotificationListenerReply reply = expectMsgClass(duration("5 seconds"),
206                         RegisterDataTreeNotificationListenerReply.class);
207                 assertNotNull("getListenerRegistratioznPath", reply.getListenerRegistrationPath());
208
209                 shard.tell(FindLeader.INSTANCE, getRef());
210                 final FindLeaderReply findLeadeReply = expectMsgClass(duration("5 seconds"), FindLeaderReply.class);
211                 assertFalse("Expected the shard not to be the leader", findLeadeReply.getLeaderActor().isPresent());
212
213                 onChangeListenerRegistered.countDown();
214
215                 // TODO: investigate why we do not receive data chage events
216                 listener.waitForChangeEvents();
217             }
218         };
219     }
220
221     @Test
222     public void testCreateTransaction() {
223         new ShardTestKit(getSystem()) {
224             {
225                 final ActorRef shard = actorFactory.createActor(newShardProps(), "testCreateTransaction");
226
227                 waitUntilLeader(shard);
228
229                 shard.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
230
231                 shard.tell(new CreateTransaction(nextTransactionId(), TransactionType.READ_ONLY.ordinal(),
232                         DataStoreVersions.CURRENT_VERSION).toSerializable(), getRef());
233
234                 final CreateTransactionReply reply = expectMsgClass(duration("3 seconds"),
235                         CreateTransactionReply.class);
236
237                 final String path = reply.getTransactionPath().toString();
238                 assertTrue("Unexpected transaction path " + path, path.startsWith(String.format(
239                         "akka://test/user/testCreateTransaction/shard-%s-%s:ShardTransactionTest@0:",
240                             shardID.getShardName(), shardID.getMemberName().getName())));
241             }
242         };
243     }
244
245     @Test
246     public void testCreateTransactionOnChain() {
247         new ShardTestKit(getSystem()) {
248             {
249                 final ActorRef shard = actorFactory.createActor(newShardProps(), "testCreateTransactionOnChain");
250
251                 waitUntilLeader(shard);
252
253                 shard.tell(new CreateTransaction(nextTransactionId(), TransactionType.READ_ONLY.ordinal(),
254                         DataStoreVersions.CURRENT_VERSION).toSerializable(), getRef());
255
256                 final CreateTransactionReply reply = expectMsgClass(duration("3 seconds"),
257                         CreateTransactionReply.class);
258
259                 final String path = reply.getTransactionPath().toString();
260                 assertTrue("Unexpected transaction path " + path, path.startsWith(String.format(
261                         "akka://test/user/testCreateTransactionOnChain/shard-%s-%s:ShardTransactionTest@0:",
262                         shardID.getShardName(), shardID.getMemberName().getName())));
263             }
264         };
265     }
266
267     @Test
268     public void testPeerAddressResolved() {
269         new ShardTestKit(getSystem()) {
270             {
271                 final ShardIdentifier peerID = ShardIdentifier.create("inventory", MemberName.forName("member-2"),
272                         "config");
273                 final TestActorRef<Shard> shard = actorFactory.createTestActor(newShardBuilder()
274                         .peerAddresses(Collections.<String, String>singletonMap(peerID.toString(), null))
275                         .props().withDispatcher(Dispatchers.DefaultDispatcherId()), "testPeerAddressResolved");
276
277                 final String address = "akka://foobar";
278                 shard.tell(new PeerAddressResolved(peerID.toString(), address), ActorRef.noSender());
279
280                 shard.tell(GetOnDemandRaftState.INSTANCE, getRef());
281                 final OnDemandRaftState state = expectMsgClass(OnDemandRaftState.class);
282                 assertEquals("getPeerAddress", address, state.getPeerAddresses().get(peerID.toString()));
283             }
284         };
285     }
286
287     @Test
288     public void testApplySnapshot() throws Exception {
289
290         final TestActorRef<Shard> shard = actorFactory.createTestActor(newShardProps()
291                 .withDispatcher(Dispatchers.DefaultDispatcherId()), "testApplySnapshot");
292
293         ShardTestKit.waitUntilLeader(shard);
294
295         final DataTree store = new InMemoryDataTreeFactory().create(DataTreeConfiguration.DEFAULT_OPERATIONAL,
296             SCHEMA_CONTEXT);
297
298         final ContainerNode container = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
299                 new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME))
300                     .withChild(ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).addChild(
301                         ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1)).build()).build();
302
303         writeToStore(store, TestModel.TEST_PATH, container);
304
305         final YangInstanceIdentifier root = YangInstanceIdentifier.EMPTY;
306         final NormalizedNode<?,?> expected = readStore(store, root);
307
308         final Snapshot snapshot = Snapshot.create(
309                 new ShardSnapshotState(new MetadataShardDataTreeSnapshot(expected)),
310                 Collections.<ReplicatedLogEntry>emptyList(), 1, 2, 3, 4, -1, null, null);
311
312         shard.tell(new ApplySnapshot(snapshot), ActorRef.noSender());
313
314         final Stopwatch sw = Stopwatch.createStarted();
315         while (sw.elapsed(TimeUnit.SECONDS) <= 5) {
316             Uninterruptibles.sleepUninterruptibly(75, TimeUnit.MILLISECONDS);
317
318             try {
319                 assertEquals("Root node", expected, readStore(shard, root));
320                 return;
321             } catch (final AssertionError e) {
322                 // try again
323             }
324         }
325
326         fail("Snapshot was not applied");
327     }
328
329     @Test
330     public void testApplyState() throws Exception {
331         final TestActorRef<Shard> shard = actorFactory.createTestActor(
332                 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), "testApplyState");
333
334         ShardTestKit.waitUntilLeader(shard);
335
336         final DataTree store = new InMemoryDataTreeFactory().create(DataTreeConfiguration.DEFAULT_OPERATIONAL,
337             SCHEMA_CONTEXT);
338
339         final DataTreeModification writeMod = store.takeSnapshot().newModification();
340         final ContainerNode node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
341         writeMod.write(TestModel.TEST_PATH, node);
342         writeMod.ready();
343
344         final TransactionIdentifier tx = nextTransactionId();
345         shard.underlyingActor().applyState(null, null, payloadForModification(store, writeMod, tx));
346
347         final Stopwatch sw = Stopwatch.createStarted();
348         while (sw.elapsed(TimeUnit.SECONDS) <= 5) {
349             Uninterruptibles.sleepUninterruptibly(75, TimeUnit.MILLISECONDS);
350
351             final NormalizedNode<?,?> actual = readStore(shard, TestModel.TEST_PATH);
352             if (actual != null) {
353                 assertEquals("Applied state", node, actual);
354                 return;
355             }
356         }
357
358         fail("State was not applied");
359     }
360
361     @Test
362     public void testDataTreeCandidateRecovery() throws Exception {
363         // Set up the InMemorySnapshotStore.
364         final DataTree source = setupInMemorySnapshotStore();
365
366         final DataTreeModification writeMod = source.takeSnapshot().newModification();
367         writeMod.write(TestModel.OUTER_LIST_PATH, ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
368         writeMod.ready();
369         InMemoryJournal.addEntry(shardID.toString(), 0, DUMMY_DATA);
370
371         // Set up the InMemoryJournal.
372         InMemoryJournal.addEntry(shardID.toString(), 1, new SimpleReplicatedLogEntry(0, 1,
373             payloadForModification(source, writeMod, nextTransactionId())));
374
375         final int nListEntries = 16;
376         final Set<Integer> listEntryKeys = new HashSet<>();
377
378         // Add some ModificationPayload entries
379         for (int i = 1; i <= nListEntries; i++) {
380             listEntryKeys.add(Integer.valueOf(i));
381
382             final YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
383                     .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
384
385             final DataTreeModification mod = source.takeSnapshot().newModification();
386             mod.merge(path, ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i));
387             mod.ready();
388
389             InMemoryJournal.addEntry(shardID.toString(), i + 1, new SimpleReplicatedLogEntry(i, 1,
390                 payloadForModification(source, mod, nextTransactionId())));
391         }
392
393         InMemoryJournal.addEntry(shardID.toString(), nListEntries + 2,
394             new ApplyJournalEntries(nListEntries));
395
396         testRecovery(listEntryKeys);
397     }
398
399     @Test
400     @SuppressWarnings("checkstyle:IllegalCatch")
401     public void testConcurrentThreePhaseCommits() throws Exception {
402         final AtomicReference<Throwable> caughtEx = new AtomicReference<>();
403         final CountDownLatch commitLatch = new CountDownLatch(2);
404
405         final long timeoutSec = 5;
406         final FiniteDuration duration = FiniteDuration.create(timeoutSec, TimeUnit.SECONDS);
407         final Timeout timeout = new Timeout(duration);
408
409         final TestActorRef<Shard> shard = actorFactory.createTestActor(
410                 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
411                 "testConcurrentThreePhaseCommits");
412
413         class OnFutureComplete extends OnComplete<Object> {
414             private final Class<?> expRespType;
415
416             OnFutureComplete(final Class<?> expRespType) {
417                 this.expRespType = expRespType;
418             }
419
420             @Override
421             public void onComplete(final Throwable error, final Object resp) {
422                 if (error != null) {
423                     caughtEx.set(new AssertionError(getClass().getSimpleName() + " failure", error));
424                 } else {
425                     try {
426                         assertEquals("Commit response type", expRespType, resp.getClass());
427                         onSuccess(resp);
428                     } catch (final Exception e) {
429                         caughtEx.set(e);
430                     }
431                 }
432             }
433
434             void onSuccess(final Object resp) {
435             }
436         }
437
438         class OnCommitFutureComplete extends OnFutureComplete {
439             OnCommitFutureComplete() {
440                 super(CommitTransactionReply.class);
441             }
442
443             @Override
444             public void onComplete(final Throwable error, final Object resp) {
445                 super.onComplete(error, resp);
446                 commitLatch.countDown();
447             }
448         }
449
450         class OnCanCommitFutureComplete extends OnFutureComplete {
451             private final TransactionIdentifier transactionID;
452
453             OnCanCommitFutureComplete(final TransactionIdentifier transactionID) {
454                 super(CanCommitTransactionReply.class);
455                 this.transactionID = transactionID;
456             }
457
458             @Override
459             void onSuccess(final Object resp) {
460                 final CanCommitTransactionReply canCommitReply =
461                         CanCommitTransactionReply.fromSerializable(resp);
462                 assertEquals("Can commit", true, canCommitReply.getCanCommit());
463
464                 final Future<Object> commitFuture = Patterns.ask(shard,
465                         new CommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), timeout);
466                 commitFuture.onComplete(new OnCommitFutureComplete(), getSystem().dispatcher());
467             }
468         }
469
470         new ShardTestKit(getSystem()) {
471             {
472                 waitUntilLeader(shard);
473
474                 final TransactionIdentifier transactionID1 = nextTransactionId();
475                 final TransactionIdentifier transactionID2 = nextTransactionId();
476                 final TransactionIdentifier transactionID3 = nextTransactionId();
477
478                 final Map<TransactionIdentifier, CapturingShardDataTreeCohort> cohortMap = setupCohortDecorator(
479                         shard.underlyingActor(), transactionID1, transactionID2, transactionID3);
480                 final CapturingShardDataTreeCohort cohort1 = cohortMap.get(transactionID1);
481                 final CapturingShardDataTreeCohort cohort2 = cohortMap.get(transactionID2);
482                 final CapturingShardDataTreeCohort cohort3 = cohortMap.get(transactionID3);
483
484                 shard.tell(prepareBatchedModifications(transactionID1, TestModel.TEST_PATH,
485                         ImmutableNodes.containerNode(TestModel.TEST_QNAME), false), getRef());
486                 final ReadyTransactionReply readyReply = ReadyTransactionReply
487                         .fromSerializable(expectMsgClass(duration, ReadyTransactionReply.class));
488                 assertEquals("Cohort path", shard.path().toString(), readyReply.getCohortPath());
489                 // Send the CanCommitTransaction message for the first Tx.
490
491                 shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
492                 final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply
493                         .fromSerializable(expectMsgClass(duration, CanCommitTransactionReply.class));
494                 assertEquals("Can commit", true, canCommitReply.getCanCommit());
495
496                 // Ready 2 more Tx's.
497
498                 shard.tell(prepareBatchedModifications(transactionID2, TestModel.OUTER_LIST_PATH,
499                         ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), false), getRef());
500                 expectMsgClass(duration, ReadyTransactionReply.class);
501
502                 shard.tell(
503                         prepareBatchedModifications(transactionID3,
504                                 YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
505                                         .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
506                                 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), false),
507                         getRef());
508                 expectMsgClass(duration, ReadyTransactionReply.class);
509
510                 // Send the CanCommitTransaction message for the next 2 Tx's.
511                 // These should get queued and
512                 // processed after the first Tx completes.
513
514                 final Future<Object> canCommitFuture1 = Patterns.ask(shard,
515                         new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), timeout);
516
517                 final Future<Object> canCommitFuture2 = Patterns.ask(shard,
518                         new CanCommitTransaction(transactionID3, CURRENT_VERSION).toSerializable(), timeout);
519
520                 // Send the CommitTransaction message for the first Tx. After it
521                 // completes, it should
522                 // trigger the 2nd Tx to proceed which should in turn then
523                 // trigger the 3rd.
524
525                 shard.tell(new CommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
526                 expectMsgClass(duration, CommitTransactionReply.class);
527
528                 // Wait for the next 2 Tx's to complete.
529
530                 canCommitFuture1.onComplete(new OnCanCommitFutureComplete(transactionID2), getSystem().dispatcher());
531
532                 canCommitFuture2.onComplete(new OnCanCommitFutureComplete(transactionID3), getSystem().dispatcher());
533
534                 final boolean done = commitLatch.await(timeoutSec, TimeUnit.SECONDS);
535
536                 final Throwable t = caughtEx.get();
537                 if (t != null) {
538                     Throwables.propagateIfPossible(t, Exception.class);
539                     throw new RuntimeException(t);
540                 }
541
542                 assertEquals("Commits complete", true, done);
543
544 //                final InOrder inOrder = inOrder(cohort1.getCanCommit(), cohort1.getPreCommit(), cohort1.getCommit(),
545 //                        cohort2.getCanCommit(), cohort2.getPreCommit(), cohort2.getCommit(), cohort3.getCanCommit(),
546 //                        cohort3.getPreCommit(), cohort3.getCommit());
547 //                inOrder.verify(cohort1.getCanCommit()).onSuccess(any(Void.class));
548 //                inOrder.verify(cohort1.getPreCommit()).onSuccess(any(DataTreeCandidate.class));
549 //                inOrder.verify(cohort2.getCanCommit()).onSuccess(any(Void.class));
550 //                inOrder.verify(cohort2.getPreCommit()).onSuccess(any(DataTreeCandidate.class));
551 //                inOrder.verify(cohort3.getCanCommit()).onSuccess(any(Void.class));
552 //                inOrder.verify(cohort3.getPreCommit()).onSuccess(any(DataTreeCandidate.class));
553 //                inOrder.verify(cohort1.getCommit()).onSuccess(any(UnsignedLong.class));
554 //                inOrder.verify(cohort2.getCommit()).onSuccess(any(UnsignedLong.class));
555 //                inOrder.verify(cohort3.getCommit()).onSuccess(any(UnsignedLong.class));
556
557                 // Verify data in the data store.
558
559                 verifyOuterListEntry(shard, 1);
560
561                 verifyLastApplied(shard, 5);
562             }
563         };
564     }
565
566     @Test
567     public void testBatchedModificationsWithNoCommitOnReady() {
568         new ShardTestKit(getSystem()) {
569             {
570                 final TestActorRef<Shard> shard = actorFactory.createTestActor(
571                         newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
572                         "testBatchedModificationsWithNoCommitOnReady");
573
574                 waitUntilLeader(shard);
575
576                 final TransactionIdentifier transactionID = nextTransactionId();
577                 final FiniteDuration duration = duration("5 seconds");
578
579                 // Send a BatchedModifications to start a transaction.
580
581                 shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH,
582                         ImmutableNodes.containerNode(TestModel.TEST_QNAME), false, false, 1), getRef());
583                 expectMsgClass(duration, BatchedModificationsReply.class);
584
585                 // Send a couple more BatchedModifications.
586
587                 shard.tell(
588                         newBatchedModifications(transactionID, TestModel.OUTER_LIST_PATH,
589                                 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), false, false, 2),
590                         getRef());
591                 expectMsgClass(duration, BatchedModificationsReply.class);
592
593                 shard.tell(newBatchedModifications(transactionID,
594                         YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
595                                 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
596                         ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true, false, 3),
597                         getRef());
598                 expectMsgClass(duration, ReadyTransactionReply.class);
599
600                 // Send the CanCommitTransaction message.
601
602                 shard.tell(new CanCommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
603                 final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply
604                         .fromSerializable(expectMsgClass(duration, CanCommitTransactionReply.class));
605                 assertEquals("Can commit", true, canCommitReply.getCanCommit());
606
607                 // Send the CommitTransaction message.
608
609                 shard.tell(new CommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
610                 expectMsgClass(duration, CommitTransactionReply.class);
611
612                 // Verify data in the data store.
613
614                 verifyOuterListEntry(shard, 1);
615             }
616         };
617     }
618
619     @Test
620     public void testBatchedModificationsWithCommitOnReady() {
621         new ShardTestKit(getSystem()) {
622             {
623                 final TestActorRef<Shard> shard = actorFactory.createTestActor(
624                         newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
625                         "testBatchedModificationsWithCommitOnReady");
626
627                 waitUntilLeader(shard);
628
629                 final TransactionIdentifier transactionID = nextTransactionId();
630                 final FiniteDuration duration = duration("5 seconds");
631
632                 // Send a BatchedModifications to start a transaction.
633
634                 shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH,
635                         ImmutableNodes.containerNode(TestModel.TEST_QNAME), false, false, 1), getRef());
636                 expectMsgClass(duration, BatchedModificationsReply.class);
637
638                 // Send a couple more BatchedModifications.
639
640                 shard.tell(newBatchedModifications(transactionID, TestModel.OUTER_LIST_PATH,
641                                 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), false, false, 2),
642                         getRef());
643                 expectMsgClass(duration, BatchedModificationsReply.class);
644
645                 shard.tell(newBatchedModifications(transactionID,
646                         YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
647                                 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
648                         ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true, true, 3),
649                         getRef());
650
651                 expectMsgClass(duration, CommitTransactionReply.class);
652
653                 // Verify data in the data store.
654
655                 verifyOuterListEntry(shard, 1);
656             }
657         };
658     }
659
660     @Test(expected = IllegalStateException.class)
661     public void testBatchedModificationsReadyWithIncorrectTotalMessageCount() throws Exception {
662         new ShardTestKit(getSystem()) {
663             {
664                 final TestActorRef<Shard> shard = actorFactory.createTestActor(
665                         newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
666                         "testBatchedModificationsReadyWithIncorrectTotalMessageCount");
667
668                 waitUntilLeader(shard);
669
670                 final TransactionIdentifier transactionID = nextTransactionId();
671                 final BatchedModifications batched = new BatchedModifications(transactionID,
672                         DataStoreVersions.CURRENT_VERSION);
673                 batched.setReady();
674                 batched.setTotalMessagesSent(2);
675
676                 shard.tell(batched, getRef());
677
678                 final Failure failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
679
680                 if (failure != null) {
681                     Throwables.propagateIfPossible(failure.cause(), Exception.class);
682                     throw new RuntimeException(failure.cause());
683                 }
684             }
685         };
686     }
687
688     @Test
689     public void testBatchedModificationsWithOperationFailure() {
690         new ShardTestKit(getSystem()) {
691             {
692                 final TestActorRef<Shard> shard = actorFactory.createTestActor(
693                         newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
694                         "testBatchedModificationsWithOperationFailure");
695
696                 waitUntilLeader(shard);
697
698                 // Test merge with invalid data. An exception should occur when
699                 // the merge is applied. Note that
700                 // write will not validate the children for performance reasons.
701
702                 final TransactionIdentifier transactionID = nextTransactionId();
703
704                 final ContainerNode invalidData = ImmutableContainerNodeBuilder.create()
705                         .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME))
706                         .withChild(ImmutableNodes.leafNode(TestModel.JUNK_QNAME, "junk")).build();
707
708                 BatchedModifications batched = new BatchedModifications(transactionID, CURRENT_VERSION);
709                 batched.addModification(new MergeModification(TestModel.TEST_PATH, invalidData));
710                 shard.tell(batched, getRef());
711                 Failure failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
712
713                 final Throwable cause = failure.cause();
714
715                 batched = new BatchedModifications(transactionID, DataStoreVersions.CURRENT_VERSION);
716                 batched.setReady();
717                 batched.setTotalMessagesSent(2);
718
719                 shard.tell(batched, getRef());
720
721                 failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
722                 assertEquals("Failure cause", cause, failure.cause());
723             }
724         };
725     }
726
727     @Test
728     public void testBatchedModificationsOnTransactionChain() {
729         new ShardTestKit(getSystem()) {
730             {
731                 final TestActorRef<Shard> shard = actorFactory.createTestActor(
732                         newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
733                         "testBatchedModificationsOnTransactionChain");
734
735                 waitUntilLeader(shard);
736
737                 final LocalHistoryIdentifier historyId = nextHistoryId();
738                 final TransactionIdentifier transactionID1 = new TransactionIdentifier(historyId, 0);
739                 final TransactionIdentifier transactionID2 = new TransactionIdentifier(historyId, 1);
740
741                 final FiniteDuration duration = duration("5 seconds");
742
743                 // Send a BatchedModifications to start a chained write
744                 // transaction and ready it.
745
746                 final ContainerNode containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
747                 final YangInstanceIdentifier path = TestModel.TEST_PATH;
748                 shard.tell(newBatchedModifications(transactionID1, path, containerNode, true, false, 1), getRef());
749                 expectMsgClass(duration, ReadyTransactionReply.class);
750
751                 // Create a read Tx on the same chain.
752
753                 shard.tell(new CreateTransaction(transactionID2, TransactionType.READ_ONLY.ordinal(),
754                         DataStoreVersions.CURRENT_VERSION).toSerializable(), getRef());
755
756                 final CreateTransactionReply createReply = expectMsgClass(duration("3 seconds"),
757                         CreateTransactionReply.class);
758
759                 getSystem().actorSelection(createReply.getTransactionPath())
760                         .tell(new ReadData(path, DataStoreVersions.CURRENT_VERSION), getRef());
761                 final ReadDataReply readReply = expectMsgClass(duration("3 seconds"), ReadDataReply.class);
762                 assertEquals("Read node", containerNode, readReply.getNormalizedNode());
763
764                 // Commit the write transaction.
765
766                 shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
767                 final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply
768                         .fromSerializable(expectMsgClass(duration, CanCommitTransactionReply.class));
769                 assertEquals("Can commit", true, canCommitReply.getCanCommit());
770
771                 shard.tell(new CommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
772                 expectMsgClass(duration, CommitTransactionReply.class);
773
774                 // Verify data in the data store.
775
776                 final NormalizedNode<?, ?> actualNode = readStore(shard, path);
777                 assertEquals("Stored node", containerNode, actualNode);
778             }
779         };
780     }
781
782     @Test
783     public void testOnBatchedModificationsWhenNotLeader() {
784         final AtomicBoolean overrideLeaderCalls = new AtomicBoolean();
785         new ShardTestKit(getSystem()) {
786             {
787                 final Creator<Shard> creator = new Creator<Shard>() {
788                     private static final long serialVersionUID = 1L;
789
790                     @Override
791                     public Shard create() {
792                         return new Shard(newShardBuilder()) {
793                             @Override
794                             protected boolean isLeader() {
795                                 return overrideLeaderCalls.get() ? false : super.isLeader();
796                             }
797
798                             @Override
799                             public ActorSelection getLeader() {
800                                 return overrideLeaderCalls.get() ? getSystem().actorSelection(getRef().path())
801                                         : super.getLeader();
802                             }
803                         };
804                     }
805                 };
806
807                 final TestActorRef<Shard> shard = actorFactory.createTestActor(Props
808                         .create(new DelegatingShardCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()),
809                         "testOnBatchedModificationsWhenNotLeader");
810
811                 waitUntilLeader(shard);
812
813                 overrideLeaderCalls.set(true);
814
815                 final BatchedModifications batched = new BatchedModifications(nextTransactionId(),
816                         DataStoreVersions.CURRENT_VERSION);
817
818                 shard.tell(batched, ActorRef.noSender());
819
820                 expectMsgEquals(batched);
821             }
822         };
823     }
824
825     @Test
826     public void testTransactionMessagesWithNoLeader() {
827         new ShardTestKit(getSystem()) {
828             {
829                 dataStoreContextBuilder.customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName())
830                         .shardHeartbeatIntervalInMillis(50).shardElectionTimeoutFactor(1);
831                 final TestActorRef<Shard> shard = actorFactory.createTestActor(
832                         newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
833                         "testTransactionMessagesWithNoLeader");
834
835                 waitUntilNoLeader(shard);
836
837                 final TransactionIdentifier txId = nextTransactionId();
838                 shard.tell(new BatchedModifications(txId, DataStoreVersions.CURRENT_VERSION), getRef());
839                 Failure failure = expectMsgClass(Failure.class);
840                 assertEquals("Failure cause type", NoShardLeaderException.class, failure.cause().getClass());
841
842                 shard.tell(prepareForwardedReadyTransaction(shard, txId, TestModel.TEST_PATH,
843                         ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
844                 failure = expectMsgClass(Failure.class);
845                 assertEquals("Failure cause type", NoShardLeaderException.class, failure.cause().getClass());
846
847                 shard.tell(new ReadyLocalTransaction(txId, mock(DataTreeModification.class), true, Optional.empty()),
848                         getRef());
849                 failure = expectMsgClass(Failure.class);
850                 assertEquals("Failure cause type", NoShardLeaderException.class, failure.cause().getClass());
851             }
852         };
853     }
854
855     @Test
856     public void testReadyWithReadWriteImmediateCommit() {
857         testReadyWithImmediateCommit(true);
858     }
859
860     @Test
861     public void testReadyWithWriteOnlyImmediateCommit() {
862         testReadyWithImmediateCommit(false);
863     }
864
865     private void testReadyWithImmediateCommit(final boolean readWrite) {
866         new ShardTestKit(getSystem()) {
867             {
868                 final TestActorRef<Shard> shard = actorFactory.createTestActor(
869                         newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
870                         "testReadyWithImmediateCommit-" + readWrite);
871
872                 waitUntilLeader(shard);
873
874                 final TransactionIdentifier transactionID = nextTransactionId();
875                 final NormalizedNode<?, ?> containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
876                 if (readWrite) {
877                     shard.tell(prepareForwardedReadyTransaction(shard, transactionID, TestModel.TEST_PATH,
878                             containerNode, true), getRef());
879                 } else {
880                     shard.tell(prepareBatchedModifications(transactionID, TestModel.TEST_PATH, containerNode, true),
881                             getRef());
882                 }
883
884                 expectMsgClass(duration("5 seconds"), CommitTransactionReply.class);
885
886                 final NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.TEST_PATH);
887                 assertEquals(TestModel.TEST_QNAME.getLocalName(), containerNode, actualNode);
888             }
889         };
890     }
891
892     @Test
893     public void testReadyLocalTransactionWithImmediateCommit() {
894         new ShardTestKit(getSystem()) {
895             {
896                 final TestActorRef<Shard> shard = actorFactory.createTestActor(
897                         newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
898                         "testReadyLocalTransactionWithImmediateCommit");
899
900                 waitUntilLeader(shard);
901
902                 final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
903
904                 final DataTreeModification modification = dataStore.newModification();
905
906                 final ContainerNode writeData = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
907                 new WriteModification(TestModel.TEST_PATH, writeData).apply(modification);
908                 final MapNode mergeData = ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build();
909                 new MergeModification(TestModel.OUTER_LIST_PATH, mergeData).apply(modification);
910
911                 final TransactionIdentifier txId = nextTransactionId();
912                 modification.ready();
913                 final ReadyLocalTransaction readyMessage =
914                         new ReadyLocalTransaction(txId, modification, true, Optional.empty());
915
916                 shard.tell(readyMessage, getRef());
917
918                 expectMsgClass(CommitTransactionReply.class);
919
920                 final NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.OUTER_LIST_PATH);
921                 assertEquals(TestModel.OUTER_LIST_QNAME.getLocalName(), mergeData, actualNode);
922             }
923         };
924     }
925
926     @Test
927     public void testReadyLocalTransactionWithThreePhaseCommit() {
928         new ShardTestKit(getSystem()) {
929             {
930                 final TestActorRef<Shard> shard = actorFactory.createTestActor(
931                         newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
932                         "testReadyLocalTransactionWithThreePhaseCommit");
933
934                 waitUntilLeader(shard);
935
936                 final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
937
938                 final DataTreeModification modification = dataStore.newModification();
939
940                 final ContainerNode writeData = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
941                 new WriteModification(TestModel.TEST_PATH, writeData).apply(modification);
942                 final MapNode mergeData = ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build();
943                 new MergeModification(TestModel.OUTER_LIST_PATH, mergeData).apply(modification);
944
945                 final TransactionIdentifier txId = nextTransactionId();
946                 modification.ready();
947                 final ReadyLocalTransaction readyMessage =
948                         new ReadyLocalTransaction(txId, modification, false, Optional.empty());
949
950                 shard.tell(readyMessage, getRef());
951
952                 expectMsgClass(ReadyTransactionReply.class);
953
954                 // Send the CanCommitTransaction message.
955
956                 shard.tell(new CanCommitTransaction(txId, CURRENT_VERSION).toSerializable(), getRef());
957                 final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply
958                         .fromSerializable(expectMsgClass(CanCommitTransactionReply.class));
959                 assertEquals("Can commit", true, canCommitReply.getCanCommit());
960
961                 // Send the CanCommitTransaction message.
962
963                 shard.tell(new CommitTransaction(txId, CURRENT_VERSION).toSerializable(), getRef());
964                 expectMsgClass(CommitTransactionReply.class);
965
966                 final NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.OUTER_LIST_PATH);
967                 assertEquals(TestModel.OUTER_LIST_QNAME.getLocalName(), mergeData, actualNode);
968             }
969         };
970     }
971
972     @Test
973     public void testReadWriteCommitWithPersistenceDisabled() {
974         dataStoreContextBuilder.persistent(false);
975         new ShardTestKit(getSystem()) {
976             {
977                 final TestActorRef<Shard> shard = actorFactory.createTestActor(
978                         newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
979                         "testCommitWithPersistenceDisabled");
980
981                 waitUntilLeader(shard);
982
983                 // Setup a simulated transactions with a mock cohort.
984
985                 final FiniteDuration duration = duration("5 seconds");
986
987                 final TransactionIdentifier transactionID = nextTransactionId();
988                 final NormalizedNode<?, ?> containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
989                 shard.tell(prepareBatchedModifications(transactionID, TestModel.TEST_PATH, containerNode, false),
990                         getRef());
991                 expectMsgClass(duration, ReadyTransactionReply.class);
992
993                 // Send the CanCommitTransaction message.
994
995                 shard.tell(new CanCommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
996                 final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply
997                         .fromSerializable(expectMsgClass(duration, CanCommitTransactionReply.class));
998                 assertEquals("Can commit", true, canCommitReply.getCanCommit());
999
1000                 // Send the CanCommitTransaction message.
1001
1002                 shard.tell(new CommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
1003                 expectMsgClass(duration, CommitTransactionReply.class);
1004
1005                 final NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.TEST_PATH);
1006                 assertEquals(TestModel.TEST_QNAME.getLocalName(), containerNode, actualNode);
1007             }
1008         };
1009     }
1010
1011     @Test
1012     public void testReadWriteCommitWhenTransactionHasModifications() throws Exception {
1013         testCommitWhenTransactionHasModifications(true);
1014     }
1015
1016     @Test
1017     public void testWriteOnlyCommitWhenTransactionHasModifications() throws Exception {
1018         testCommitWhenTransactionHasModifications(false);
1019     }
1020
1021     private void testCommitWhenTransactionHasModifications(final boolean readWrite) throws Exception {
1022         new ShardTestKit(getSystem()) {
1023             {
1024                 final DataTree dataTree = createDelegatingMockDataTree();
1025                 final TestActorRef<Shard> shard = actorFactory.createTestActor(
1026                         newShardBuilder().dataTree(dataTree).props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1027                         "testCommitWhenTransactionHasModifications-" + readWrite);
1028
1029                 waitUntilLeader(shard);
1030
1031                 final FiniteDuration duration = duration("5 seconds");
1032                 final TransactionIdentifier transactionID = nextTransactionId();
1033
1034                 if (readWrite) {
1035                     shard.tell(prepareForwardedReadyTransaction(shard, transactionID, TestModel.TEST_PATH,
1036                             ImmutableNodes.containerNode(TestModel.TEST_QNAME), false), getRef());
1037                 } else {
1038                     shard.tell(prepareBatchedModifications(transactionID, TestModel.TEST_PATH,
1039                             ImmutableNodes.containerNode(TestModel.TEST_QNAME), false), getRef());
1040                 }
1041
1042                 expectMsgClass(duration, ReadyTransactionReply.class);
1043
1044                 // Send the CanCommitTransaction message.
1045
1046                 shard.tell(new CanCommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
1047                 final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply
1048                         .fromSerializable(expectMsgClass(duration, CanCommitTransactionReply.class));
1049                 assertEquals("Can commit", true, canCommitReply.getCanCommit());
1050
1051                 shard.tell(new CommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
1052                 expectMsgClass(duration, CommitTransactionReply.class);
1053
1054                 final InOrder inOrder = inOrder(dataTree);
1055                 inOrder.verify(dataTree).validate(any(DataTreeModification.class));
1056                 inOrder.verify(dataTree).prepare(any(DataTreeModification.class));
1057                 inOrder.verify(dataTree).commit(any(DataTreeCandidate.class));
1058
1059                 // Purge request is scheduled as asynchronous, wait for two heartbeats to let it propagate into
1060                 // the journal
1061                 Thread.sleep(HEARTBEAT_MILLIS * 2);
1062
1063                 shard.tell(Shard.GET_SHARD_MBEAN_MESSAGE, getRef());
1064                 final ShardStats shardStats = expectMsgClass(duration, ShardStats.class);
1065
1066                 // Use MBean for verification
1067                 // Committed transaction count should increase as usual
1068                 assertEquals(1, shardStats.getCommittedTransactionsCount());
1069
1070                 // Commit index should advance as we do not have an empty
1071                 // modification
1072                 assertEquals(1, shardStats.getCommitIndex());
1073             }
1074         };
1075     }
1076
1077     @Test
1078     public void testCommitPhaseFailure() throws Exception {
1079         new ShardTestKit(getSystem()) {
1080             {
1081                 final DataTree dataTree = createDelegatingMockDataTree();
1082                 final TestActorRef<Shard> shard = actorFactory.createTestActor(
1083                         newShardBuilder().dataTree(dataTree).props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1084                         "testCommitPhaseFailure");
1085
1086                 waitUntilLeader(shard);
1087
1088                 final FiniteDuration duration = duration("5 seconds");
1089                 final Timeout timeout = new Timeout(duration);
1090
1091                 // Setup 2 simulated transactions with mock cohorts. The first
1092                 // one fails in the
1093                 // commit phase.
1094
1095                 doThrow(new RuntimeException("mock commit failure")).when(dataTree)
1096                         .commit(any(DataTreeCandidate.class));
1097
1098                 final TransactionIdentifier transactionID1 = nextTransactionId();
1099                 shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH,
1100                         ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), getRef());
1101                 expectMsgClass(duration, ReadyTransactionReply.class);
1102
1103                 final TransactionIdentifier transactionID2 = nextTransactionId();
1104                 shard.tell(newBatchedModifications(transactionID2, TestModel.TEST_PATH,
1105                         ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), getRef());
1106                 expectMsgClass(duration, ReadyTransactionReply.class);
1107
1108                 // Send the CanCommitTransaction message for the first Tx.
1109
1110                 shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
1111                 final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply
1112                         .fromSerializable(expectMsgClass(duration, CanCommitTransactionReply.class));
1113                 assertEquals("Can commit", true, canCommitReply.getCanCommit());
1114
1115                 // Send the CanCommitTransaction message for the 2nd Tx. This
1116                 // should get queued and
1117                 // processed after the first Tx completes.
1118
1119                 final Future<Object> canCommitFuture = Patterns.ask(shard,
1120                         new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), timeout);
1121
1122                 // Send the CommitTransaction message for the first Tx. This
1123                 // should send back an error
1124                 // and trigger the 2nd Tx to proceed.
1125
1126                 shard.tell(new CommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
1127                 expectMsgClass(duration, akka.actor.Status.Failure.class);
1128
1129                 // Wait for the 2nd Tx to complete the canCommit phase.
1130
1131                 final CountDownLatch latch = new CountDownLatch(1);
1132                 canCommitFuture.onComplete(new OnComplete<Object>() {
1133                     @Override
1134                     public void onComplete(final Throwable failure, final Object resp) {
1135                         latch.countDown();
1136                     }
1137                 }, getSystem().dispatcher());
1138
1139                 assertEquals("2nd CanCommit complete", true, latch.await(5, TimeUnit.SECONDS));
1140
1141                 final InOrder inOrder = inOrder(dataTree);
1142                 inOrder.verify(dataTree).validate(any(DataTreeModification.class));
1143                 inOrder.verify(dataTree).prepare(any(DataTreeModification.class));
1144
1145                 // FIXME: this invocation is done on the result of validate(). To test it, we need to make sure mock
1146                 //        validate performs wrapping and we capture that mock
1147                 // inOrder.verify(dataTree).validate(any(DataTreeModification.class));
1148
1149                 inOrder.verify(dataTree).commit(any(DataTreeCandidate.class));
1150             }
1151         };
1152     }
1153
1154     @Test
1155     public void testPreCommitPhaseFailure() throws Exception {
1156         new ShardTestKit(getSystem()) {
1157             {
1158                 final DataTree dataTree = createDelegatingMockDataTree();
1159                 final TestActorRef<Shard> shard = actorFactory.createTestActor(
1160                         newShardBuilder().dataTree(dataTree).props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1161                         "testPreCommitPhaseFailure");
1162
1163                 waitUntilLeader(shard);
1164
1165                 final FiniteDuration duration = duration("5 seconds");
1166                 final Timeout timeout = new Timeout(duration);
1167
1168                 doThrow(new RuntimeException("mock preCommit failure")).when(dataTree)
1169                         .prepare(any(DataTreeModification.class));
1170
1171                 final TransactionIdentifier transactionID1 = nextTransactionId();
1172                 shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH,
1173                         ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), getRef());
1174                 expectMsgClass(duration, ReadyTransactionReply.class);
1175
1176                 final TransactionIdentifier transactionID2 = nextTransactionId();
1177                 shard.tell(newBatchedModifications(transactionID2, TestModel.TEST_PATH,
1178                         ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), getRef());
1179                 expectMsgClass(duration, ReadyTransactionReply.class);
1180
1181                 // Send the CanCommitTransaction message for the first Tx.
1182
1183                 shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
1184                 final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply
1185                         .fromSerializable(expectMsgClass(duration, CanCommitTransactionReply.class));
1186                 assertEquals("Can commit", true, canCommitReply.getCanCommit());
1187
1188                 // Send the CanCommitTransaction message for the 2nd Tx. This
1189                 // should get queued and
1190                 // processed after the first Tx completes.
1191
1192                 final Future<Object> canCommitFuture = Patterns.ask(shard,
1193                         new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), timeout);
1194
1195                 // Send the CommitTransaction message for the first Tx. This
1196                 // should send back an error
1197                 // and trigger the 2nd Tx to proceed.
1198
1199                 shard.tell(new CommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
1200                 expectMsgClass(duration, akka.actor.Status.Failure.class);
1201
1202                 // Wait for the 2nd Tx to complete the canCommit phase.
1203
1204                 final CountDownLatch latch = new CountDownLatch(1);
1205                 canCommitFuture.onComplete(new OnComplete<Object>() {
1206                     @Override
1207                     public void onComplete(final Throwable failure, final Object resp) {
1208                         latch.countDown();
1209                     }
1210                 }, getSystem().dispatcher());
1211
1212                 assertEquals("2nd CanCommit complete", true, latch.await(5, TimeUnit.SECONDS));
1213
1214                 final InOrder inOrder = inOrder(dataTree);
1215                 inOrder.verify(dataTree).validate(any(DataTreeModification.class));
1216                 inOrder.verify(dataTree).prepare(any(DataTreeModification.class));
1217                 inOrder.verify(dataTree).validate(any(DataTreeModification.class));
1218             }
1219         };
1220     }
1221
1222     @Test
1223     public void testCanCommitPhaseFailure() throws Exception {
1224         new ShardTestKit(getSystem()) {
1225             {
1226                 final DataTree dataTree = createDelegatingMockDataTree();
1227                 final TestActorRef<Shard> shard = actorFactory.createTestActor(
1228                         newShardBuilder().dataTree(dataTree).props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1229                         "testCanCommitPhaseFailure");
1230
1231                 waitUntilLeader(shard);
1232
1233                 final FiniteDuration duration = duration("5 seconds");
1234                 final TransactionIdentifier transactionID1 = nextTransactionId();
1235
1236                 doThrow(new DataValidationFailedException(YangInstanceIdentifier.EMPTY, "mock canCommit failure"))
1237                         .doNothing().when(dataTree).validate(any(DataTreeModification.class));
1238
1239                 shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH,
1240                         ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), getRef());
1241                 expectMsgClass(duration, ReadyTransactionReply.class);
1242
1243                 // Send the CanCommitTransaction message.
1244
1245                 shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
1246                 expectMsgClass(duration, akka.actor.Status.Failure.class);
1247
1248                 // Send another can commit to ensure the failed one got cleaned
1249                 // up.
1250
1251                 final TransactionIdentifier transactionID2 = nextTransactionId();
1252                 shard.tell(newBatchedModifications(transactionID2, TestModel.TEST_PATH,
1253                         ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), getRef());
1254                 expectMsgClass(duration, ReadyTransactionReply.class);
1255
1256                 shard.tell(new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), getRef());
1257                 final CanCommitTransactionReply reply = CanCommitTransactionReply
1258                         .fromSerializable(expectMsgClass(CanCommitTransactionReply.class));
1259                 assertEquals("getCanCommit", true, reply.getCanCommit());
1260             }
1261         };
1262     }
1263
1264     @Test
1265     public void testImmediateCommitWithCanCommitPhaseFailure() throws Exception {
1266         testImmediateCommitWithCanCommitPhaseFailure(true);
1267         testImmediateCommitWithCanCommitPhaseFailure(false);
1268     }
1269
1270     private void testImmediateCommitWithCanCommitPhaseFailure(final boolean readWrite) throws Exception {
1271         new ShardTestKit(getSystem()) {
1272             {
1273                 final DataTree dataTree = createDelegatingMockDataTree();
1274                 final TestActorRef<Shard> shard = actorFactory.createTestActor(
1275                         newShardBuilder().dataTree(dataTree).props().withDispatcher(Dispatchers.DefaultDispatcherId()),
1276                         "testImmediateCommitWithCanCommitPhaseFailure-" + readWrite);
1277
1278                 waitUntilLeader(shard);
1279
1280                 doThrow(new DataValidationFailedException(YangInstanceIdentifier.EMPTY, "mock canCommit failure"))
1281                         .doNothing().when(dataTree).validate(any(DataTreeModification.class));
1282
1283                 final FiniteDuration duration = duration("5 seconds");
1284
1285                 final TransactionIdentifier transactionID1 = nextTransactionId();
1286
1287                 if (readWrite) {
1288                     shard.tell(prepareForwardedReadyTransaction(shard, transactionID1, TestModel.TEST_PATH,
1289                             ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
1290                 } else {
1291                     shard.tell(prepareBatchedModifications(transactionID1, TestModel.TEST_PATH,
1292                             ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
1293                 }
1294
1295                 expectMsgClass(duration, akka.actor.Status.Failure.class);
1296
1297                 // Send another can commit to ensure the failed one got cleaned
1298                 // up.
1299
1300                 final TransactionIdentifier transactionID2 = nextTransactionId();
1301                 if (readWrite) {
1302                     shard.tell(prepareForwardedReadyTransaction(shard, transactionID2, TestModel.TEST_PATH,
1303                             ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
1304                 } else {
1305                     shard.tell(prepareBatchedModifications(transactionID2, TestModel.TEST_PATH,
1306                             ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
1307                 }
1308
1309                 expectMsgClass(duration, CommitTransactionReply.class);
1310             }
1311         };
1312     }
1313
1314     @Test
1315     public void testAbortWithCommitPending() {
1316         new ShardTestKit(getSystem()) {
1317             {
1318                 final Creator<Shard> creator = () -> new Shard(newShardBuilder()) {
1319                     @Override
1320                     void persistPayload(final Identifier id, final Payload payload,
1321                             final boolean batchHint) {
1322                         // Simulate an AbortTransaction message occurring during
1323                         // replication, after
1324                         // persisting and before finishing the commit to the
1325                         // in-memory store.
1326
1327                         doAbortTransaction(id, null);
1328                         super.persistPayload(id, payload, batchHint);
1329                     }
1330                 };
1331
1332                 final TestActorRef<Shard> shard = actorFactory.createTestActor(Props
1333                         .create(new DelegatingShardCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()),
1334                         "testAbortWithCommitPending");
1335
1336                 waitUntilLeader(shard);
1337
1338                 final FiniteDuration duration = duration("5 seconds");
1339
1340                 final TransactionIdentifier transactionID = nextTransactionId();
1341
1342                 shard.tell(prepareBatchedModifications(transactionID, TestModel.TEST_PATH,
1343                         ImmutableNodes.containerNode(TestModel.TEST_QNAME), false), getRef());
1344                 expectMsgClass(duration, ReadyTransactionReply.class);
1345
1346                 shard.tell(new CanCommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
1347                 expectMsgClass(duration, CanCommitTransactionReply.class);
1348
1349                 shard.tell(new CommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), getRef());
1350                 expectMsgClass(duration, CommitTransactionReply.class);
1351
1352                 final NormalizedNode<?, ?> node = readStore(shard, TestModel.TEST_PATH);
1353
1354                 // Since we're simulating an abort occurring during replication
1355                 // and before finish commit,
1356                 // the data should still get written to the in-memory store
1357                 // since we've gotten past
1358                 // canCommit and preCommit and persisted the data.
1359                 assertNotNull(TestModel.TEST_QNAME.getLocalName() + " not found", node);
1360             }
1361         };
1362     }
1363
1364     @Test
1365     public void testTransactionCommitTimeout() throws Exception {
1366         dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1);
1367         new ShardTestKit(getSystem()) {
1368             {
1369                 final TestActorRef<Shard> shard = actorFactory.createTestActor(
1370                         newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1371                         "testTransactionCommitTimeout");
1372
1373                 waitUntilLeader(shard);
1374
1375                 final FiniteDuration duration = duration("5 seconds");
1376
1377                 writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1378                 writeToStore(shard, TestModel.OUTER_LIST_PATH,
1379                         ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
1380
1381                 // Ready 2 Tx's - the first will timeout
1382
1383                 final TransactionIdentifier transactionID1 = nextTransactionId();
1384                 shard.tell(
1385                         prepareBatchedModifications(transactionID1,
1386                                 YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
1387                                         .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
1388                                 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), false),
1389                         getRef());
1390                 expectMsgClass(duration, ReadyTransactionReply.class);
1391
1392                 final TransactionIdentifier transactionID2 = nextTransactionId();
1393                 final YangInstanceIdentifier listNodePath = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
1394                         .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build();
1395                 shard.tell(
1396                         prepareBatchedModifications(transactionID2, listNodePath,
1397                                 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2), false),
1398                         getRef());
1399                 expectMsgClass(duration, ReadyTransactionReply.class);
1400
1401                 // canCommit 1st Tx. We don't send the commit so it should
1402                 // timeout.
1403
1404                 shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
1405                 expectMsgClass(duration, CanCommitTransactionReply.class);
1406
1407                 // canCommit the 2nd Tx - it should complete after the 1st Tx
1408                 // times out.
1409
1410                 shard.tell(new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), getRef());
1411                 expectMsgClass(duration, CanCommitTransactionReply.class);
1412
1413                 // Try to commit the 1st Tx - should fail as it's not the
1414                 // current Tx.
1415
1416                 shard.tell(new CommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
1417                 expectMsgClass(duration, akka.actor.Status.Failure.class);
1418
1419                 // Commit the 2nd Tx.
1420
1421                 shard.tell(new CommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), getRef());
1422                 expectMsgClass(duration, CommitTransactionReply.class);
1423
1424                 final NormalizedNode<?, ?> node = readStore(shard, listNodePath);
1425                 assertNotNull(listNodePath + " not found", node);
1426             }
1427         };
1428     }
1429
1430 //    @Test
1431 //    @Ignore
1432 //    public void testTransactionCommitQueueCapacityExceeded() throws Throwable {
1433 //        dataStoreContextBuilder.shardTransactionCommitQueueCapacity(2);
1434 //
1435 //        new ShardTestKit(getSystem()) {{
1436 //            final TestActorRef<Shard> shard = actorFactory.createTestActor(
1437 //                    newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1438 //                    "testTransactionCommitQueueCapacityExceeded");
1439 //
1440 //            waitUntilLeader(shard);
1441 //
1442 //            final FiniteDuration duration = duration("5 seconds");
1443 //
1444 //            final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1445 //
1446 //            final TransactionIdentifier transactionID1 = nextTransactionId();
1447 //            final MutableCompositeModification modification1 = new MutableCompositeModification();
1448 //            final ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
1449 //                    TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), transactionID1,
1450 //                    modification1);
1451 //
1452 //            final TransactionIdentifier transactionID2 = nextTransactionId();
1453 //            final MutableCompositeModification modification2 = new MutableCompositeModification();
1454 //            final ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
1455 //                    TestModel.OUTER_LIST_PATH,
1456 //                    ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), transactionID2,
1457 //                    modification2);
1458 //
1459 //            final TransactionIdentifier transactionID3 = nextTransactionId();
1460 //            final MutableCompositeModification modification3 = new MutableCompositeModification();
1461 //            final ShardDataTreeCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
1462 //                    TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), transactionID3,
1463 //                    modification3);
1464 //
1465 //            // Ready the Tx's
1466 //
1467 //            shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort1, transactionID1,
1468 //                    modification1), getRef());
1469 //            expectMsgClass(duration, ReadyTransactionReply.class);
1470 //
1471 //            shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort2, transactionID2,
1472 //                    modification2), getRef());
1473 //            expectMsgClass(duration, ReadyTransactionReply.class);
1474 //
1475 //            // The 3rd Tx should exceed queue capacity and fail.
1476 //
1477 //            shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort3, transactionID3,
1478 //                    modification3), getRef());
1479 //            expectMsgClass(duration, akka.actor.Status.Failure.class);
1480 //
1481 //            // canCommit 1st Tx.
1482 //
1483 //            shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
1484 //            expectMsgClass(duration, CanCommitTransactionReply.class);
1485 //
1486 //            // canCommit the 2nd Tx - it should get queued.
1487 //
1488 //            shard.tell(new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), getRef());
1489 //
1490 //            // canCommit the 3rd Tx - should exceed queue capacity and fail.
1491 //
1492 //            shard.tell(new CanCommitTransaction(transactionID3, CURRENT_VERSION).toSerializable(), getRef());
1493 //            expectMsgClass(duration, akka.actor.Status.Failure.class);
1494 //        }};
1495 //    }
1496
1497     @Test
1498     public void testTransactionCommitWithPriorExpiredCohortEntries() {
1499         dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1);
1500         new ShardTestKit(getSystem()) {
1501             {
1502                 final TestActorRef<Shard> shard = actorFactory.createTestActor(
1503                         newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1504                         "testTransactionCommitWithPriorExpiredCohortEntries");
1505
1506                 waitUntilLeader(shard);
1507
1508                 final FiniteDuration duration = duration("5 seconds");
1509
1510                 final TransactionIdentifier transactionID1 = nextTransactionId();
1511                 shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH,
1512                         ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), getRef());
1513                 expectMsgClass(duration, ReadyTransactionReply.class);
1514
1515                 final TransactionIdentifier transactionID2 = nextTransactionId();
1516                 shard.tell(newBatchedModifications(transactionID2, TestModel.TEST_PATH,
1517                         ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), getRef());
1518                 expectMsgClass(duration, ReadyTransactionReply.class);
1519
1520                 final TransactionIdentifier transactionID3 = nextTransactionId();
1521                 shard.tell(newBatchedModifications(transactionID3, TestModel.TEST_PATH,
1522                         ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), getRef());
1523                 expectMsgClass(duration, ReadyTransactionReply.class);
1524
1525                 // All Tx's are readied. We'll send canCommit for the last one
1526                 // but not the others. The others
1527                 // should expire from the queue and the last one should be
1528                 // processed.
1529
1530                 shard.tell(new CanCommitTransaction(transactionID3, CURRENT_VERSION).toSerializable(), getRef());
1531                 expectMsgClass(duration, CanCommitTransactionReply.class);
1532             }
1533         };
1534     }
1535
1536     @Test
1537     public void testTransactionCommitWithSubsequentExpiredCohortEntry() {
1538         dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1);
1539         new ShardTestKit(getSystem()) {
1540             {
1541                 final TestActorRef<Shard> shard = actorFactory.createTestActor(
1542                         newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1543                         "testTransactionCommitWithSubsequentExpiredCohortEntry");
1544
1545                 waitUntilLeader(shard);
1546
1547                 final FiniteDuration duration = duration("5 seconds");
1548
1549                 final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1550
1551                 final TransactionIdentifier transactionID1 = nextTransactionId();
1552                 shard.tell(prepareBatchedModifications(transactionID1, TestModel.TEST_PATH,
1553                         ImmutableNodes.containerNode(TestModel.TEST_QNAME), false), getRef());
1554                 expectMsgClass(duration, ReadyTransactionReply.class);
1555
1556                 // CanCommit the first Tx so it's the current in-progress Tx.
1557
1558                 shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
1559                 expectMsgClass(duration, CanCommitTransactionReply.class);
1560
1561                 // Ready the second Tx.
1562
1563                 final TransactionIdentifier transactionID2 = nextTransactionId();
1564                 shard.tell(prepareBatchedModifications(transactionID2, TestModel.TEST_PATH,
1565                         ImmutableNodes.containerNode(TestModel.TEST_QNAME), false), getRef());
1566                 expectMsgClass(duration, ReadyTransactionReply.class);
1567
1568                 // Ready the third Tx.
1569
1570                 final TransactionIdentifier transactionID3 = nextTransactionId();
1571                 final DataTreeModification modification3 = dataStore.newModification();
1572                 new WriteModification(TestModel.TEST2_PATH, ImmutableNodes.containerNode(TestModel.TEST2_QNAME))
1573                         .apply(modification3);
1574                 modification3.ready();
1575                 final ReadyLocalTransaction readyMessage = new ReadyLocalTransaction(transactionID3, modification3,
1576                         true, Optional.empty());
1577                 shard.tell(readyMessage, getRef());
1578
1579                 // Commit the first Tx. After completing, the second should
1580                 // expire from the queue and the third
1581                 // Tx committed.
1582
1583                 shard.tell(new CommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
1584                 expectMsgClass(duration, CommitTransactionReply.class);
1585
1586                 // Expect commit reply from the third Tx.
1587
1588                 expectMsgClass(duration, CommitTransactionReply.class);
1589
1590                 final NormalizedNode<?, ?> node = readStore(shard, TestModel.TEST2_PATH);
1591                 assertNotNull(TestModel.TEST2_PATH + " not found", node);
1592             }
1593         };
1594     }
1595
1596     @Test
1597     public void testCanCommitBeforeReadyFailure() {
1598         new ShardTestKit(getSystem()) {
1599             {
1600                 final TestActorRef<Shard> shard = actorFactory.createTestActor(
1601                         newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1602                         "testCanCommitBeforeReadyFailure");
1603
1604                 shard.tell(new CanCommitTransaction(nextTransactionId(), CURRENT_VERSION).toSerializable(), getRef());
1605                 expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
1606             }
1607         };
1608     }
1609
1610     @Test
1611     public void testAbortAfterCanCommit() throws Exception {
1612         new ShardTestKit(getSystem()) {
1613             {
1614                 final TestActorRef<Shard> shard = actorFactory.createTestActor(
1615                         newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), "testAbortAfterCanCommit");
1616
1617                 waitUntilLeader(shard);
1618
1619                 final FiniteDuration duration = duration("5 seconds");
1620                 final Timeout timeout = new Timeout(duration);
1621
1622                 // Ready 2 transactions - the first one will be aborted.
1623
1624                 final TransactionIdentifier transactionID1 = nextTransactionId();
1625                 shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH,
1626                         ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), getRef());
1627                 expectMsgClass(duration, ReadyTransactionReply.class);
1628
1629                 final TransactionIdentifier transactionID2 = nextTransactionId();
1630                 shard.tell(newBatchedModifications(transactionID2, TestModel.TEST_PATH,
1631                         ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), getRef());
1632                 expectMsgClass(duration, ReadyTransactionReply.class);
1633
1634                 // Send the CanCommitTransaction message for the first Tx.
1635
1636                 shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
1637                 CanCommitTransactionReply canCommitReply = CanCommitTransactionReply
1638                         .fromSerializable(expectMsgClass(duration, CanCommitTransactionReply.class));
1639                 assertEquals("Can commit", true, canCommitReply.getCanCommit());
1640
1641                 // Send the CanCommitTransaction message for the 2nd Tx. This
1642                 // should get queued and
1643                 // processed after the first Tx completes.
1644
1645                 final Future<Object> canCommitFuture = Patterns.ask(shard,
1646                         new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), timeout);
1647
1648                 // Send the AbortTransaction message for the first Tx. This
1649                 // should trigger the 2nd
1650                 // Tx to proceed.
1651
1652                 shard.tell(new AbortTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
1653                 expectMsgClass(duration, AbortTransactionReply.class);
1654
1655                 // Wait for the 2nd Tx to complete the canCommit phase.
1656
1657                 canCommitReply = (CanCommitTransactionReply) Await.result(canCommitFuture, duration);
1658                 assertEquals("Can commit", true, canCommitReply.getCanCommit());
1659             }
1660         };
1661     }
1662
1663     @Test
1664     public void testAbortAfterReady() {
1665         dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1);
1666         new ShardTestKit(getSystem()) {
1667             {
1668                 final TestActorRef<Shard> shard = actorFactory.createTestActor(
1669                         newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), "testAbortAfterReady");
1670
1671                 waitUntilLeader(shard);
1672
1673                 final FiniteDuration duration = duration("5 seconds");
1674
1675                 // Ready a tx.
1676
1677                 final TransactionIdentifier transactionID1 = nextTransactionId();
1678                 shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH,
1679                         ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), getRef());
1680                 expectMsgClass(duration, ReadyTransactionReply.class);
1681
1682                 // Send the AbortTransaction message.
1683
1684                 shard.tell(new AbortTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
1685                 expectMsgClass(duration, AbortTransactionReply.class);
1686
1687                 assertEquals("getPendingTxCommitQueueSize", 0, shard.underlyingActor().getPendingTxCommitQueueSize());
1688
1689                 // Now send CanCommitTransaction - should fail.
1690
1691                 shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
1692                 final Throwable failure = expectMsgClass(duration, akka.actor.Status.Failure.class).cause();
1693                 assertTrue("Failure type", failure instanceof IllegalStateException);
1694
1695                 // Ready and CanCommit another and verify success.
1696
1697                 final TransactionIdentifier transactionID2 = nextTransactionId();
1698                 shard.tell(newBatchedModifications(transactionID2, TestModel.TEST_PATH,
1699                         ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), getRef());
1700                 expectMsgClass(duration, ReadyTransactionReply.class);
1701
1702                 shard.tell(new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), getRef());
1703                 expectMsgClass(duration, CanCommitTransactionReply.class);
1704             }
1705         };
1706     }
1707
1708     @Test
1709     public void testAbortQueuedTransaction() {
1710         new ShardTestKit(getSystem()) {
1711             {
1712                 final TestActorRef<Shard> shard = actorFactory.createTestActor(
1713                         newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), "testAbortAfterReady");
1714
1715                 waitUntilLeader(shard);
1716
1717                 final FiniteDuration duration = duration("5 seconds");
1718
1719                 // Ready 3 tx's.
1720
1721                 final TransactionIdentifier transactionID1 = nextTransactionId();
1722                 shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH,
1723                         ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), getRef());
1724                 expectMsgClass(duration, ReadyTransactionReply.class);
1725
1726                 final TransactionIdentifier transactionID2 = nextTransactionId();
1727                 shard.tell(newBatchedModifications(transactionID2, TestModel.TEST_PATH,
1728                         ImmutableNodes.containerNode(TestModel.TEST_QNAME), true, false, 1), getRef());
1729                 expectMsgClass(duration, ReadyTransactionReply.class);
1730
1731                 final TransactionIdentifier transactionID3 = nextTransactionId();
1732                 shard.tell(
1733                         newBatchedModifications(transactionID3, TestModel.OUTER_LIST_PATH,
1734                                 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), true, false, 1),
1735                         getRef());
1736                 expectMsgClass(duration, ReadyTransactionReply.class);
1737
1738                 // Abort the second tx while it's queued.
1739
1740                 shard.tell(new AbortTransaction(transactionID2, CURRENT_VERSION).toSerializable(), getRef());
1741                 expectMsgClass(duration, AbortTransactionReply.class);
1742
1743                 // Commit the other 2.
1744
1745                 shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
1746                 expectMsgClass(duration, CanCommitTransactionReply.class);
1747
1748                 shard.tell(new CommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
1749                 expectMsgClass(duration, CommitTransactionReply.class);
1750
1751                 shard.tell(new CanCommitTransaction(transactionID3, CURRENT_VERSION).toSerializable(), getRef());
1752                 expectMsgClass(duration, CanCommitTransactionReply.class);
1753
1754                 shard.tell(new CommitTransaction(transactionID3, CURRENT_VERSION).toSerializable(), getRef());
1755                 expectMsgClass(duration, CommitTransactionReply.class);
1756
1757                 assertEquals("getPendingTxCommitQueueSize", 0, shard.underlyingActor().getPendingTxCommitQueueSize());
1758             }
1759         };
1760     }
1761
1762     @Test
1763     public void testCreateSnapshotWithNonPersistentData() throws Exception {
1764         testCreateSnapshot(false, "testCreateSnapshotWithNonPersistentData");
1765     }
1766
1767     @Test
1768     public void testCreateSnapshot() throws Exception {
1769         testCreateSnapshot(true, "testCreateSnapshot");
1770     }
1771
1772     private void testCreateSnapshot(final boolean persistent, final String shardActorName) throws Exception {
1773         final AtomicReference<CountDownLatch> latch = new AtomicReference<>(new CountDownLatch(1));
1774
1775         final AtomicReference<Object> savedSnapshot = new AtomicReference<>();
1776         class TestPersistentDataProvider extends DelegatingPersistentDataProvider {
1777             TestPersistentDataProvider(final DataPersistenceProvider delegate) {
1778                 super(delegate);
1779             }
1780
1781             @Override
1782             public void saveSnapshot(final Object obj) {
1783                 savedSnapshot.set(obj);
1784                 super.saveSnapshot(obj);
1785             }
1786         }
1787
1788         dataStoreContextBuilder.persistent(persistent);
1789
1790         class TestShard extends Shard {
1791
1792             protected TestShard(final AbstractBuilder<?, ?> builder) {
1793                 super(builder);
1794                 setPersistence(new TestPersistentDataProvider(super.persistence()));
1795             }
1796
1797             @Override
1798             public void handleCommand(final Object message) {
1799                 super.handleCommand(message);
1800
1801                 // XXX:  commit_snapshot equality check references RaftActorSnapshotMessageSupport.COMMIT_SNAPSHOT
1802                 if (message instanceof SaveSnapshotSuccess || "commit_snapshot".equals(message.toString())) {
1803                     latch.get().countDown();
1804                 }
1805             }
1806
1807             @Override
1808             public RaftActorContext getRaftActorContext() {
1809                 return super.getRaftActorContext();
1810             }
1811         }
1812
1813         new ShardTestKit(getSystem()) {
1814             {
1815                 final Creator<Shard> creator = () -> new TestShard(newShardBuilder());
1816
1817                 final TestActorRef<Shard> shard = actorFactory.createTestActor(Props
1818                         .create(new DelegatingShardCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()),
1819                         shardActorName);
1820
1821                 waitUntilLeader(shard);
1822                 writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1823
1824                 final NormalizedNode<?, ?> expectedRoot = readStore(shard, YangInstanceIdentifier.EMPTY);
1825
1826                 // Trigger creation of a snapshot by ensuring
1827                 final RaftActorContext raftActorContext = ((TestShard) shard.underlyingActor()).getRaftActorContext();
1828                 raftActorContext.getSnapshotManager().capture(mock(ReplicatedLogEntry.class), -1);
1829                 awaitAndValidateSnapshot(expectedRoot);
1830
1831                 raftActorContext.getSnapshotManager().capture(mock(ReplicatedLogEntry.class), -1);
1832                 awaitAndValidateSnapshot(expectedRoot);
1833             }
1834
1835             private void awaitAndValidateSnapshot(final NormalizedNode<?, ?> expectedRoot)
1836                     throws InterruptedException {
1837                 assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS));
1838
1839                 assertTrue("Invalid saved snapshot " + savedSnapshot.get(), savedSnapshot.get() instanceof Snapshot);
1840
1841                 verifySnapshot((Snapshot) savedSnapshot.get(), expectedRoot);
1842
1843                 latch.set(new CountDownLatch(1));
1844                 savedSnapshot.set(null);
1845             }
1846
1847             private void verifySnapshot(final Snapshot snapshot, final NormalizedNode<?, ?> expectedRoot) {
1848                 final NormalizedNode<?, ?> actual = ((ShardSnapshotState)snapshot.getState()).getSnapshot()
1849                         .getRootNode().get();
1850                 assertEquals("Root node", expectedRoot, actual);
1851             }
1852         };
1853     }
1854
1855     /**
1856      * This test simply verifies that the applySnapShot logic will work.
1857      */
1858     @Test
1859     public void testInMemoryDataTreeRestore() throws DataValidationFailedException {
1860         final DataTree store = new InMemoryDataTreeFactory().create(DataTreeConfiguration.DEFAULT_OPERATIONAL,
1861             SCHEMA_CONTEXT);
1862
1863         final DataTreeModification putTransaction = store.takeSnapshot().newModification();
1864         putTransaction.write(TestModel.TEST_PATH,
1865             ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1866         commitTransaction(store, putTransaction);
1867
1868
1869         final NormalizedNode<?, ?> expected = readStore(store, YangInstanceIdentifier.EMPTY);
1870
1871         final DataTreeModification writeTransaction = store.takeSnapshot().newModification();
1872
1873         writeTransaction.delete(YangInstanceIdentifier.EMPTY);
1874         writeTransaction.write(YangInstanceIdentifier.EMPTY, expected);
1875
1876         commitTransaction(store, writeTransaction);
1877
1878         final NormalizedNode<?, ?> actual = readStore(store, YangInstanceIdentifier.EMPTY);
1879
1880         assertEquals(expected, actual);
1881     }
1882
1883     @Test
1884     public void testRecoveryApplicable() {
1885
1886         final DatastoreContext persistentContext = DatastoreContext.newBuilder()
1887                 .shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(true).build();
1888
1889         final Props persistentProps = Shard.builder().id(shardID).datastoreContext(persistentContext)
1890                 .schemaContextProvider(() -> SCHEMA_CONTEXT).props();
1891
1892         final DatastoreContext nonPersistentContext = DatastoreContext.newBuilder()
1893                 .shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(false).build();
1894
1895         final Props nonPersistentProps = Shard.builder().id(shardID).datastoreContext(nonPersistentContext)
1896                 .schemaContextProvider(() -> SCHEMA_CONTEXT).props();
1897
1898         new ShardTestKit(getSystem()) {
1899             {
1900                 final TestActorRef<Shard> shard1 = actorFactory.createTestActor(persistentProps, "testPersistence1");
1901
1902                 assertTrue("Recovery Applicable", shard1.underlyingActor().persistence().isRecoveryApplicable());
1903
1904                 final TestActorRef<Shard> shard2 = actorFactory.createTestActor(nonPersistentProps, "testPersistence2");
1905
1906                 assertFalse("Recovery Not Applicable", shard2.underlyingActor().persistence().isRecoveryApplicable());
1907             }
1908         };
1909     }
1910
1911     @Test
1912     public void testOnDatastoreContext() {
1913         new ShardTestKit(getSystem()) {
1914             {
1915                 dataStoreContextBuilder.persistent(true);
1916
1917                 final TestActorRef<Shard> shard = actorFactory.createTestActor(newShardProps(),
1918                         "testOnDatastoreContext");
1919
1920                 assertEquals("isRecoveryApplicable", true,
1921                         shard.underlyingActor().persistence().isRecoveryApplicable());
1922
1923                 waitUntilLeader(shard);
1924
1925                 shard.tell(dataStoreContextBuilder.persistent(false).build(), ActorRef.noSender());
1926
1927                 assertEquals("isRecoveryApplicable", false,
1928                         shard.underlyingActor().persistence().isRecoveryApplicable());
1929
1930                 shard.tell(dataStoreContextBuilder.persistent(true).build(), ActorRef.noSender());
1931
1932                 assertEquals("isRecoveryApplicable", true,
1933                         shard.underlyingActor().persistence().isRecoveryApplicable());
1934             }
1935         };
1936     }
1937
1938     @Test
1939     public void testRegisterRoleChangeListener() {
1940         new ShardTestKit(getSystem()) {
1941             {
1942                 final TestActorRef<Shard> shard = actorFactory.createTestActor(
1943                         newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1944                         "testRegisterRoleChangeListener");
1945
1946                 waitUntilLeader(shard);
1947
1948                 final ActorRef listener = getSystem().actorOf(MessageCollectorActor.props());
1949
1950                 shard.tell(new RegisterRoleChangeListener(), listener);
1951
1952                 MessageCollectorActor.expectFirstMatching(listener, RegisterRoleChangeListenerReply.class);
1953
1954                 ShardLeaderStateChanged leaderStateChanged = MessageCollectorActor.expectFirstMatching(listener,
1955                     ShardLeaderStateChanged.class);
1956                 assertEquals("getLocalShardDataTree present", true,
1957                         leaderStateChanged.getLocalShardDataTree().isPresent());
1958                 assertSame("getLocalShardDataTree", shard.underlyingActor().getDataStore().getDataTree(),
1959                     leaderStateChanged.getLocalShardDataTree().get());
1960
1961                 MessageCollectorActor.clearMessages(listener);
1962
1963                 // Force a leader change
1964
1965                 shard.tell(new RequestVote(10000, "member2", 50, 50), getRef());
1966
1967                 leaderStateChanged = MessageCollectorActor.expectFirstMatching(listener,
1968                         ShardLeaderStateChanged.class);
1969                 assertEquals("getLocalShardDataTree present", false,
1970                         leaderStateChanged.getLocalShardDataTree().isPresent());
1971             }
1972         };
1973     }
1974
1975     @Test
1976     public void testFollowerInitialSyncStatus() {
1977         final TestActorRef<Shard> shard = actorFactory.createTestActor(
1978                 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1979                 "testFollowerInitialSyncStatus");
1980
1981         shard.underlyingActor().handleNonRaftCommand(new FollowerInitialSyncUpStatus(false,
1982                 "member-1-shard-inventory-operational"));
1983
1984         assertEquals(false, shard.underlyingActor().getShardMBean().getFollowerInitialSyncStatus());
1985
1986         shard.underlyingActor().handleNonRaftCommand(new FollowerInitialSyncUpStatus(true,
1987                 "member-1-shard-inventory-operational"));
1988
1989         assertEquals(true, shard.underlyingActor().getShardMBean().getFollowerInitialSyncStatus());
1990     }
1991
1992     @Test
1993     public void testClusteredDataTreeChangeListenerWithDelayedRegistration() throws Exception {
1994         new ShardTestKit(getSystem()) {
1995             {
1996                 final String testName = "testClusteredDataTreeChangeListenerWithDelayedRegistration";
1997                 dataStoreContextBuilder.shardElectionTimeoutFactor(1000)
1998                         .customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName());
1999
2000                 final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
2001                 final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener,
2002                         TestModel.TEST_PATH), actorFactory.generateActorId(testName + "-DataTreeChangeListener"));
2003
2004                 setupInMemorySnapshotStore();
2005
2006                 final TestActorRef<Shard> shard = actorFactory.createTestActor(
2007                         newShardBuilder().props().withDispatcher(Dispatchers.DefaultDispatcherId()),
2008                         actorFactory.generateActorId(testName + "-shard"));
2009
2010                 waitUntilNoLeader(shard);
2011
2012                 shard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor, true), getRef());
2013                 final RegisterDataTreeNotificationListenerReply reply = expectMsgClass(duration("5 seconds"),
2014                         RegisterDataTreeNotificationListenerReply.class);
2015                 assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
2016
2017                 shard.tell(DatastoreContext.newBuilderFrom(dataStoreContextBuilder.build())
2018                         .customRaftPolicyImplementation(null).build(), ActorRef.noSender());
2019
2020                 listener.waitForChangeEvents();
2021             }
2022         };
2023     }
2024
2025     @Test
2026     public void testClusteredDataTreeChangeListenerWithDelayedRegistrationClosed() throws Exception {
2027         new ShardTestKit(getSystem()) {
2028             {
2029                 final String testName = "testClusteredDataTreeChangeListenerWithDelayedRegistrationClosed";
2030                 dataStoreContextBuilder.shardElectionTimeoutFactor(1000)
2031                         .customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName());
2032
2033                 final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(0);
2034                 final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener,
2035                         TestModel.TEST_PATH), actorFactory.generateActorId(testName + "-DataTreeChangeListener"));
2036
2037                 setupInMemorySnapshotStore();
2038
2039                 final TestActorRef<Shard> shard = actorFactory.createTestActor(
2040                         newShardBuilder().props().withDispatcher(Dispatchers.DefaultDispatcherId()),
2041                         actorFactory.generateActorId(testName + "-shard"));
2042
2043                 waitUntilNoLeader(shard);
2044
2045                 shard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor, true), getRef());
2046                 final RegisterDataTreeNotificationListenerReply reply = expectMsgClass(duration("5 seconds"),
2047                         RegisterDataTreeNotificationListenerReply.class);
2048                 assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
2049
2050                 final ActorSelection regActor = getSystem().actorSelection(reply.getListenerRegistrationPath());
2051                 regActor.tell(CloseDataTreeNotificationListenerRegistration.getInstance(), getRef());
2052                 expectMsgClass(CloseDataTreeNotificationListenerRegistrationReply.class);
2053
2054                 shard.tell(DatastoreContext.newBuilderFrom(dataStoreContextBuilder.build())
2055                         .customRaftPolicyImplementation(null).build(), ActorRef.noSender());
2056
2057                 listener.expectNoMoreChanges("Received unexpected change after close");
2058             }
2059         };
2060     }
2061
2062     @Test
2063     public void testClusteredDataTreeChangeListenerRegistration() throws Exception {
2064         new ShardTestKit(getSystem()) {
2065             {
2066                 final String testName = "testClusteredDataTreeChangeListenerRegistration";
2067                 final ShardIdentifier followerShardID = ShardIdentifier.create("inventory",
2068                         MemberName.forName(actorFactory.generateActorId(testName + "-follower")), "config");
2069
2070                 final ShardIdentifier leaderShardID = ShardIdentifier.create("inventory",
2071                         MemberName.forName(actorFactory.generateActorId(testName + "-leader")), "config");
2072
2073                 final TestActorRef<Shard> followerShard = actorFactory
2074                         .createTestActor(Shard.builder().id(followerShardID)
2075                                 .datastoreContext(dataStoreContextBuilder.shardElectionTimeoutFactor(1000).build())
2076                                 .peerAddresses(Collections.singletonMap(leaderShardID.toString(),
2077                                         "akka://test/user/" + leaderShardID.toString()))
2078                                 .schemaContextProvider(() -> SCHEMA_CONTEXT).props()
2079                                 .withDispatcher(Dispatchers.DefaultDispatcherId()), followerShardID.toString());
2080
2081                 final TestActorRef<Shard> leaderShard = actorFactory
2082                         .createTestActor(Shard.builder().id(leaderShardID).datastoreContext(newDatastoreContext())
2083                                 .peerAddresses(Collections.singletonMap(followerShardID.toString(),
2084                                         "akka://test/user/" + followerShardID.toString()))
2085                                 .schemaContextProvider(() -> SCHEMA_CONTEXT).props()
2086                                 .withDispatcher(Dispatchers.DefaultDispatcherId()), leaderShardID.toString());
2087
2088                 leaderShard.tell(TimeoutNow.INSTANCE, ActorRef.noSender());
2089                 final String leaderPath = waitUntilLeader(followerShard);
2090                 assertEquals("Shard leader path", leaderShard.path().toString(), leaderPath);
2091
2092                 final YangInstanceIdentifier path = TestModel.TEST_PATH;
2093                 final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
2094                 final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener, path),
2095                         actorFactory.generateActorId(testName + "-DataTreeChangeListener"));
2096
2097                 followerShard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor, true), getRef());
2098                 final RegisterDataTreeNotificationListenerReply reply = expectMsgClass(duration("5 seconds"),
2099                         RegisterDataTreeNotificationListenerReply.class);
2100                 assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
2101
2102                 writeToStore(followerShard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
2103
2104                 listener.waitForChangeEvents();
2105             }
2106         };
2107     }
2108
2109     @Test
2110     public void testServerRemoved() {
2111         final TestActorRef<MessageCollectorActor> parent = actorFactory.createTestActor(MessageCollectorActor.props()
2112                 .withDispatcher(Dispatchers.DefaultDispatcherId()));
2113
2114         final ActorRef shard = parent.underlyingActor().context().actorOf(
2115                 newShardBuilder().props().withDispatcher(Dispatchers.DefaultDispatcherId()),
2116                 "testServerRemoved");
2117
2118         shard.tell(new ServerRemoved("test"), ActorRef.noSender());
2119
2120         MessageCollectorActor.expectFirstMatching(parent, ServerRemoved.class);
2121     }
2122 }