1 package org.opendaylight.controller.cluster.datastore;
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertFalse;
5 import static org.junit.Assert.assertNotNull;
6 import static org.junit.Assert.assertNull;
7 import static org.junit.Assert.assertSame;
8 import static org.junit.Assert.assertTrue;
9 import static org.mockito.Mockito.doReturn;
10 import static org.mockito.Mockito.inOrder;
11 import static org.mockito.Mockito.mock;
12 import static org.mockito.Mockito.reset;
13 import static org.opendaylight.controller.cluster.datastore.DataStoreVersions.CURRENT_VERSION;
14 import akka.actor.ActorRef;
15 import akka.actor.ActorSelection;
16 import akka.actor.PoisonPill;
17 import akka.actor.Props;
18 import akka.dispatch.Dispatchers;
19 import akka.dispatch.OnComplete;
20 import akka.japi.Creator;
21 import akka.pattern.Patterns;
22 import akka.persistence.SaveSnapshotSuccess;
23 import akka.testkit.TestActorRef;
24 import akka.util.Timeout;
25 import com.google.common.base.Function;
26 import com.google.common.base.Optional;
27 import com.google.common.util.concurrent.Futures;
28 import com.google.common.util.concurrent.ListenableFuture;
29 import com.google.common.util.concurrent.Uninterruptibles;
30 import java.io.IOException;
31 import java.util.Collections;
32 import java.util.HashSet;
35 import java.util.concurrent.CountDownLatch;
36 import java.util.concurrent.TimeUnit;
37 import java.util.concurrent.atomic.AtomicBoolean;
38 import java.util.concurrent.atomic.AtomicReference;
39 import org.junit.Test;
40 import org.mockito.InOrder;
41 import org.opendaylight.controller.cluster.DataPersistenceProvider;
42 import org.opendaylight.controller.cluster.DelegatingPersistentDataProvider;
43 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
44 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
45 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
46 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
47 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
48 import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
49 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
50 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
51 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
52 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
53 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
54 import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
55 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
56 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
57 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
58 import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction;
59 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
60 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
61 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
62 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener;
63 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListenerReply;
64 import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
65 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
66 import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
67 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
68 import org.opendaylight.controller.cluster.datastore.modification.Modification;
69 import org.opendaylight.controller.cluster.datastore.modification.ModificationPayload;
70 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
71 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
72 import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener;
73 import org.opendaylight.controller.cluster.datastore.utils.MockDataTreeChangeListener;
74 import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
75 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
76 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListenerReply;
77 import org.opendaylight.controller.cluster.raft.RaftActorContext;
78 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
79 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
80 import org.opendaylight.controller.cluster.raft.Snapshot;
81 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
82 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
83 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
84 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
85 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
86 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
87 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
88 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
89 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
90 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
91 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
92 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
93 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
94 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
95 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
96 import org.opendaylight.controller.protobuff.messages.cohort3pc.ThreePhaseCommitCohortMessages;
97 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
98 import org.opendaylight.yangtools.yang.common.QName;
99 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
100 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
101 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
102 import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
103 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
104 import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
105 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
106 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
107 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
108 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateNode;
109 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateTip;
110 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidates;
111 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
112 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
113 import org.opendaylight.yangtools.yang.data.api.schema.tree.ModificationType;
114 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
115 import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory;
116 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
117 import scala.concurrent.Await;
118 import scala.concurrent.Future;
119 import scala.concurrent.duration.FiniteDuration;
121 public class ShardTest extends AbstractShardTest {
122 private static final QName CARS_QNAME = QName.create("urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:test:cars", "2014-03-13", "cars");
124 private static final String DUMMY_DATA = "Dummy data as snapshot sequence number is set to 0 in InMemorySnapshotStore and journal recovery seq number will start from 1";
126 final CountDownLatch recoveryComplete = new CountDownLatch(1);
128 protected Props newShardPropsWithRecoveryComplete() {
130 Creator<Shard> creator = new Creator<Shard>() {
132 public Shard create() throws Exception {
133 return new Shard(shardID, Collections.<String,String>emptyMap(),
134 newDatastoreContext(), SCHEMA_CONTEXT) {
136 protected void onRecoveryComplete() {
138 super.onRecoveryComplete();
140 recoveryComplete.countDown();
146 return Props.create(new DelegatingShardCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId());
150 public void testRegisterChangeListener() throws Exception {
151 new ShardTestKit(getSystem()) {{
152 TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
153 newShardProps(), "testRegisterChangeListener");
155 waitUntilLeader(shard);
157 shard.tell(new UpdateSchemaContext(SchemaContextHelper.full()), ActorRef.noSender());
159 MockDataChangeListener listener = new MockDataChangeListener(1);
160 ActorRef dclActor = getSystem().actorOf(DataChangeListener.props(listener),
161 "testRegisterChangeListener-DataChangeListener");
163 shard.tell(new RegisterChangeListener(TestModel.TEST_PATH,
164 dclActor, AsyncDataBroker.DataChangeScope.BASE), getRef());
166 RegisterChangeListenerReply reply = expectMsgClass(duration("3 seconds"),
167 RegisterChangeListenerReply.class);
168 String replyPath = reply.getListenerRegistrationPath().toString();
169 assertTrue("Incorrect reply path: " + replyPath, replyPath.matches(
170 "akka:\\/\\/test\\/user\\/testRegisterChangeListener\\/\\$.*"));
172 YangInstanceIdentifier path = TestModel.TEST_PATH;
173 writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
175 listener.waitForChangeEvents(path);
177 dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
178 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
182 @SuppressWarnings("serial")
184 public void testChangeListenerNotifiedWhenNotTheLeaderOnRegistration() throws Exception {
185 // This test tests the timing window in which a change listener is registered before the
186 // shard becomes the leader. We verify that the listener is registered and notified of the
187 // existing data when the shard becomes the leader.
188 new ShardTestKit(getSystem()) {{
189 // For this test, we want to send the RegisterChangeListener message after the shard
190 // has recovered from persistence and before it becomes the leader. So we subclass
191 // Shard to override onReceiveCommand and, when the first ElectionTimeout is received,
192 // we know that the shard has been initialized to a follower and has started the
193 // election process. The following 2 CountDownLatches are used to coordinate the
194 // ElectionTimeout with the sending of the RegisterChangeListener message.
195 final CountDownLatch onFirstElectionTimeout = new CountDownLatch(1);
196 final CountDownLatch onChangeListenerRegistered = new CountDownLatch(1);
197 Creator<Shard> creator = new Creator<Shard>() {
198 boolean firstElectionTimeout = true;
201 public Shard create() throws Exception {
202 // Use a non persistent provider because this test actually invokes persist on the journal
203 // this will cause all other messages to not be queued properly after that.
204 // The basic issue is that you cannot use TestActorRef with a persistent actor (at least when
205 // it does do a persist)
206 return new Shard(shardID, Collections.<String,String>emptyMap(),
207 dataStoreContextBuilder.persistent(false).build(), SCHEMA_CONTEXT) {
209 public void onReceiveCommand(final Object message) throws Exception {
210 if(message instanceof ElectionTimeout && firstElectionTimeout) {
211 // Got the first ElectionTimeout. We don't forward it to the
212 // base Shard yet until we've sent the RegisterChangeListener
213 // message. So we signal the onFirstElectionTimeout latch to tell
214 // the main thread to send the RegisterChangeListener message and
215 // start a thread to wait on the onChangeListenerRegistered latch,
216 // which the main thread signals after it has sent the message.
217 // After the onChangeListenerRegistered is triggered, we send the
218 // original ElectionTimeout message to proceed with the election.
219 firstElectionTimeout = false;
220 final ActorRef self = getSelf();
224 Uninterruptibles.awaitUninterruptibly(
225 onChangeListenerRegistered, 5, TimeUnit.SECONDS);
226 self.tell(message, self);
230 onFirstElectionTimeout.countDown();
232 super.onReceiveCommand(message);
239 MockDataChangeListener listener = new MockDataChangeListener(1);
240 ActorRef dclActor = getSystem().actorOf(DataChangeListener.props(listener),
241 "testRegisterChangeListenerWhenNotLeaderInitially-DataChangeListener");
243 TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
244 Props.create(new DelegatingShardCreator(creator)),
245 "testRegisterChangeListenerWhenNotLeaderInitially");
247 // Write initial data into the in-memory store.
248 YangInstanceIdentifier path = TestModel.TEST_PATH;
249 writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
251 // Wait until the shard receives the first ElectionTimeout message.
252 assertEquals("Got first ElectionTimeout", true,
253 onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
255 // Now send the RegisterChangeListener and wait for the reply.
256 shard.tell(new RegisterChangeListener(path, dclActor,
257 AsyncDataBroker.DataChangeScope.SUBTREE), getRef());
259 RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
260 RegisterChangeListenerReply.class);
261 assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
263 // Sanity check - verify the shard is not the leader yet.
264 shard.tell(new FindLeader(), getRef());
265 FindLeaderReply findLeadeReply =
266 expectMsgClass(duration("5 seconds"), FindLeaderReply.class);
267 assertNull("Expected the shard not to be the leader", findLeadeReply.getLeaderActor());
269 // Signal the onChangeListenerRegistered latch to tell the thread above to proceed
270 // with the election process.
271 onChangeListenerRegistered.countDown();
273 // Wait for the shard to become the leader and notify our listener with the existing
274 // data in the store.
275 listener.waitForChangeEvents(path);
277 dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
278 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
283 public void testRegisterDataTreeChangeListener() throws Exception {
284 new ShardTestKit(getSystem()) {{
285 TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
286 newShardProps(), "testRegisterDataTreeChangeListener");
288 waitUntilLeader(shard);
290 shard.tell(new UpdateSchemaContext(SchemaContextHelper.full()), ActorRef.noSender());
292 MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
293 ActorRef dclActor = getSystem().actorOf(DataTreeChangeListenerActor.props(listener),
294 "testRegisterDataTreeChangeListener-DataTreeChangeListener");
296 shard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor), getRef());
298 RegisterDataTreeChangeListenerReply reply = expectMsgClass(duration("3 seconds"),
299 RegisterDataTreeChangeListenerReply.class);
300 String replyPath = reply.getListenerRegistrationPath().toString();
301 assertTrue("Incorrect reply path: " + replyPath, replyPath.matches(
302 "akka:\\/\\/test\\/user\\/testRegisterDataTreeChangeListener\\/\\$.*"));
304 YangInstanceIdentifier path = TestModel.TEST_PATH;
305 writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
307 listener.waitForChangeEvents();
309 dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
310 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
314 @SuppressWarnings("serial")
316 public void testDataTreeChangeListenerNotifiedWhenNotTheLeaderOnRegistration() throws Exception {
317 new ShardTestKit(getSystem()) {{
318 final CountDownLatch onFirstElectionTimeout = new CountDownLatch(1);
319 final CountDownLatch onChangeListenerRegistered = new CountDownLatch(1);
320 Creator<Shard> creator = new Creator<Shard>() {
321 boolean firstElectionTimeout = true;
324 public Shard create() throws Exception {
325 return new Shard(shardID, Collections.<String,String>emptyMap(),
326 dataStoreContextBuilder.persistent(false).build(), SCHEMA_CONTEXT) {
328 public void onReceiveCommand(final Object message) throws Exception {
329 if(message instanceof ElectionTimeout && firstElectionTimeout) {
330 firstElectionTimeout = false;
331 final ActorRef self = getSelf();
335 Uninterruptibles.awaitUninterruptibly(
336 onChangeListenerRegistered, 5, TimeUnit.SECONDS);
337 self.tell(message, self);
341 onFirstElectionTimeout.countDown();
343 super.onReceiveCommand(message);
350 MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
351 ActorRef dclActor = getSystem().actorOf(DataTreeChangeListenerActor.props(listener),
352 "testDataTreeChangeListenerNotifiedWhenNotTheLeaderOnRegistration-DataChangeListener");
354 TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
355 Props.create(new DelegatingShardCreator(creator)),
356 "testDataTreeChangeListenerNotifiedWhenNotTheLeaderOnRegistration");
358 YangInstanceIdentifier path = TestModel.TEST_PATH;
359 writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
361 assertEquals("Got first ElectionTimeout", true,
362 onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
364 shard.tell(new RegisterDataTreeChangeListener(path, dclActor), getRef());
365 RegisterDataTreeChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
366 RegisterDataTreeChangeListenerReply.class);
367 assertNotNull("getListenerRegistratioznPath", reply.getListenerRegistrationPath());
369 shard.tell(new FindLeader(), getRef());
370 FindLeaderReply findLeadeReply =
371 expectMsgClass(duration("5 seconds"), FindLeaderReply.class);
372 assertNull("Expected the shard not to be the leader", findLeadeReply.getLeaderActor());
374 writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
376 onChangeListenerRegistered.countDown();
378 // TODO: investigate why we do not receive data chage events
379 listener.waitForChangeEvents();
381 dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
382 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
387 public void testCreateTransaction(){
388 new ShardTestKit(getSystem()) {{
389 ActorRef shard = getSystem().actorOf(newShardProps(), "testCreateTransaction");
391 waitUntilLeader(shard);
393 shard.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
395 shard.tell(new CreateTransaction("txn-1",
396 TransactionType.READ_ONLY.ordinal() ).toSerializable(), getRef());
398 CreateTransactionReply reply = expectMsgClass(duration("3 seconds"),
399 CreateTransactionReply.class);
401 String path = reply.getTransactionActorPath().toString();
402 assertTrue("Unexpected transaction path " + path,
403 path.contains("akka://test/user/testCreateTransaction/shard-txn-1"));
405 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
410 public void testCreateTransactionOnChain(){
411 new ShardTestKit(getSystem()) {{
412 final ActorRef shard = getSystem().actorOf(newShardProps(), "testCreateTransactionOnChain");
414 waitUntilLeader(shard);
416 shard.tell(new CreateTransaction("txn-1",
417 TransactionType.READ_ONLY.ordinal() , "foobar").toSerializable(),
420 CreateTransactionReply reply = expectMsgClass(duration("3 seconds"),
421 CreateTransactionReply.class);
423 String path = reply.getTransactionActorPath().toString();
424 assertTrue("Unexpected transaction path " + path,
425 path.contains("akka://test/user/testCreateTransactionOnChain/shard-txn-1"));
427 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
431 @SuppressWarnings("serial")
433 public void testPeerAddressResolved() throws Exception {
434 new ShardTestKit(getSystem()) {{
435 final CountDownLatch recoveryComplete = new CountDownLatch(1);
436 class TestShard extends Shard {
438 super(shardID, Collections.<String, String>singletonMap(shardID.toString(), null),
439 newDatastoreContext(), SCHEMA_CONTEXT);
442 Map<String, String> getPeerAddresses() {
443 return getRaftActorContext().getPeerAddresses();
447 protected void onRecoveryComplete() {
449 super.onRecoveryComplete();
451 recoveryComplete.countDown();
456 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
457 Props.create(new DelegatingShardCreator(new Creator<Shard>() {
459 public TestShard create() throws Exception {
460 return new TestShard();
462 })), "testPeerAddressResolved");
464 //waitUntilLeader(shard);
465 assertEquals("Recovery complete", true,
466 Uninterruptibles.awaitUninterruptibly(recoveryComplete, 5, TimeUnit.SECONDS));
468 String address = "akka://foobar";
469 shard.underlyingActor().onReceiveCommand(new PeerAddressResolved(shardID.toString(), address));
471 assertEquals("getPeerAddresses", address,
472 ((TestShard)shard.underlyingActor()).getPeerAddresses().get(shardID.toString()));
474 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
479 public void testApplySnapshot() throws Exception {
480 TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(),
481 "testApplySnapshot");
483 DataTree store = InMemoryDataTreeFactory.getInstance().create();
484 store.setSchemaContext(SCHEMA_CONTEXT);
486 writeToStore(store, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
488 YangInstanceIdentifier root = YangInstanceIdentifier.builder().build();
489 NormalizedNode<?,?> expected = readStore(store, root);
491 ApplySnapshot applySnapshot = new ApplySnapshot(Snapshot.create(
492 SerializationUtils.serializeNormalizedNode(expected),
493 Collections.<ReplicatedLogEntry>emptyList(), 1, 2, 3, 4));
495 shard.underlyingActor().onReceiveCommand(applySnapshot);
497 NormalizedNode<?,?> actual = readStore(shard, root);
499 assertEquals("Root node", expected, actual);
501 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
505 public void testApplyState() throws Exception {
507 TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(), "testApplyState");
509 NormalizedNode<?, ?> node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
511 ApplyState applyState = new ApplyState(null, "test", new ReplicatedLogImplEntry(1, 2,
512 newModificationPayload(new WriteModification(TestModel.TEST_PATH, node))));
514 shard.underlyingActor().onReceiveCommand(applyState);
516 NormalizedNode<?,?> actual = readStore(shard, TestModel.TEST_PATH);
517 assertEquals("Applied state", node, actual);
519 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
523 public void testApplyStateWithCandidatePayload() throws Exception {
525 TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardPropsWithRecoveryComplete(), "testApplyState");
527 recoveryComplete.await(5, TimeUnit.SECONDS);
529 NormalizedNode<?, ?> node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
530 DataTreeCandidate candidate = DataTreeCandidates.fromNormalizedNode(TestModel.TEST_PATH, node);
532 ApplyState applyState = new ApplyState(null, "test", new ReplicatedLogImplEntry(1, 2,
533 DataTreeCandidatePayload.create(candidate)));
535 shard.underlyingActor().onReceiveCommand(applyState);
537 NormalizedNode<?,?> actual = readStore(shard, TestModel.TEST_PATH);
538 assertEquals("Applied state", node, actual);
540 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
543 DataTree setupInMemorySnapshotStore() throws DataValidationFailedException {
544 DataTree testStore = InMemoryDataTreeFactory.getInstance().create();
545 testStore.setSchemaContext(SCHEMA_CONTEXT);
547 writeToStore(testStore, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
549 NormalizedNode<?, ?> root = readStore(testStore, YangInstanceIdentifier.builder().build());
551 InMemorySnapshotStore.addSnapshot(shardID.toString(), Snapshot.create(
552 SerializationUtils.serializeNormalizedNode(root),
553 Collections.<ReplicatedLogEntry>emptyList(), 0, 1, -1, -1));
557 private static DataTreeCandidatePayload payloadForModification(DataTree source, DataTreeModification mod) throws DataValidationFailedException {
558 source.validate(mod);
559 final DataTreeCandidate candidate = source.prepare(mod);
560 source.commit(candidate);
561 return DataTreeCandidatePayload.create(candidate);
565 public void testDataTreeCandidateRecovery() throws Exception {
566 // Set up the InMemorySnapshotStore.
567 final DataTree source = setupInMemorySnapshotStore();
569 final DataTreeModification writeMod = source.takeSnapshot().newModification();
570 writeMod.write(TestModel.OUTER_LIST_PATH, ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
572 InMemoryJournal.addEntry(shardID.toString(), 0, DUMMY_DATA);
574 // Set up the InMemoryJournal.
575 InMemoryJournal.addEntry(shardID.toString(), 1, new ReplicatedLogImplEntry(0, 1, payloadForModification(source, writeMod)));
577 int nListEntries = 16;
578 Set<Integer> listEntryKeys = new HashSet<>();
580 // Add some ModificationPayload entries
581 for (int i = 1; i <= nListEntries; i++) {
582 listEntryKeys.add(Integer.valueOf(i));
584 YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
585 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
587 final DataTreeModification mod = source.takeSnapshot().newModification();
588 mod.merge(path, ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i));
590 InMemoryJournal.addEntry(shardID.toString(), i+1, new ReplicatedLogImplEntry(i, 1,
591 payloadForModification(source, mod)));
594 InMemoryJournal.addEntry(shardID.toString(), nListEntries + 2,
595 new ApplyJournalEntries(nListEntries));
597 testRecovery(listEntryKeys);
601 public void testModicationRecovery() throws Exception {
603 // Set up the InMemorySnapshotStore.
604 setupInMemorySnapshotStore();
606 // Set up the InMemoryJournal.
608 InMemoryJournal.addEntry(shardID.toString(), 0, DUMMY_DATA);
610 InMemoryJournal.addEntry(shardID.toString(), 1, new ReplicatedLogImplEntry(0, 1, newModificationPayload(
611 new WriteModification(TestModel.OUTER_LIST_PATH,
612 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build()))));
614 int nListEntries = 16;
615 Set<Integer> listEntryKeys = new HashSet<>();
617 // Add some ModificationPayload entries
618 for(int i = 1; i <= nListEntries; i++) {
619 listEntryKeys.add(Integer.valueOf(i));
620 YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
621 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
622 Modification mod = new MergeModification(path,
623 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i));
624 InMemoryJournal.addEntry(shardID.toString(), i + 1, new ReplicatedLogImplEntry(i, 1,
625 newModificationPayload(mod)));
628 InMemoryJournal.addEntry(shardID.toString(), nListEntries + 2,
629 new ApplyJournalEntries(nListEntries));
631 testRecovery(listEntryKeys);
634 private static ModificationPayload newModificationPayload(final Modification... mods) throws IOException {
635 MutableCompositeModification compMod = new MutableCompositeModification();
636 for(Modification mod: mods) {
637 compMod.addModification(mod);
640 return new ModificationPayload(compMod);
644 public void testConcurrentThreePhaseCommits() throws Throwable {
645 new ShardTestKit(getSystem()) {{
646 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
647 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
648 "testConcurrentThreePhaseCommits");
650 waitUntilLeader(shard);
652 // Setup 3 simulated transactions with mock cohorts backed by real cohorts.
654 ShardDataTree dataStore = shard.underlyingActor().getDataStore();
656 String transactionID1 = "tx1";
657 MutableCompositeModification modification1 = new MutableCompositeModification();
658 ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
659 TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
661 String transactionID2 = "tx2";
662 MutableCompositeModification modification2 = new MutableCompositeModification();
663 ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
664 TestModel.OUTER_LIST_PATH,
665 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
668 String transactionID3 = "tx3";
669 MutableCompositeModification modification3 = new MutableCompositeModification();
670 ShardDataTreeCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
671 YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
672 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
673 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1),
677 final FiniteDuration duration = FiniteDuration.create(timeoutSec, TimeUnit.SECONDS);
678 final Timeout timeout = new Timeout(duration);
680 // Simulate the ForwardedReadyTransaction message for the first Tx that would be sent
681 // by the ShardTransaction.
683 shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
684 cohort1, modification1, true, false), getRef());
685 ReadyTransactionReply readyReply = ReadyTransactionReply.fromSerializable(
686 expectMsgClass(duration, ReadyTransactionReply.class));
687 assertEquals("Cohort path", shard.path().toString(), readyReply.getCohortPath());
689 // Send the CanCommitTransaction message for the first Tx.
691 shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
692 CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
693 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
694 assertEquals("Can commit", true, canCommitReply.getCanCommit());
696 // Send the ForwardedReadyTransaction for the next 2 Tx's.
698 shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
699 cohort2, modification2, true, false), getRef());
700 expectMsgClass(duration, ReadyTransactionReply.class);
702 shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
703 cohort3, modification3, true, false), getRef());
704 expectMsgClass(duration, ReadyTransactionReply.class);
706 // Send the CanCommitTransaction message for the next 2 Tx's. These should get queued and
707 // processed after the first Tx completes.
709 Future<Object> canCommitFuture1 = Patterns.ask(shard,
710 new CanCommitTransaction(transactionID2).toSerializable(), timeout);
712 Future<Object> canCommitFuture2 = Patterns.ask(shard,
713 new CanCommitTransaction(transactionID3).toSerializable(), timeout);
715 // Send the CommitTransaction message for the first Tx. After it completes, it should
716 // trigger the 2nd Tx to proceed which should in turn then trigger the 3rd.
718 shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
719 expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
721 // Wait for the next 2 Tx's to complete.
723 final AtomicReference<Throwable> caughtEx = new AtomicReference<>();
724 final CountDownLatch commitLatch = new CountDownLatch(2);
726 class OnFutureComplete extends OnComplete<Object> {
727 private final Class<?> expRespType;
729 OnFutureComplete(final Class<?> expRespType) {
730 this.expRespType = expRespType;
734 public void onComplete(final Throwable error, final Object resp) {
736 caughtEx.set(new AssertionError(getClass().getSimpleName() + " failure", error));
739 assertEquals("Commit response type", expRespType, resp.getClass());
741 } catch (Exception e) {
747 void onSuccess(final Object resp) throws Exception {
751 class OnCommitFutureComplete extends OnFutureComplete {
752 OnCommitFutureComplete() {
753 super(CommitTransactionReply.SERIALIZABLE_CLASS);
757 public void onComplete(final Throwable error, final Object resp) {
758 super.onComplete(error, resp);
759 commitLatch.countDown();
763 class OnCanCommitFutureComplete extends OnFutureComplete {
764 private final String transactionID;
766 OnCanCommitFutureComplete(final String transactionID) {
767 super(CanCommitTransactionReply.SERIALIZABLE_CLASS);
768 this.transactionID = transactionID;
772 void onSuccess(final Object resp) throws Exception {
773 CanCommitTransactionReply canCommitReply =
774 CanCommitTransactionReply.fromSerializable(resp);
775 assertEquals("Can commit", true, canCommitReply.getCanCommit());
777 Future<Object> commitFuture = Patterns.ask(shard,
778 new CommitTransaction(transactionID).toSerializable(), timeout);
779 commitFuture.onComplete(new OnCommitFutureComplete(), getSystem().dispatcher());
783 canCommitFuture1.onComplete(new OnCanCommitFutureComplete(transactionID2),
784 getSystem().dispatcher());
786 canCommitFuture2.onComplete(new OnCanCommitFutureComplete(transactionID3),
787 getSystem().dispatcher());
789 boolean done = commitLatch.await(timeoutSec, TimeUnit.SECONDS);
791 if(caughtEx.get() != null) {
792 throw caughtEx.get();
795 assertEquals("Commits complete", true, done);
797 InOrder inOrder = inOrder(cohort1, cohort2, cohort3);
798 inOrder.verify(cohort1).canCommit();
799 inOrder.verify(cohort1).preCommit();
800 inOrder.verify(cohort1).commit();
801 inOrder.verify(cohort2).canCommit();
802 inOrder.verify(cohort2).preCommit();
803 inOrder.verify(cohort2).commit();
804 inOrder.verify(cohort3).canCommit();
805 inOrder.verify(cohort3).preCommit();
806 inOrder.verify(cohort3).commit();
808 // Verify data in the data store.
810 verifyOuterListEntry(shard, 1);
812 verifyLastApplied(shard, 2);
814 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
818 private static BatchedModifications newBatchedModifications(String transactionID, YangInstanceIdentifier path,
819 NormalizedNode<?, ?> data, boolean ready, boolean doCommitOnReady) {
820 return newBatchedModifications(transactionID, null, path, data, ready, doCommitOnReady);
823 private static BatchedModifications newBatchedModifications(String transactionID, String transactionChainID,
824 YangInstanceIdentifier path, NormalizedNode<?, ?> data, boolean ready, boolean doCommitOnReady) {
825 BatchedModifications batched = new BatchedModifications(transactionID, CURRENT_VERSION, transactionChainID);
826 batched.addModification(new WriteModification(path, data));
827 batched.setReady(ready);
828 batched.setDoCommitOnReady(doCommitOnReady);
833 public void testBatchedModificationsWithNoCommitOnReady() throws Throwable {
834 new ShardTestKit(getSystem()) {{
835 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
836 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
837 "testBatchedModificationsWithNoCommitOnReady");
839 waitUntilLeader(shard);
841 final String transactionID = "tx";
842 FiniteDuration duration = duration("5 seconds");
844 final AtomicReference<ShardDataTreeCohort> mockCohort = new AtomicReference<>();
845 ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() {
847 public ShardDataTreeCohort decorate(String txID, ShardDataTreeCohort actual) {
848 if(mockCohort.get() == null) {
849 mockCohort.set(createDelegatingMockCohort("cohort", actual));
852 return mockCohort.get();
856 shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator);
858 // Send a BatchedModifications to start a transaction.
860 shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH,
861 ImmutableNodes.containerNode(TestModel.TEST_QNAME), false, false), getRef());
862 expectMsgClass(duration, BatchedModificationsReply.class);
864 // Send a couple more BatchedModifications.
866 shard.tell(newBatchedModifications(transactionID, TestModel.OUTER_LIST_PATH,
867 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), false, false), getRef());
868 expectMsgClass(duration, BatchedModificationsReply.class);
870 shard.tell(newBatchedModifications(transactionID, YangInstanceIdentifier.builder(
871 TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
872 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true, false), getRef());
873 expectMsgClass(duration, ReadyTransactionReply.class);
875 // Send the CanCommitTransaction message.
877 shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
878 CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
879 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
880 assertEquals("Can commit", true, canCommitReply.getCanCommit());
882 // Send the CanCommitTransaction message.
884 shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
885 expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
887 InOrder inOrder = inOrder(mockCohort.get());
888 inOrder.verify(mockCohort.get()).canCommit();
889 inOrder.verify(mockCohort.get()).preCommit();
890 inOrder.verify(mockCohort.get()).commit();
892 // Verify data in the data store.
894 verifyOuterListEntry(shard, 1);
896 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
901 public void testBatchedModificationsWithCommitOnReady() throws Throwable {
902 new ShardTestKit(getSystem()) {{
903 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
904 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
905 "testBatchedModificationsWithCommitOnReady");
907 waitUntilLeader(shard);
909 final String transactionID = "tx";
910 FiniteDuration duration = duration("5 seconds");
912 final AtomicReference<ShardDataTreeCohort> mockCohort = new AtomicReference<>();
913 ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() {
915 public ShardDataTreeCohort decorate(String txID, ShardDataTreeCohort actual) {
916 if(mockCohort.get() == null) {
917 mockCohort.set(createDelegatingMockCohort("cohort", actual));
920 return mockCohort.get();
924 shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator);
926 // Send a BatchedModifications to start a transaction.
928 shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH,
929 ImmutableNodes.containerNode(TestModel.TEST_QNAME), false, false), getRef());
930 expectMsgClass(duration, BatchedModificationsReply.class);
932 // Send a couple more BatchedModifications.
934 shard.tell(newBatchedModifications(transactionID, TestModel.OUTER_LIST_PATH,
935 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), false, false), getRef());
936 expectMsgClass(duration, BatchedModificationsReply.class);
938 shard.tell(newBatchedModifications(transactionID, YangInstanceIdentifier.builder(
939 TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
940 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true, true), getRef());
942 expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
944 InOrder inOrder = inOrder(mockCohort.get());
945 inOrder.verify(mockCohort.get()).canCommit();
946 inOrder.verify(mockCohort.get()).preCommit();
947 inOrder.verify(mockCohort.get()).commit();
949 // Verify data in the data store.
951 verifyOuterListEntry(shard, 1);
953 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
957 @SuppressWarnings("unchecked")
958 private static void verifyOuterListEntry(final TestActorRef<Shard> shard, Object expIDValue) throws Exception {
959 NormalizedNode<?, ?> outerList = readStore(shard, TestModel.OUTER_LIST_PATH);
960 assertNotNull(TestModel.OUTER_LIST_QNAME.getLocalName() + " not found", outerList);
961 assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " value is not Iterable",
962 outerList.getValue() instanceof Iterable);
963 Object entry = ((Iterable<Object>)outerList.getValue()).iterator().next();
964 assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " entry is not MapEntryNode",
965 entry instanceof MapEntryNode);
966 MapEntryNode mapEntry = (MapEntryNode)entry;
967 Optional<DataContainerChild<? extends PathArgument, ?>> idLeaf =
968 mapEntry.getChild(new YangInstanceIdentifier.NodeIdentifier(TestModel.ID_QNAME));
969 assertTrue("Missing leaf " + TestModel.ID_QNAME.getLocalName(), idLeaf.isPresent());
970 assertEquals(TestModel.ID_QNAME.getLocalName() + " value", expIDValue, idLeaf.get().getValue());
974 public void testBatchedModificationsOnTransactionChain() throws Throwable {
975 new ShardTestKit(getSystem()) {{
976 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
977 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
978 "testBatchedModificationsOnTransactionChain");
980 waitUntilLeader(shard);
982 String transactionChainID = "txChain";
983 String transactionID1 = "tx1";
984 String transactionID2 = "tx2";
986 FiniteDuration duration = duration("5 seconds");
988 // Send a BatchedModifications to start a chained write transaction and ready it.
990 ContainerNode containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
991 YangInstanceIdentifier path = TestModel.TEST_PATH;
992 shard.tell(newBatchedModifications(transactionID1, transactionChainID, path,
993 containerNode, true, false), getRef());
994 expectMsgClass(duration, ReadyTransactionReply.class);
996 // Create a read Tx on the same chain.
998 shard.tell(new CreateTransaction(transactionID2, TransactionType.READ_ONLY.ordinal() ,
999 transactionChainID).toSerializable(), getRef());
1001 CreateTransactionReply createReply = expectMsgClass(duration("3 seconds"), CreateTransactionReply.class);
1003 getSystem().actorSelection(createReply.getTransactionActorPath()).tell(new ReadData(path), getRef());
1004 ReadDataReply readReply = expectMsgClass(duration("3 seconds"), ReadDataReply.class);
1005 assertEquals("Read node", containerNode, readReply.getNormalizedNode());
1007 // Commit the write transaction.
1009 shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1010 CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1011 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1012 assertEquals("Can commit", true, canCommitReply.getCanCommit());
1014 shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
1015 expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1017 // Verify data in the data store.
1019 NormalizedNode<?, ?> actualNode = readStore(shard, path);
1020 assertEquals("Stored node", containerNode, actualNode);
1022 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1027 public void testOnBatchedModificationsWhenNotLeader() {
1028 final AtomicBoolean overrideLeaderCalls = new AtomicBoolean();
1029 new ShardTestKit(getSystem()) {{
1030 Creator<Shard> creator = new Creator<Shard>() {
1031 private static final long serialVersionUID = 1L;
1034 public Shard create() throws Exception {
1035 return new Shard(shardID, Collections.<String,String>emptyMap(),
1036 newDatastoreContext(), SCHEMA_CONTEXT) {
1038 protected boolean isLeader() {
1039 return overrideLeaderCalls.get() ? false : super.isLeader();
1043 protected ActorSelection getLeader() {
1044 return overrideLeaderCalls.get() ? getSystem().actorSelection(getRef().path()) :
1051 TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1052 Props.create(new DelegatingShardCreator(creator)), "testOnBatchedModificationsWhenNotLeader");
1054 waitUntilLeader(shard);
1056 overrideLeaderCalls.set(true);
1058 BatchedModifications batched = new BatchedModifications("tx", DataStoreVersions.CURRENT_VERSION, "");
1060 shard.tell(batched, ActorRef.noSender());
1062 expectMsgEquals(batched);
1064 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1069 public void testForwardedReadyTransactionWithImmediateCommit() throws Exception{
1070 new ShardTestKit(getSystem()) {{
1071 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1072 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1073 "testForwardedReadyTransactionWithImmediateCommit");
1075 waitUntilLeader(shard);
1077 ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1079 String transactionID = "tx1";
1080 MutableCompositeModification modification = new MutableCompositeModification();
1081 NormalizedNode<?, ?> containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1082 ShardDataTreeCohort cohort = setupMockWriteTransaction("cohort", dataStore,
1083 TestModel.TEST_PATH, containerNode, modification);
1085 FiniteDuration duration = duration("5 seconds");
1087 // Simulate the ForwardedReadyTransaction messages that would be sent
1088 // by the ShardTransaction.
1090 shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
1091 cohort, modification, true, true), getRef());
1093 expectMsgClass(duration, ThreePhaseCommitCohortMessages.CommitTransactionReply.class);
1095 InOrder inOrder = inOrder(cohort);
1096 inOrder.verify(cohort).canCommit();
1097 inOrder.verify(cohort).preCommit();
1098 inOrder.verify(cohort).commit();
1100 NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.TEST_PATH);
1101 assertEquals(TestModel.TEST_QNAME.getLocalName(), containerNode, actualNode);
1103 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1108 public void testReadyLocalTransactionWithImmediateCommit() throws Exception{
1109 new ShardTestKit(getSystem()) {{
1110 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1111 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1112 "testReadyLocalTransactionWithImmediateCommit");
1114 waitUntilLeader(shard);
1116 ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1118 DataTreeModification modification = dataStore.getDataTree().takeSnapshot().newModification();
1120 ContainerNode writeData = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1121 new WriteModification(TestModel.TEST_PATH, writeData).apply(modification);
1122 MapNode mergeData = ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build();
1123 new MergeModification(TestModel.OUTER_LIST_PATH, mergeData).apply(modification);
1125 String txId = "tx1";
1126 ReadyLocalTransaction readyMessage = new ReadyLocalTransaction(txId, modification, true);
1128 shard.tell(readyMessage, getRef());
1130 expectMsgClass(CommitTransactionReply.SERIALIZABLE_CLASS);
1132 NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.OUTER_LIST_PATH);
1133 assertEquals(TestModel.OUTER_LIST_QNAME.getLocalName(), mergeData, actualNode);
1135 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1140 public void testReadyLocalTransactionWithThreePhaseCommit() throws Exception{
1141 new ShardTestKit(getSystem()) {{
1142 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1143 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1144 "testReadyLocalTransactionWithThreePhaseCommit");
1146 waitUntilLeader(shard);
1148 ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1150 DataTreeModification modification = dataStore.getDataTree().takeSnapshot().newModification();
1152 ContainerNode writeData = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1153 new WriteModification(TestModel.TEST_PATH, writeData).apply(modification);
1154 MapNode mergeData = ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build();
1155 new MergeModification(TestModel.OUTER_LIST_PATH, mergeData).apply(modification);
1157 String txId = "tx1";
1158 ReadyLocalTransaction readyMessage = new ReadyLocalTransaction(txId, modification, false);
1160 shard.tell(readyMessage, getRef());
1162 expectMsgClass(ReadyTransactionReply.class);
1164 // Send the CanCommitTransaction message.
1166 shard.tell(new CanCommitTransaction(txId).toSerializable(), getRef());
1167 CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1168 expectMsgClass(CanCommitTransactionReply.SERIALIZABLE_CLASS));
1169 assertEquals("Can commit", true, canCommitReply.getCanCommit());
1171 // Send the CanCommitTransaction message.
1173 shard.tell(new CommitTransaction(txId).toSerializable(), getRef());
1174 expectMsgClass(CommitTransactionReply.SERIALIZABLE_CLASS);
1176 NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.OUTER_LIST_PATH);
1177 assertEquals(TestModel.OUTER_LIST_QNAME.getLocalName(), mergeData, actualNode);
1179 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1184 public void testCommitWithPersistenceDisabled() throws Throwable {
1185 dataStoreContextBuilder.persistent(false);
1186 new ShardTestKit(getSystem()) {{
1187 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1188 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1189 "testCommitWithPersistenceDisabled");
1191 waitUntilLeader(shard);
1193 ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1195 // Setup a simulated transactions with a mock cohort.
1197 String transactionID = "tx";
1198 MutableCompositeModification modification = new MutableCompositeModification();
1199 NormalizedNode<?, ?> containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1200 ShardDataTreeCohort cohort = setupMockWriteTransaction("cohort", dataStore,
1201 TestModel.TEST_PATH, containerNode, modification);
1203 FiniteDuration duration = duration("5 seconds");
1205 // Simulate the ForwardedReadyTransaction messages that would be sent
1206 // by the ShardTransaction.
1208 shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
1209 cohort, modification, true, false), getRef());
1210 expectMsgClass(duration, ReadyTransactionReply.class);
1212 // Send the CanCommitTransaction message.
1214 shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
1215 CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1216 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1217 assertEquals("Can commit", true, canCommitReply.getCanCommit());
1219 // Send the CanCommitTransaction message.
1221 shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
1222 expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1224 InOrder inOrder = inOrder(cohort);
1225 inOrder.verify(cohort).canCommit();
1226 inOrder.verify(cohort).preCommit();
1227 inOrder.verify(cohort).commit();
1229 NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.TEST_PATH);
1230 assertEquals(TestModel.TEST_QNAME.getLocalName(), containerNode, actualNode);
1232 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1236 private static DataTreeCandidateTip mockCandidate(final String name) {
1237 DataTreeCandidateTip mockCandidate = mock(DataTreeCandidateTip.class, name);
1238 DataTreeCandidateNode mockCandidateNode = mock(DataTreeCandidateNode.class, name + "-node");
1239 doReturn(ModificationType.WRITE).when(mockCandidateNode).getModificationType();
1240 doReturn(Optional.of(ImmutableNodes.containerNode(CARS_QNAME))).when(mockCandidateNode).getDataAfter();
1241 doReturn(YangInstanceIdentifier.builder().build()).when(mockCandidate).getRootPath();
1242 doReturn(mockCandidateNode).when(mockCandidate).getRootNode();
1243 return mockCandidate;
1246 private static DataTreeCandidateTip mockUnmodifiedCandidate(final String name) {
1247 DataTreeCandidateTip mockCandidate = mock(DataTreeCandidateTip.class, name);
1248 DataTreeCandidateNode mockCandidateNode = mock(DataTreeCandidateNode.class, name + "-node");
1249 doReturn(ModificationType.UNMODIFIED).when(mockCandidateNode).getModificationType();
1250 doReturn(YangInstanceIdentifier.builder().build()).when(mockCandidate).getRootPath();
1251 doReturn(mockCandidateNode).when(mockCandidate).getRootNode();
1252 return mockCandidate;
1256 public void testCommitWhenTransactionHasNoModifications(){
1257 // Note that persistence is enabled which would normally result in the entry getting written to the journal
1258 // but here that need not happen
1259 new ShardTestKit(getSystem()) {
1261 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1262 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1263 "testCommitWhenTransactionHasNoModifications");
1265 waitUntilLeader(shard);
1267 String transactionID = "tx1";
1268 MutableCompositeModification modification = new MutableCompositeModification();
1269 ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1270 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1271 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).preCommit();
1272 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).commit();
1273 doReturn(mockUnmodifiedCandidate("cohort1-candidate")).when(cohort).getCandidate();
1275 FiniteDuration duration = duration("5 seconds");
1277 // Simulate the ForwardedReadyTransaction messages that would be sent
1278 // by the ShardTransaction.
1280 shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
1281 cohort, modification, true, false), getRef());
1282 expectMsgClass(duration, ReadyTransactionReply.class);
1284 // Send the CanCommitTransaction message.
1286 shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
1287 CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1288 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1289 assertEquals("Can commit", true, canCommitReply.getCanCommit());
1291 shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
1292 expectMsgClass(duration, ThreePhaseCommitCohortMessages.CommitTransactionReply.class);
1294 InOrder inOrder = inOrder(cohort);
1295 inOrder.verify(cohort).canCommit();
1296 inOrder.verify(cohort).preCommit();
1297 inOrder.verify(cohort).commit();
1299 shard.tell(Shard.GET_SHARD_MBEAN_MESSAGE, getRef());
1300 ShardStats shardStats = expectMsgClass(duration, ShardStats.class);
1302 // Use MBean for verification
1303 // Committed transaction count should increase as usual
1304 assertEquals(1,shardStats.getCommittedTransactionsCount());
1306 // Commit index should not advance because this does not go into the journal
1307 assertEquals(-1, shardStats.getCommitIndex());
1309 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1316 public void testCommitWhenTransactionHasModifications(){
1317 new ShardTestKit(getSystem()) {
1319 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1320 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1321 "testCommitWhenTransactionHasModifications");
1323 waitUntilLeader(shard);
1325 String transactionID = "tx1";
1326 MutableCompositeModification modification = new MutableCompositeModification();
1327 modification.addModification(new DeleteModification(YangInstanceIdentifier.builder().build()));
1328 ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1329 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1330 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).preCommit();
1331 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).commit();
1332 doReturn(mockCandidate("cohort1-candidate")).when(cohort).getCandidate();
1334 FiniteDuration duration = duration("5 seconds");
1336 // Simulate the ForwardedReadyTransaction messages that would be sent
1337 // by the ShardTransaction.
1339 shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
1340 cohort, modification, true, false), getRef());
1341 expectMsgClass(duration, ReadyTransactionReply.class);
1343 // Send the CanCommitTransaction message.
1345 shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
1346 CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1347 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1348 assertEquals("Can commit", true, canCommitReply.getCanCommit());
1350 shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
1351 expectMsgClass(duration, ThreePhaseCommitCohortMessages.CommitTransactionReply.class);
1353 InOrder inOrder = inOrder(cohort);
1354 inOrder.verify(cohort).canCommit();
1355 inOrder.verify(cohort).preCommit();
1356 inOrder.verify(cohort).commit();
1358 shard.tell(Shard.GET_SHARD_MBEAN_MESSAGE, getRef());
1359 ShardStats shardStats = expectMsgClass(duration, ShardStats.class);
1361 // Use MBean for verification
1362 // Committed transaction count should increase as usual
1363 assertEquals(1, shardStats.getCommittedTransactionsCount());
1365 // Commit index should advance as we do not have an empty modification
1366 assertEquals(0, shardStats.getCommitIndex());
1368 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1375 public void testCommitPhaseFailure() throws Throwable {
1376 new ShardTestKit(getSystem()) {{
1377 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1378 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1379 "testCommitPhaseFailure");
1381 waitUntilLeader(shard);
1383 // Setup 2 simulated transactions with mock cohorts. The first one fails in the
1386 String transactionID1 = "tx1";
1387 MutableCompositeModification modification1 = new MutableCompositeModification();
1388 ShardDataTreeCohort cohort1 = mock(ShardDataTreeCohort.class, "cohort1");
1389 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
1390 doReturn(Futures.immediateFuture(null)).when(cohort1).preCommit();
1391 doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort1).commit();
1392 doReturn(mockCandidate("cohort1-candidate")).when(cohort1).getCandidate();
1394 String transactionID2 = "tx2";
1395 MutableCompositeModification modification2 = new MutableCompositeModification();
1396 ShardDataTreeCohort cohort2 = mock(ShardDataTreeCohort.class, "cohort2");
1397 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
1399 FiniteDuration duration = duration("5 seconds");
1400 final Timeout timeout = new Timeout(duration);
1402 // Simulate the ForwardedReadyTransaction messages that would be sent
1403 // by the ShardTransaction.
1405 shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1406 cohort1, modification1, true, false), getRef());
1407 expectMsgClass(duration, ReadyTransactionReply.class);
1409 shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1410 cohort2, modification2, true, false), getRef());
1411 expectMsgClass(duration, ReadyTransactionReply.class);
1413 // Send the CanCommitTransaction message for the first Tx.
1415 shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1416 CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1417 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1418 assertEquals("Can commit", true, canCommitReply.getCanCommit());
1420 // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
1421 // processed after the first Tx completes.
1423 Future<Object> canCommitFuture = Patterns.ask(shard,
1424 new CanCommitTransaction(transactionID2).toSerializable(), timeout);
1426 // Send the CommitTransaction message for the first Tx. This should send back an error
1427 // and trigger the 2nd Tx to proceed.
1429 shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
1430 expectMsgClass(duration, akka.actor.Status.Failure.class);
1432 // Wait for the 2nd Tx to complete the canCommit phase.
1434 final CountDownLatch latch = new CountDownLatch(1);
1435 canCommitFuture.onComplete(new OnComplete<Object>() {
1437 public void onComplete(final Throwable t, final Object resp) {
1440 }, getSystem().dispatcher());
1442 assertEquals("2nd CanCommit complete", true, latch.await(5, TimeUnit.SECONDS));
1444 InOrder inOrder = inOrder(cohort1, cohort2);
1445 inOrder.verify(cohort1).canCommit();
1446 inOrder.verify(cohort1).preCommit();
1447 inOrder.verify(cohort1).commit();
1448 inOrder.verify(cohort2).canCommit();
1450 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1455 public void testPreCommitPhaseFailure() throws Throwable {
1456 new ShardTestKit(getSystem()) {{
1457 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1458 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1459 "testPreCommitPhaseFailure");
1461 waitUntilLeader(shard);
1463 String transactionID1 = "tx1";
1464 MutableCompositeModification modification1 = new MutableCompositeModification();
1465 ShardDataTreeCohort cohort1 = mock(ShardDataTreeCohort.class, "cohort1");
1466 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
1467 doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort1).preCommit();
1469 String transactionID2 = "tx2";
1470 MutableCompositeModification modification2 = new MutableCompositeModification();
1471 ShardDataTreeCohort cohort2 = mock(ShardDataTreeCohort.class, "cohort2");
1472 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
1474 FiniteDuration duration = duration("5 seconds");
1475 final Timeout timeout = new Timeout(duration);
1477 // Simulate the ForwardedReadyTransaction messages that would be sent
1478 // by the ShardTransaction.
1480 shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1481 cohort1, modification1, true, false), getRef());
1482 expectMsgClass(duration, ReadyTransactionReply.class);
1484 shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1485 cohort2, modification2, true, false), getRef());
1486 expectMsgClass(duration, ReadyTransactionReply.class);
1488 // Send the CanCommitTransaction message for the first Tx.
1490 shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1491 CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1492 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1493 assertEquals("Can commit", true, canCommitReply.getCanCommit());
1495 // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
1496 // processed after the first Tx completes.
1498 Future<Object> canCommitFuture = Patterns.ask(shard,
1499 new CanCommitTransaction(transactionID2).toSerializable(), timeout);
1501 // Send the CommitTransaction message for the first Tx. This should send back an error
1502 // and trigger the 2nd Tx to proceed.
1504 shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
1505 expectMsgClass(duration, akka.actor.Status.Failure.class);
1507 // Wait for the 2nd Tx to complete the canCommit phase.
1509 final CountDownLatch latch = new CountDownLatch(1);
1510 canCommitFuture.onComplete(new OnComplete<Object>() {
1512 public void onComplete(final Throwable t, final Object resp) {
1515 }, getSystem().dispatcher());
1517 assertEquals("2nd CanCommit complete", true, latch.await(5, TimeUnit.SECONDS));
1519 InOrder inOrder = inOrder(cohort1, cohort2);
1520 inOrder.verify(cohort1).canCommit();
1521 inOrder.verify(cohort1).preCommit();
1522 inOrder.verify(cohort2).canCommit();
1524 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1529 public void testCanCommitPhaseFailure() throws Throwable {
1530 new ShardTestKit(getSystem()) {{
1531 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1532 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1533 "testCanCommitPhaseFailure");
1535 waitUntilLeader(shard);
1537 final FiniteDuration duration = duration("5 seconds");
1539 String transactionID1 = "tx1";
1540 MutableCompositeModification modification = new MutableCompositeModification();
1541 ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1542 doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).canCommit();
1544 // Simulate the ForwardedReadyTransaction messages that would be sent
1545 // by the ShardTransaction.
1547 shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1548 cohort, modification, true, false), getRef());
1549 expectMsgClass(duration, ReadyTransactionReply.class);
1551 // Send the CanCommitTransaction message.
1553 shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1554 expectMsgClass(duration, akka.actor.Status.Failure.class);
1556 // Send another can commit to ensure the failed one got cleaned up.
1560 String transactionID2 = "tx2";
1561 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1563 shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1564 cohort, modification, true, false), getRef());
1565 expectMsgClass(duration, ReadyTransactionReply.class);
1567 shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
1568 CanCommitTransactionReply reply = CanCommitTransactionReply.fromSerializable(
1569 expectMsgClass(CanCommitTransactionReply.SERIALIZABLE_CLASS));
1570 assertEquals("getCanCommit", true, reply.getCanCommit());
1572 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1577 public void testCanCommitPhaseFalseResponse() throws Throwable {
1578 new ShardTestKit(getSystem()) {{
1579 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1580 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1581 "testCanCommitPhaseFalseResponse");
1583 waitUntilLeader(shard);
1585 final FiniteDuration duration = duration("5 seconds");
1587 String transactionID1 = "tx1";
1588 MutableCompositeModification modification = new MutableCompositeModification();
1589 ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1590 doReturn(Futures.immediateFuture(Boolean.FALSE)).when(cohort).canCommit();
1592 // Simulate the ForwardedReadyTransaction messages that would be sent
1593 // by the ShardTransaction.
1595 shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1596 cohort, modification, true, false), getRef());
1597 expectMsgClass(duration, ReadyTransactionReply.class);
1599 // Send the CanCommitTransaction message.
1601 shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1602 CanCommitTransactionReply reply = CanCommitTransactionReply.fromSerializable(
1603 expectMsgClass(CanCommitTransactionReply.SERIALIZABLE_CLASS));
1604 assertEquals("getCanCommit", false, reply.getCanCommit());
1606 // Send another can commit to ensure the failed one got cleaned up.
1610 String transactionID2 = "tx2";
1611 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1613 shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1614 cohort, modification, true, false), getRef());
1615 expectMsgClass(duration, ReadyTransactionReply.class);
1617 shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
1618 reply = CanCommitTransactionReply.fromSerializable(
1619 expectMsgClass(CanCommitTransactionReply.SERIALIZABLE_CLASS));
1620 assertEquals("getCanCommit", true, reply.getCanCommit());
1622 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1627 public void testImmediateCommitWithCanCommitPhaseFailure() throws Throwable {
1628 new ShardTestKit(getSystem()) {{
1629 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1630 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1631 "testImmediateCommitWithCanCommitPhaseFailure");
1633 waitUntilLeader(shard);
1635 final FiniteDuration duration = duration("5 seconds");
1637 String transactionID1 = "tx1";
1638 MutableCompositeModification modification = new MutableCompositeModification();
1639 ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1640 doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).canCommit();
1642 // Simulate the ForwardedReadyTransaction messages that would be sent
1643 // by the ShardTransaction.
1645 shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1646 cohort, modification, true, true), getRef());
1648 expectMsgClass(duration, akka.actor.Status.Failure.class);
1650 // Send another can commit to ensure the failed one got cleaned up.
1654 String transactionID2 = "tx2";
1655 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1656 doReturn(Futures.immediateFuture(null)).when(cohort).preCommit();
1657 doReturn(Futures.immediateFuture(null)).when(cohort).commit();
1658 DataTreeCandidateTip candidate = mock(DataTreeCandidateTip.class);
1659 DataTreeCandidateNode candidateRoot = mock(DataTreeCandidateNode.class);
1660 doReturn(ModificationType.UNMODIFIED).when(candidateRoot).getModificationType();
1661 doReturn(candidateRoot).when(candidate).getRootNode();
1662 doReturn(candidate).when(cohort).getCandidate();
1664 shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1665 cohort, modification, true, true), getRef());
1667 expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1669 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1674 public void testImmediateCommitWithCanCommitPhaseFalseResponse() throws Throwable {
1675 new ShardTestKit(getSystem()) {{
1676 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1677 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1678 "testImmediateCommitWithCanCommitPhaseFalseResponse");
1680 waitUntilLeader(shard);
1682 final FiniteDuration duration = duration("5 seconds");
1684 String transactionID = "tx1";
1685 MutableCompositeModification modification = new MutableCompositeModification();
1686 ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1687 doReturn(Futures.immediateFuture(Boolean.FALSE)).when(cohort).canCommit();
1689 // Simulate the ForwardedReadyTransaction messages that would be sent
1690 // by the ShardTransaction.
1692 shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
1693 cohort, modification, true, true), getRef());
1695 expectMsgClass(duration, akka.actor.Status.Failure.class);
1697 // Send another can commit to ensure the failed one got cleaned up.
1701 String transactionID2 = "tx2";
1702 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1703 doReturn(Futures.immediateFuture(null)).when(cohort).preCommit();
1704 doReturn(Futures.immediateFuture(null)).when(cohort).commit();
1705 DataTreeCandidateTip candidate = mock(DataTreeCandidateTip.class);
1706 DataTreeCandidateNode candidateRoot = mock(DataTreeCandidateNode.class);
1707 doReturn(ModificationType.UNMODIFIED).when(candidateRoot).getModificationType();
1708 doReturn(candidateRoot).when(candidate).getRootNode();
1709 doReturn(candidate).when(cohort).getCandidate();
1711 shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1712 cohort, modification, true, true), getRef());
1714 expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1716 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1721 public void testAbortBeforeFinishCommit() throws Throwable {
1722 new ShardTestKit(getSystem()) {{
1723 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1724 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1725 "testAbortBeforeFinishCommit");
1727 waitUntilLeader(shard);
1729 final FiniteDuration duration = duration("5 seconds");
1730 ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1732 final String transactionID = "tx1";
1733 Function<ShardDataTreeCohort, ListenableFuture<Void>> preCommit =
1734 new Function<ShardDataTreeCohort, ListenableFuture<Void>>() {
1736 public ListenableFuture<Void> apply(final ShardDataTreeCohort cohort) {
1737 ListenableFuture<Void> preCommitFuture = cohort.preCommit();
1739 // Simulate an AbortTransaction message occurring during replication, after
1740 // persisting and before finishing the commit to the in-memory store.
1741 // We have no followers so due to optimizations in the RaftActor, it does not
1742 // attempt replication and thus we can't send an AbortTransaction message b/c
1743 // it would be processed too late after CommitTransaction completes. So we'll
1744 // simulate an AbortTransaction message occurring during replication by calling
1745 // the shard directly.
1747 shard.underlyingActor().doAbortTransaction(transactionID, null);
1749 return preCommitFuture;
1753 MutableCompositeModification modification = new MutableCompositeModification();
1754 ShardDataTreeCohort cohort = setupMockWriteTransaction("cohort1", dataStore,
1755 TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME),
1756 modification, preCommit);
1758 shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
1759 cohort, modification, true, false), getRef());
1760 expectMsgClass(duration, ReadyTransactionReply.class);
1762 shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
1763 CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1764 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1765 assertEquals("Can commit", true, canCommitReply.getCanCommit());
1767 shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
1768 expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1770 NormalizedNode<?, ?> node = readStore(shard, TestModel.TEST_PATH);
1772 // Since we're simulating an abort occurring during replication and before finish commit,
1773 // the data should still get written to the in-memory store since we've gotten past
1774 // canCommit and preCommit and persisted the data.
1775 assertNotNull(TestModel.TEST_QNAME.getLocalName() + " not found", node);
1777 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1782 public void testTransactionCommitTimeout() throws Throwable {
1783 dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1);
1785 new ShardTestKit(getSystem()) {{
1786 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1787 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1788 "testTransactionCommitTimeout");
1790 waitUntilLeader(shard);
1792 final FiniteDuration duration = duration("5 seconds");
1794 ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1796 writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1797 writeToStore(shard, TestModel.OUTER_LIST_PATH,
1798 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
1800 // Create 1st Tx - will timeout
1802 String transactionID1 = "tx1";
1803 MutableCompositeModification modification1 = new MutableCompositeModification();
1804 ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
1805 YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
1806 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
1807 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1),
1812 String transactionID2 = "tx3";
1813 MutableCompositeModification modification2 = new MutableCompositeModification();
1814 YangInstanceIdentifier listNodePath = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
1815 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build();
1816 ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort3", dataStore,
1818 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2),
1823 shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1824 cohort1, modification1, true, false), getRef());
1825 expectMsgClass(duration, ReadyTransactionReply.class);
1827 shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1828 cohort2, modification2, true, false), getRef());
1829 expectMsgClass(duration, ReadyTransactionReply.class);
1831 // canCommit 1st Tx. We don't send the commit so it should timeout.
1833 shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1834 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
1836 // canCommit the 2nd Tx - it should complete after the 1st Tx times out.
1838 shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
1839 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
1841 // Try to commit the 1st Tx - should fail as it's not the current Tx.
1843 shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
1844 expectMsgClass(duration, akka.actor.Status.Failure.class);
1846 // Commit the 2nd Tx.
1848 shard.tell(new CommitTransaction(transactionID2).toSerializable(), getRef());
1849 expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1851 NormalizedNode<?, ?> node = readStore(shard, listNodePath);
1852 assertNotNull(listNodePath + " not found", node);
1854 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1859 public void testTransactionCommitQueueCapacityExceeded() throws Throwable {
1860 dataStoreContextBuilder.shardTransactionCommitQueueCapacity(2);
1862 new ShardTestKit(getSystem()) {{
1863 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1864 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1865 "testTransactionCommitQueueCapacityExceeded");
1867 waitUntilLeader(shard);
1869 final FiniteDuration duration = duration("5 seconds");
1871 ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1873 String transactionID1 = "tx1";
1874 MutableCompositeModification modification1 = new MutableCompositeModification();
1875 ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
1876 TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
1878 String transactionID2 = "tx2";
1879 MutableCompositeModification modification2 = new MutableCompositeModification();
1880 ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
1881 TestModel.OUTER_LIST_PATH,
1882 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
1885 String transactionID3 = "tx3";
1886 MutableCompositeModification modification3 = new MutableCompositeModification();
1887 ShardDataTreeCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
1888 TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification3);
1892 shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1893 cohort1, modification1, true, false), getRef());
1894 expectMsgClass(duration, ReadyTransactionReply.class);
1896 shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1897 cohort2, modification2, true, false), getRef());
1898 expectMsgClass(duration, ReadyTransactionReply.class);
1900 // The 3rd Tx should exceed queue capacity and fail.
1902 shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
1903 cohort3, modification3, true, false), getRef());
1904 expectMsgClass(duration, akka.actor.Status.Failure.class);
1906 // canCommit 1st Tx.
1908 shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1909 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
1911 // canCommit the 2nd Tx - it should get queued.
1913 shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
1915 // canCommit the 3rd Tx - should exceed queue capacity and fail.
1917 shard.tell(new CanCommitTransaction(transactionID3).toSerializable(), getRef());
1918 expectMsgClass(duration, akka.actor.Status.Failure.class);
1920 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1925 public void testTransactionCommitWithPriorExpiredCohortEntries() throws Throwable {
1926 dataStoreContextBuilder.shardCommitQueueExpiryTimeoutInMillis(1300).shardTransactionCommitTimeoutInSeconds(1);
1928 new ShardTestKit(getSystem()) {{
1929 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1930 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1931 "testTransactionCommitWithPriorExpiredCohortEntries");
1933 waitUntilLeader(shard);
1935 final FiniteDuration duration = duration("5 seconds");
1937 ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1939 String transactionID1 = "tx1";
1940 MutableCompositeModification modification1 = new MutableCompositeModification();
1941 ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
1942 TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
1944 shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1945 cohort1, modification1, true, false), getRef());
1946 expectMsgClass(duration, ReadyTransactionReply.class);
1948 String transactionID2 = "tx2";
1949 MutableCompositeModification modification2 = new MutableCompositeModification();
1950 ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
1951 TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification2);
1953 shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1954 cohort2, modification2, true, false), getRef());
1955 expectMsgClass(duration, ReadyTransactionReply.class);
1957 String transactionID3 = "tx3";
1958 MutableCompositeModification modification3 = new MutableCompositeModification();
1959 ShardDataTreeCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
1960 TestModel.TEST2_PATH, ImmutableNodes.containerNode(TestModel.TEST2_QNAME), modification3);
1962 shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
1963 cohort3, modification3, true, false), getRef());
1964 expectMsgClass(duration, ReadyTransactionReply.class);
1966 // All Tx's are readied. We'll send canCommit for the last one but not the others. The others
1967 // should expire from the queue and the last one should be processed.
1969 shard.tell(new CanCommitTransaction(transactionID3).toSerializable(), getRef());
1970 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
1972 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1977 public void testTransactionCommitWithSubsequentExpiredCohortEntry() throws Throwable {
1978 dataStoreContextBuilder.shardCommitQueueExpiryTimeoutInMillis(1300).shardTransactionCommitTimeoutInSeconds(1);
1980 new ShardTestKit(getSystem()) {{
1981 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1982 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1983 "testTransactionCommitWithSubsequentExpiredCohortEntry");
1985 waitUntilLeader(shard);
1987 final FiniteDuration duration = duration("5 seconds");
1989 ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1991 String transactionID1 = "tx1";
1992 MutableCompositeModification modification1 = new MutableCompositeModification();
1993 ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
1994 TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
1996 shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1997 cohort1, modification1, true, false), getRef());
1998 expectMsgClass(duration, ReadyTransactionReply.class);
2000 // CanCommit the first one so it's the current in-progress CohortEntry.
2002 shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
2003 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
2005 // Ready the second Tx.
2007 String transactionID2 = "tx2";
2008 MutableCompositeModification modification2 = new MutableCompositeModification();
2009 ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
2010 TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification2);
2012 shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
2013 cohort2, modification2, true, false), getRef());
2014 expectMsgClass(duration, ReadyTransactionReply.class);
2016 // Ready the third Tx.
2018 String transactionID3 = "tx3";
2019 DataTreeModification modification3 = dataStore.getDataTree().takeSnapshot().newModification();
2020 new WriteModification(TestModel.TEST2_PATH, ImmutableNodes.containerNode(TestModel.TEST2_QNAME))
2021 .apply(modification3);
2022 ReadyLocalTransaction readyMessage = new ReadyLocalTransaction(transactionID3, modification3, true);
2024 shard.tell(readyMessage, getRef());
2026 // Commit the first Tx. After completing, the second should expire from the queue and the third
2029 shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
2030 expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
2032 // Expect commit reply from the third Tx.
2034 expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
2036 NormalizedNode<?, ?> node = readStore(shard, TestModel.TEST2_PATH);
2037 assertNotNull(TestModel.TEST2_PATH + " not found", node);
2039 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2044 public void testCanCommitBeforeReadyFailure() throws Throwable {
2045 new ShardTestKit(getSystem()) {{
2046 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
2047 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
2048 "testCanCommitBeforeReadyFailure");
2050 shard.tell(new CanCommitTransaction("tx").toSerializable(), getRef());
2051 expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
2053 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2058 public void testAbortTransaction() throws Throwable {
2059 new ShardTestKit(getSystem()) {{
2060 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
2061 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
2062 "testAbortTransaction");
2064 waitUntilLeader(shard);
2066 // Setup 2 simulated transactions with mock cohorts. The first one will be aborted.
2068 String transactionID1 = "tx1";
2069 MutableCompositeModification modification1 = new MutableCompositeModification();
2070 ShardDataTreeCohort cohort1 = mock(ShardDataTreeCohort.class, "cohort1");
2071 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
2072 doReturn(Futures.immediateFuture(null)).when(cohort1).abort();
2074 String transactionID2 = "tx2";
2075 MutableCompositeModification modification2 = new MutableCompositeModification();
2076 ShardDataTreeCohort cohort2 = mock(ShardDataTreeCohort.class, "cohort2");
2077 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
2079 FiniteDuration duration = duration("5 seconds");
2080 final Timeout timeout = new Timeout(duration);
2082 // Simulate the ForwardedReadyTransaction messages that would be sent
2083 // by the ShardTransaction.
2085 shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
2086 cohort1, modification1, true, false), getRef());
2087 expectMsgClass(duration, ReadyTransactionReply.class);
2089 shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
2090 cohort2, modification2, true, false), getRef());
2091 expectMsgClass(duration, ReadyTransactionReply.class);
2093 // Send the CanCommitTransaction message for the first Tx.
2095 shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
2096 CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
2097 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
2098 assertEquals("Can commit", true, canCommitReply.getCanCommit());
2100 // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
2101 // processed after the first Tx completes.
2103 Future<Object> canCommitFuture = Patterns.ask(shard,
2104 new CanCommitTransaction(transactionID2).toSerializable(), timeout);
2106 // Send the AbortTransaction message for the first Tx. This should trigger the 2nd
2109 shard.tell(new AbortTransaction(transactionID1).toSerializable(), getRef());
2110 expectMsgClass(duration, AbortTransactionReply.SERIALIZABLE_CLASS);
2112 // Wait for the 2nd Tx to complete the canCommit phase.
2114 Await.ready(canCommitFuture, duration);
2116 InOrder inOrder = inOrder(cohort1, cohort2);
2117 inOrder.verify(cohort1).canCommit();
2118 inOrder.verify(cohort2).canCommit();
2120 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2125 public void testCreateSnapshot() throws Exception {
2126 testCreateSnapshot(true, "testCreateSnapshot");
2130 public void testCreateSnapshotWithNonPersistentData() throws Exception {
2131 testCreateSnapshot(false, "testCreateSnapshotWithNonPersistentData");
2134 @SuppressWarnings("serial")
2135 public void testCreateSnapshot(final boolean persistent, final String shardActorName) throws Exception{
2137 final AtomicReference<CountDownLatch> latch = new AtomicReference<>(new CountDownLatch(1));
2139 final AtomicReference<Object> savedSnapshot = new AtomicReference<>();
2140 class TestPersistentDataProvider extends DelegatingPersistentDataProvider {
2141 TestPersistentDataProvider(DataPersistenceProvider delegate) {
2146 public void saveSnapshot(Object o) {
2147 savedSnapshot.set(o);
2148 super.saveSnapshot(o);
2152 dataStoreContextBuilder.persistent(persistent);
2154 new ShardTestKit(getSystem()) {{
2155 class TestShard extends Shard {
2157 protected TestShard(ShardIdentifier name, Map<String, String> peerAddresses,
2158 DatastoreContext datastoreContext, SchemaContext schemaContext) {
2159 super(name, peerAddresses, datastoreContext, schemaContext);
2160 setPersistence(new TestPersistentDataProvider(super.persistence()));
2164 public void handleCommand(Object message) {
2165 super.handleCommand(message);
2167 if (message instanceof SaveSnapshotSuccess || message.equals("commit_snapshot")) {
2168 latch.get().countDown();
2173 public RaftActorContext getRaftActorContext() {
2174 return super.getRaftActorContext();
2178 Creator<Shard> creator = new Creator<Shard>() {
2180 public Shard create() throws Exception {
2181 return new TestShard(shardID, Collections.<String,String>emptyMap(),
2182 newDatastoreContext(), SCHEMA_CONTEXT);
2186 TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
2187 Props.create(new DelegatingShardCreator(creator)), shardActorName);
2189 waitUntilLeader(shard);
2191 writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
2193 NormalizedNode<?,?> expectedRoot = readStore(shard, YangInstanceIdentifier.builder().build());
2195 // Trigger creation of a snapshot by ensuring
2196 RaftActorContext raftActorContext = ((TestShard) shard.underlyingActor()).getRaftActorContext();
2197 raftActorContext.getSnapshotManager().capture(mock(ReplicatedLogEntry.class), -1);
2199 assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS));
2201 assertTrue("Invalid saved snapshot " + savedSnapshot.get(),
2202 savedSnapshot.get() instanceof Snapshot);
2204 verifySnapshot((Snapshot)savedSnapshot.get(), expectedRoot);
2206 latch.set(new CountDownLatch(1));
2207 savedSnapshot.set(null);
2209 raftActorContext.getSnapshotManager().capture(mock(ReplicatedLogEntry.class), -1);
2211 assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS));
2213 assertTrue("Invalid saved snapshot " + savedSnapshot.get(),
2214 savedSnapshot.get() instanceof Snapshot);
2216 verifySnapshot((Snapshot)savedSnapshot.get(), expectedRoot);
2218 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2221 private void verifySnapshot(Snapshot snapshot, NormalizedNode<?,?> expectedRoot) {
2223 NormalizedNode<?, ?> actual = SerializationUtils.deserializeNormalizedNode(snapshot.getState());
2224 assertEquals("Root node", expectedRoot, actual);
2230 * This test simply verifies that the applySnapShot logic will work
2231 * @throws ReadFailedException
2232 * @throws DataValidationFailedException
2235 public void testInMemoryDataTreeRestore() throws ReadFailedException, DataValidationFailedException {
2236 DataTree store = InMemoryDataTreeFactory.getInstance().create();
2237 store.setSchemaContext(SCHEMA_CONTEXT);
2239 DataTreeModification putTransaction = store.takeSnapshot().newModification();
2240 putTransaction.write(TestModel.TEST_PATH,
2241 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
2242 commitTransaction(store, putTransaction);
2245 NormalizedNode<?, ?> expected = readStore(store, YangInstanceIdentifier.builder().build());
2247 DataTreeModification writeTransaction = store.takeSnapshot().newModification();
2249 writeTransaction.delete(YangInstanceIdentifier.builder().build());
2250 writeTransaction.write(YangInstanceIdentifier.builder().build(), expected);
2252 commitTransaction(store, writeTransaction);
2254 NormalizedNode<?, ?> actual = readStore(store, YangInstanceIdentifier.builder().build());
2256 assertEquals(expected, actual);
2260 public void testRecoveryApplicable(){
2262 final DatastoreContext persistentContext = DatastoreContext.newBuilder().
2263 shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(true).build();
2265 final Props persistentProps = Shard.props(shardID, Collections.<String, String>emptyMap(),
2266 persistentContext, SCHEMA_CONTEXT);
2268 final DatastoreContext nonPersistentContext = DatastoreContext.newBuilder().
2269 shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(false).build();
2271 final Props nonPersistentProps = Shard.props(shardID, Collections.<String, String>emptyMap(),
2272 nonPersistentContext, SCHEMA_CONTEXT);
2274 new ShardTestKit(getSystem()) {{
2275 TestActorRef<Shard> shard1 = TestActorRef.create(getSystem(),
2276 persistentProps, "testPersistence1");
2278 assertTrue("Recovery Applicable", shard1.underlyingActor().persistence().isRecoveryApplicable());
2280 shard1.tell(PoisonPill.getInstance(), ActorRef.noSender());
2282 TestActorRef<Shard> shard2 = TestActorRef.create(getSystem(),
2283 nonPersistentProps, "testPersistence2");
2285 assertFalse("Recovery Not Applicable", shard2.underlyingActor().persistence().isRecoveryApplicable());
2287 shard2.tell(PoisonPill.getInstance(), ActorRef.noSender());
2294 public void testOnDatastoreContext() {
2295 new ShardTestKit(getSystem()) {{
2296 dataStoreContextBuilder.persistent(true);
2298 TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(), "testOnDatastoreContext");
2300 assertEquals("isRecoveryApplicable", true,
2301 shard.underlyingActor().persistence().isRecoveryApplicable());
2303 waitUntilLeader(shard);
2305 shard.tell(dataStoreContextBuilder.persistent(false).build(), ActorRef.noSender());
2307 assertEquals("isRecoveryApplicable", false,
2308 shard.underlyingActor().persistence().isRecoveryApplicable());
2310 shard.tell(dataStoreContextBuilder.persistent(true).build(), ActorRef.noSender());
2312 assertEquals("isRecoveryApplicable", true,
2313 shard.underlyingActor().persistence().isRecoveryApplicable());
2315 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2320 public void testRegisterRoleChangeListener() throws Exception {
2321 new ShardTestKit(getSystem()) {
2323 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
2324 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
2325 "testRegisterRoleChangeListener");
2327 waitUntilLeader(shard);
2329 TestActorRef<MessageCollectorActor> listener =
2330 TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
2332 shard.tell(new RegisterRoleChangeListener(), listener);
2334 MessageCollectorActor.expectFirstMatching(listener, RegisterRoleChangeListenerReply.class);
2336 ShardLeaderStateChanged leaderStateChanged = MessageCollectorActor.expectFirstMatching(listener,
2337 ShardLeaderStateChanged.class);
2338 assertEquals("getLocalShardDataTree present", true,
2339 leaderStateChanged.getLocalShardDataTree().isPresent());
2340 assertSame("getLocalShardDataTree", shard.underlyingActor().getDataStore().getDataTree(),
2341 leaderStateChanged.getLocalShardDataTree().get());
2343 MessageCollectorActor.clearMessages(listener);
2345 // Force a leader change
2347 shard.tell(new RequestVote(10000, "member2", 50, 50), getRef());
2349 leaderStateChanged = MessageCollectorActor.expectFirstMatching(listener,
2350 ShardLeaderStateChanged.class);
2351 assertEquals("getLocalShardDataTree present", false,
2352 leaderStateChanged.getLocalShardDataTree().isPresent());
2354 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2360 public void testFollowerInitialSyncStatus() throws Exception {
2361 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
2362 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
2363 "testFollowerInitialSyncStatus");
2365 shard.underlyingActor().onReceiveCommand(new FollowerInitialSyncUpStatus(false, "member-1-shard-inventory-operational"));
2367 assertEquals(false, shard.underlyingActor().getShardMBean().getFollowerInitialSyncStatus());
2369 shard.underlyingActor().onReceiveCommand(new FollowerInitialSyncUpStatus(true, "member-1-shard-inventory-operational"));
2371 assertEquals(true, shard.underlyingActor().getShardMBean().getFollowerInitialSyncStatus());
2373 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2376 private static void commitTransaction(DataTree store, final DataTreeModification modification) throws DataValidationFailedException {
2377 modification.ready();
2378 store.validate(modification);
2379 store.commit(store.prepare(modification));