1 package org.opendaylight.controller.cluster.datastore;
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertFalse;
5 import static org.junit.Assert.assertNotNull;
6 import static org.junit.Assert.assertNull;
7 import static org.junit.Assert.assertTrue;
8 import static org.mockito.Mockito.doReturn;
9 import static org.mockito.Mockito.inOrder;
10 import static org.mockito.Mockito.mock;
11 import static org.opendaylight.controller.cluster.datastore.DataStoreVersions.CURRENT_VERSION;
12 import akka.actor.ActorRef;
13 import akka.actor.PoisonPill;
14 import akka.actor.Props;
15 import akka.dispatch.Dispatchers;
16 import akka.dispatch.OnComplete;
17 import akka.japi.Creator;
18 import akka.japi.Procedure;
19 import akka.pattern.Patterns;
20 import akka.persistence.SnapshotSelectionCriteria;
21 import akka.testkit.TestActorRef;
22 import akka.util.Timeout;
23 import com.google.common.base.Function;
24 import com.google.common.base.Optional;
25 import com.google.common.util.concurrent.Futures;
26 import com.google.common.util.concurrent.ListenableFuture;
27 import com.google.common.util.concurrent.MoreExecutors;
28 import com.google.common.util.concurrent.Uninterruptibles;
29 import java.io.IOException;
30 import java.util.Collections;
31 import java.util.HashSet;
32 import java.util.List;
35 import java.util.concurrent.CountDownLatch;
36 import java.util.concurrent.ExecutionException;
37 import java.util.concurrent.TimeUnit;
38 import java.util.concurrent.atomic.AtomicReference;
39 import org.junit.Test;
40 import org.mockito.InOrder;
41 import org.opendaylight.controller.cluster.DataPersistenceProvider;
42 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
43 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
44 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
45 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
46 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
47 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
48 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
49 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
50 import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
51 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
52 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
53 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
54 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
55 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
56 import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
57 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
58 import org.opendaylight.controller.cluster.datastore.modification.Modification;
59 import org.opendaylight.controller.cluster.datastore.modification.ModificationPayload;
60 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
61 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
62 import org.opendaylight.controller.cluster.datastore.utils.MessageCollectorActor;
63 import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener;
64 import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
65 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
66 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListenerReply;
67 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
68 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
69 import org.opendaylight.controller.cluster.raft.Snapshot;
70 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
71 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
72 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
73 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
74 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
75 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
76 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
77 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
78 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
79 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
80 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
81 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
82 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
83 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
84 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
85 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreFactory;
86 import org.opendaylight.controller.protobuff.messages.cohort3pc.ThreePhaseCommitCohortMessages;
87 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
88 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
89 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
90 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
91 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
92 import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
93 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
94 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
95 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
96 import scala.concurrent.Await;
97 import scala.concurrent.Future;
98 import scala.concurrent.duration.FiniteDuration;
100 public class ShardTest extends AbstractShardTest {
102 public void testRegisterChangeListener() throws Exception {
103 new ShardTestKit(getSystem()) {{
104 TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
105 newShardProps(), "testRegisterChangeListener");
107 waitUntilLeader(shard);
109 shard.tell(new UpdateSchemaContext(SchemaContextHelper.full()), ActorRef.noSender());
111 MockDataChangeListener listener = new MockDataChangeListener(1);
112 ActorRef dclActor = getSystem().actorOf(DataChangeListener.props(listener),
113 "testRegisterChangeListener-DataChangeListener");
115 shard.tell(new RegisterChangeListener(TestModel.TEST_PATH,
116 dclActor.path(), AsyncDataBroker.DataChangeScope.BASE), getRef());
118 RegisterChangeListenerReply reply = expectMsgClass(duration("3 seconds"),
119 RegisterChangeListenerReply.class);
120 String replyPath = reply.getListenerRegistrationPath().toString();
121 assertTrue("Incorrect reply path: " + replyPath, replyPath.matches(
122 "akka:\\/\\/test\\/user\\/testRegisterChangeListener\\/\\$.*"));
124 YangInstanceIdentifier path = TestModel.TEST_PATH;
125 writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
127 listener.waitForChangeEvents(path);
129 dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
130 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
134 @SuppressWarnings("serial")
136 public void testChangeListenerNotifiedWhenNotTheLeaderOnRegistration() throws Exception {
137 // This test tests the timing window in which a change listener is registered before the
138 // shard becomes the leader. We verify that the listener is registered and notified of the
139 // existing data when the shard becomes the leader.
140 new ShardTestKit(getSystem()) {{
141 // For this test, we want to send the RegisterChangeListener message after the shard
142 // has recovered from persistence and before it becomes the leader. So we subclass
143 // Shard to override onReceiveCommand and, when the first ElectionTimeout is received,
144 // we know that the shard has been initialized to a follower and has started the
145 // election process. The following 2 CountDownLatches are used to coordinate the
146 // ElectionTimeout with the sending of the RegisterChangeListener message.
147 final CountDownLatch onFirstElectionTimeout = new CountDownLatch(1);
148 final CountDownLatch onChangeListenerRegistered = new CountDownLatch(1);
149 Creator<Shard> creator = new Creator<Shard>() {
150 boolean firstElectionTimeout = true;
153 public Shard create() throws Exception {
154 return new Shard(shardID, Collections.<ShardIdentifier,String>emptyMap(),
155 newDatastoreContext(), SCHEMA_CONTEXT) {
157 public void onReceiveCommand(final Object message) throws Exception {
158 if(message instanceof ElectionTimeout && firstElectionTimeout) {
159 // Got the first ElectionTimeout. We don't forward it to the
160 // base Shard yet until we've sent the RegisterChangeListener
161 // message. So we signal the onFirstElectionTimeout latch to tell
162 // the main thread to send the RegisterChangeListener message and
163 // start a thread to wait on the onChangeListenerRegistered latch,
164 // which the main thread signals after it has sent the message.
165 // After the onChangeListenerRegistered is triggered, we send the
166 // original ElectionTimeout message to proceed with the election.
167 firstElectionTimeout = false;
168 final ActorRef self = getSelf();
172 Uninterruptibles.awaitUninterruptibly(
173 onChangeListenerRegistered, 5, TimeUnit.SECONDS);
174 self.tell(message, self);
178 onFirstElectionTimeout.countDown();
180 super.onReceiveCommand(message);
187 MockDataChangeListener listener = new MockDataChangeListener(1);
188 ActorRef dclActor = getSystem().actorOf(DataChangeListener.props(listener),
189 "testRegisterChangeListenerWhenNotLeaderInitially-DataChangeListener");
191 TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
192 Props.create(new DelegatingShardCreator(creator)),
193 "testRegisterChangeListenerWhenNotLeaderInitially");
195 // Write initial data into the in-memory store.
196 YangInstanceIdentifier path = TestModel.TEST_PATH;
197 writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
199 // Wait until the shard receives the first ElectionTimeout message.
200 assertEquals("Got first ElectionTimeout", true,
201 onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
203 // Now send the RegisterChangeListener and wait for the reply.
204 shard.tell(new RegisterChangeListener(path, dclActor.path(),
205 AsyncDataBroker.DataChangeScope.SUBTREE), getRef());
207 RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
208 RegisterChangeListenerReply.class);
209 assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
211 // Sanity check - verify the shard is not the leader yet.
212 shard.tell(new FindLeader(), getRef());
213 FindLeaderReply findLeadeReply =
214 expectMsgClass(duration("5 seconds"), FindLeaderReply.class);
215 assertNull("Expected the shard not to be the leader", findLeadeReply.getLeaderActor());
217 // Signal the onChangeListenerRegistered latch to tell the thread above to proceed
218 // with the election process.
219 onChangeListenerRegistered.countDown();
221 // Wait for the shard to become the leader and notify our listener with the existing
222 // data in the store.
223 listener.waitForChangeEvents(path);
225 dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
226 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
231 public void testCreateTransaction(){
232 new ShardTestKit(getSystem()) {{
233 ActorRef shard = getSystem().actorOf(newShardProps(), "testCreateTransaction");
235 waitUntilLeader(shard);
237 shard.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
239 shard.tell(new CreateTransaction("txn-1",
240 TransactionProxy.TransactionType.READ_ONLY.ordinal() ).toSerializable(), getRef());
242 CreateTransactionReply reply = expectMsgClass(duration("3 seconds"),
243 CreateTransactionReply.class);
245 String path = reply.getTransactionActorPath().toString();
246 assertTrue("Unexpected transaction path " + path,
247 path.contains("akka://test/user/testCreateTransaction/shard-txn-1"));
249 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
254 public void testCreateTransactionOnChain(){
255 new ShardTestKit(getSystem()) {{
256 final ActorRef shard = getSystem().actorOf(newShardProps(), "testCreateTransactionOnChain");
258 waitUntilLeader(shard);
260 shard.tell(new CreateTransaction("txn-1",
261 TransactionProxy.TransactionType.READ_ONLY.ordinal() , "foobar").toSerializable(),
264 CreateTransactionReply reply = expectMsgClass(duration("3 seconds"),
265 CreateTransactionReply.class);
267 String path = reply.getTransactionActorPath().toString();
268 assertTrue("Unexpected transaction path " + path,
269 path.contains("akka://test/user/testCreateTransactionOnChain/shard-txn-1"));
271 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
275 @SuppressWarnings("serial")
277 public void testPeerAddressResolved() throws Exception {
278 new ShardTestKit(getSystem()) {{
279 final CountDownLatch recoveryComplete = new CountDownLatch(1);
280 class TestShard extends Shard {
282 super(shardID, Collections.<ShardIdentifier, String>singletonMap(shardID, null),
283 newDatastoreContext(), SCHEMA_CONTEXT);
286 Map<String, String> getPeerAddresses() {
287 return getRaftActorContext().getPeerAddresses();
291 protected void onRecoveryComplete() {
293 super.onRecoveryComplete();
295 recoveryComplete.countDown();
300 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
301 Props.create(new DelegatingShardCreator(new Creator<Shard>() {
303 public TestShard create() throws Exception {
304 return new TestShard();
306 })), "testPeerAddressResolved");
308 //waitUntilLeader(shard);
309 assertEquals("Recovery complete", true,
310 Uninterruptibles.awaitUninterruptibly(recoveryComplete, 5, TimeUnit.SECONDS));
312 String address = "akka://foobar";
313 shard.underlyingActor().onReceiveCommand(new PeerAddressResolved(shardID, address));
315 assertEquals("getPeerAddresses", address,
316 ((TestShard)shard.underlyingActor()).getPeerAddresses().get(shardID.toString()));
318 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
323 public void testApplySnapshot() throws Exception {
324 TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(),
325 "testApplySnapshot");
327 InMemoryDOMDataStore store = new InMemoryDOMDataStore("OPER", MoreExecutors.sameThreadExecutor());
328 store.onGlobalContextUpdated(SCHEMA_CONTEXT);
330 writeToStore(store, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
332 YangInstanceIdentifier root = YangInstanceIdentifier.builder().build();
333 NormalizedNode<?,?> expected = readStore(store, root);
335 ApplySnapshot applySnapshot = new ApplySnapshot(Snapshot.create(
336 SerializationUtils.serializeNormalizedNode(expected),
337 Collections.<ReplicatedLogEntry>emptyList(), 1, 2, 3, 4));
339 shard.underlyingActor().onReceiveCommand(applySnapshot);
341 NormalizedNode<?,?> actual = readStore(shard, root);
343 assertEquals("Root node", expected, actual);
345 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
349 public void testApplyState() throws Exception {
351 TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(), "testApplyState");
353 NormalizedNode<?, ?> node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
355 ApplyState applyState = new ApplyState(null, "test", new ReplicatedLogImplEntry(1, 2,
356 newModificationPayload(new WriteModification(TestModel.TEST_PATH, node))));
358 shard.underlyingActor().onReceiveCommand(applyState);
360 NormalizedNode<?,?> actual = readStore(shard, TestModel.TEST_PATH);
361 assertEquals("Applied state", node, actual);
363 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
367 public void testRecovery() throws Exception {
369 // Set up the InMemorySnapshotStore.
371 InMemoryDOMDataStore testStore = InMemoryDOMDataStoreFactory.create("Test", null, null);
372 testStore.onGlobalContextUpdated(SCHEMA_CONTEXT);
374 writeToStore(testStore, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
376 NormalizedNode<?, ?> root = readStore(testStore, YangInstanceIdentifier.builder().build());
378 InMemorySnapshotStore.addSnapshot(shardID.toString(), Snapshot.create(
379 SerializationUtils.serializeNormalizedNode(root),
380 Collections.<ReplicatedLogEntry>emptyList(), 0, 1, -1, -1));
382 // Set up the InMemoryJournal.
384 InMemoryJournal.addEntry(shardID.toString(), 0, new ReplicatedLogImplEntry(0, 1, newModificationPayload(
385 new WriteModification(TestModel.OUTER_LIST_PATH,
386 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build()))));
388 int nListEntries = 16;
389 Set<Integer> listEntryKeys = new HashSet<>();
391 // Add some ModificationPayload entries
392 for(int i = 1; i <= nListEntries; i++) {
393 listEntryKeys.add(Integer.valueOf(i));
394 YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
395 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
396 Modification mod = new MergeModification(path,
397 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i));
398 InMemoryJournal.addEntry(shardID.toString(), i, new ReplicatedLogImplEntry(i, 1,
399 newModificationPayload(mod)));
402 InMemoryJournal.addEntry(shardID.toString(), nListEntries + 1,
403 new ApplyJournalEntries(nListEntries));
405 testRecovery(listEntryKeys);
408 private ModificationPayload newModificationPayload(final Modification... mods) throws IOException {
409 MutableCompositeModification compMod = new MutableCompositeModification();
410 for(Modification mod: mods) {
411 compMod.addModification(mod);
414 return new ModificationPayload(compMod);
417 @SuppressWarnings({ "unchecked" })
419 public void testConcurrentThreePhaseCommits() throws Throwable {
420 new ShardTestKit(getSystem()) {{
421 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
422 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
423 "testConcurrentThreePhaseCommits");
425 waitUntilLeader(shard);
427 // Setup 3 simulated transactions with mock cohorts backed by real cohorts.
429 InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
431 String transactionID1 = "tx1";
432 MutableCompositeModification modification1 = new MutableCompositeModification();
433 DOMStoreThreePhaseCommitCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
434 TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
436 String transactionID2 = "tx2";
437 MutableCompositeModification modification2 = new MutableCompositeModification();
438 DOMStoreThreePhaseCommitCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
439 TestModel.OUTER_LIST_PATH,
440 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
443 String transactionID3 = "tx3";
444 MutableCompositeModification modification3 = new MutableCompositeModification();
445 DOMStoreThreePhaseCommitCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
446 YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
447 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
448 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1),
452 final FiniteDuration duration = FiniteDuration.create(timeoutSec, TimeUnit.SECONDS);
453 final Timeout timeout = new Timeout(duration);
455 // Simulate the ForwardedReadyTransaction message for the first Tx that would be sent
456 // by the ShardTransaction.
458 shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
459 cohort1, modification1, true), getRef());
460 ReadyTransactionReply readyReply = ReadyTransactionReply.fromSerializable(
461 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS));
462 assertEquals("Cohort path", shard.path().toString(), readyReply.getCohortPath());
464 // Send the CanCommitTransaction message for the first Tx.
466 shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
467 CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
468 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
469 assertEquals("Can commit", true, canCommitReply.getCanCommit());
471 // Send the ForwardedReadyTransaction for the next 2 Tx's.
473 shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
474 cohort2, modification2, true), getRef());
475 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
477 shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
478 cohort3, modification3, true), getRef());
479 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
481 // Send the CanCommitTransaction message for the next 2 Tx's. These should get queued and
482 // processed after the first Tx completes.
484 Future<Object> canCommitFuture1 = Patterns.ask(shard,
485 new CanCommitTransaction(transactionID2).toSerializable(), timeout);
487 Future<Object> canCommitFuture2 = Patterns.ask(shard,
488 new CanCommitTransaction(transactionID3).toSerializable(), timeout);
490 // Send the CommitTransaction message for the first Tx. After it completes, it should
491 // trigger the 2nd Tx to proceed which should in turn then trigger the 3rd.
493 shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
494 expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
496 // Wait for the next 2 Tx's to complete.
498 final AtomicReference<Throwable> caughtEx = new AtomicReference<>();
499 final CountDownLatch commitLatch = new CountDownLatch(2);
501 class OnFutureComplete extends OnComplete<Object> {
502 private final Class<?> expRespType;
504 OnFutureComplete(final Class<?> expRespType) {
505 this.expRespType = expRespType;
509 public void onComplete(final Throwable error, final Object resp) {
511 caughtEx.set(new AssertionError(getClass().getSimpleName() + " failure", error));
514 assertEquals("Commit response type", expRespType, resp.getClass());
516 } catch (Exception e) {
522 void onSuccess(final Object resp) throws Exception {
526 class OnCommitFutureComplete extends OnFutureComplete {
527 OnCommitFutureComplete() {
528 super(CommitTransactionReply.SERIALIZABLE_CLASS);
532 public void onComplete(final Throwable error, final Object resp) {
533 super.onComplete(error, resp);
534 commitLatch.countDown();
538 class OnCanCommitFutureComplete extends OnFutureComplete {
539 private final String transactionID;
541 OnCanCommitFutureComplete(final String transactionID) {
542 super(CanCommitTransactionReply.SERIALIZABLE_CLASS);
543 this.transactionID = transactionID;
547 void onSuccess(final Object resp) throws Exception {
548 CanCommitTransactionReply canCommitReply =
549 CanCommitTransactionReply.fromSerializable(resp);
550 assertEquals("Can commit", true, canCommitReply.getCanCommit());
552 Future<Object> commitFuture = Patterns.ask(shard,
553 new CommitTransaction(transactionID).toSerializable(), timeout);
554 commitFuture.onComplete(new OnCommitFutureComplete(), getSystem().dispatcher());
558 canCommitFuture1.onComplete(new OnCanCommitFutureComplete(transactionID2),
559 getSystem().dispatcher());
561 canCommitFuture2.onComplete(new OnCanCommitFutureComplete(transactionID3),
562 getSystem().dispatcher());
564 boolean done = commitLatch.await(timeoutSec, TimeUnit.SECONDS);
566 if(caughtEx.get() != null) {
567 throw caughtEx.get();
570 assertEquals("Commits complete", true, done);
572 InOrder inOrder = inOrder(cohort1, cohort2, cohort3);
573 inOrder.verify(cohort1).canCommit();
574 inOrder.verify(cohort1).preCommit();
575 inOrder.verify(cohort1).commit();
576 inOrder.verify(cohort2).canCommit();
577 inOrder.verify(cohort2).preCommit();
578 inOrder.verify(cohort2).commit();
579 inOrder.verify(cohort3).canCommit();
580 inOrder.verify(cohort3).preCommit();
581 inOrder.verify(cohort3).commit();
583 // Verify data in the data store.
585 NormalizedNode<?, ?> outerList = readStore(shard, TestModel.OUTER_LIST_PATH);
586 assertNotNull(TestModel.OUTER_LIST_QNAME.getLocalName() + " not found", outerList);
587 assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " value is not Iterable",
588 outerList.getValue() instanceof Iterable);
589 Object entry = ((Iterable<Object>)outerList.getValue()).iterator().next();
590 assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " entry is not MapEntryNode",
591 entry instanceof MapEntryNode);
592 MapEntryNode mapEntry = (MapEntryNode)entry;
593 Optional<DataContainerChild<? extends PathArgument, ?>> idLeaf =
594 mapEntry.getChild(new YangInstanceIdentifier.NodeIdentifier(TestModel.ID_QNAME));
595 assertTrue("Missing leaf " + TestModel.ID_QNAME.getLocalName(), idLeaf.isPresent());
596 assertEquals(TestModel.ID_QNAME.getLocalName() + " value", 1, idLeaf.get().getValue());
598 verifyLastLogIndex(shard, 2);
600 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
605 public void testCommitWithPersistenceDisabled() throws Throwable {
606 dataStoreContextBuilder.persistent(false);
607 new ShardTestKit(getSystem()) {{
608 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
609 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
610 "testCommitPhaseFailure");
612 waitUntilLeader(shard);
614 InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
616 // Setup a simulated transactions with a mock cohort.
618 String transactionID = "tx";
619 MutableCompositeModification modification = new MutableCompositeModification();
620 NormalizedNode<?, ?> containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
621 DOMStoreThreePhaseCommitCohort cohort = setupMockWriteTransaction("cohort", dataStore,
622 TestModel.TEST_PATH, containerNode, modification);
624 FiniteDuration duration = duration("5 seconds");
626 // Simulate the ForwardedReadyTransaction messages that would be sent
627 // by the ShardTransaction.
629 shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
630 cohort, modification, true), getRef());
631 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
633 // Send the CanCommitTransaction message.
635 shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
636 CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
637 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
638 assertEquals("Can commit", true, canCommitReply.getCanCommit());
640 // Send the CanCommitTransaction message.
642 shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
643 expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
645 InOrder inOrder = inOrder(cohort);
646 inOrder.verify(cohort).canCommit();
647 inOrder.verify(cohort).preCommit();
648 inOrder.verify(cohort).commit();
650 NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.TEST_PATH);
651 assertEquals(TestModel.TEST_QNAME.getLocalName(), containerNode, actualNode);
653 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
658 public void testCommitWhenTransactionHasNoModifications(){
659 // Note that persistence is enabled which would normally result in the entry getting written to the journal
660 // but here that need not happen
661 new ShardTestKit(getSystem()) {
663 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
664 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
665 "testCommitWhenTransactionHasNoModifications");
667 waitUntilLeader(shard);
669 String transactionID = "tx1";
670 MutableCompositeModification modification = new MutableCompositeModification();
671 DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
672 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
673 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).preCommit();
674 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).commit();
676 FiniteDuration duration = duration("5 seconds");
678 // Simulate the ForwardedReadyTransaction messages that would be sent
679 // by the ShardTransaction.
681 shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
682 cohort, modification, true), getRef());
683 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
685 // Send the CanCommitTransaction message.
687 shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
688 CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
689 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
690 assertEquals("Can commit", true, canCommitReply.getCanCommit());
692 shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
693 expectMsgClass(duration, ThreePhaseCommitCohortMessages.CommitTransactionReply.class);
695 InOrder inOrder = inOrder(cohort);
696 inOrder.verify(cohort).canCommit();
697 inOrder.verify(cohort).preCommit();
698 inOrder.verify(cohort).commit();
700 // Use MBean for verification
701 // Committed transaction count should increase as usual
702 assertEquals(1,shard.underlyingActor().getShardMBean().getCommittedTransactionsCount());
704 // Commit index should not advance because this does not go into the journal
705 assertEquals(-1, shard.underlyingActor().getShardMBean().getCommitIndex());
707 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
714 public void testCommitWhenTransactionHasModifications(){
715 new ShardTestKit(getSystem()) {
717 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
718 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
719 "testCommitWhenTransactionHasModifications");
721 waitUntilLeader(shard);
723 String transactionID = "tx1";
724 MutableCompositeModification modification = new MutableCompositeModification();
725 modification.addModification(new DeleteModification(YangInstanceIdentifier.builder().build()));
726 DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
727 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
728 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).preCommit();
729 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).commit();
731 FiniteDuration duration = duration("5 seconds");
733 // Simulate the ForwardedReadyTransaction messages that would be sent
734 // by the ShardTransaction.
736 shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
737 cohort, modification, true), getRef());
738 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
740 // Send the CanCommitTransaction message.
742 shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
743 CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
744 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
745 assertEquals("Can commit", true, canCommitReply.getCanCommit());
747 shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
748 expectMsgClass(duration, ThreePhaseCommitCohortMessages.CommitTransactionReply.class);
750 InOrder inOrder = inOrder(cohort);
751 inOrder.verify(cohort).canCommit();
752 inOrder.verify(cohort).preCommit();
753 inOrder.verify(cohort).commit();
755 // Use MBean for verification
756 // Committed transaction count should increase as usual
757 assertEquals(1,shard.underlyingActor().getShardMBean().getCommittedTransactionsCount());
759 // Commit index should advance as we do not have an empty modification
760 assertEquals(0, shard.underlyingActor().getShardMBean().getCommitIndex());
762 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
769 public void testCommitPhaseFailure() throws Throwable {
770 new ShardTestKit(getSystem()) {{
771 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
772 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
773 "testCommitPhaseFailure");
775 waitUntilLeader(shard);
777 // Setup 2 simulated transactions with mock cohorts. The first one fails in the
780 String transactionID1 = "tx1";
781 MutableCompositeModification modification1 = new MutableCompositeModification();
782 DOMStoreThreePhaseCommitCohort cohort1 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
783 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
784 doReturn(Futures.immediateFuture(null)).when(cohort1).preCommit();
785 doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort1).commit();
787 String transactionID2 = "tx2";
788 MutableCompositeModification modification2 = new MutableCompositeModification();
789 DOMStoreThreePhaseCommitCohort cohort2 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort2");
790 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
792 FiniteDuration duration = duration("5 seconds");
793 final Timeout timeout = new Timeout(duration);
795 // Simulate the ForwardedReadyTransaction messages that would be sent
796 // by the ShardTransaction.
798 shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
799 cohort1, modification1, true), getRef());
800 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
802 shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
803 cohort2, modification2, true), getRef());
804 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
806 // Send the CanCommitTransaction message for the first Tx.
808 shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
809 CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
810 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
811 assertEquals("Can commit", true, canCommitReply.getCanCommit());
813 // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
814 // processed after the first Tx completes.
816 Future<Object> canCommitFuture = Patterns.ask(shard,
817 new CanCommitTransaction(transactionID2).toSerializable(), timeout);
819 // Send the CommitTransaction message for the first Tx. This should send back an error
820 // and trigger the 2nd Tx to proceed.
822 shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
823 expectMsgClass(duration, akka.actor.Status.Failure.class);
825 // Wait for the 2nd Tx to complete the canCommit phase.
827 final CountDownLatch latch = new CountDownLatch(1);
828 canCommitFuture.onComplete(new OnComplete<Object>() {
830 public void onComplete(final Throwable t, final Object resp) {
833 }, getSystem().dispatcher());
835 assertEquals("2nd CanCommit complete", true, latch.await(5, TimeUnit.SECONDS));
837 InOrder inOrder = inOrder(cohort1, cohort2);
838 inOrder.verify(cohort1).canCommit();
839 inOrder.verify(cohort1).preCommit();
840 inOrder.verify(cohort1).commit();
841 inOrder.verify(cohort2).canCommit();
843 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
848 public void testPreCommitPhaseFailure() throws Throwable {
849 new ShardTestKit(getSystem()) {{
850 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
851 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
852 "testPreCommitPhaseFailure");
854 waitUntilLeader(shard);
856 String transactionID = "tx1";
857 MutableCompositeModification modification = new MutableCompositeModification();
858 DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
859 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
860 doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).preCommit();
862 FiniteDuration duration = duration("5 seconds");
864 // Simulate the ForwardedReadyTransaction messages that would be sent
865 // by the ShardTransaction.
867 shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
868 cohort, modification, true), getRef());
869 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
871 // Send the CanCommitTransaction message.
873 shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
874 CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
875 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
876 assertEquals("Can commit", true, canCommitReply.getCanCommit());
878 // Send the CommitTransaction message. This should send back an error
879 // for preCommit failure.
881 shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
882 expectMsgClass(duration, akka.actor.Status.Failure.class);
884 InOrder inOrder = inOrder(cohort);
885 inOrder.verify(cohort).canCommit();
886 inOrder.verify(cohort).preCommit();
888 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
893 public void testCanCommitPhaseFailure() throws Throwable {
894 new ShardTestKit(getSystem()) {{
895 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
896 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
897 "testCanCommitPhaseFailure");
899 waitUntilLeader(shard);
901 final FiniteDuration duration = duration("5 seconds");
903 String transactionID = "tx1";
904 MutableCompositeModification modification = new MutableCompositeModification();
905 DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
906 doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).canCommit();
908 // Simulate the ForwardedReadyTransaction messages that would be sent
909 // by the ShardTransaction.
911 shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
912 cohort, modification, true), getRef());
913 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
915 // Send the CanCommitTransaction message.
917 shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
918 expectMsgClass(duration, akka.actor.Status.Failure.class);
920 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
925 public void testAbortBeforeFinishCommit() throws Throwable {
926 new ShardTestKit(getSystem()) {{
927 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
928 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
929 "testAbortBeforeFinishCommit");
931 waitUntilLeader(shard);
933 final FiniteDuration duration = duration("5 seconds");
934 InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
936 final String transactionID = "tx1";
937 Function<DOMStoreThreePhaseCommitCohort,ListenableFuture<Void>> preCommit =
938 new Function<DOMStoreThreePhaseCommitCohort,ListenableFuture<Void>>() {
940 public ListenableFuture<Void> apply(final DOMStoreThreePhaseCommitCohort cohort) {
941 ListenableFuture<Void> preCommitFuture = cohort.preCommit();
943 // Simulate an AbortTransaction message occurring during replication, after
944 // persisting and before finishing the commit to the in-memory store.
945 // We have no followers so due to optimizations in the RaftActor, it does not
946 // attempt replication and thus we can't send an AbortTransaction message b/c
947 // it would be processed too late after CommitTransaction completes. So we'll
948 // simulate an AbortTransaction message occurring during replication by calling
949 // the shard directly.
951 shard.underlyingActor().doAbortTransaction(transactionID, null);
953 return preCommitFuture;
957 MutableCompositeModification modification = new MutableCompositeModification();
958 DOMStoreThreePhaseCommitCohort cohort = setupMockWriteTransaction("cohort1", dataStore,
959 TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME),
960 modification, preCommit);
962 shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
963 cohort, modification, true), getRef());
964 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
966 shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
967 CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
968 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
969 assertEquals("Can commit", true, canCommitReply.getCanCommit());
971 shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
972 expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
974 NormalizedNode<?, ?> node = readStore(shard, TestModel.TEST_PATH);
976 // Since we're simulating an abort occurring during replication and before finish commit,
977 // the data should still get written to the in-memory store since we've gotten past
978 // canCommit and preCommit and persisted the data.
979 assertNotNull(TestModel.TEST_QNAME.getLocalName() + " not found", node);
981 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
986 public void testTransactionCommitTimeout() throws Throwable {
987 dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1);
989 new ShardTestKit(getSystem()) {{
990 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
991 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
992 "testTransactionCommitTimeout");
994 waitUntilLeader(shard);
996 final FiniteDuration duration = duration("5 seconds");
998 InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
1000 writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1001 writeToStore(shard, TestModel.OUTER_LIST_PATH,
1002 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
1004 // Create 1st Tx - will timeout
1006 String transactionID1 = "tx1";
1007 MutableCompositeModification modification1 = new MutableCompositeModification();
1008 DOMStoreThreePhaseCommitCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
1009 YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
1010 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
1011 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1),
1016 String transactionID2 = "tx3";
1017 MutableCompositeModification modification2 = new MutableCompositeModification();
1018 YangInstanceIdentifier listNodePath = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
1019 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build();
1020 DOMStoreThreePhaseCommitCohort cohort2 = setupMockWriteTransaction("cohort3", dataStore,
1022 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2),
1027 shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1028 cohort1, modification1, true), getRef());
1029 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1031 shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1032 cohort2, modification2, true), getRef());
1033 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1035 // canCommit 1st Tx. We don't send the commit so it should timeout.
1037 shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1038 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
1040 // canCommit the 2nd Tx - it should complete after the 1st Tx times out.
1042 shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
1043 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
1045 // Commit the 2nd Tx.
1047 shard.tell(new CommitTransaction(transactionID2).toSerializable(), getRef());
1048 expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1050 NormalizedNode<?, ?> node = readStore(shard, listNodePath);
1051 assertNotNull(listNodePath + " not found", node);
1053 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1058 public void testTransactionCommitQueueCapacityExceeded() throws Throwable {
1059 dataStoreContextBuilder.shardTransactionCommitQueueCapacity(1);
1061 new ShardTestKit(getSystem()) {{
1062 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1063 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1064 "testTransactionCommitQueueCapacityExceeded");
1066 waitUntilLeader(shard);
1068 final FiniteDuration duration = duration("5 seconds");
1070 InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
1072 String transactionID1 = "tx1";
1073 MutableCompositeModification modification1 = new MutableCompositeModification();
1074 DOMStoreThreePhaseCommitCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
1075 TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
1077 String transactionID2 = "tx2";
1078 MutableCompositeModification modification2 = new MutableCompositeModification();
1079 DOMStoreThreePhaseCommitCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
1080 TestModel.OUTER_LIST_PATH,
1081 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
1084 String transactionID3 = "tx3";
1085 MutableCompositeModification modification3 = new MutableCompositeModification();
1086 DOMStoreThreePhaseCommitCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
1087 TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification3);
1091 shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1092 cohort1, modification1, true), getRef());
1093 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1095 shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1096 cohort2, modification2, true), getRef());
1097 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1099 shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
1100 cohort3, modification3, true), getRef());
1101 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1103 // canCommit 1st Tx.
1105 shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1106 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
1108 // canCommit the 2nd Tx - it should get queued.
1110 shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
1112 // canCommit the 3rd Tx - should exceed queue capacity and fail.
1114 shard.tell(new CanCommitTransaction(transactionID3).toSerializable(), getRef());
1115 expectMsgClass(duration, akka.actor.Status.Failure.class);
1117 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1122 public void testCanCommitBeforeReadyFailure() throws Throwable {
1123 new ShardTestKit(getSystem()) {{
1124 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1125 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1126 "testCanCommitBeforeReadyFailure");
1128 shard.tell(new CanCommitTransaction("tx").toSerializable(), getRef());
1129 expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
1131 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1136 public void testAbortTransaction() throws Throwable {
1137 new ShardTestKit(getSystem()) {{
1138 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1139 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1140 "testAbortTransaction");
1142 waitUntilLeader(shard);
1144 // Setup 2 simulated transactions with mock cohorts. The first one will be aborted.
1146 String transactionID1 = "tx1";
1147 MutableCompositeModification modification1 = new MutableCompositeModification();
1148 DOMStoreThreePhaseCommitCohort cohort1 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
1149 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
1150 doReturn(Futures.immediateFuture(null)).when(cohort1).abort();
1152 String transactionID2 = "tx2";
1153 MutableCompositeModification modification2 = new MutableCompositeModification();
1154 DOMStoreThreePhaseCommitCohort cohort2 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort2");
1155 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
1157 FiniteDuration duration = duration("5 seconds");
1158 final Timeout timeout = new Timeout(duration);
1160 // Simulate the ForwardedReadyTransaction messages that would be sent
1161 // by the ShardTransaction.
1163 shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1164 cohort1, modification1, true), getRef());
1165 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1167 shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1168 cohort2, modification2, true), getRef());
1169 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1171 // Send the CanCommitTransaction message for the first Tx.
1173 shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1174 CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1175 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1176 assertEquals("Can commit", true, canCommitReply.getCanCommit());
1178 // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
1179 // processed after the first Tx completes.
1181 Future<Object> canCommitFuture = Patterns.ask(shard,
1182 new CanCommitTransaction(transactionID2).toSerializable(), timeout);
1184 // Send the AbortTransaction message for the first Tx. This should trigger the 2nd
1187 shard.tell(new AbortTransaction(transactionID1).toSerializable(), getRef());
1188 expectMsgClass(duration, AbortTransactionReply.SERIALIZABLE_CLASS);
1190 // Wait for the 2nd Tx to complete the canCommit phase.
1192 Await.ready(canCommitFuture, duration);
1194 InOrder inOrder = inOrder(cohort1, cohort2);
1195 inOrder.verify(cohort1).canCommit();
1196 inOrder.verify(cohort2).canCommit();
1198 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1203 public void testCreateSnapshot() throws Exception {
1204 testCreateSnapshot(true, "testCreateSnapshot");
1208 public void testCreateSnapshotWithNonPersistentData() throws Exception {
1209 testCreateSnapshot(false, "testCreateSnapshotWithNonPersistentData");
1212 @SuppressWarnings("serial")
1213 public void testCreateSnapshot(final boolean persistent, final String shardActorName) throws Exception{
1215 final AtomicReference<Object> savedSnapshot = new AtomicReference<>();
1216 class DelegatingPersistentDataProvider implements DataPersistenceProvider {
1217 DataPersistenceProvider delegate;
1219 DelegatingPersistentDataProvider(DataPersistenceProvider delegate) {
1220 this.delegate = delegate;
1224 public boolean isRecoveryApplicable() {
1225 return delegate.isRecoveryApplicable();
1229 public <T> void persist(T o, Procedure<T> procedure) {
1230 delegate.persist(o, procedure);
1234 public void saveSnapshot(Object o) {
1235 savedSnapshot.set(o);
1236 delegate.saveSnapshot(o);
1240 public void deleteSnapshots(SnapshotSelectionCriteria criteria) {
1241 delegate.deleteSnapshots(criteria);
1245 public void deleteMessages(long sequenceNumber) {
1246 delegate.deleteMessages(sequenceNumber);
1250 dataStoreContextBuilder.persistent(persistent);
1252 new ShardTestKit(getSystem()) {{
1253 final AtomicReference<CountDownLatch> latch = new AtomicReference<>(new CountDownLatch(1));
1254 Creator<Shard> creator = new Creator<Shard>() {
1256 public Shard create() throws Exception {
1257 return new Shard(shardID, Collections.<ShardIdentifier,String>emptyMap(),
1258 newDatastoreContext(), SCHEMA_CONTEXT) {
1260 DelegatingPersistentDataProvider delegating;
1263 protected DataPersistenceProvider persistence() {
1264 if(delegating == null) {
1265 delegating = new DelegatingPersistentDataProvider(super.persistence());
1272 protected void commitSnapshot(final long sequenceNumber) {
1273 super.commitSnapshot(sequenceNumber);
1274 latch.get().countDown();
1280 TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1281 Props.create(new DelegatingShardCreator(creator)), shardActorName);
1283 waitUntilLeader(shard);
1285 writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1287 NormalizedNode<?,?> expectedRoot = readStore(shard, YangInstanceIdentifier.builder().build());
1289 CaptureSnapshot capture = new CaptureSnapshot(-1, -1, -1, -1, -1, -1);
1290 shard.tell(capture, getRef());
1292 assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS));
1294 assertTrue("Invalid saved snapshot " + savedSnapshot.get(),
1295 savedSnapshot.get() instanceof Snapshot);
1297 verifySnapshot((Snapshot)savedSnapshot.get(), expectedRoot);
1299 latch.set(new CountDownLatch(1));
1300 savedSnapshot.set(null);
1302 shard.tell(capture, getRef());
1304 assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS));
1306 assertTrue("Invalid saved snapshot " + savedSnapshot.get(),
1307 savedSnapshot.get() instanceof Snapshot);
1309 verifySnapshot((Snapshot)savedSnapshot.get(), expectedRoot);
1311 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1314 private void verifySnapshot(Snapshot snapshot, NormalizedNode<?,?> expectedRoot) {
1316 NormalizedNode<?, ?> actual = SerializationUtils.deserializeNormalizedNode(snapshot.getState());
1317 assertEquals("Root node", expectedRoot, actual);
1323 * This test simply verifies that the applySnapShot logic will work
1324 * @throws ReadFailedException
1327 public void testInMemoryDataStoreRestore() throws ReadFailedException {
1328 InMemoryDOMDataStore store = new InMemoryDOMDataStore("test", MoreExecutors.sameThreadExecutor());
1330 store.onGlobalContextUpdated(SCHEMA_CONTEXT);
1332 DOMStoreWriteTransaction putTransaction = store.newWriteOnlyTransaction();
1333 putTransaction.write(TestModel.TEST_PATH,
1334 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1335 commitTransaction(putTransaction);
1338 NormalizedNode<?, ?> expected = readStore(store);
1340 DOMStoreWriteTransaction writeTransaction = store.newWriteOnlyTransaction();
1342 writeTransaction.delete(YangInstanceIdentifier.builder().build());
1343 writeTransaction.write(YangInstanceIdentifier.builder().build(), expected);
1345 commitTransaction(writeTransaction);
1347 NormalizedNode<?, ?> actual = readStore(store);
1349 assertEquals(expected, actual);
1353 public void testRecoveryApplicable(){
1355 final DatastoreContext persistentContext = DatastoreContext.newBuilder().
1356 shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(true).build();
1358 final Props persistentProps = Shard.props(shardID, Collections.<ShardIdentifier, String>emptyMap(),
1359 persistentContext, SCHEMA_CONTEXT);
1361 final DatastoreContext nonPersistentContext = DatastoreContext.newBuilder().
1362 shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(false).build();
1364 final Props nonPersistentProps = Shard.props(shardID, Collections.<ShardIdentifier, String>emptyMap(),
1365 nonPersistentContext, SCHEMA_CONTEXT);
1367 new ShardTestKit(getSystem()) {{
1368 TestActorRef<Shard> shard1 = TestActorRef.create(getSystem(),
1369 persistentProps, "testPersistence1");
1371 assertTrue("Recovery Applicable", shard1.underlyingActor().getDataPersistenceProvider().isRecoveryApplicable());
1373 shard1.tell(PoisonPill.getInstance(), ActorRef.noSender());
1375 TestActorRef<Shard> shard2 = TestActorRef.create(getSystem(),
1376 nonPersistentProps, "testPersistence2");
1378 assertFalse("Recovery Not Applicable", shard2.underlyingActor().getDataPersistenceProvider().isRecoveryApplicable());
1380 shard2.tell(PoisonPill.getInstance(), ActorRef.noSender());
1386 public void testOnDatastoreContext() {
1387 new ShardTestKit(getSystem()) {{
1388 dataStoreContextBuilder.persistent(true);
1390 TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(), "testOnDatastoreContext");
1392 assertEquals("isRecoveryApplicable", true,
1393 shard.underlyingActor().getDataPersistenceProvider().isRecoveryApplicable());
1395 waitUntilLeader(shard);
1397 shard.tell(dataStoreContextBuilder.persistent(false).build(), ActorRef.noSender());
1399 assertEquals("isRecoveryApplicable", false,
1400 shard.underlyingActor().getDataPersistenceProvider().isRecoveryApplicable());
1402 shard.tell(dataStoreContextBuilder.persistent(true).build(), ActorRef.noSender());
1404 assertEquals("isRecoveryApplicable", true,
1405 shard.underlyingActor().getDataPersistenceProvider().isRecoveryApplicable());
1407 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1412 public void testRegisterRoleChangeListener() throws Exception {
1413 new ShardTestKit(getSystem()) {
1415 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1416 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1417 "testRegisterRoleChangeListener");
1419 waitUntilLeader(shard);
1421 TestActorRef<MessageCollectorActor> listener =
1422 TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
1424 shard.tell(new RegisterRoleChangeListener(), listener);
1426 // TODO: MessageCollectorActor exists as a test util in both the akka-raft and distributed-datastore
1427 // projects. Need to move it to commons as a regular utility and then we can get rid of this arbitrary
1429 Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
1431 List<Object> allMatching = MessageCollectorActor.getAllMatching(listener, RegisterRoleChangeListenerReply.class);
1433 assertEquals(1, allMatching.size());
1439 public void testFollowerInitialSyncStatus() throws Exception {
1440 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1441 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1442 "testFollowerInitialSyncStatus");
1444 shard.underlyingActor().onReceiveCommand(new FollowerInitialSyncUpStatus(false, "member-1-shard-inventory-operational"));
1446 assertEquals(false, shard.underlyingActor().getShardMBean().getFollowerInitialSyncStatus());
1448 shard.underlyingActor().onReceiveCommand(new FollowerInitialSyncUpStatus(true, "member-1-shard-inventory-operational"));
1450 assertEquals(true, shard.underlyingActor().getShardMBean().getFollowerInitialSyncStatus());
1452 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1455 private void commitTransaction(final DOMStoreWriteTransaction transaction) {
1456 DOMStoreThreePhaseCommitCohort commitCohort = transaction.ready();
1457 ListenableFuture<Void> future =
1458 commitCohort.preCommit();
1461 future = commitCohort.commit();
1463 } catch (InterruptedException | ExecutionException e) {