1 package org.opendaylight.controller.cluster.datastore;
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertFalse;
5 import static org.junit.Assert.assertNotNull;
6 import static org.junit.Assert.assertNull;
7 import static org.junit.Assert.assertTrue;
8 import static org.mockito.Mockito.doReturn;
9 import static org.mockito.Mockito.inOrder;
10 import static org.mockito.Mockito.mock;
11 import static org.opendaylight.controller.cluster.datastore.DataStoreVersions.CURRENT_VERSION;
12 import akka.actor.ActorRef;
13 import akka.actor.PoisonPill;
14 import akka.actor.Props;
15 import akka.dispatch.Dispatchers;
16 import akka.dispatch.OnComplete;
17 import akka.japi.Creator;
18 import akka.japi.Procedure;
19 import akka.pattern.Patterns;
20 import akka.persistence.SnapshotSelectionCriteria;
21 import akka.testkit.TestActorRef;
22 import akka.util.Timeout;
23 import com.google.common.base.Function;
24 import com.google.common.base.Optional;
25 import com.google.common.util.concurrent.Futures;
26 import com.google.common.util.concurrent.ListenableFuture;
27 import com.google.common.util.concurrent.MoreExecutors;
28 import com.google.common.util.concurrent.Uninterruptibles;
29 import java.io.IOException;
30 import java.util.Collections;
31 import java.util.HashSet;
32 import java.util.List;
35 import java.util.concurrent.CountDownLatch;
36 import java.util.concurrent.ExecutionException;
37 import java.util.concurrent.TimeUnit;
38 import java.util.concurrent.atomic.AtomicReference;
39 import org.junit.Test;
40 import org.mockito.InOrder;
41 import org.opendaylight.controller.cluster.DataPersistenceProvider;
42 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
43 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
44 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
45 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
46 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
47 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
48 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
49 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
50 import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
51 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
52 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
53 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
54 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
55 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
56 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
57 import org.opendaylight.controller.cluster.datastore.modification.Modification;
58 import org.opendaylight.controller.cluster.datastore.modification.ModificationPayload;
59 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
60 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
61 import org.opendaylight.controller.cluster.datastore.utils.InMemoryJournal;
62 import org.opendaylight.controller.cluster.datastore.utils.InMemorySnapshotStore;
63 import org.opendaylight.controller.cluster.datastore.utils.MessageCollectorActor;
64 import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener;
65 import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
66 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
67 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListenerReply;
68 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
69 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
70 import org.opendaylight.controller.cluster.raft.Snapshot;
71 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
72 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
73 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
74 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
75 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
76 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
77 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
78 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
79 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
80 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
81 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
82 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
83 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
84 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreFactory;
85 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
86 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
87 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
88 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
89 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
90 import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
91 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
92 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
93 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
94 import scala.concurrent.Await;
95 import scala.concurrent.Future;
96 import scala.concurrent.duration.FiniteDuration;
98 public class ShardTest extends AbstractShardTest {
100 public void testRegisterChangeListener() throws Exception {
101 new ShardTestKit(getSystem()) {{
102 TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
103 newShardProps(), "testRegisterChangeListener");
105 waitUntilLeader(shard);
107 shard.tell(new UpdateSchemaContext(SchemaContextHelper.full()), ActorRef.noSender());
109 MockDataChangeListener listener = new MockDataChangeListener(1);
110 ActorRef dclActor = getSystem().actorOf(DataChangeListener.props(listener),
111 "testRegisterChangeListener-DataChangeListener");
113 shard.tell(new RegisterChangeListener(TestModel.TEST_PATH,
114 dclActor.path(), AsyncDataBroker.DataChangeScope.BASE), getRef());
116 RegisterChangeListenerReply reply = expectMsgClass(duration("3 seconds"),
117 RegisterChangeListenerReply.class);
118 String replyPath = reply.getListenerRegistrationPath().toString();
119 assertTrue("Incorrect reply path: " + replyPath, replyPath.matches(
120 "akka:\\/\\/test\\/user\\/testRegisterChangeListener\\/\\$.*"));
122 YangInstanceIdentifier path = TestModel.TEST_PATH;
123 writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
125 listener.waitForChangeEvents(path);
127 dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
128 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
132 @SuppressWarnings("serial")
134 public void testChangeListenerNotifiedWhenNotTheLeaderOnRegistration() throws Exception {
135 // This test tests the timing window in which a change listener is registered before the
136 // shard becomes the leader. We verify that the listener is registered and notified of the
137 // existing data when the shard becomes the leader.
138 new ShardTestKit(getSystem()) {{
139 // For this test, we want to send the RegisterChangeListener message after the shard
140 // has recovered from persistence and before it becomes the leader. So we subclass
141 // Shard to override onReceiveCommand and, when the first ElectionTimeout is received,
142 // we know that the shard has been initialized to a follower and has started the
143 // election process. The following 2 CountDownLatches are used to coordinate the
144 // ElectionTimeout with the sending of the RegisterChangeListener message.
145 final CountDownLatch onFirstElectionTimeout = new CountDownLatch(1);
146 final CountDownLatch onChangeListenerRegistered = new CountDownLatch(1);
147 Creator<Shard> creator = new Creator<Shard>() {
148 boolean firstElectionTimeout = true;
151 public Shard create() throws Exception {
152 return new Shard(shardID, Collections.<ShardIdentifier,String>emptyMap(),
153 newDatastoreContext(), SCHEMA_CONTEXT) {
155 public void onReceiveCommand(final Object message) throws Exception {
156 if(message instanceof ElectionTimeout && firstElectionTimeout) {
157 // Got the first ElectionTimeout. We don't forward it to the
158 // base Shard yet until we've sent the RegisterChangeListener
159 // message. So we signal the onFirstElectionTimeout latch to tell
160 // the main thread to send the RegisterChangeListener message and
161 // start a thread to wait on the onChangeListenerRegistered latch,
162 // which the main thread signals after it has sent the message.
163 // After the onChangeListenerRegistered is triggered, we send the
164 // original ElectionTimeout message to proceed with the election.
165 firstElectionTimeout = false;
166 final ActorRef self = getSelf();
170 Uninterruptibles.awaitUninterruptibly(
171 onChangeListenerRegistered, 5, TimeUnit.SECONDS);
172 self.tell(message, self);
176 onFirstElectionTimeout.countDown();
178 super.onReceiveCommand(message);
185 MockDataChangeListener listener = new MockDataChangeListener(1);
186 ActorRef dclActor = getSystem().actorOf(DataChangeListener.props(listener),
187 "testRegisterChangeListenerWhenNotLeaderInitially-DataChangeListener");
189 TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
190 Props.create(new DelegatingShardCreator(creator)),
191 "testRegisterChangeListenerWhenNotLeaderInitially");
193 // Write initial data into the in-memory store.
194 YangInstanceIdentifier path = TestModel.TEST_PATH;
195 writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
197 // Wait until the shard receives the first ElectionTimeout message.
198 assertEquals("Got first ElectionTimeout", true,
199 onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
201 // Now send the RegisterChangeListener and wait for the reply.
202 shard.tell(new RegisterChangeListener(path, dclActor.path(),
203 AsyncDataBroker.DataChangeScope.SUBTREE), getRef());
205 RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
206 RegisterChangeListenerReply.class);
207 assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
209 // Sanity check - verify the shard is not the leader yet.
210 shard.tell(new FindLeader(), getRef());
211 FindLeaderReply findLeadeReply =
212 expectMsgClass(duration("5 seconds"), FindLeaderReply.class);
213 assertNull("Expected the shard not to be the leader", findLeadeReply.getLeaderActor());
215 // Signal the onChangeListenerRegistered latch to tell the thread above to proceed
216 // with the election process.
217 onChangeListenerRegistered.countDown();
219 // Wait for the shard to become the leader and notify our listener with the existing
220 // data in the store.
221 listener.waitForChangeEvents(path);
223 dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
224 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
229 public void testCreateTransaction(){
230 new ShardTestKit(getSystem()) {{
231 ActorRef shard = getSystem().actorOf(newShardProps(), "testCreateTransaction");
233 waitUntilLeader(shard);
235 shard.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
237 shard.tell(new CreateTransaction("txn-1",
238 TransactionProxy.TransactionType.READ_ONLY.ordinal() ).toSerializable(), getRef());
240 CreateTransactionReply reply = expectMsgClass(duration("3 seconds"),
241 CreateTransactionReply.class);
243 String path = reply.getTransactionActorPath().toString();
244 assertTrue("Unexpected transaction path " + path,
245 path.contains("akka://test/user/testCreateTransaction/shard-txn-1"));
247 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
252 public void testCreateTransactionOnChain(){
253 new ShardTestKit(getSystem()) {{
254 final ActorRef shard = getSystem().actorOf(newShardProps(), "testCreateTransactionOnChain");
256 waitUntilLeader(shard);
258 shard.tell(new CreateTransaction("txn-1",
259 TransactionProxy.TransactionType.READ_ONLY.ordinal() , "foobar").toSerializable(),
262 CreateTransactionReply reply = expectMsgClass(duration("3 seconds"),
263 CreateTransactionReply.class);
265 String path = reply.getTransactionActorPath().toString();
266 assertTrue("Unexpected transaction path " + path,
267 path.contains("akka://test/user/testCreateTransactionOnChain/shard-txn-1"));
269 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
273 @SuppressWarnings("serial")
275 public void testPeerAddressResolved() throws Exception {
276 new ShardTestKit(getSystem()) {{
277 final CountDownLatch recoveryComplete = new CountDownLatch(1);
278 class TestShard extends Shard {
280 super(shardID, Collections.<ShardIdentifier, String>singletonMap(shardID, null),
281 newDatastoreContext(), SCHEMA_CONTEXT);
284 Map<String, String> getPeerAddresses() {
285 return getRaftActorContext().getPeerAddresses();
289 protected void onRecoveryComplete() {
291 super.onRecoveryComplete();
293 recoveryComplete.countDown();
298 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
299 Props.create(new DelegatingShardCreator(new Creator<Shard>() {
301 public TestShard create() throws Exception {
302 return new TestShard();
304 })), "testPeerAddressResolved");
306 //waitUntilLeader(shard);
307 assertEquals("Recovery complete", true,
308 Uninterruptibles.awaitUninterruptibly(recoveryComplete, 5, TimeUnit.SECONDS));
310 String address = "akka://foobar";
311 shard.underlyingActor().onReceiveCommand(new PeerAddressResolved(shardID, address));
313 assertEquals("getPeerAddresses", address,
314 ((TestShard)shard.underlyingActor()).getPeerAddresses().get(shardID.toString()));
316 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
321 public void testApplySnapshot() throws Exception {
322 TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(),
323 "testApplySnapshot");
325 InMemoryDOMDataStore store = new InMemoryDOMDataStore("OPER", MoreExecutors.sameThreadExecutor());
326 store.onGlobalContextUpdated(SCHEMA_CONTEXT);
328 writeToStore(store, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
330 YangInstanceIdentifier root = YangInstanceIdentifier.builder().build();
331 NormalizedNode<?,?> expected = readStore(store, root);
333 ApplySnapshot applySnapshot = new ApplySnapshot(Snapshot.create(
334 SerializationUtils.serializeNormalizedNode(expected),
335 Collections.<ReplicatedLogEntry>emptyList(), 1, 2, 3, 4));
337 shard.underlyingActor().onReceiveCommand(applySnapshot);
339 NormalizedNode<?,?> actual = readStore(shard, root);
341 assertEquals("Root node", expected, actual);
343 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
347 public void testApplyState() throws Exception {
349 TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(), "testApplyState");
351 NormalizedNode<?, ?> node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
353 ApplyState applyState = new ApplyState(null, "test", new ReplicatedLogImplEntry(1, 2,
354 newModificationPayload(new WriteModification(TestModel.TEST_PATH, node))));
356 shard.underlyingActor().onReceiveCommand(applyState);
358 NormalizedNode<?,?> actual = readStore(shard, TestModel.TEST_PATH);
359 assertEquals("Applied state", node, actual);
361 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
365 public void testRecovery() throws Exception {
367 // Set up the InMemorySnapshotStore.
369 InMemoryDOMDataStore testStore = InMemoryDOMDataStoreFactory.create("Test", null, null);
370 testStore.onGlobalContextUpdated(SCHEMA_CONTEXT);
372 writeToStore(testStore, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
374 NormalizedNode<?, ?> root = readStore(testStore, YangInstanceIdentifier.builder().build());
376 InMemorySnapshotStore.addSnapshot(shardID.toString(), Snapshot.create(
377 SerializationUtils.serializeNormalizedNode(root),
378 Collections.<ReplicatedLogEntry>emptyList(), 0, 1, -1, -1));
380 // Set up the InMemoryJournal.
382 InMemoryJournal.addEntry(shardID.toString(), 0, new ReplicatedLogImplEntry(0, 1, newModificationPayload(
383 new WriteModification(TestModel.OUTER_LIST_PATH,
384 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build()))));
386 int nListEntries = 16;
387 Set<Integer> listEntryKeys = new HashSet<>();
389 // Add some ModificationPayload entries
390 for(int i = 1; i <= nListEntries; i++) {
391 listEntryKeys.add(Integer.valueOf(i));
392 YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
393 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
394 Modification mod = new MergeModification(path,
395 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i));
396 InMemoryJournal.addEntry(shardID.toString(), i, new ReplicatedLogImplEntry(i, 1,
397 newModificationPayload(mod)));
400 InMemoryJournal.addEntry(shardID.toString(), nListEntries + 1,
401 new ApplyJournalEntries(nListEntries));
403 testRecovery(listEntryKeys);
406 private ModificationPayload newModificationPayload(final Modification... mods) throws IOException {
407 MutableCompositeModification compMod = new MutableCompositeModification();
408 for(Modification mod: mods) {
409 compMod.addModification(mod);
412 return new ModificationPayload(compMod);
415 @SuppressWarnings({ "unchecked" })
417 public void testConcurrentThreePhaseCommits() throws Throwable {
418 new ShardTestKit(getSystem()) {{
419 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
420 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
421 "testConcurrentThreePhaseCommits");
423 waitUntilLeader(shard);
425 // Setup 3 simulated transactions with mock cohorts backed by real cohorts.
427 InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
429 String transactionID1 = "tx1";
430 MutableCompositeModification modification1 = new MutableCompositeModification();
431 DOMStoreThreePhaseCommitCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
432 TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
434 String transactionID2 = "tx2";
435 MutableCompositeModification modification2 = new MutableCompositeModification();
436 DOMStoreThreePhaseCommitCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
437 TestModel.OUTER_LIST_PATH,
438 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
441 String transactionID3 = "tx3";
442 MutableCompositeModification modification3 = new MutableCompositeModification();
443 DOMStoreThreePhaseCommitCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
444 YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
445 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
446 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1),
450 final FiniteDuration duration = FiniteDuration.create(timeoutSec, TimeUnit.SECONDS);
451 final Timeout timeout = new Timeout(duration);
453 // Simulate the ForwardedReadyTransaction message for the first Tx that would be sent
454 // by the ShardTransaction.
456 shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
457 cohort1, modification1, true), getRef());
458 ReadyTransactionReply readyReply = ReadyTransactionReply.fromSerializable(
459 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS));
460 assertEquals("Cohort path", shard.path().toString(), readyReply.getCohortPath());
462 // Send the CanCommitTransaction message for the first Tx.
464 shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
465 CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
466 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
467 assertEquals("Can commit", true, canCommitReply.getCanCommit());
469 // Send the ForwardedReadyTransaction for the next 2 Tx's.
471 shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
472 cohort2, modification2, true), getRef());
473 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
475 shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
476 cohort3, modification3, true), getRef());
477 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
479 // Send the CanCommitTransaction message for the next 2 Tx's. These should get queued and
480 // processed after the first Tx completes.
482 Future<Object> canCommitFuture1 = Patterns.ask(shard,
483 new CanCommitTransaction(transactionID2).toSerializable(), timeout);
485 Future<Object> canCommitFuture2 = Patterns.ask(shard,
486 new CanCommitTransaction(transactionID3).toSerializable(), timeout);
488 // Send the CommitTransaction message for the first Tx. After it completes, it should
489 // trigger the 2nd Tx to proceed which should in turn then trigger the 3rd.
491 shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
492 expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
494 // Wait for the next 2 Tx's to complete.
496 final AtomicReference<Throwable> caughtEx = new AtomicReference<>();
497 final CountDownLatch commitLatch = new CountDownLatch(2);
499 class OnFutureComplete extends OnComplete<Object> {
500 private final Class<?> expRespType;
502 OnFutureComplete(final Class<?> expRespType) {
503 this.expRespType = expRespType;
507 public void onComplete(final Throwable error, final Object resp) {
509 caughtEx.set(new AssertionError(getClass().getSimpleName() + " failure", error));
512 assertEquals("Commit response type", expRespType, resp.getClass());
514 } catch (Exception e) {
520 void onSuccess(final Object resp) throws Exception {
524 class OnCommitFutureComplete extends OnFutureComplete {
525 OnCommitFutureComplete() {
526 super(CommitTransactionReply.SERIALIZABLE_CLASS);
530 public void onComplete(final Throwable error, final Object resp) {
531 super.onComplete(error, resp);
532 commitLatch.countDown();
536 class OnCanCommitFutureComplete extends OnFutureComplete {
537 private final String transactionID;
539 OnCanCommitFutureComplete(final String transactionID) {
540 super(CanCommitTransactionReply.SERIALIZABLE_CLASS);
541 this.transactionID = transactionID;
545 void onSuccess(final Object resp) throws Exception {
546 CanCommitTransactionReply canCommitReply =
547 CanCommitTransactionReply.fromSerializable(resp);
548 assertEquals("Can commit", true, canCommitReply.getCanCommit());
550 Future<Object> commitFuture = Patterns.ask(shard,
551 new CommitTransaction(transactionID).toSerializable(), timeout);
552 commitFuture.onComplete(new OnCommitFutureComplete(), getSystem().dispatcher());
556 canCommitFuture1.onComplete(new OnCanCommitFutureComplete(transactionID2),
557 getSystem().dispatcher());
559 canCommitFuture2.onComplete(new OnCanCommitFutureComplete(transactionID3),
560 getSystem().dispatcher());
562 boolean done = commitLatch.await(timeoutSec, TimeUnit.SECONDS);
564 if(caughtEx.get() != null) {
565 throw caughtEx.get();
568 assertEquals("Commits complete", true, done);
570 InOrder inOrder = inOrder(cohort1, cohort2, cohort3);
571 inOrder.verify(cohort1).canCommit();
572 inOrder.verify(cohort1).preCommit();
573 inOrder.verify(cohort1).commit();
574 inOrder.verify(cohort2).canCommit();
575 inOrder.verify(cohort2).preCommit();
576 inOrder.verify(cohort2).commit();
577 inOrder.verify(cohort3).canCommit();
578 inOrder.verify(cohort3).preCommit();
579 inOrder.verify(cohort3).commit();
581 // Verify data in the data store.
583 NormalizedNode<?, ?> outerList = readStore(shard, TestModel.OUTER_LIST_PATH);
584 assertNotNull(TestModel.OUTER_LIST_QNAME.getLocalName() + " not found", outerList);
585 assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " value is not Iterable",
586 outerList.getValue() instanceof Iterable);
587 Object entry = ((Iterable<Object>)outerList.getValue()).iterator().next();
588 assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " entry is not MapEntryNode",
589 entry instanceof MapEntryNode);
590 MapEntryNode mapEntry = (MapEntryNode)entry;
591 Optional<DataContainerChild<? extends PathArgument, ?>> idLeaf =
592 mapEntry.getChild(new YangInstanceIdentifier.NodeIdentifier(TestModel.ID_QNAME));
593 assertTrue("Missing leaf " + TestModel.ID_QNAME.getLocalName(), idLeaf.isPresent());
594 assertEquals(TestModel.ID_QNAME.getLocalName() + " value", 1, idLeaf.get().getValue());
596 verifyLastLogIndex(shard, 2);
598 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
603 public void testCommitWithPersistenceDisabled() throws Throwable {
604 dataStoreContextBuilder.persistent(false);
605 new ShardTestKit(getSystem()) {{
606 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
607 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
608 "testCommitPhaseFailure");
610 waitUntilLeader(shard);
612 InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
614 // Setup a simulated transactions with a mock cohort.
616 String transactionID = "tx";
617 MutableCompositeModification modification = new MutableCompositeModification();
618 NormalizedNode<?, ?> containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
619 DOMStoreThreePhaseCommitCohort cohort = setupMockWriteTransaction("cohort", dataStore,
620 TestModel.TEST_PATH, containerNode, modification);
622 FiniteDuration duration = duration("5 seconds");
624 // Simulate the ForwardedReadyTransaction messages that would be sent
625 // by the ShardTransaction.
627 shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
628 cohort, modification, true), getRef());
629 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
631 // Send the CanCommitTransaction message.
633 shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
634 CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
635 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
636 assertEquals("Can commit", true, canCommitReply.getCanCommit());
638 // Send the CanCommitTransaction message.
640 shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
641 expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
643 InOrder inOrder = inOrder(cohort);
644 inOrder.verify(cohort).canCommit();
645 inOrder.verify(cohort).preCommit();
646 inOrder.verify(cohort).commit();
648 NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.TEST_PATH);
649 assertEquals(TestModel.TEST_QNAME.getLocalName(), containerNode, actualNode);
651 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
656 public void testCommitPhaseFailure() throws Throwable {
657 new ShardTestKit(getSystem()) {{
658 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
659 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
660 "testCommitPhaseFailure");
662 waitUntilLeader(shard);
664 // Setup 2 simulated transactions with mock cohorts. The first one fails in the
667 String transactionID1 = "tx1";
668 MutableCompositeModification modification1 = new MutableCompositeModification();
669 DOMStoreThreePhaseCommitCohort cohort1 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
670 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
671 doReturn(Futures.immediateFuture(null)).when(cohort1).preCommit();
672 doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort1).commit();
674 String transactionID2 = "tx2";
675 MutableCompositeModification modification2 = new MutableCompositeModification();
676 DOMStoreThreePhaseCommitCohort cohort2 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort2");
677 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
679 FiniteDuration duration = duration("5 seconds");
680 final Timeout timeout = new Timeout(duration);
682 // Simulate the ForwardedReadyTransaction messages that would be sent
683 // by the ShardTransaction.
685 shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
686 cohort1, modification1, true), getRef());
687 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
689 shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
690 cohort2, modification2, true), getRef());
691 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
693 // Send the CanCommitTransaction message for the first Tx.
695 shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
696 CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
697 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
698 assertEquals("Can commit", true, canCommitReply.getCanCommit());
700 // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
701 // processed after the first Tx completes.
703 Future<Object> canCommitFuture = Patterns.ask(shard,
704 new CanCommitTransaction(transactionID2).toSerializable(), timeout);
706 // Send the CommitTransaction message for the first Tx. This should send back an error
707 // and trigger the 2nd Tx to proceed.
709 shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
710 expectMsgClass(duration, akka.actor.Status.Failure.class);
712 // Wait for the 2nd Tx to complete the canCommit phase.
714 final CountDownLatch latch = new CountDownLatch(1);
715 canCommitFuture.onComplete(new OnComplete<Object>() {
717 public void onComplete(final Throwable t, final Object resp) {
720 }, getSystem().dispatcher());
722 assertEquals("2nd CanCommit complete", true, latch.await(5, TimeUnit.SECONDS));
724 InOrder inOrder = inOrder(cohort1, cohort2);
725 inOrder.verify(cohort1).canCommit();
726 inOrder.verify(cohort1).preCommit();
727 inOrder.verify(cohort1).commit();
728 inOrder.verify(cohort2).canCommit();
730 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
735 public void testPreCommitPhaseFailure() throws Throwable {
736 new ShardTestKit(getSystem()) {{
737 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
738 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
739 "testPreCommitPhaseFailure");
741 waitUntilLeader(shard);
743 String transactionID = "tx1";
744 MutableCompositeModification modification = new MutableCompositeModification();
745 DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
746 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
747 doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).preCommit();
749 FiniteDuration duration = duration("5 seconds");
751 // Simulate the ForwardedReadyTransaction messages that would be sent
752 // by the ShardTransaction.
754 shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
755 cohort, modification, true), getRef());
756 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
758 // Send the CanCommitTransaction message.
760 shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
761 CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
762 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
763 assertEquals("Can commit", true, canCommitReply.getCanCommit());
765 // Send the CommitTransaction message. This should send back an error
766 // for preCommit failure.
768 shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
769 expectMsgClass(duration, akka.actor.Status.Failure.class);
771 InOrder inOrder = inOrder(cohort);
772 inOrder.verify(cohort).canCommit();
773 inOrder.verify(cohort).preCommit();
775 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
780 public void testCanCommitPhaseFailure() throws Throwable {
781 new ShardTestKit(getSystem()) {{
782 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
783 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
784 "testCanCommitPhaseFailure");
786 waitUntilLeader(shard);
788 final FiniteDuration duration = duration("5 seconds");
790 String transactionID = "tx1";
791 MutableCompositeModification modification = new MutableCompositeModification();
792 DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
793 doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).canCommit();
795 // Simulate the ForwardedReadyTransaction messages that would be sent
796 // by the ShardTransaction.
798 shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
799 cohort, modification, true), getRef());
800 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
802 // Send the CanCommitTransaction message.
804 shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
805 expectMsgClass(duration, akka.actor.Status.Failure.class);
807 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
812 public void testAbortBeforeFinishCommit() throws Throwable {
813 new ShardTestKit(getSystem()) {{
814 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
815 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
816 "testAbortBeforeFinishCommit");
818 waitUntilLeader(shard);
820 final FiniteDuration duration = duration("5 seconds");
821 InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
823 final String transactionID = "tx1";
824 Function<DOMStoreThreePhaseCommitCohort,ListenableFuture<Void>> preCommit =
825 new Function<DOMStoreThreePhaseCommitCohort,ListenableFuture<Void>>() {
827 public ListenableFuture<Void> apply(final DOMStoreThreePhaseCommitCohort cohort) {
828 ListenableFuture<Void> preCommitFuture = cohort.preCommit();
830 // Simulate an AbortTransaction message occurring during replication, after
831 // persisting and before finishing the commit to the in-memory store.
832 // We have no followers so due to optimizations in the RaftActor, it does not
833 // attempt replication and thus we can't send an AbortTransaction message b/c
834 // it would be processed too late after CommitTransaction completes. So we'll
835 // simulate an AbortTransaction message occurring during replication by calling
836 // the shard directly.
838 shard.underlyingActor().doAbortTransaction(transactionID, null);
840 return preCommitFuture;
844 MutableCompositeModification modification = new MutableCompositeModification();
845 DOMStoreThreePhaseCommitCohort cohort = setupMockWriteTransaction("cohort1", dataStore,
846 TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME),
847 modification, preCommit);
849 shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
850 cohort, modification, true), getRef());
851 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
853 shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
854 CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
855 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
856 assertEquals("Can commit", true, canCommitReply.getCanCommit());
858 shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
859 expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
861 NormalizedNode<?, ?> node = readStore(shard, TestModel.TEST_PATH);
863 // Since we're simulating an abort occurring during replication and before finish commit,
864 // the data should still get written to the in-memory store since we've gotten past
865 // canCommit and preCommit and persisted the data.
866 assertNotNull(TestModel.TEST_QNAME.getLocalName() + " not found", node);
868 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
873 public void testTransactionCommitTimeout() throws Throwable {
874 dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1);
876 new ShardTestKit(getSystem()) {{
877 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
878 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
879 "testTransactionCommitTimeout");
881 waitUntilLeader(shard);
883 final FiniteDuration duration = duration("5 seconds");
885 InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
887 writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
888 writeToStore(shard, TestModel.OUTER_LIST_PATH,
889 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
891 // Create 1st Tx - will timeout
893 String transactionID1 = "tx1";
894 MutableCompositeModification modification1 = new MutableCompositeModification();
895 DOMStoreThreePhaseCommitCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
896 YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
897 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
898 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1),
903 String transactionID2 = "tx3";
904 MutableCompositeModification modification2 = new MutableCompositeModification();
905 YangInstanceIdentifier listNodePath = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
906 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build();
907 DOMStoreThreePhaseCommitCohort cohort2 = setupMockWriteTransaction("cohort3", dataStore,
909 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2),
914 shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
915 cohort1, modification1, true), getRef());
916 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
918 shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
919 cohort2, modification2, true), getRef());
920 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
922 // canCommit 1st Tx. We don't send the commit so it should timeout.
924 shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
925 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
927 // canCommit the 2nd Tx - it should complete after the 1st Tx times out.
929 shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
930 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
932 // Commit the 2nd Tx.
934 shard.tell(new CommitTransaction(transactionID2).toSerializable(), getRef());
935 expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
937 NormalizedNode<?, ?> node = readStore(shard, listNodePath);
938 assertNotNull(listNodePath + " not found", node);
940 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
945 public void testTransactionCommitQueueCapacityExceeded() throws Throwable {
946 dataStoreContextBuilder.shardTransactionCommitQueueCapacity(1);
948 new ShardTestKit(getSystem()) {{
949 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
950 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
951 "testTransactionCommitQueueCapacityExceeded");
953 waitUntilLeader(shard);
955 final FiniteDuration duration = duration("5 seconds");
957 InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
959 String transactionID1 = "tx1";
960 MutableCompositeModification modification1 = new MutableCompositeModification();
961 DOMStoreThreePhaseCommitCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
962 TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
964 String transactionID2 = "tx2";
965 MutableCompositeModification modification2 = new MutableCompositeModification();
966 DOMStoreThreePhaseCommitCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
967 TestModel.OUTER_LIST_PATH,
968 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
971 String transactionID3 = "tx3";
972 MutableCompositeModification modification3 = new MutableCompositeModification();
973 DOMStoreThreePhaseCommitCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
974 TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification3);
978 shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
979 cohort1, modification1, true), getRef());
980 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
982 shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
983 cohort2, modification2, true), getRef());
984 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
986 shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
987 cohort3, modification3, true), getRef());
988 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
992 shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
993 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
995 // canCommit the 2nd Tx - it should get queued.
997 shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
999 // canCommit the 3rd Tx - should exceed queue capacity and fail.
1001 shard.tell(new CanCommitTransaction(transactionID3).toSerializable(), getRef());
1002 expectMsgClass(duration, akka.actor.Status.Failure.class);
1004 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1009 public void testCanCommitBeforeReadyFailure() throws Throwable {
1010 new ShardTestKit(getSystem()) {{
1011 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1012 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1013 "testCanCommitBeforeReadyFailure");
1015 shard.tell(new CanCommitTransaction("tx").toSerializable(), getRef());
1016 expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
1018 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1023 public void testAbortTransaction() throws Throwable {
1024 new ShardTestKit(getSystem()) {{
1025 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1026 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1027 "testAbortTransaction");
1029 waitUntilLeader(shard);
1031 // Setup 2 simulated transactions with mock cohorts. The first one will be aborted.
1033 String transactionID1 = "tx1";
1034 MutableCompositeModification modification1 = new MutableCompositeModification();
1035 DOMStoreThreePhaseCommitCohort cohort1 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
1036 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
1037 doReturn(Futures.immediateFuture(null)).when(cohort1).abort();
1039 String transactionID2 = "tx2";
1040 MutableCompositeModification modification2 = new MutableCompositeModification();
1041 DOMStoreThreePhaseCommitCohort cohort2 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort2");
1042 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
1044 FiniteDuration duration = duration("5 seconds");
1045 final Timeout timeout = new Timeout(duration);
1047 // Simulate the ForwardedReadyTransaction messages that would be sent
1048 // by the ShardTransaction.
1050 shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1051 cohort1, modification1, true), getRef());
1052 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1054 shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1055 cohort2, modification2, true), getRef());
1056 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1058 // Send the CanCommitTransaction message for the first Tx.
1060 shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1061 CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1062 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1063 assertEquals("Can commit", true, canCommitReply.getCanCommit());
1065 // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
1066 // processed after the first Tx completes.
1068 Future<Object> canCommitFuture = Patterns.ask(shard,
1069 new CanCommitTransaction(transactionID2).toSerializable(), timeout);
1071 // Send the AbortTransaction message for the first Tx. This should trigger the 2nd
1074 shard.tell(new AbortTransaction(transactionID1).toSerializable(), getRef());
1075 expectMsgClass(duration, AbortTransactionReply.SERIALIZABLE_CLASS);
1077 // Wait for the 2nd Tx to complete the canCommit phase.
1079 Await.ready(canCommitFuture, duration);
1081 InOrder inOrder = inOrder(cohort1, cohort2);
1082 inOrder.verify(cohort1).canCommit();
1083 inOrder.verify(cohort2).canCommit();
1085 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1090 public void testCreateSnapshot() throws Exception {
1091 testCreateSnapshot(true, "testCreateSnapshot");
1095 public void testCreateSnapshotWithNonPersistentData() throws Exception {
1096 testCreateSnapshot(false, "testCreateSnapshotWithNonPersistentData");
1099 @SuppressWarnings("serial")
1100 public void testCreateSnapshot(final boolean persistent, final String shardActorName) throws Exception{
1102 final AtomicReference<Object> savedSnapshot = new AtomicReference<>();
1103 class DelegatingPersistentDataProvider implements DataPersistenceProvider {
1104 DataPersistenceProvider delegate;
1106 DelegatingPersistentDataProvider(DataPersistenceProvider delegate) {
1107 this.delegate = delegate;
1111 public boolean isRecoveryApplicable() {
1112 return delegate.isRecoveryApplicable();
1116 public <T> void persist(T o, Procedure<T> procedure) {
1117 delegate.persist(o, procedure);
1121 public void saveSnapshot(Object o) {
1122 savedSnapshot.set(o);
1123 delegate.saveSnapshot(o);
1127 public void deleteSnapshots(SnapshotSelectionCriteria criteria) {
1128 delegate.deleteSnapshots(criteria);
1132 public void deleteMessages(long sequenceNumber) {
1133 delegate.deleteMessages(sequenceNumber);
1137 dataStoreContextBuilder.persistent(persistent);
1139 new ShardTestKit(getSystem()) {{
1140 final AtomicReference<CountDownLatch> latch = new AtomicReference<>(new CountDownLatch(1));
1141 Creator<Shard> creator = new Creator<Shard>() {
1143 public Shard create() throws Exception {
1144 return new Shard(shardID, Collections.<ShardIdentifier,String>emptyMap(),
1145 newDatastoreContext(), SCHEMA_CONTEXT) {
1147 DelegatingPersistentDataProvider delegating;
1150 protected DataPersistenceProvider persistence() {
1151 if(delegating == null) {
1152 delegating = new DelegatingPersistentDataProvider(super.persistence());
1159 protected void commitSnapshot(final long sequenceNumber) {
1160 super.commitSnapshot(sequenceNumber);
1161 latch.get().countDown();
1167 TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1168 Props.create(new DelegatingShardCreator(creator)), shardActorName);
1170 waitUntilLeader(shard);
1172 writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1174 NormalizedNode<?,?> expectedRoot = readStore(shard, YangInstanceIdentifier.builder().build());
1176 CaptureSnapshot capture = new CaptureSnapshot(-1, -1, -1, -1, -1, -1);
1177 shard.tell(capture, getRef());
1179 assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS));
1181 assertTrue("Invalid saved snapshot " + savedSnapshot.get(),
1182 savedSnapshot.get() instanceof Snapshot);
1184 verifySnapshot((Snapshot)savedSnapshot.get(), expectedRoot);
1186 latch.set(new CountDownLatch(1));
1187 savedSnapshot.set(null);
1189 shard.tell(capture, getRef());
1191 assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS));
1193 assertTrue("Invalid saved snapshot " + savedSnapshot.get(),
1194 savedSnapshot.get() instanceof Snapshot);
1196 verifySnapshot((Snapshot)savedSnapshot.get(), expectedRoot);
1198 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1201 private void verifySnapshot(Snapshot snapshot, NormalizedNode<?,?> expectedRoot) {
1203 NormalizedNode<?, ?> actual = SerializationUtils.deserializeNormalizedNode(snapshot.getState());
1204 assertEquals("Root node", expectedRoot, actual);
1210 * This test simply verifies that the applySnapShot logic will work
1211 * @throws ReadFailedException
1214 public void testInMemoryDataStoreRestore() throws ReadFailedException {
1215 InMemoryDOMDataStore store = new InMemoryDOMDataStore("test", MoreExecutors.sameThreadExecutor());
1217 store.onGlobalContextUpdated(SCHEMA_CONTEXT);
1219 DOMStoreWriteTransaction putTransaction = store.newWriteOnlyTransaction();
1220 putTransaction.write(TestModel.TEST_PATH,
1221 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1222 commitTransaction(putTransaction);
1225 NormalizedNode<?, ?> expected = readStore(store);
1227 DOMStoreWriteTransaction writeTransaction = store.newWriteOnlyTransaction();
1229 writeTransaction.delete(YangInstanceIdentifier.builder().build());
1230 writeTransaction.write(YangInstanceIdentifier.builder().build(), expected);
1232 commitTransaction(writeTransaction);
1234 NormalizedNode<?, ?> actual = readStore(store);
1236 assertEquals(expected, actual);
1240 public void testRecoveryApplicable(){
1242 final DatastoreContext persistentContext = DatastoreContext.newBuilder().
1243 shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(true).build();
1245 final Props persistentProps = Shard.props(shardID, Collections.<ShardIdentifier, String>emptyMap(),
1246 persistentContext, SCHEMA_CONTEXT);
1248 final DatastoreContext nonPersistentContext = DatastoreContext.newBuilder().
1249 shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(false).build();
1251 final Props nonPersistentProps = Shard.props(shardID, Collections.<ShardIdentifier, String>emptyMap(),
1252 nonPersistentContext, SCHEMA_CONTEXT);
1254 new ShardTestKit(getSystem()) {{
1255 TestActorRef<Shard> shard1 = TestActorRef.create(getSystem(),
1256 persistentProps, "testPersistence1");
1258 assertTrue("Recovery Applicable", shard1.underlyingActor().getDataPersistenceProvider().isRecoveryApplicable());
1260 shard1.tell(PoisonPill.getInstance(), ActorRef.noSender());
1262 TestActorRef<Shard> shard2 = TestActorRef.create(getSystem(),
1263 nonPersistentProps, "testPersistence2");
1265 assertFalse("Recovery Not Applicable", shard2.underlyingActor().getDataPersistenceProvider().isRecoveryApplicable());
1267 shard2.tell(PoisonPill.getInstance(), ActorRef.noSender());
1273 public void testOnDatastoreContext() {
1274 new ShardTestKit(getSystem()) {{
1275 dataStoreContextBuilder.persistent(true);
1277 TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(), "testOnDatastoreContext");
1279 assertEquals("isRecoveryApplicable", true,
1280 shard.underlyingActor().getDataPersistenceProvider().isRecoveryApplicable());
1282 waitUntilLeader(shard);
1284 shard.tell(dataStoreContextBuilder.persistent(false).build(), ActorRef.noSender());
1286 assertEquals("isRecoveryApplicable", false,
1287 shard.underlyingActor().getDataPersistenceProvider().isRecoveryApplicable());
1289 shard.tell(dataStoreContextBuilder.persistent(true).build(), ActorRef.noSender());
1291 assertEquals("isRecoveryApplicable", true,
1292 shard.underlyingActor().getDataPersistenceProvider().isRecoveryApplicable());
1294 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1299 public void testRegisterRoleChangeListener() throws Exception {
1300 new ShardTestKit(getSystem()) {
1302 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1303 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1304 "testRegisterRoleChangeListener");
1306 waitUntilLeader(shard);
1308 TestActorRef<MessageCollectorActor> listener =
1309 TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
1311 shard.tell(new RegisterRoleChangeListener(), listener);
1313 // TODO: MessageCollectorActor exists as a test util in both the akka-raft and distributed-datastore
1314 // projects. Need to move it to commons as a regular utility and then we can get rid of this arbitrary
1316 Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
1318 List<Object> allMatching = MessageCollectorActor.getAllMatching(listener, RegisterRoleChangeListenerReply.class);
1320 assertEquals(1, allMatching.size());
1326 public void testFollowerInitialSyncStatus() throws Exception {
1327 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1328 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1329 "testFollowerInitialSyncStatus");
1331 shard.underlyingActor().onReceiveCommand(new FollowerInitialSyncUpStatus(false, "member-1-shard-inventory-operational"));
1333 assertEquals(false, shard.underlyingActor().getShardMBean().getFollowerInitialSyncStatus());
1335 shard.underlyingActor().onReceiveCommand(new FollowerInitialSyncUpStatus(true, "member-1-shard-inventory-operational"));
1337 assertEquals(true, shard.underlyingActor().getShardMBean().getFollowerInitialSyncStatus());
1339 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1342 private void commitTransaction(final DOMStoreWriteTransaction transaction) {
1343 DOMStoreThreePhaseCommitCohort commitCohort = transaction.ready();
1344 ListenableFuture<Void> future =
1345 commitCohort.preCommit();
1348 future = commitCohort.commit();
1350 } catch (InterruptedException | ExecutionException e) {