1 package org.opendaylight.controller.cluster.datastore;
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertFalse;
5 import static org.junit.Assert.assertNotNull;
6 import static org.junit.Assert.assertNull;
7 import static org.junit.Assert.assertSame;
8 import static org.junit.Assert.assertTrue;
9 import static org.mockito.Mockito.doReturn;
10 import static org.mockito.Mockito.inOrder;
11 import static org.mockito.Mockito.mock;
12 import static org.mockito.Mockito.reset;
13 import static org.opendaylight.controller.cluster.datastore.DataStoreVersions.CURRENT_VERSION;
14 import akka.actor.ActorRef;
15 import akka.actor.ActorSelection;
16 import akka.actor.PoisonPill;
17 import akka.actor.Props;
18 import akka.dispatch.Dispatchers;
19 import akka.dispatch.OnComplete;
20 import akka.japi.Creator;
21 import akka.pattern.Patterns;
22 import akka.persistence.SaveSnapshotSuccess;
23 import akka.testkit.TestActorRef;
24 import akka.util.Timeout;
25 import com.google.common.base.Function;
26 import com.google.common.base.Optional;
27 import com.google.common.util.concurrent.Futures;
28 import com.google.common.util.concurrent.ListenableFuture;
29 import com.google.common.util.concurrent.Uninterruptibles;
30 import java.io.IOException;
31 import java.util.Collections;
32 import java.util.HashSet;
35 import java.util.concurrent.CountDownLatch;
36 import java.util.concurrent.TimeUnit;
37 import java.util.concurrent.atomic.AtomicBoolean;
38 import java.util.concurrent.atomic.AtomicReference;
39 import org.junit.Test;
40 import org.mockito.InOrder;
41 import org.opendaylight.controller.cluster.DataPersistenceProvider;
42 import org.opendaylight.controller.cluster.DelegatingPersistentDataProvider;
43 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
44 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
45 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
46 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
47 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
48 import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
49 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
50 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
51 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
52 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
53 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
54 import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
55 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
56 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
57 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
58 import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction;
59 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
60 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
61 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
62 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener;
63 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListenerReply;
64 import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
65 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
66 import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
67 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
68 import org.opendaylight.controller.cluster.datastore.modification.Modification;
69 import org.opendaylight.controller.cluster.datastore.modification.ModificationPayload;
70 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
71 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
72 import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener;
73 import org.opendaylight.controller.cluster.datastore.utils.MockDataTreeChangeListener;
74 import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
75 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
76 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListenerReply;
77 import org.opendaylight.controller.cluster.raft.RaftActorContext;
78 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
79 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
80 import org.opendaylight.controller.cluster.raft.Snapshot;
81 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
82 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
83 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
84 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
85 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
86 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
87 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
88 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
89 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
90 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
91 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
92 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
93 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
94 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
95 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
96 import org.opendaylight.controller.protobuff.messages.cohort3pc.ThreePhaseCommitCohortMessages;
97 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
98 import org.opendaylight.yangtools.yang.common.QName;
99 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
100 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
101 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
102 import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
103 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
104 import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
105 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
106 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
107 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
108 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateNode;
109 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateTip;
110 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidates;
111 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
112 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
113 import org.opendaylight.yangtools.yang.data.api.schema.tree.ModificationType;
114 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
115 import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory;
116 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
117 import scala.concurrent.Await;
118 import scala.concurrent.Future;
119 import scala.concurrent.duration.FiniteDuration;
121 public class ShardTest extends AbstractShardTest {
122 private static final QName CARS_QNAME = QName.create("urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:test:cars", "2014-03-13", "cars");
125 public void testRegisterChangeListener() throws Exception {
126 new ShardTestKit(getSystem()) {{
127 TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
128 newShardProps(), "testRegisterChangeListener");
130 waitUntilLeader(shard);
132 shard.tell(new UpdateSchemaContext(SchemaContextHelper.full()), ActorRef.noSender());
134 MockDataChangeListener listener = new MockDataChangeListener(1);
135 ActorRef dclActor = getSystem().actorOf(DataChangeListener.props(listener),
136 "testRegisterChangeListener-DataChangeListener");
138 shard.tell(new RegisterChangeListener(TestModel.TEST_PATH,
139 dclActor, AsyncDataBroker.DataChangeScope.BASE), getRef());
141 RegisterChangeListenerReply reply = expectMsgClass(duration("3 seconds"),
142 RegisterChangeListenerReply.class);
143 String replyPath = reply.getListenerRegistrationPath().toString();
144 assertTrue("Incorrect reply path: " + replyPath, replyPath.matches(
145 "akka:\\/\\/test\\/user\\/testRegisterChangeListener\\/\\$.*"));
147 YangInstanceIdentifier path = TestModel.TEST_PATH;
148 writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
150 listener.waitForChangeEvents(path);
152 dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
153 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
157 @SuppressWarnings("serial")
159 public void testChangeListenerNotifiedWhenNotTheLeaderOnRegistration() throws Exception {
160 // This test tests the timing window in which a change listener is registered before the
161 // shard becomes the leader. We verify that the listener is registered and notified of the
162 // existing data when the shard becomes the leader.
163 new ShardTestKit(getSystem()) {{
164 // For this test, we want to send the RegisterChangeListener message after the shard
165 // has recovered from persistence and before it becomes the leader. So we subclass
166 // Shard to override onReceiveCommand and, when the first ElectionTimeout is received,
167 // we know that the shard has been initialized to a follower and has started the
168 // election process. The following 2 CountDownLatches are used to coordinate the
169 // ElectionTimeout with the sending of the RegisterChangeListener message.
170 final CountDownLatch onFirstElectionTimeout = new CountDownLatch(1);
171 final CountDownLatch onChangeListenerRegistered = new CountDownLatch(1);
172 Creator<Shard> creator = new Creator<Shard>() {
173 boolean firstElectionTimeout = true;
176 public Shard create() throws Exception {
177 // Use a non persistent provider because this test actually invokes persist on the journal
178 // this will cause all other messages to not be queued properly after that.
179 // The basic issue is that you cannot use TestActorRef with a persistent actor (at least when
180 // it does do a persist)
181 return new Shard(shardID, Collections.<String,String>emptyMap(),
182 dataStoreContextBuilder.persistent(false).build(), SCHEMA_CONTEXT) {
184 public void onReceiveCommand(final Object message) throws Exception {
185 if(message instanceof ElectionTimeout && firstElectionTimeout) {
186 // Got the first ElectionTimeout. We don't forward it to the
187 // base Shard yet until we've sent the RegisterChangeListener
188 // message. So we signal the onFirstElectionTimeout latch to tell
189 // the main thread to send the RegisterChangeListener message and
190 // start a thread to wait on the onChangeListenerRegistered latch,
191 // which the main thread signals after it has sent the message.
192 // After the onChangeListenerRegistered is triggered, we send the
193 // original ElectionTimeout message to proceed with the election.
194 firstElectionTimeout = false;
195 final ActorRef self = getSelf();
199 Uninterruptibles.awaitUninterruptibly(
200 onChangeListenerRegistered, 5, TimeUnit.SECONDS);
201 self.tell(message, self);
205 onFirstElectionTimeout.countDown();
207 super.onReceiveCommand(message);
214 MockDataChangeListener listener = new MockDataChangeListener(1);
215 ActorRef dclActor = getSystem().actorOf(DataChangeListener.props(listener),
216 "testRegisterChangeListenerWhenNotLeaderInitially-DataChangeListener");
218 TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
219 Props.create(new DelegatingShardCreator(creator)),
220 "testRegisterChangeListenerWhenNotLeaderInitially");
222 // Write initial data into the in-memory store.
223 YangInstanceIdentifier path = TestModel.TEST_PATH;
224 writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
226 // Wait until the shard receives the first ElectionTimeout message.
227 assertEquals("Got first ElectionTimeout", true,
228 onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
230 // Now send the RegisterChangeListener and wait for the reply.
231 shard.tell(new RegisterChangeListener(path, dclActor,
232 AsyncDataBroker.DataChangeScope.SUBTREE), getRef());
234 RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
235 RegisterChangeListenerReply.class);
236 assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
238 // Sanity check - verify the shard is not the leader yet.
239 shard.tell(new FindLeader(), getRef());
240 FindLeaderReply findLeadeReply =
241 expectMsgClass(duration("5 seconds"), FindLeaderReply.class);
242 assertNull("Expected the shard not to be the leader", findLeadeReply.getLeaderActor());
244 // Signal the onChangeListenerRegistered latch to tell the thread above to proceed
245 // with the election process.
246 onChangeListenerRegistered.countDown();
248 // Wait for the shard to become the leader and notify our listener with the existing
249 // data in the store.
250 listener.waitForChangeEvents(path);
252 dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
253 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
258 public void testRegisterDataTreeChangeListener() throws Exception {
259 new ShardTestKit(getSystem()) {{
260 TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
261 newShardProps(), "testRegisterDataTreeChangeListener");
263 waitUntilLeader(shard);
265 shard.tell(new UpdateSchemaContext(SchemaContextHelper.full()), ActorRef.noSender());
267 MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
268 ActorRef dclActor = getSystem().actorOf(DataTreeChangeListenerActor.props(listener),
269 "testRegisterDataTreeChangeListener-DataTreeChangeListener");
271 shard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor), getRef());
273 RegisterDataTreeChangeListenerReply reply = expectMsgClass(duration("3 seconds"),
274 RegisterDataTreeChangeListenerReply.class);
275 String replyPath = reply.getListenerRegistrationPath().toString();
276 assertTrue("Incorrect reply path: " + replyPath, replyPath.matches(
277 "akka:\\/\\/test\\/user\\/testRegisterDataTreeChangeListener\\/\\$.*"));
279 YangInstanceIdentifier path = TestModel.TEST_PATH;
280 writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
282 listener.waitForChangeEvents();
284 dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
285 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
289 @SuppressWarnings("serial")
291 public void testDataTreeChangeListenerNotifiedWhenNotTheLeaderOnRegistration() throws Exception {
292 new ShardTestKit(getSystem()) {{
293 final CountDownLatch onFirstElectionTimeout = new CountDownLatch(1);
294 final CountDownLatch onChangeListenerRegistered = new CountDownLatch(1);
295 Creator<Shard> creator = new Creator<Shard>() {
296 boolean firstElectionTimeout = true;
299 public Shard create() throws Exception {
300 return new Shard(shardID, Collections.<String,String>emptyMap(),
301 dataStoreContextBuilder.persistent(false).build(), SCHEMA_CONTEXT) {
303 public void onReceiveCommand(final Object message) throws Exception {
304 if(message instanceof ElectionTimeout && firstElectionTimeout) {
305 firstElectionTimeout = false;
306 final ActorRef self = getSelf();
310 Uninterruptibles.awaitUninterruptibly(
311 onChangeListenerRegistered, 5, TimeUnit.SECONDS);
312 self.tell(message, self);
316 onFirstElectionTimeout.countDown();
318 super.onReceiveCommand(message);
325 MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
326 ActorRef dclActor = getSystem().actorOf(DataTreeChangeListenerActor.props(listener),
327 "testDataTreeChangeListenerNotifiedWhenNotTheLeaderOnRegistration-DataChangeListener");
329 TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
330 Props.create(new DelegatingShardCreator(creator)),
331 "testDataTreeChangeListenerNotifiedWhenNotTheLeaderOnRegistration");
333 YangInstanceIdentifier path = TestModel.TEST_PATH;
334 writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
336 assertEquals("Got first ElectionTimeout", true,
337 onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
339 shard.tell(new RegisterDataTreeChangeListener(path, dclActor), getRef());
340 RegisterDataTreeChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
341 RegisterDataTreeChangeListenerReply.class);
342 assertNotNull("getListenerRegistratioznPath", reply.getListenerRegistrationPath());
344 shard.tell(new FindLeader(), getRef());
345 FindLeaderReply findLeadeReply =
346 expectMsgClass(duration("5 seconds"), FindLeaderReply.class);
347 assertNull("Expected the shard not to be the leader", findLeadeReply.getLeaderActor());
349 writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
351 onChangeListenerRegistered.countDown();
353 // TODO: investigate why we do not receive data chage events
354 listener.waitForChangeEvents();
356 dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
357 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
362 public void testCreateTransaction(){
363 new ShardTestKit(getSystem()) {{
364 ActorRef shard = getSystem().actorOf(newShardProps(), "testCreateTransaction");
366 waitUntilLeader(shard);
368 shard.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
370 shard.tell(new CreateTransaction("txn-1",
371 TransactionType.READ_ONLY.ordinal() ).toSerializable(), getRef());
373 CreateTransactionReply reply = expectMsgClass(duration("3 seconds"),
374 CreateTransactionReply.class);
376 String path = reply.getTransactionActorPath().toString();
377 assertTrue("Unexpected transaction path " + path,
378 path.contains("akka://test/user/testCreateTransaction/shard-txn-1"));
380 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
385 public void testCreateTransactionOnChain(){
386 new ShardTestKit(getSystem()) {{
387 final ActorRef shard = getSystem().actorOf(newShardProps(), "testCreateTransactionOnChain");
389 waitUntilLeader(shard);
391 shard.tell(new CreateTransaction("txn-1",
392 TransactionType.READ_ONLY.ordinal() , "foobar").toSerializable(),
395 CreateTransactionReply reply = expectMsgClass(duration("3 seconds"),
396 CreateTransactionReply.class);
398 String path = reply.getTransactionActorPath().toString();
399 assertTrue("Unexpected transaction path " + path,
400 path.contains("akka://test/user/testCreateTransactionOnChain/shard-txn-1"));
402 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
406 @SuppressWarnings("serial")
408 public void testPeerAddressResolved() throws Exception {
409 new ShardTestKit(getSystem()) {{
410 final CountDownLatch recoveryComplete = new CountDownLatch(1);
411 class TestShard extends Shard {
413 super(shardID, Collections.<String, String>singletonMap(shardID.toString(), null),
414 newDatastoreContext(), SCHEMA_CONTEXT);
417 Map<String, String> getPeerAddresses() {
418 return getRaftActorContext().getPeerAddresses();
422 protected void onRecoveryComplete() {
424 super.onRecoveryComplete();
426 recoveryComplete.countDown();
431 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
432 Props.create(new DelegatingShardCreator(new Creator<Shard>() {
434 public TestShard create() throws Exception {
435 return new TestShard();
437 })), "testPeerAddressResolved");
439 //waitUntilLeader(shard);
440 assertEquals("Recovery complete", true,
441 Uninterruptibles.awaitUninterruptibly(recoveryComplete, 5, TimeUnit.SECONDS));
443 String address = "akka://foobar";
444 shard.underlyingActor().onReceiveCommand(new PeerAddressResolved(shardID.toString(), address));
446 assertEquals("getPeerAddresses", address,
447 ((TestShard)shard.underlyingActor()).getPeerAddresses().get(shardID.toString()));
449 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
454 public void testApplySnapshot() throws Exception {
455 TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(),
456 "testApplySnapshot");
458 DataTree store = InMemoryDataTreeFactory.getInstance().create();
459 store.setSchemaContext(SCHEMA_CONTEXT);
461 writeToStore(store, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
463 YangInstanceIdentifier root = YangInstanceIdentifier.builder().build();
464 NormalizedNode<?,?> expected = readStore(store, root);
466 ApplySnapshot applySnapshot = new ApplySnapshot(Snapshot.create(
467 SerializationUtils.serializeNormalizedNode(expected),
468 Collections.<ReplicatedLogEntry>emptyList(), 1, 2, 3, 4));
470 shard.underlyingActor().onReceiveCommand(applySnapshot);
472 NormalizedNode<?,?> actual = readStore(shard, root);
474 assertEquals("Root node", expected, actual);
476 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
480 public void testApplyState() throws Exception {
482 TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(), "testApplyState");
484 NormalizedNode<?, ?> node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
486 ApplyState applyState = new ApplyState(null, "test", new ReplicatedLogImplEntry(1, 2,
487 newModificationPayload(new WriteModification(TestModel.TEST_PATH, node))));
489 shard.underlyingActor().onReceiveCommand(applyState);
491 NormalizedNode<?,?> actual = readStore(shard, TestModel.TEST_PATH);
492 assertEquals("Applied state", node, actual);
494 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
498 public void testApplyStateWithCandidatePayload() throws Exception {
500 TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(), "testApplyState");
502 NormalizedNode<?, ?> node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
503 DataTreeCandidate candidate = DataTreeCandidates.fromNormalizedNode(TestModel.TEST_PATH, node);
505 ApplyState applyState = new ApplyState(null, "test", new ReplicatedLogImplEntry(1, 2,
506 DataTreeCandidatePayload.create(candidate)));
508 shard.underlyingActor().onReceiveCommand(applyState);
510 NormalizedNode<?,?> actual = readStore(shard, TestModel.TEST_PATH);
511 assertEquals("Applied state", node, actual);
513 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
516 DataTree setupInMemorySnapshotStore() throws DataValidationFailedException {
517 DataTree testStore = InMemoryDataTreeFactory.getInstance().create();
518 testStore.setSchemaContext(SCHEMA_CONTEXT);
520 writeToStore(testStore, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
522 NormalizedNode<?, ?> root = readStore(testStore, YangInstanceIdentifier.builder().build());
524 InMemorySnapshotStore.addSnapshot(shardID.toString(), Snapshot.create(
525 SerializationUtils.serializeNormalizedNode(root),
526 Collections.<ReplicatedLogEntry>emptyList(), 0, 1, -1, -1));
530 private static DataTreeCandidatePayload payloadForModification(DataTree source, DataTreeModification mod) throws DataValidationFailedException {
531 source.validate(mod);
532 final DataTreeCandidate candidate = source.prepare(mod);
533 source.commit(candidate);
534 return DataTreeCandidatePayload.create(candidate);
538 public void testDataTreeCandidateRecovery() throws Exception {
539 // Set up the InMemorySnapshotStore.
540 final DataTree source = setupInMemorySnapshotStore();
542 final DataTreeModification writeMod = source.takeSnapshot().newModification();
543 writeMod.write(TestModel.OUTER_LIST_PATH, ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
545 // Set up the InMemoryJournal.
546 InMemoryJournal.addEntry(shardID.toString(), 0, new ReplicatedLogImplEntry(0, 1, payloadForModification(source, writeMod)));
548 int nListEntries = 16;
549 Set<Integer> listEntryKeys = new HashSet<>();
551 // Add some ModificationPayload entries
552 for (int i = 1; i <= nListEntries; i++) {
553 listEntryKeys.add(Integer.valueOf(i));
555 YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
556 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
558 final DataTreeModification mod = source.takeSnapshot().newModification();
559 mod.merge(path, ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i));
561 InMemoryJournal.addEntry(shardID.toString(), i, new ReplicatedLogImplEntry(i, 1,
562 payloadForModification(source, mod)));
565 InMemoryJournal.addEntry(shardID.toString(), nListEntries + 1,
566 new ApplyJournalEntries(nListEntries));
568 testRecovery(listEntryKeys);
572 public void testModicationRecovery() throws Exception {
574 // Set up the InMemorySnapshotStore.
575 setupInMemorySnapshotStore();
577 // Set up the InMemoryJournal.
579 InMemoryJournal.addEntry(shardID.toString(), 0, new ReplicatedLogImplEntry(0, 1, newModificationPayload(
580 new WriteModification(TestModel.OUTER_LIST_PATH,
581 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build()))));
583 int nListEntries = 16;
584 Set<Integer> listEntryKeys = new HashSet<>();
586 // Add some ModificationPayload entries
587 for(int i = 1; i <= nListEntries; i++) {
588 listEntryKeys.add(Integer.valueOf(i));
589 YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
590 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
591 Modification mod = new MergeModification(path,
592 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i));
593 InMemoryJournal.addEntry(shardID.toString(), i, new ReplicatedLogImplEntry(i, 1,
594 newModificationPayload(mod)));
597 InMemoryJournal.addEntry(shardID.toString(), nListEntries + 1,
598 new ApplyJournalEntries(nListEntries));
600 testRecovery(listEntryKeys);
603 private static ModificationPayload newModificationPayload(final Modification... mods) throws IOException {
604 MutableCompositeModification compMod = new MutableCompositeModification();
605 for(Modification mod: mods) {
606 compMod.addModification(mod);
609 return new ModificationPayload(compMod);
613 public void testConcurrentThreePhaseCommits() throws Throwable {
614 new ShardTestKit(getSystem()) {{
615 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
616 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
617 "testConcurrentThreePhaseCommits");
619 waitUntilLeader(shard);
621 // Setup 3 simulated transactions with mock cohorts backed by real cohorts.
623 ShardDataTree dataStore = shard.underlyingActor().getDataStore();
625 String transactionID1 = "tx1";
626 MutableCompositeModification modification1 = new MutableCompositeModification();
627 ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
628 TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
630 String transactionID2 = "tx2";
631 MutableCompositeModification modification2 = new MutableCompositeModification();
632 ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
633 TestModel.OUTER_LIST_PATH,
634 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
637 String transactionID3 = "tx3";
638 MutableCompositeModification modification3 = new MutableCompositeModification();
639 ShardDataTreeCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
640 YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
641 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
642 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1),
646 final FiniteDuration duration = FiniteDuration.create(timeoutSec, TimeUnit.SECONDS);
647 final Timeout timeout = new Timeout(duration);
649 // Simulate the ForwardedReadyTransaction message for the first Tx that would be sent
650 // by the ShardTransaction.
652 shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
653 cohort1, modification1, true, false), getRef());
654 ReadyTransactionReply readyReply = ReadyTransactionReply.fromSerializable(
655 expectMsgClass(duration, ReadyTransactionReply.class));
656 assertEquals("Cohort path", shard.path().toString(), readyReply.getCohortPath());
658 // Send the CanCommitTransaction message for the first Tx.
660 shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
661 CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
662 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
663 assertEquals("Can commit", true, canCommitReply.getCanCommit());
665 // Send the ForwardedReadyTransaction for the next 2 Tx's.
667 shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
668 cohort2, modification2, true, false), getRef());
669 expectMsgClass(duration, ReadyTransactionReply.class);
671 shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
672 cohort3, modification3, true, false), getRef());
673 expectMsgClass(duration, ReadyTransactionReply.class);
675 // Send the CanCommitTransaction message for the next 2 Tx's. These should get queued and
676 // processed after the first Tx completes.
678 Future<Object> canCommitFuture1 = Patterns.ask(shard,
679 new CanCommitTransaction(transactionID2).toSerializable(), timeout);
681 Future<Object> canCommitFuture2 = Patterns.ask(shard,
682 new CanCommitTransaction(transactionID3).toSerializable(), timeout);
684 // Send the CommitTransaction message for the first Tx. After it completes, it should
685 // trigger the 2nd Tx to proceed which should in turn then trigger the 3rd.
687 shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
688 expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
690 // Wait for the next 2 Tx's to complete.
692 final AtomicReference<Throwable> caughtEx = new AtomicReference<>();
693 final CountDownLatch commitLatch = new CountDownLatch(2);
695 class OnFutureComplete extends OnComplete<Object> {
696 private final Class<?> expRespType;
698 OnFutureComplete(final Class<?> expRespType) {
699 this.expRespType = expRespType;
703 public void onComplete(final Throwable error, final Object resp) {
705 caughtEx.set(new AssertionError(getClass().getSimpleName() + " failure", error));
708 assertEquals("Commit response type", expRespType, resp.getClass());
710 } catch (Exception e) {
716 void onSuccess(final Object resp) throws Exception {
720 class OnCommitFutureComplete extends OnFutureComplete {
721 OnCommitFutureComplete() {
722 super(CommitTransactionReply.SERIALIZABLE_CLASS);
726 public void onComplete(final Throwable error, final Object resp) {
727 super.onComplete(error, resp);
728 commitLatch.countDown();
732 class OnCanCommitFutureComplete extends OnFutureComplete {
733 private final String transactionID;
735 OnCanCommitFutureComplete(final String transactionID) {
736 super(CanCommitTransactionReply.SERIALIZABLE_CLASS);
737 this.transactionID = transactionID;
741 void onSuccess(final Object resp) throws Exception {
742 CanCommitTransactionReply canCommitReply =
743 CanCommitTransactionReply.fromSerializable(resp);
744 assertEquals("Can commit", true, canCommitReply.getCanCommit());
746 Future<Object> commitFuture = Patterns.ask(shard,
747 new CommitTransaction(transactionID).toSerializable(), timeout);
748 commitFuture.onComplete(new OnCommitFutureComplete(), getSystem().dispatcher());
752 canCommitFuture1.onComplete(new OnCanCommitFutureComplete(transactionID2),
753 getSystem().dispatcher());
755 canCommitFuture2.onComplete(new OnCanCommitFutureComplete(transactionID3),
756 getSystem().dispatcher());
758 boolean done = commitLatch.await(timeoutSec, TimeUnit.SECONDS);
760 if(caughtEx.get() != null) {
761 throw caughtEx.get();
764 assertEquals("Commits complete", true, done);
766 InOrder inOrder = inOrder(cohort1, cohort2, cohort3);
767 inOrder.verify(cohort1).canCommit();
768 inOrder.verify(cohort1).preCommit();
769 inOrder.verify(cohort1).commit();
770 inOrder.verify(cohort2).canCommit();
771 inOrder.verify(cohort2).preCommit();
772 inOrder.verify(cohort2).commit();
773 inOrder.verify(cohort3).canCommit();
774 inOrder.verify(cohort3).preCommit();
775 inOrder.verify(cohort3).commit();
777 // Verify data in the data store.
779 verifyOuterListEntry(shard, 1);
781 verifyLastApplied(shard, 2);
783 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
787 private static BatchedModifications newBatchedModifications(String transactionID, YangInstanceIdentifier path,
788 NormalizedNode<?, ?> data, boolean ready, boolean doCommitOnReady) {
789 return newBatchedModifications(transactionID, null, path, data, ready, doCommitOnReady);
792 private static BatchedModifications newBatchedModifications(String transactionID, String transactionChainID,
793 YangInstanceIdentifier path, NormalizedNode<?, ?> data, boolean ready, boolean doCommitOnReady) {
794 BatchedModifications batched = new BatchedModifications(transactionID, CURRENT_VERSION, transactionChainID);
795 batched.addModification(new WriteModification(path, data));
796 batched.setReady(ready);
797 batched.setDoCommitOnReady(doCommitOnReady);
802 public void testBatchedModificationsWithNoCommitOnReady() throws Throwable {
803 new ShardTestKit(getSystem()) {{
804 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
805 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
806 "testBatchedModificationsWithNoCommitOnReady");
808 waitUntilLeader(shard);
810 final String transactionID = "tx";
811 FiniteDuration duration = duration("5 seconds");
813 final AtomicReference<ShardDataTreeCohort> mockCohort = new AtomicReference<>();
814 ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() {
816 public ShardDataTreeCohort decorate(String txID, ShardDataTreeCohort actual) {
817 if(mockCohort.get() == null) {
818 mockCohort.set(createDelegatingMockCohort("cohort", actual));
821 return mockCohort.get();
825 shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator);
827 // Send a BatchedModifications to start a transaction.
829 shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH,
830 ImmutableNodes.containerNode(TestModel.TEST_QNAME), false, false), getRef());
831 expectMsgClass(duration, BatchedModificationsReply.class);
833 // Send a couple more BatchedModifications.
835 shard.tell(newBatchedModifications(transactionID, TestModel.OUTER_LIST_PATH,
836 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), false, false), getRef());
837 expectMsgClass(duration, BatchedModificationsReply.class);
839 shard.tell(newBatchedModifications(transactionID, YangInstanceIdentifier.builder(
840 TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
841 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true, false), getRef());
842 expectMsgClass(duration, ReadyTransactionReply.class);
844 // Send the CanCommitTransaction message.
846 shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
847 CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
848 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
849 assertEquals("Can commit", true, canCommitReply.getCanCommit());
851 // Send the CanCommitTransaction message.
853 shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
854 expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
856 InOrder inOrder = inOrder(mockCohort.get());
857 inOrder.verify(mockCohort.get()).canCommit();
858 inOrder.verify(mockCohort.get()).preCommit();
859 inOrder.verify(mockCohort.get()).commit();
861 // Verify data in the data store.
863 verifyOuterListEntry(shard, 1);
865 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
870 public void testBatchedModificationsWithCommitOnReady() throws Throwable {
871 new ShardTestKit(getSystem()) {{
872 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
873 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
874 "testBatchedModificationsWithCommitOnReady");
876 waitUntilLeader(shard);
878 final String transactionID = "tx";
879 FiniteDuration duration = duration("5 seconds");
881 final AtomicReference<ShardDataTreeCohort> mockCohort = new AtomicReference<>();
882 ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() {
884 public ShardDataTreeCohort decorate(String txID, ShardDataTreeCohort actual) {
885 if(mockCohort.get() == null) {
886 mockCohort.set(createDelegatingMockCohort("cohort", actual));
889 return mockCohort.get();
893 shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator);
895 // Send a BatchedModifications to start a transaction.
897 shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH,
898 ImmutableNodes.containerNode(TestModel.TEST_QNAME), false, false), getRef());
899 expectMsgClass(duration, BatchedModificationsReply.class);
901 // Send a couple more BatchedModifications.
903 shard.tell(newBatchedModifications(transactionID, TestModel.OUTER_LIST_PATH,
904 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), false, false), getRef());
905 expectMsgClass(duration, BatchedModificationsReply.class);
907 shard.tell(newBatchedModifications(transactionID, YangInstanceIdentifier.builder(
908 TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
909 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true, true), getRef());
911 expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
913 InOrder inOrder = inOrder(mockCohort.get());
914 inOrder.verify(mockCohort.get()).canCommit();
915 inOrder.verify(mockCohort.get()).preCommit();
916 inOrder.verify(mockCohort.get()).commit();
918 // Verify data in the data store.
920 verifyOuterListEntry(shard, 1);
922 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
926 @SuppressWarnings("unchecked")
927 private static void verifyOuterListEntry(final TestActorRef<Shard> shard, Object expIDValue) throws Exception {
928 NormalizedNode<?, ?> outerList = readStore(shard, TestModel.OUTER_LIST_PATH);
929 assertNotNull(TestModel.OUTER_LIST_QNAME.getLocalName() + " not found", outerList);
930 assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " value is not Iterable",
931 outerList.getValue() instanceof Iterable);
932 Object entry = ((Iterable<Object>)outerList.getValue()).iterator().next();
933 assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " entry is not MapEntryNode",
934 entry instanceof MapEntryNode);
935 MapEntryNode mapEntry = (MapEntryNode)entry;
936 Optional<DataContainerChild<? extends PathArgument, ?>> idLeaf =
937 mapEntry.getChild(new YangInstanceIdentifier.NodeIdentifier(TestModel.ID_QNAME));
938 assertTrue("Missing leaf " + TestModel.ID_QNAME.getLocalName(), idLeaf.isPresent());
939 assertEquals(TestModel.ID_QNAME.getLocalName() + " value", expIDValue, idLeaf.get().getValue());
943 public void testBatchedModificationsOnTransactionChain() throws Throwable {
944 new ShardTestKit(getSystem()) {{
945 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
946 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
947 "testBatchedModificationsOnTransactionChain");
949 waitUntilLeader(shard);
951 String transactionChainID = "txChain";
952 String transactionID1 = "tx1";
953 String transactionID2 = "tx2";
955 FiniteDuration duration = duration("5 seconds");
957 // Send a BatchedModifications to start a chained write transaction and ready it.
959 ContainerNode containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
960 YangInstanceIdentifier path = TestModel.TEST_PATH;
961 shard.tell(newBatchedModifications(transactionID1, transactionChainID, path,
962 containerNode, true, false), getRef());
963 expectMsgClass(duration, ReadyTransactionReply.class);
965 // Create a read Tx on the same chain.
967 shard.tell(new CreateTransaction(transactionID2, TransactionType.READ_ONLY.ordinal() ,
968 transactionChainID).toSerializable(), getRef());
970 CreateTransactionReply createReply = expectMsgClass(duration("3 seconds"), CreateTransactionReply.class);
972 getSystem().actorSelection(createReply.getTransactionActorPath()).tell(new ReadData(path), getRef());
973 ReadDataReply readReply = expectMsgClass(duration("3 seconds"), ReadDataReply.class);
974 assertEquals("Read node", containerNode, readReply.getNormalizedNode());
976 // Commit the write transaction.
978 shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
979 CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
980 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
981 assertEquals("Can commit", true, canCommitReply.getCanCommit());
983 shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
984 expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
986 // Verify data in the data store.
988 NormalizedNode<?, ?> actualNode = readStore(shard, path);
989 assertEquals("Stored node", containerNode, actualNode);
991 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
996 public void testOnBatchedModificationsWhenNotLeader() {
997 final AtomicBoolean overrideLeaderCalls = new AtomicBoolean();
998 new ShardTestKit(getSystem()) {{
999 Creator<Shard> creator = new Creator<Shard>() {
1000 private static final long serialVersionUID = 1L;
1003 public Shard create() throws Exception {
1004 return new Shard(shardID, Collections.<String,String>emptyMap(),
1005 newDatastoreContext(), SCHEMA_CONTEXT) {
1007 protected boolean isLeader() {
1008 return overrideLeaderCalls.get() ? false : super.isLeader();
1012 protected ActorSelection getLeader() {
1013 return overrideLeaderCalls.get() ? getSystem().actorSelection(getRef().path()) :
1020 TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1021 Props.create(new DelegatingShardCreator(creator)), "testOnBatchedModificationsWhenNotLeader");
1023 waitUntilLeader(shard);
1025 overrideLeaderCalls.set(true);
1027 BatchedModifications batched = new BatchedModifications("tx", DataStoreVersions.CURRENT_VERSION, "");
1029 shard.tell(batched, ActorRef.noSender());
1031 expectMsgEquals(batched);
1033 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1038 public void testForwardedReadyTransactionWithImmediateCommit() throws Exception{
1039 new ShardTestKit(getSystem()) {{
1040 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1041 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1042 "testForwardedReadyTransactionWithImmediateCommit");
1044 waitUntilLeader(shard);
1046 ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1048 String transactionID = "tx1";
1049 MutableCompositeModification modification = new MutableCompositeModification();
1050 NormalizedNode<?, ?> containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1051 ShardDataTreeCohort cohort = setupMockWriteTransaction("cohort", dataStore,
1052 TestModel.TEST_PATH, containerNode, modification);
1054 FiniteDuration duration = duration("5 seconds");
1056 // Simulate the ForwardedReadyTransaction messages that would be sent
1057 // by the ShardTransaction.
1059 shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
1060 cohort, modification, true, true), getRef());
1062 expectMsgClass(duration, ThreePhaseCommitCohortMessages.CommitTransactionReply.class);
1064 InOrder inOrder = inOrder(cohort);
1065 inOrder.verify(cohort).canCommit();
1066 inOrder.verify(cohort).preCommit();
1067 inOrder.verify(cohort).commit();
1069 NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.TEST_PATH);
1070 assertEquals(TestModel.TEST_QNAME.getLocalName(), containerNode, actualNode);
1072 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1077 public void testReadyLocalTransactionWithImmediateCommit() throws Exception{
1078 new ShardTestKit(getSystem()) {{
1079 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1080 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1081 "testReadyLocalTransactionWithImmediateCommit");
1083 waitUntilLeader(shard);
1085 ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1087 DataTreeModification modification = dataStore.getDataTree().takeSnapshot().newModification();
1089 ContainerNode writeData = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1090 new WriteModification(TestModel.TEST_PATH, writeData).apply(modification);
1091 MapNode mergeData = ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build();
1092 new MergeModification(TestModel.OUTER_LIST_PATH, mergeData).apply(modification);
1094 String txId = "tx1";
1095 ReadyLocalTransaction readyMessage = new ReadyLocalTransaction(txId, modification, true);
1097 shard.tell(readyMessage, getRef());
1099 expectMsgClass(CommitTransactionReply.SERIALIZABLE_CLASS);
1101 NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.OUTER_LIST_PATH);
1102 assertEquals(TestModel.OUTER_LIST_QNAME.getLocalName(), mergeData, actualNode);
1104 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1109 public void testReadyLocalTransactionWithThreePhaseCommit() throws Exception{
1110 new ShardTestKit(getSystem()) {{
1111 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1112 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1113 "testReadyLocalTransactionWithThreePhaseCommit");
1115 waitUntilLeader(shard);
1117 ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1119 DataTreeModification modification = dataStore.getDataTree().takeSnapshot().newModification();
1121 ContainerNode writeData = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1122 new WriteModification(TestModel.TEST_PATH, writeData).apply(modification);
1123 MapNode mergeData = ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build();
1124 new MergeModification(TestModel.OUTER_LIST_PATH, mergeData).apply(modification);
1126 String txId = "tx1";
1127 ReadyLocalTransaction readyMessage = new ReadyLocalTransaction(txId, modification, false);
1129 shard.tell(readyMessage, getRef());
1131 expectMsgClass(ReadyTransactionReply.class);
1133 // Send the CanCommitTransaction message.
1135 shard.tell(new CanCommitTransaction(txId).toSerializable(), getRef());
1136 CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1137 expectMsgClass(CanCommitTransactionReply.SERIALIZABLE_CLASS));
1138 assertEquals("Can commit", true, canCommitReply.getCanCommit());
1140 // Send the CanCommitTransaction message.
1142 shard.tell(new CommitTransaction(txId).toSerializable(), getRef());
1143 expectMsgClass(CommitTransactionReply.SERIALIZABLE_CLASS);
1145 NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.OUTER_LIST_PATH);
1146 assertEquals(TestModel.OUTER_LIST_QNAME.getLocalName(), mergeData, actualNode);
1148 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1153 public void testCommitWithPersistenceDisabled() throws Throwable {
1154 dataStoreContextBuilder.persistent(false);
1155 new ShardTestKit(getSystem()) {{
1156 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1157 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1158 "testCommitWithPersistenceDisabled");
1160 waitUntilLeader(shard);
1162 ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1164 // Setup a simulated transactions with a mock cohort.
1166 String transactionID = "tx";
1167 MutableCompositeModification modification = new MutableCompositeModification();
1168 NormalizedNode<?, ?> containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
1169 ShardDataTreeCohort cohort = setupMockWriteTransaction("cohort", dataStore,
1170 TestModel.TEST_PATH, containerNode, modification);
1172 FiniteDuration duration = duration("5 seconds");
1174 // Simulate the ForwardedReadyTransaction messages that would be sent
1175 // by the ShardTransaction.
1177 shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
1178 cohort, modification, true, false), getRef());
1179 expectMsgClass(duration, ReadyTransactionReply.class);
1181 // Send the CanCommitTransaction message.
1183 shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
1184 CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1185 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1186 assertEquals("Can commit", true, canCommitReply.getCanCommit());
1188 // Send the CanCommitTransaction message.
1190 shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
1191 expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1193 InOrder inOrder = inOrder(cohort);
1194 inOrder.verify(cohort).canCommit();
1195 inOrder.verify(cohort).preCommit();
1196 inOrder.verify(cohort).commit();
1198 NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.TEST_PATH);
1199 assertEquals(TestModel.TEST_QNAME.getLocalName(), containerNode, actualNode);
1201 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1205 private static DataTreeCandidateTip mockCandidate(final String name) {
1206 DataTreeCandidateTip mockCandidate = mock(DataTreeCandidateTip.class, name);
1207 DataTreeCandidateNode mockCandidateNode = mock(DataTreeCandidateNode.class, name + "-node");
1208 doReturn(ModificationType.WRITE).when(mockCandidateNode).getModificationType();
1209 doReturn(Optional.of(ImmutableNodes.containerNode(CARS_QNAME))).when(mockCandidateNode).getDataAfter();
1210 doReturn(YangInstanceIdentifier.builder().build()).when(mockCandidate).getRootPath();
1211 doReturn(mockCandidateNode).when(mockCandidate).getRootNode();
1212 return mockCandidate;
1215 private static DataTreeCandidateTip mockUnmodifiedCandidate(final String name) {
1216 DataTreeCandidateTip mockCandidate = mock(DataTreeCandidateTip.class, name);
1217 DataTreeCandidateNode mockCandidateNode = mock(DataTreeCandidateNode.class, name + "-node");
1218 doReturn(ModificationType.UNMODIFIED).when(mockCandidateNode).getModificationType();
1219 doReturn(YangInstanceIdentifier.builder().build()).when(mockCandidate).getRootPath();
1220 doReturn(mockCandidateNode).when(mockCandidate).getRootNode();
1221 return mockCandidate;
1225 public void testCommitWhenTransactionHasNoModifications(){
1226 // Note that persistence is enabled which would normally result in the entry getting written to the journal
1227 // but here that need not happen
1228 new ShardTestKit(getSystem()) {
1230 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1231 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1232 "testCommitWhenTransactionHasNoModifications");
1234 waitUntilLeader(shard);
1236 String transactionID = "tx1";
1237 MutableCompositeModification modification = new MutableCompositeModification();
1238 ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1239 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1240 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).preCommit();
1241 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).commit();
1242 doReturn(mockUnmodifiedCandidate("cohort1-candidate")).when(cohort).getCandidate();
1244 FiniteDuration duration = duration("5 seconds");
1246 // Simulate the ForwardedReadyTransaction messages that would be sent
1247 // by the ShardTransaction.
1249 shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
1250 cohort, modification, true, false), getRef());
1251 expectMsgClass(duration, ReadyTransactionReply.class);
1253 // Send the CanCommitTransaction message.
1255 shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
1256 CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1257 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1258 assertEquals("Can commit", true, canCommitReply.getCanCommit());
1260 shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
1261 expectMsgClass(duration, ThreePhaseCommitCohortMessages.CommitTransactionReply.class);
1263 InOrder inOrder = inOrder(cohort);
1264 inOrder.verify(cohort).canCommit();
1265 inOrder.verify(cohort).preCommit();
1266 inOrder.verify(cohort).commit();
1268 shard.tell(Shard.GET_SHARD_MBEAN_MESSAGE, getRef());
1269 ShardStats shardStats = expectMsgClass(duration, ShardStats.class);
1271 // Use MBean for verification
1272 // Committed transaction count should increase as usual
1273 assertEquals(1,shardStats.getCommittedTransactionsCount());
1275 // Commit index should not advance because this does not go into the journal
1276 assertEquals(-1, shardStats.getCommitIndex());
1278 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1285 public void testCommitWhenTransactionHasModifications(){
1286 new ShardTestKit(getSystem()) {
1288 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1289 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1290 "testCommitWhenTransactionHasModifications");
1292 waitUntilLeader(shard);
1294 String transactionID = "tx1";
1295 MutableCompositeModification modification = new MutableCompositeModification();
1296 modification.addModification(new DeleteModification(YangInstanceIdentifier.builder().build()));
1297 ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1298 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1299 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).preCommit();
1300 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).commit();
1301 doReturn(mockCandidate("cohort1-candidate")).when(cohort).getCandidate();
1303 FiniteDuration duration = duration("5 seconds");
1305 // Simulate the ForwardedReadyTransaction messages that would be sent
1306 // by the ShardTransaction.
1308 shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
1309 cohort, modification, true, false), getRef());
1310 expectMsgClass(duration, ReadyTransactionReply.class);
1312 // Send the CanCommitTransaction message.
1314 shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
1315 CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1316 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1317 assertEquals("Can commit", true, canCommitReply.getCanCommit());
1319 shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
1320 expectMsgClass(duration, ThreePhaseCommitCohortMessages.CommitTransactionReply.class);
1322 InOrder inOrder = inOrder(cohort);
1323 inOrder.verify(cohort).canCommit();
1324 inOrder.verify(cohort).preCommit();
1325 inOrder.verify(cohort).commit();
1327 shard.tell(Shard.GET_SHARD_MBEAN_MESSAGE, getRef());
1328 ShardStats shardStats = expectMsgClass(duration, ShardStats.class);
1330 // Use MBean for verification
1331 // Committed transaction count should increase as usual
1332 assertEquals(1, shardStats.getCommittedTransactionsCount());
1334 // Commit index should advance as we do not have an empty modification
1335 assertEquals(0, shardStats.getCommitIndex());
1337 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1344 public void testCommitPhaseFailure() throws Throwable {
1345 new ShardTestKit(getSystem()) {{
1346 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1347 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1348 "testCommitPhaseFailure");
1350 waitUntilLeader(shard);
1352 // Setup 2 simulated transactions with mock cohorts. The first one fails in the
1355 String transactionID1 = "tx1";
1356 MutableCompositeModification modification1 = new MutableCompositeModification();
1357 ShardDataTreeCohort cohort1 = mock(ShardDataTreeCohort.class, "cohort1");
1358 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
1359 doReturn(Futures.immediateFuture(null)).when(cohort1).preCommit();
1360 doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort1).commit();
1361 doReturn(mockCandidate("cohort1-candidate")).when(cohort1).getCandidate();
1363 String transactionID2 = "tx2";
1364 MutableCompositeModification modification2 = new MutableCompositeModification();
1365 ShardDataTreeCohort cohort2 = mock(ShardDataTreeCohort.class, "cohort2");
1366 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
1368 FiniteDuration duration = duration("5 seconds");
1369 final Timeout timeout = new Timeout(duration);
1371 // Simulate the ForwardedReadyTransaction messages that would be sent
1372 // by the ShardTransaction.
1374 shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1375 cohort1, modification1, true, false), getRef());
1376 expectMsgClass(duration, ReadyTransactionReply.class);
1378 shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1379 cohort2, modification2, true, false), getRef());
1380 expectMsgClass(duration, ReadyTransactionReply.class);
1382 // Send the CanCommitTransaction message for the first Tx.
1384 shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1385 CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1386 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1387 assertEquals("Can commit", true, canCommitReply.getCanCommit());
1389 // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
1390 // processed after the first Tx completes.
1392 Future<Object> canCommitFuture = Patterns.ask(shard,
1393 new CanCommitTransaction(transactionID2).toSerializable(), timeout);
1395 // Send the CommitTransaction message for the first Tx. This should send back an error
1396 // and trigger the 2nd Tx to proceed.
1398 shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
1399 expectMsgClass(duration, akka.actor.Status.Failure.class);
1401 // Wait for the 2nd Tx to complete the canCommit phase.
1403 final CountDownLatch latch = new CountDownLatch(1);
1404 canCommitFuture.onComplete(new OnComplete<Object>() {
1406 public void onComplete(final Throwable t, final Object resp) {
1409 }, getSystem().dispatcher());
1411 assertEquals("2nd CanCommit complete", true, latch.await(5, TimeUnit.SECONDS));
1413 InOrder inOrder = inOrder(cohort1, cohort2);
1414 inOrder.verify(cohort1).canCommit();
1415 inOrder.verify(cohort1).preCommit();
1416 inOrder.verify(cohort1).commit();
1417 inOrder.verify(cohort2).canCommit();
1419 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1424 public void testPreCommitPhaseFailure() throws Throwable {
1425 new ShardTestKit(getSystem()) {{
1426 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1427 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1428 "testPreCommitPhaseFailure");
1430 waitUntilLeader(shard);
1432 String transactionID1 = "tx1";
1433 MutableCompositeModification modification1 = new MutableCompositeModification();
1434 ShardDataTreeCohort cohort1 = mock(ShardDataTreeCohort.class, "cohort1");
1435 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
1436 doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort1).preCommit();
1438 String transactionID2 = "tx2";
1439 MutableCompositeModification modification2 = new MutableCompositeModification();
1440 ShardDataTreeCohort cohort2 = mock(ShardDataTreeCohort.class, "cohort2");
1441 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
1443 FiniteDuration duration = duration("5 seconds");
1444 final Timeout timeout = new Timeout(duration);
1446 // Simulate the ForwardedReadyTransaction messages that would be sent
1447 // by the ShardTransaction.
1449 shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1450 cohort1, modification1, true, false), getRef());
1451 expectMsgClass(duration, ReadyTransactionReply.class);
1453 shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1454 cohort2, modification2, true, false), getRef());
1455 expectMsgClass(duration, ReadyTransactionReply.class);
1457 // Send the CanCommitTransaction message for the first Tx.
1459 shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1460 CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1461 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1462 assertEquals("Can commit", true, canCommitReply.getCanCommit());
1464 // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
1465 // processed after the first Tx completes.
1467 Future<Object> canCommitFuture = Patterns.ask(shard,
1468 new CanCommitTransaction(transactionID2).toSerializable(), timeout);
1470 // Send the CommitTransaction message for the first Tx. This should send back an error
1471 // and trigger the 2nd Tx to proceed.
1473 shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
1474 expectMsgClass(duration, akka.actor.Status.Failure.class);
1476 // Wait for the 2nd Tx to complete the canCommit phase.
1478 final CountDownLatch latch = new CountDownLatch(1);
1479 canCommitFuture.onComplete(new OnComplete<Object>() {
1481 public void onComplete(final Throwable t, final Object resp) {
1484 }, getSystem().dispatcher());
1486 assertEquals("2nd CanCommit complete", true, latch.await(5, TimeUnit.SECONDS));
1488 InOrder inOrder = inOrder(cohort1, cohort2);
1489 inOrder.verify(cohort1).canCommit();
1490 inOrder.verify(cohort1).preCommit();
1491 inOrder.verify(cohort2).canCommit();
1493 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1498 public void testCanCommitPhaseFailure() throws Throwable {
1499 new ShardTestKit(getSystem()) {{
1500 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1501 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1502 "testCanCommitPhaseFailure");
1504 waitUntilLeader(shard);
1506 final FiniteDuration duration = duration("5 seconds");
1508 String transactionID1 = "tx1";
1509 MutableCompositeModification modification = new MutableCompositeModification();
1510 ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1511 doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).canCommit();
1513 // Simulate the ForwardedReadyTransaction messages that would be sent
1514 // by the ShardTransaction.
1516 shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1517 cohort, modification, true, false), getRef());
1518 expectMsgClass(duration, ReadyTransactionReply.class);
1520 // Send the CanCommitTransaction message.
1522 shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1523 expectMsgClass(duration, akka.actor.Status.Failure.class);
1525 // Send another can commit to ensure the failed one got cleaned up.
1529 String transactionID2 = "tx2";
1530 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1532 shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1533 cohort, modification, true, false), getRef());
1534 expectMsgClass(duration, ReadyTransactionReply.class);
1536 shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
1537 CanCommitTransactionReply reply = CanCommitTransactionReply.fromSerializable(
1538 expectMsgClass(CanCommitTransactionReply.SERIALIZABLE_CLASS));
1539 assertEquals("getCanCommit", true, reply.getCanCommit());
1541 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1546 public void testCanCommitPhaseFalseResponse() throws Throwable {
1547 new ShardTestKit(getSystem()) {{
1548 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1549 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1550 "testCanCommitPhaseFalseResponse");
1552 waitUntilLeader(shard);
1554 final FiniteDuration duration = duration("5 seconds");
1556 String transactionID1 = "tx1";
1557 MutableCompositeModification modification = new MutableCompositeModification();
1558 ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1559 doReturn(Futures.immediateFuture(Boolean.FALSE)).when(cohort).canCommit();
1561 // Simulate the ForwardedReadyTransaction messages that would be sent
1562 // by the ShardTransaction.
1564 shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1565 cohort, modification, true, false), getRef());
1566 expectMsgClass(duration, ReadyTransactionReply.class);
1568 // Send the CanCommitTransaction message.
1570 shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1571 CanCommitTransactionReply reply = CanCommitTransactionReply.fromSerializable(
1572 expectMsgClass(CanCommitTransactionReply.SERIALIZABLE_CLASS));
1573 assertEquals("getCanCommit", false, reply.getCanCommit());
1575 // Send another can commit to ensure the failed one got cleaned up.
1579 String transactionID2 = "tx2";
1580 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1582 shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1583 cohort, modification, true, false), getRef());
1584 expectMsgClass(duration, ReadyTransactionReply.class);
1586 shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
1587 reply = CanCommitTransactionReply.fromSerializable(
1588 expectMsgClass(CanCommitTransactionReply.SERIALIZABLE_CLASS));
1589 assertEquals("getCanCommit", true, reply.getCanCommit());
1591 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1596 public void testImmediateCommitWithCanCommitPhaseFailure() throws Throwable {
1597 new ShardTestKit(getSystem()) {{
1598 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1599 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1600 "testImmediateCommitWithCanCommitPhaseFailure");
1602 waitUntilLeader(shard);
1604 final FiniteDuration duration = duration("5 seconds");
1606 String transactionID1 = "tx1";
1607 MutableCompositeModification modification = new MutableCompositeModification();
1608 ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1609 doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).canCommit();
1611 // Simulate the ForwardedReadyTransaction messages that would be sent
1612 // by the ShardTransaction.
1614 shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1615 cohort, modification, true, true), getRef());
1617 expectMsgClass(duration, akka.actor.Status.Failure.class);
1619 // Send another can commit to ensure the failed one got cleaned up.
1623 String transactionID2 = "tx2";
1624 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1625 doReturn(Futures.immediateFuture(null)).when(cohort).preCommit();
1626 doReturn(Futures.immediateFuture(null)).when(cohort).commit();
1627 DataTreeCandidateTip candidate = mock(DataTreeCandidateTip.class);
1628 DataTreeCandidateNode candidateRoot = mock(DataTreeCandidateNode.class);
1629 doReturn(ModificationType.UNMODIFIED).when(candidateRoot).getModificationType();
1630 doReturn(candidateRoot).when(candidate).getRootNode();
1631 doReturn(candidate).when(cohort).getCandidate();
1633 shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1634 cohort, modification, true, true), getRef());
1636 expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1638 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1643 public void testImmediateCommitWithCanCommitPhaseFalseResponse() throws Throwable {
1644 new ShardTestKit(getSystem()) {{
1645 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1646 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1647 "testImmediateCommitWithCanCommitPhaseFalseResponse");
1649 waitUntilLeader(shard);
1651 final FiniteDuration duration = duration("5 seconds");
1653 String transactionID = "tx1";
1654 MutableCompositeModification modification = new MutableCompositeModification();
1655 ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
1656 doReturn(Futures.immediateFuture(Boolean.FALSE)).when(cohort).canCommit();
1658 // Simulate the ForwardedReadyTransaction messages that would be sent
1659 // by the ShardTransaction.
1661 shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
1662 cohort, modification, true, true), getRef());
1664 expectMsgClass(duration, akka.actor.Status.Failure.class);
1666 // Send another can commit to ensure the failed one got cleaned up.
1670 String transactionID2 = "tx2";
1671 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1672 doReturn(Futures.immediateFuture(null)).when(cohort).preCommit();
1673 doReturn(Futures.immediateFuture(null)).when(cohort).commit();
1674 DataTreeCandidateTip candidate = mock(DataTreeCandidateTip.class);
1675 DataTreeCandidateNode candidateRoot = mock(DataTreeCandidateNode.class);
1676 doReturn(ModificationType.UNMODIFIED).when(candidateRoot).getModificationType();
1677 doReturn(candidateRoot).when(candidate).getRootNode();
1678 doReturn(candidate).when(cohort).getCandidate();
1680 shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1681 cohort, modification, true, true), getRef());
1683 expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1685 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1690 public void testAbortBeforeFinishCommit() throws Throwable {
1691 new ShardTestKit(getSystem()) {{
1692 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1693 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1694 "testAbortBeforeFinishCommit");
1696 waitUntilLeader(shard);
1698 final FiniteDuration duration = duration("5 seconds");
1699 ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1701 final String transactionID = "tx1";
1702 Function<ShardDataTreeCohort, ListenableFuture<Void>> preCommit =
1703 new Function<ShardDataTreeCohort, ListenableFuture<Void>>() {
1705 public ListenableFuture<Void> apply(final ShardDataTreeCohort cohort) {
1706 ListenableFuture<Void> preCommitFuture = cohort.preCommit();
1708 // Simulate an AbortTransaction message occurring during replication, after
1709 // persisting and before finishing the commit to the in-memory store.
1710 // We have no followers so due to optimizations in the RaftActor, it does not
1711 // attempt replication and thus we can't send an AbortTransaction message b/c
1712 // it would be processed too late after CommitTransaction completes. So we'll
1713 // simulate an AbortTransaction message occurring during replication by calling
1714 // the shard directly.
1716 shard.underlyingActor().doAbortTransaction(transactionID, null);
1718 return preCommitFuture;
1722 MutableCompositeModification modification = new MutableCompositeModification();
1723 ShardDataTreeCohort cohort = setupMockWriteTransaction("cohort1", dataStore,
1724 TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME),
1725 modification, preCommit);
1727 shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
1728 cohort, modification, true, false), getRef());
1729 expectMsgClass(duration, ReadyTransactionReply.class);
1731 shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
1732 CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1733 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1734 assertEquals("Can commit", true, canCommitReply.getCanCommit());
1736 shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
1737 expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1739 NormalizedNode<?, ?> node = readStore(shard, TestModel.TEST_PATH);
1741 // Since we're simulating an abort occurring during replication and before finish commit,
1742 // the data should still get written to the in-memory store since we've gotten past
1743 // canCommit and preCommit and persisted the data.
1744 assertNotNull(TestModel.TEST_QNAME.getLocalName() + " not found", node);
1746 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1751 public void testTransactionCommitTimeout() throws Throwable {
1752 dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1);
1754 new ShardTestKit(getSystem()) {{
1755 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1756 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1757 "testTransactionCommitTimeout");
1759 waitUntilLeader(shard);
1761 final FiniteDuration duration = duration("5 seconds");
1763 ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1765 writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1766 writeToStore(shard, TestModel.OUTER_LIST_PATH,
1767 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
1769 // Create 1st Tx - will timeout
1771 String transactionID1 = "tx1";
1772 MutableCompositeModification modification1 = new MutableCompositeModification();
1773 ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
1774 YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
1775 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
1776 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1),
1781 String transactionID2 = "tx3";
1782 MutableCompositeModification modification2 = new MutableCompositeModification();
1783 YangInstanceIdentifier listNodePath = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
1784 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build();
1785 ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort3", dataStore,
1787 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2),
1792 shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1793 cohort1, modification1, true, false), getRef());
1794 expectMsgClass(duration, ReadyTransactionReply.class);
1796 shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1797 cohort2, modification2, true, false), getRef());
1798 expectMsgClass(duration, ReadyTransactionReply.class);
1800 // canCommit 1st Tx. We don't send the commit so it should timeout.
1802 shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1803 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
1805 // canCommit the 2nd Tx - it should complete after the 1st Tx times out.
1807 shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
1808 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
1810 // Try to commit the 1st Tx - should fail as it's not the current Tx.
1812 shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
1813 expectMsgClass(duration, akka.actor.Status.Failure.class);
1815 // Commit the 2nd Tx.
1817 shard.tell(new CommitTransaction(transactionID2).toSerializable(), getRef());
1818 expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1820 NormalizedNode<?, ?> node = readStore(shard, listNodePath);
1821 assertNotNull(listNodePath + " not found", node);
1823 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1828 public void testTransactionCommitQueueCapacityExceeded() throws Throwable {
1829 dataStoreContextBuilder.shardTransactionCommitQueueCapacity(2);
1831 new ShardTestKit(getSystem()) {{
1832 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1833 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1834 "testTransactionCommitQueueCapacityExceeded");
1836 waitUntilLeader(shard);
1838 final FiniteDuration duration = duration("5 seconds");
1840 ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1842 String transactionID1 = "tx1";
1843 MutableCompositeModification modification1 = new MutableCompositeModification();
1844 ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
1845 TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
1847 String transactionID2 = "tx2";
1848 MutableCompositeModification modification2 = new MutableCompositeModification();
1849 ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
1850 TestModel.OUTER_LIST_PATH,
1851 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
1854 String transactionID3 = "tx3";
1855 MutableCompositeModification modification3 = new MutableCompositeModification();
1856 ShardDataTreeCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
1857 TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification3);
1861 shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1862 cohort1, modification1, true, false), getRef());
1863 expectMsgClass(duration, ReadyTransactionReply.class);
1865 shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1866 cohort2, modification2, true, false), getRef());
1867 expectMsgClass(duration, ReadyTransactionReply.class);
1869 // The 3rd Tx should exceed queue capacity and fail.
1871 shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
1872 cohort3, modification3, true, false), getRef());
1873 expectMsgClass(duration, akka.actor.Status.Failure.class);
1875 // canCommit 1st Tx.
1877 shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1878 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
1880 // canCommit the 2nd Tx - it should get queued.
1882 shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
1884 // canCommit the 3rd Tx - should exceed queue capacity and fail.
1886 shard.tell(new CanCommitTransaction(transactionID3).toSerializable(), getRef());
1887 expectMsgClass(duration, akka.actor.Status.Failure.class);
1889 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1894 public void testTransactionCommitWithPriorExpiredCohortEntries() throws Throwable {
1895 dataStoreContextBuilder.shardCommitQueueExpiryTimeoutInMillis(1300).shardTransactionCommitTimeoutInSeconds(1);
1897 new ShardTestKit(getSystem()) {{
1898 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1899 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1900 "testTransactionCommitWithPriorExpiredCohortEntries");
1902 waitUntilLeader(shard);
1904 final FiniteDuration duration = duration("5 seconds");
1906 ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1908 String transactionID1 = "tx1";
1909 MutableCompositeModification modification1 = new MutableCompositeModification();
1910 ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
1911 TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
1913 shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1914 cohort1, modification1, true, false), getRef());
1915 expectMsgClass(duration, ReadyTransactionReply.class);
1917 String transactionID2 = "tx2";
1918 MutableCompositeModification modification2 = new MutableCompositeModification();
1919 ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
1920 TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification2);
1922 shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1923 cohort2, modification2, true, false), getRef());
1924 expectMsgClass(duration, ReadyTransactionReply.class);
1926 String transactionID3 = "tx3";
1927 MutableCompositeModification modification3 = new MutableCompositeModification();
1928 ShardDataTreeCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
1929 TestModel.TEST2_PATH, ImmutableNodes.containerNode(TestModel.TEST2_QNAME), modification3);
1931 shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
1932 cohort3, modification3, true, false), getRef());
1933 expectMsgClass(duration, ReadyTransactionReply.class);
1935 // All Tx's are readied. We'll send canCommit for the last one but not the others. The others
1936 // should expire from the queue and the last one should be processed.
1938 shard.tell(new CanCommitTransaction(transactionID3).toSerializable(), getRef());
1939 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
1941 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1946 public void testTransactionCommitWithSubsequentExpiredCohortEntry() throws Throwable {
1947 dataStoreContextBuilder.shardCommitQueueExpiryTimeoutInMillis(1300).shardTransactionCommitTimeoutInSeconds(1);
1949 new ShardTestKit(getSystem()) {{
1950 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1951 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1952 "testTransactionCommitWithSubsequentExpiredCohortEntry");
1954 waitUntilLeader(shard);
1956 final FiniteDuration duration = duration("5 seconds");
1958 ShardDataTree dataStore = shard.underlyingActor().getDataStore();
1960 String transactionID1 = "tx1";
1961 MutableCompositeModification modification1 = new MutableCompositeModification();
1962 ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
1963 TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
1965 shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1966 cohort1, modification1, true, false), getRef());
1967 expectMsgClass(duration, ReadyTransactionReply.class);
1969 // CanCommit the first one so it's the current in-progress CohortEntry.
1971 shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1972 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
1974 // Ready the second Tx.
1976 String transactionID2 = "tx2";
1977 MutableCompositeModification modification2 = new MutableCompositeModification();
1978 ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
1979 TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification2);
1981 shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1982 cohort2, modification2, true, false), getRef());
1983 expectMsgClass(duration, ReadyTransactionReply.class);
1985 // Ready the third Tx.
1987 String transactionID3 = "tx3";
1988 DataTreeModification modification3 = dataStore.getDataTree().takeSnapshot().newModification();
1989 new WriteModification(TestModel.TEST2_PATH, ImmutableNodes.containerNode(TestModel.TEST2_QNAME))
1990 .apply(modification3);
1991 ReadyLocalTransaction readyMessage = new ReadyLocalTransaction(transactionID3, modification3, true);
1993 shard.tell(readyMessage, getRef());
1995 // Commit the first Tx. After completing, the second should expire from the queue and the third
1998 shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
1999 expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
2001 // Expect commit reply from the third Tx.
2003 expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
2005 NormalizedNode<?, ?> node = readStore(shard, TestModel.TEST2_PATH);
2006 assertNotNull(TestModel.TEST2_PATH + " not found", node);
2008 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2013 public void testCanCommitBeforeReadyFailure() throws Throwable {
2014 new ShardTestKit(getSystem()) {{
2015 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
2016 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
2017 "testCanCommitBeforeReadyFailure");
2019 shard.tell(new CanCommitTransaction("tx").toSerializable(), getRef());
2020 expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
2022 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2027 public void testAbortTransaction() throws Throwable {
2028 new ShardTestKit(getSystem()) {{
2029 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
2030 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
2031 "testAbortTransaction");
2033 waitUntilLeader(shard);
2035 // Setup 2 simulated transactions with mock cohorts. The first one will be aborted.
2037 String transactionID1 = "tx1";
2038 MutableCompositeModification modification1 = new MutableCompositeModification();
2039 ShardDataTreeCohort cohort1 = mock(ShardDataTreeCohort.class, "cohort1");
2040 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
2041 doReturn(Futures.immediateFuture(null)).when(cohort1).abort();
2043 String transactionID2 = "tx2";
2044 MutableCompositeModification modification2 = new MutableCompositeModification();
2045 ShardDataTreeCohort cohort2 = mock(ShardDataTreeCohort.class, "cohort2");
2046 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
2048 FiniteDuration duration = duration("5 seconds");
2049 final Timeout timeout = new Timeout(duration);
2051 // Simulate the ForwardedReadyTransaction messages that would be sent
2052 // by the ShardTransaction.
2054 shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
2055 cohort1, modification1, true, false), getRef());
2056 expectMsgClass(duration, ReadyTransactionReply.class);
2058 shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
2059 cohort2, modification2, true, false), getRef());
2060 expectMsgClass(duration, ReadyTransactionReply.class);
2062 // Send the CanCommitTransaction message for the first Tx.
2064 shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
2065 CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
2066 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
2067 assertEquals("Can commit", true, canCommitReply.getCanCommit());
2069 // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
2070 // processed after the first Tx completes.
2072 Future<Object> canCommitFuture = Patterns.ask(shard,
2073 new CanCommitTransaction(transactionID2).toSerializable(), timeout);
2075 // Send the AbortTransaction message for the first Tx. This should trigger the 2nd
2078 shard.tell(new AbortTransaction(transactionID1).toSerializable(), getRef());
2079 expectMsgClass(duration, AbortTransactionReply.SERIALIZABLE_CLASS);
2081 // Wait for the 2nd Tx to complete the canCommit phase.
2083 Await.ready(canCommitFuture, duration);
2085 InOrder inOrder = inOrder(cohort1, cohort2);
2086 inOrder.verify(cohort1).canCommit();
2087 inOrder.verify(cohort2).canCommit();
2089 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2094 public void testCreateSnapshot() throws Exception {
2095 testCreateSnapshot(true, "testCreateSnapshot");
2099 public void testCreateSnapshotWithNonPersistentData() throws Exception {
2100 testCreateSnapshot(false, "testCreateSnapshotWithNonPersistentData");
2103 @SuppressWarnings("serial")
2104 public void testCreateSnapshot(final boolean persistent, final String shardActorName) throws Exception{
2106 final AtomicReference<CountDownLatch> latch = new AtomicReference<>(new CountDownLatch(1));
2108 final AtomicReference<Object> savedSnapshot = new AtomicReference<>();
2109 class TestPersistentDataProvider extends DelegatingPersistentDataProvider {
2110 TestPersistentDataProvider(DataPersistenceProvider delegate) {
2115 public void saveSnapshot(Object o) {
2116 savedSnapshot.set(o);
2117 super.saveSnapshot(o);
2121 dataStoreContextBuilder.persistent(persistent);
2123 new ShardTestKit(getSystem()) {{
2124 class TestShard extends Shard {
2126 protected TestShard(ShardIdentifier name, Map<String, String> peerAddresses,
2127 DatastoreContext datastoreContext, SchemaContext schemaContext) {
2128 super(name, peerAddresses, datastoreContext, schemaContext);
2129 setPersistence(new TestPersistentDataProvider(super.persistence()));
2133 public void handleCommand(Object message) {
2134 super.handleCommand(message);
2136 if (message instanceof SaveSnapshotSuccess || message.equals("commit_snapshot")) {
2137 latch.get().countDown();
2142 public RaftActorContext getRaftActorContext() {
2143 return super.getRaftActorContext();
2147 Creator<Shard> creator = new Creator<Shard>() {
2149 public Shard create() throws Exception {
2150 return new TestShard(shardID, Collections.<String,String>emptyMap(),
2151 newDatastoreContext(), SCHEMA_CONTEXT);
2155 TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
2156 Props.create(new DelegatingShardCreator(creator)), shardActorName);
2158 waitUntilLeader(shard);
2160 writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
2162 NormalizedNode<?,?> expectedRoot = readStore(shard, YangInstanceIdentifier.builder().build());
2164 // Trigger creation of a snapshot by ensuring
2165 RaftActorContext raftActorContext = ((TestShard) shard.underlyingActor()).getRaftActorContext();
2166 raftActorContext.getSnapshotManager().capture(mock(ReplicatedLogEntry.class), -1);
2168 assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS));
2170 assertTrue("Invalid saved snapshot " + savedSnapshot.get(),
2171 savedSnapshot.get() instanceof Snapshot);
2173 verifySnapshot((Snapshot)savedSnapshot.get(), expectedRoot);
2175 latch.set(new CountDownLatch(1));
2176 savedSnapshot.set(null);
2178 raftActorContext.getSnapshotManager().capture(mock(ReplicatedLogEntry.class), -1);
2180 assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS));
2182 assertTrue("Invalid saved snapshot " + savedSnapshot.get(),
2183 savedSnapshot.get() instanceof Snapshot);
2185 verifySnapshot((Snapshot)savedSnapshot.get(), expectedRoot);
2187 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2190 private void verifySnapshot(Snapshot snapshot, NormalizedNode<?,?> expectedRoot) {
2192 NormalizedNode<?, ?> actual = SerializationUtils.deserializeNormalizedNode(snapshot.getState());
2193 assertEquals("Root node", expectedRoot, actual);
2199 * This test simply verifies that the applySnapShot logic will work
2200 * @throws ReadFailedException
2201 * @throws DataValidationFailedException
2204 public void testInMemoryDataTreeRestore() throws ReadFailedException, DataValidationFailedException {
2205 DataTree store = InMemoryDataTreeFactory.getInstance().create();
2206 store.setSchemaContext(SCHEMA_CONTEXT);
2208 DataTreeModification putTransaction = store.takeSnapshot().newModification();
2209 putTransaction.write(TestModel.TEST_PATH,
2210 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
2211 commitTransaction(store, putTransaction);
2214 NormalizedNode<?, ?> expected = readStore(store, YangInstanceIdentifier.builder().build());
2216 DataTreeModification writeTransaction = store.takeSnapshot().newModification();
2218 writeTransaction.delete(YangInstanceIdentifier.builder().build());
2219 writeTransaction.write(YangInstanceIdentifier.builder().build(), expected);
2221 commitTransaction(store, writeTransaction);
2223 NormalizedNode<?, ?> actual = readStore(store, YangInstanceIdentifier.builder().build());
2225 assertEquals(expected, actual);
2229 public void testRecoveryApplicable(){
2231 final DatastoreContext persistentContext = DatastoreContext.newBuilder().
2232 shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(true).build();
2234 final Props persistentProps = Shard.props(shardID, Collections.<String, String>emptyMap(),
2235 persistentContext, SCHEMA_CONTEXT);
2237 final DatastoreContext nonPersistentContext = DatastoreContext.newBuilder().
2238 shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(false).build();
2240 final Props nonPersistentProps = Shard.props(shardID, Collections.<String, String>emptyMap(),
2241 nonPersistentContext, SCHEMA_CONTEXT);
2243 new ShardTestKit(getSystem()) {{
2244 TestActorRef<Shard> shard1 = TestActorRef.create(getSystem(),
2245 persistentProps, "testPersistence1");
2247 assertTrue("Recovery Applicable", shard1.underlyingActor().persistence().isRecoveryApplicable());
2249 shard1.tell(PoisonPill.getInstance(), ActorRef.noSender());
2251 TestActorRef<Shard> shard2 = TestActorRef.create(getSystem(),
2252 nonPersistentProps, "testPersistence2");
2254 assertFalse("Recovery Not Applicable", shard2.underlyingActor().persistence().isRecoveryApplicable());
2256 shard2.tell(PoisonPill.getInstance(), ActorRef.noSender());
2263 public void testOnDatastoreContext() {
2264 new ShardTestKit(getSystem()) {{
2265 dataStoreContextBuilder.persistent(true);
2267 TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(), "testOnDatastoreContext");
2269 assertEquals("isRecoveryApplicable", true,
2270 shard.underlyingActor().persistence().isRecoveryApplicable());
2272 waitUntilLeader(shard);
2274 shard.tell(dataStoreContextBuilder.persistent(false).build(), ActorRef.noSender());
2276 assertEquals("isRecoveryApplicable", false,
2277 shard.underlyingActor().persistence().isRecoveryApplicable());
2279 shard.tell(dataStoreContextBuilder.persistent(true).build(), ActorRef.noSender());
2281 assertEquals("isRecoveryApplicable", true,
2282 shard.underlyingActor().persistence().isRecoveryApplicable());
2284 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2289 public void testRegisterRoleChangeListener() throws Exception {
2290 new ShardTestKit(getSystem()) {
2292 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
2293 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
2294 "testRegisterRoleChangeListener");
2296 waitUntilLeader(shard);
2298 TestActorRef<MessageCollectorActor> listener =
2299 TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
2301 shard.tell(new RegisterRoleChangeListener(), listener);
2303 MessageCollectorActor.expectFirstMatching(listener, RegisterRoleChangeListenerReply.class);
2305 ShardLeaderStateChanged leaderStateChanged = MessageCollectorActor.expectFirstMatching(listener,
2306 ShardLeaderStateChanged.class);
2307 assertEquals("getLocalShardDataTree present", true,
2308 leaderStateChanged.getLocalShardDataTree().isPresent());
2309 assertSame("getLocalShardDataTree", shard.underlyingActor().getDataStore().getDataTree(),
2310 leaderStateChanged.getLocalShardDataTree().get());
2312 MessageCollectorActor.clearMessages(listener);
2314 // Force a leader change
2316 shard.tell(new RequestVote(10000, "member2", 50, 50), getRef());
2318 leaderStateChanged = MessageCollectorActor.expectFirstMatching(listener,
2319 ShardLeaderStateChanged.class);
2320 assertEquals("getLocalShardDataTree present", false,
2321 leaderStateChanged.getLocalShardDataTree().isPresent());
2323 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2329 public void testFollowerInitialSyncStatus() throws Exception {
2330 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
2331 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
2332 "testFollowerInitialSyncStatus");
2334 shard.underlyingActor().onReceiveCommand(new FollowerInitialSyncUpStatus(false, "member-1-shard-inventory-operational"));
2336 assertEquals(false, shard.underlyingActor().getShardMBean().getFollowerInitialSyncStatus());
2338 shard.underlyingActor().onReceiveCommand(new FollowerInitialSyncUpStatus(true, "member-1-shard-inventory-operational"));
2340 assertEquals(true, shard.underlyingActor().getShardMBean().getFollowerInitialSyncStatus());
2342 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
2345 private static void commitTransaction(DataTree store, final DataTreeModification modification) throws DataValidationFailedException {
2346 modification.ready();
2347 store.validate(modification);
2348 store.commit(store.prepare(modification));