1 package org.opendaylight.controller.cluster.datastore;
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertFalse;
5 import static org.junit.Assert.assertNotNull;
6 import static org.junit.Assert.assertNull;
7 import static org.junit.Assert.assertTrue;
8 import static org.junit.Assert.fail;
9 import static org.mockito.Mockito.doAnswer;
10 import static org.mockito.Mockito.doReturn;
11 import static org.mockito.Mockito.inOrder;
12 import static org.mockito.Mockito.mock;
13 import static org.opendaylight.controller.cluster.datastore.messages.CreateTransaction.CURRENT_CLIENT_VERSION;
14 import java.io.IOException;
15 import java.util.Collections;
16 import java.util.HashSet;
19 import java.util.concurrent.CountDownLatch;
20 import java.util.concurrent.ExecutionException;
21 import java.util.concurrent.TimeUnit;
22 import java.util.concurrent.atomic.AtomicInteger;
23 import java.util.concurrent.atomic.AtomicReference;
24 import org.junit.After;
25 import org.junit.Before;
26 import org.junit.Test;
27 import org.mockito.InOrder;
28 import org.mockito.invocation.InvocationOnMock;
29 import org.mockito.stubbing.Answer;
30 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
31 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
32 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
33 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
34 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
35 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
36 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
37 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
38 import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
39 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
40 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
41 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
42 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
43 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
44 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
45 import org.opendaylight.controller.cluster.datastore.modification.Modification;
46 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
47 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
48 import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec;
49 import org.opendaylight.controller.cluster.datastore.utils.InMemoryJournal;
50 import org.opendaylight.controller.cluster.datastore.utils.InMemorySnapshotStore;
51 import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener;
52 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
53 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
54 import org.opendaylight.controller.cluster.raft.Snapshot;
55 import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
56 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
57 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
58 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
59 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
60 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
61 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
62 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload;
63 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
64 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
65 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
66 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
67 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
68 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
69 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
70 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
71 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreFactory;
72 import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages;
73 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
74 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
75 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
76 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
77 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
78 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
79 import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
80 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
81 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
82 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
83 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
84 import scala.concurrent.Await;
85 import scala.concurrent.Future;
86 import scala.concurrent.duration.FiniteDuration;
87 import akka.actor.ActorRef;
88 import akka.actor.PoisonPill;
89 import akka.actor.Props;
90 import akka.dispatch.Dispatchers;
91 import akka.dispatch.OnComplete;
92 import akka.japi.Creator;
93 import akka.pattern.Patterns;
94 import akka.testkit.TestActorRef;
95 import akka.util.Timeout;
96 import com.google.common.base.Function;
97 import com.google.common.base.Optional;
98 import com.google.common.util.concurrent.CheckedFuture;
99 import com.google.common.util.concurrent.Futures;
100 import com.google.common.util.concurrent.ListenableFuture;
101 import com.google.common.util.concurrent.MoreExecutors;
102 import com.google.common.util.concurrent.Uninterruptibles;
105 public class ShardTest extends AbstractActorTest {
107 private static final SchemaContext SCHEMA_CONTEXT = TestModel.createTestContext();
109 private static final AtomicInteger NEXT_SHARD_NUM = new AtomicInteger();
111 private final ShardIdentifier shardID = ShardIdentifier.builder().memberName("member-1")
112 .shardName("inventory").type("config" + NEXT_SHARD_NUM.getAndIncrement()).build();
114 private DatastoreContext dataStoreContext = DatastoreContext.newBuilder().
115 shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).
116 shardHeartbeatIntervalInMillis(100).build();
119 public void setUp() {
120 InMemorySnapshotStore.clear();
121 InMemoryJournal.clear();
125 public void tearDown() {
126 InMemorySnapshotStore.clear();
127 InMemoryJournal.clear();
130 private Props newShardProps() {
131 return Shard.props(shardID, Collections.<ShardIdentifier,String>emptyMap(),
132 dataStoreContext, SCHEMA_CONTEXT);
136 public void testRegisterChangeListener() throws Exception {
137 new ShardTestKit(getSystem()) {{
138 TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
139 newShardProps(), "testRegisterChangeListener");
141 waitUntilLeader(shard);
143 shard.tell(new UpdateSchemaContext(SchemaContextHelper.full()), ActorRef.noSender());
145 MockDataChangeListener listener = new MockDataChangeListener(1);
146 ActorRef dclActor = getSystem().actorOf(DataChangeListener.props(listener),
147 "testRegisterChangeListener-DataChangeListener");
149 shard.tell(new RegisterChangeListener(TestModel.TEST_PATH,
150 dclActor.path(), AsyncDataBroker.DataChangeScope.BASE), getRef());
152 RegisterChangeListenerReply reply = expectMsgClass(duration("3 seconds"),
153 RegisterChangeListenerReply.class);
154 String replyPath = reply.getListenerRegistrationPath().toString();
155 assertTrue("Incorrect reply path: " + replyPath, replyPath.matches(
156 "akka:\\/\\/test\\/user\\/testRegisterChangeListener\\/\\$.*"));
158 YangInstanceIdentifier path = TestModel.TEST_PATH;
159 writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
161 listener.waitForChangeEvents(path);
163 dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
164 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
168 @SuppressWarnings("serial")
170 public void testChangeListenerNotifiedWhenNotTheLeaderOnRegistration() throws Exception {
171 // This test tests the timing window in which a change listener is registered before the
172 // shard becomes the leader. We verify that the listener is registered and notified of the
173 // existing data when the shard becomes the leader.
174 new ShardTestKit(getSystem()) {{
175 // For this test, we want to send the RegisterChangeListener message after the shard
176 // has recovered from persistence and before it becomes the leader. So we subclass
177 // Shard to override onReceiveCommand and, when the first ElectionTimeout is received,
178 // we know that the shard has been initialized to a follower and has started the
179 // election process. The following 2 CountDownLatches are used to coordinate the
180 // ElectionTimeout with the sending of the RegisterChangeListener message.
181 final CountDownLatch onFirstElectionTimeout = new CountDownLatch(1);
182 final CountDownLatch onChangeListenerRegistered = new CountDownLatch(1);
183 Creator<Shard> creator = new Creator<Shard>() {
184 boolean firstElectionTimeout = true;
187 public Shard create() throws Exception {
188 return new Shard(shardID, Collections.<ShardIdentifier,String>emptyMap(),
189 dataStoreContext, SCHEMA_CONTEXT) {
191 public void onReceiveCommand(final Object message) throws Exception {
192 if(message instanceof ElectionTimeout && firstElectionTimeout) {
193 // Got the first ElectionTimeout. We don't forward it to the
194 // base Shard yet until we've sent the RegisterChangeListener
195 // message. So we signal the onFirstElectionTimeout latch to tell
196 // the main thread to send the RegisterChangeListener message and
197 // start a thread to wait on the onChangeListenerRegistered latch,
198 // which the main thread signals after it has sent the message.
199 // After the onChangeListenerRegistered is triggered, we send the
200 // original ElectionTimeout message to proceed with the election.
201 firstElectionTimeout = false;
202 final ActorRef self = getSelf();
206 Uninterruptibles.awaitUninterruptibly(
207 onChangeListenerRegistered, 5, TimeUnit.SECONDS);
208 self.tell(message, self);
212 onFirstElectionTimeout.countDown();
214 super.onReceiveCommand(message);
221 MockDataChangeListener listener = new MockDataChangeListener(1);
222 ActorRef dclActor = getSystem().actorOf(DataChangeListener.props(listener),
223 "testRegisterChangeListenerWhenNotLeaderInitially-DataChangeListener");
225 TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
226 Props.create(new DelegatingShardCreator(creator)),
227 "testRegisterChangeListenerWhenNotLeaderInitially");
229 // Write initial data into the in-memory store.
230 YangInstanceIdentifier path = TestModel.TEST_PATH;
231 writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
233 // Wait until the shard receives the first ElectionTimeout message.
234 assertEquals("Got first ElectionTimeout", true,
235 onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
237 // Now send the RegisterChangeListener and wait for the reply.
238 shard.tell(new RegisterChangeListener(path, dclActor.path(),
239 AsyncDataBroker.DataChangeScope.SUBTREE), getRef());
241 RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
242 RegisterChangeListenerReply.class);
243 assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
245 // Sanity check - verify the shard is not the leader yet.
246 shard.tell(new FindLeader(), getRef());
247 FindLeaderReply findLeadeReply =
248 expectMsgClass(duration("5 seconds"), FindLeaderReply.class);
249 assertNull("Expected the shard not to be the leader", findLeadeReply.getLeaderActor());
251 // Signal the onChangeListenerRegistered latch to tell the thread above to proceed
252 // with the election process.
253 onChangeListenerRegistered.countDown();
255 // Wait for the shard to become the leader and notify our listener with the existing
256 // data in the store.
257 listener.waitForChangeEvents(path);
259 dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
260 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
265 public void testCreateTransaction(){
266 new ShardTestKit(getSystem()) {{
267 ActorRef shard = getSystem().actorOf(newShardProps(), "testCreateTransaction");
269 waitUntilLeader(shard);
271 shard.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
273 shard.tell(new CreateTransaction("txn-1",
274 TransactionProxy.TransactionType.READ_ONLY.ordinal() ).toSerializable(), getRef());
276 CreateTransactionReply reply = expectMsgClass(duration("3 seconds"),
277 CreateTransactionReply.class);
279 String path = reply.getTransactionActorPath().toString();
280 assertTrue("Unexpected transaction path " + path,
281 path.contains("akka://test/user/testCreateTransaction/shard-txn-1"));
283 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
288 public void testCreateTransactionOnChain(){
289 new ShardTestKit(getSystem()) {{
290 final ActorRef shard = getSystem().actorOf(newShardProps(), "testCreateTransactionOnChain");
292 waitUntilLeader(shard);
294 shard.tell(new CreateTransaction("txn-1",
295 TransactionProxy.TransactionType.READ_ONLY.ordinal() , "foobar").toSerializable(),
298 CreateTransactionReply reply = expectMsgClass(duration("3 seconds"),
299 CreateTransactionReply.class);
301 String path = reply.getTransactionActorPath().toString();
302 assertTrue("Unexpected transaction path " + path,
303 path.contains("akka://test/user/testCreateTransactionOnChain/shard-txn-1"));
305 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
310 public void testPeerAddressResolved() throws Exception {
311 new ShardTestKit(getSystem()) {{
312 final CountDownLatch recoveryComplete = new CountDownLatch(1);
313 class TestShard extends Shard {
315 super(shardID, Collections.<ShardIdentifier, String>singletonMap(shardID, null),
316 dataStoreContext, SCHEMA_CONTEXT);
319 Map<String, String> getPeerAddresses() {
320 return getRaftActorContext().getPeerAddresses();
324 protected void onRecoveryComplete() {
326 super.onRecoveryComplete();
328 recoveryComplete.countDown();
333 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
334 Props.create(new DelegatingShardCreator(new Creator<Shard>() {
336 public TestShard create() throws Exception {
337 return new TestShard();
339 })), "testPeerAddressResolved");
341 //waitUntilLeader(shard);
342 assertEquals("Recovery complete", true,
343 Uninterruptibles.awaitUninterruptibly(recoveryComplete, 5, TimeUnit.SECONDS));
345 String address = "akka://foobar";
346 shard.underlyingActor().onReceiveCommand(new PeerAddressResolved(shardID, address));
348 assertEquals("getPeerAddresses", address,
349 ((TestShard)shard.underlyingActor()).getPeerAddresses().get(shardID.toString()));
351 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
356 public void testApplySnapshot() throws Exception {
357 TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(),
358 "testApplySnapshot");
360 NormalizedNodeToNodeCodec codec =
361 new NormalizedNodeToNodeCodec(SCHEMA_CONTEXT);
363 writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
365 YangInstanceIdentifier root = YangInstanceIdentifier.builder().build();
366 NormalizedNode<?,?> expected = readStore(shard, root);
368 NormalizedNodeMessages.Container encode = codec.encode(expected);
370 ApplySnapshot applySnapshot = new ApplySnapshot(Snapshot.create(
371 encode.getNormalizedNode().toByteString().toByteArray(),
372 Collections.<ReplicatedLogEntry>emptyList(), 1, 2, 3, 4));
374 shard.underlyingActor().onReceiveCommand(applySnapshot);
376 NormalizedNode<?,?> actual = readStore(shard, root);
378 assertEquals(expected, actual);
380 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
384 public void testApplyState() throws Exception {
386 TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(), "testApplyState");
388 NormalizedNode<?, ?> node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
390 MutableCompositeModification compMod = new MutableCompositeModification();
391 compMod.addModification(new WriteModification(TestModel.TEST_PATH, node, SCHEMA_CONTEXT));
392 Payload payload = new CompositeModificationPayload(compMod.toSerializable());
393 ApplyState applyState = new ApplyState(null, "test",
394 new ReplicatedLogImplEntry(1, 2, payload));
396 shard.underlyingActor().onReceiveCommand(applyState);
398 NormalizedNode<?,?> actual = readStore(shard, TestModel.TEST_PATH);
399 assertEquals("Applied state", node, actual);
401 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
404 @SuppressWarnings("serial")
406 public void testRecovery() throws Exception {
408 // Set up the InMemorySnapshotStore.
410 InMemoryDOMDataStore testStore = InMemoryDOMDataStoreFactory.create("Test", null, null);
411 testStore.onGlobalContextUpdated(SCHEMA_CONTEXT);
413 DOMStoreWriteTransaction writeTx = testStore.newWriteOnlyTransaction();
414 writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
415 DOMStoreThreePhaseCommitCohort commitCohort = writeTx.ready();
416 commitCohort.preCommit().get();
417 commitCohort.commit().get();
419 DOMStoreReadTransaction readTx = testStore.newReadOnlyTransaction();
420 NormalizedNode<?, ?> root = readTx.read(YangInstanceIdentifier.builder().build()).get().get();
422 InMemorySnapshotStore.addSnapshot(shardID.toString(), Snapshot.create(
423 new NormalizedNodeToNodeCodec(SCHEMA_CONTEXT).encode(
425 getNormalizedNode().toByteString().toByteArray(),
426 Collections.<ReplicatedLogEntry>emptyList(), 0, 1, -1, -1));
428 // Set up the InMemoryJournal.
430 InMemoryJournal.addEntry(shardID.toString(), 0, new ReplicatedLogImplEntry(0, 1, newPayload(
431 new WriteModification(TestModel.OUTER_LIST_PATH,
432 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
435 int nListEntries = 11;
436 Set<Integer> listEntryKeys = new HashSet<>();
437 for(int i = 1; i <= nListEntries; i++) {
438 listEntryKeys.add(Integer.valueOf(i));
439 YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
440 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
441 Modification mod = new MergeModification(path,
442 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i),
444 InMemoryJournal.addEntry(shardID.toString(), i, new ReplicatedLogImplEntry(i, 1,
448 InMemoryJournal.addEntry(shardID.toString(), nListEntries + 1,
449 new ApplyLogEntries(nListEntries));
451 // Create the actor and wait for recovery complete.
453 final CountDownLatch recoveryComplete = new CountDownLatch(1);
455 Creator<Shard> creator = new Creator<Shard>() {
457 public Shard create() throws Exception {
458 return new Shard(shardID, Collections.<ShardIdentifier,String>emptyMap(),
459 dataStoreContext, SCHEMA_CONTEXT) {
461 protected void onRecoveryComplete() {
463 super.onRecoveryComplete();
465 recoveryComplete.countDown();
472 TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
473 Props.create(new DelegatingShardCreator(creator)), "testRecovery");
475 assertEquals("Recovery complete", true, recoveryComplete.await(5, TimeUnit.SECONDS));
477 // Verify data in the data store.
479 NormalizedNode<?, ?> outerList = readStore(shard, TestModel.OUTER_LIST_PATH);
480 assertNotNull(TestModel.OUTER_LIST_QNAME.getLocalName() + " not found", outerList);
481 assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " value is not Iterable",
482 outerList.getValue() instanceof Iterable);
483 for(Object entry: (Iterable<?>) outerList.getValue()) {
484 assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " entry is not MapEntryNode",
485 entry instanceof MapEntryNode);
486 MapEntryNode mapEntry = (MapEntryNode)entry;
487 Optional<DataContainerChild<? extends PathArgument, ?>> idLeaf =
488 mapEntry.getChild(new YangInstanceIdentifier.NodeIdentifier(TestModel.ID_QNAME));
489 assertTrue("Missing leaf " + TestModel.ID_QNAME.getLocalName(), idLeaf.isPresent());
490 Object value = idLeaf.get().getValue();
491 assertTrue("Unexpected value for leaf "+ TestModel.ID_QNAME.getLocalName() + ": " + value,
492 listEntryKeys.remove(value));
495 if(!listEntryKeys.isEmpty()) {
496 fail("Missing " + TestModel.OUTER_LIST_QNAME.getLocalName() + " entries with keys: " +
500 assertEquals("Last log index", nListEntries,
501 shard.underlyingActor().getShardMBean().getLastLogIndex());
502 assertEquals("Commit index", nListEntries,
503 shard.underlyingActor().getShardMBean().getCommitIndex());
504 assertEquals("Last applied", nListEntries,
505 shard.underlyingActor().getShardMBean().getLastApplied());
507 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
510 private CompositeModificationPayload newPayload(Modification... mods) {
511 MutableCompositeModification compMod = new MutableCompositeModification();
512 for(Modification mod: mods) {
513 compMod.addModification(mod);
516 return new CompositeModificationPayload(compMod.toSerializable());
519 private DOMStoreThreePhaseCommitCohort setupMockWriteTransaction(String cohortName,
520 InMemoryDOMDataStore dataStore, YangInstanceIdentifier path, NormalizedNode data,
521 MutableCompositeModification modification) {
522 return setupMockWriteTransaction(cohortName, dataStore, path, data, modification, null);
525 private DOMStoreThreePhaseCommitCohort setupMockWriteTransaction(String cohortName,
526 InMemoryDOMDataStore dataStore, YangInstanceIdentifier path, NormalizedNode data,
527 MutableCompositeModification modification,
528 final Function<DOMStoreThreePhaseCommitCohort,ListenableFuture<Void>> preCommit) {
530 DOMStoreWriteTransaction tx = dataStore.newWriteOnlyTransaction();
531 tx.write(path, data);
532 final DOMStoreThreePhaseCommitCohort realCohort = tx.ready();
533 DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, cohortName);
535 doAnswer(new Answer<ListenableFuture<Boolean>>() {
537 public ListenableFuture<Boolean> answer(InvocationOnMock invocation) {
538 return realCohort.canCommit();
540 }).when(cohort).canCommit();
542 doAnswer(new Answer<ListenableFuture<Void>>() {
544 public ListenableFuture<Void> answer(InvocationOnMock invocation) throws Throwable {
545 if(preCommit != null) {
546 return preCommit.apply(realCohort);
548 return realCohort.preCommit();
551 }).when(cohort).preCommit();
553 doAnswer(new Answer<ListenableFuture<Void>>() {
555 public ListenableFuture<Void> answer(InvocationOnMock invocation) throws Throwable {
556 return realCohort.commit();
558 }).when(cohort).commit();
560 doAnswer(new Answer<ListenableFuture<Void>>() {
562 public ListenableFuture<Void> answer(InvocationOnMock invocation) throws Throwable {
563 return realCohort.abort();
565 }).when(cohort).abort();
567 modification.addModification(new WriteModification(path, data, SCHEMA_CONTEXT));
572 @SuppressWarnings({ "unchecked" })
574 public void testConcurrentThreePhaseCommits() throws Throwable {
575 new ShardTestKit(getSystem()) {{
576 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
577 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
578 "testConcurrentThreePhaseCommits");
580 waitUntilLeader(shard);
582 // Setup 3 simulated transactions with mock cohorts backed by real cohorts.
584 InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
586 String transactionID1 = "tx1";
587 MutableCompositeModification modification1 = new MutableCompositeModification();
588 DOMStoreThreePhaseCommitCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
589 TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
591 String transactionID2 = "tx2";
592 MutableCompositeModification modification2 = new MutableCompositeModification();
593 DOMStoreThreePhaseCommitCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
594 TestModel.OUTER_LIST_PATH,
595 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
598 String transactionID3 = "tx3";
599 MutableCompositeModification modification3 = new MutableCompositeModification();
600 DOMStoreThreePhaseCommitCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
601 YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
602 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
603 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1),
607 final FiniteDuration duration = FiniteDuration.create(timeoutSec, TimeUnit.SECONDS);
608 final Timeout timeout = new Timeout(duration);
610 // Simulate the ForwardedReadyTransaction message for the first Tx that would be sent
611 // by the ShardTransaction.
613 shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_CLIENT_VERSION,
614 cohort1, modification1, true), getRef());
615 ReadyTransactionReply readyReply = ReadyTransactionReply.fromSerializable(
616 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS));
617 assertEquals("Cohort path", shard.path().toString(), readyReply.getCohortPath());
619 // Send the CanCommitTransaction message for the first Tx.
621 shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
622 CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
623 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
624 assertEquals("Can commit", true, canCommitReply.getCanCommit());
626 // Send the ForwardedReadyTransaction for the next 2 Tx's.
628 shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_CLIENT_VERSION,
629 cohort2, modification2, true), getRef());
630 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
632 shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_CLIENT_VERSION,
633 cohort3, modification3, true), getRef());
634 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
636 // Send the CanCommitTransaction message for the next 2 Tx's. These should get queued and
637 // processed after the first Tx completes.
639 Future<Object> canCommitFuture1 = Patterns.ask(shard,
640 new CanCommitTransaction(transactionID2).toSerializable(), timeout);
642 Future<Object> canCommitFuture2 = Patterns.ask(shard,
643 new CanCommitTransaction(transactionID3).toSerializable(), timeout);
645 // Send the CommitTransaction message for the first Tx. After it completes, it should
646 // trigger the 2nd Tx to proceed which should in turn then trigger the 3rd.
648 shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
649 expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
651 // Wait for the next 2 Tx's to complete.
653 final AtomicReference<Throwable> caughtEx = new AtomicReference<>();
654 final CountDownLatch commitLatch = new CountDownLatch(2);
656 class OnFutureComplete extends OnComplete<Object> {
657 private final Class<?> expRespType;
659 OnFutureComplete(Class<?> expRespType) {
660 this.expRespType = expRespType;
664 public void onComplete(Throwable error, Object resp) {
666 caughtEx.set(new AssertionError(getClass().getSimpleName() + " failure", error));
669 assertEquals("Commit response type", expRespType, resp.getClass());
671 } catch (Exception e) {
677 void onSuccess(Object resp) throws Exception {
681 class OnCommitFutureComplete extends OnFutureComplete {
682 OnCommitFutureComplete() {
683 super(CommitTransactionReply.SERIALIZABLE_CLASS);
687 public void onComplete(Throwable error, Object resp) {
688 super.onComplete(error, resp);
689 commitLatch.countDown();
693 class OnCanCommitFutureComplete extends OnFutureComplete {
694 private final String transactionID;
696 OnCanCommitFutureComplete(String transactionID) {
697 super(CanCommitTransactionReply.SERIALIZABLE_CLASS);
698 this.transactionID = transactionID;
702 void onSuccess(Object resp) throws Exception {
703 CanCommitTransactionReply canCommitReply =
704 CanCommitTransactionReply.fromSerializable(resp);
705 assertEquals("Can commit", true, canCommitReply.getCanCommit());
707 Future<Object> commitFuture = Patterns.ask(shard,
708 new CommitTransaction(transactionID).toSerializable(), timeout);
709 commitFuture.onComplete(new OnCommitFutureComplete(), getSystem().dispatcher());
713 canCommitFuture1.onComplete(new OnCanCommitFutureComplete(transactionID2),
714 getSystem().dispatcher());
716 canCommitFuture2.onComplete(new OnCanCommitFutureComplete(transactionID3),
717 getSystem().dispatcher());
719 boolean done = commitLatch.await(timeoutSec, TimeUnit.SECONDS);
721 if(caughtEx.get() != null) {
722 throw caughtEx.get();
725 assertEquals("Commits complete", true, done);
727 InOrder inOrder = inOrder(cohort1, cohort2, cohort3);
728 inOrder.verify(cohort1).canCommit();
729 inOrder.verify(cohort1).preCommit();
730 inOrder.verify(cohort1).commit();
731 inOrder.verify(cohort2).canCommit();
732 inOrder.verify(cohort2).preCommit();
733 inOrder.verify(cohort2).commit();
734 inOrder.verify(cohort3).canCommit();
735 inOrder.verify(cohort3).preCommit();
736 inOrder.verify(cohort3).commit();
738 // Verify data in the data store.
740 NormalizedNode<?, ?> outerList = readStore(shard, TestModel.OUTER_LIST_PATH);
741 assertNotNull(TestModel.OUTER_LIST_QNAME.getLocalName() + " not found", outerList);
742 assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " value is not Iterable",
743 outerList.getValue() instanceof Iterable);
744 Object entry = ((Iterable<Object>)outerList.getValue()).iterator().next();
745 assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " entry is not MapEntryNode",
746 entry instanceof MapEntryNode);
747 MapEntryNode mapEntry = (MapEntryNode)entry;
748 Optional<DataContainerChild<? extends PathArgument, ?>> idLeaf =
749 mapEntry.getChild(new YangInstanceIdentifier.NodeIdentifier(TestModel.ID_QNAME));
750 assertTrue("Missing leaf " + TestModel.ID_QNAME.getLocalName(), idLeaf.isPresent());
751 assertEquals(TestModel.ID_QNAME.getLocalName() + " value", 1, idLeaf.get().getValue());
753 for(int i = 0; i < 20 * 5; i++) {
754 long lastLogIndex = shard.underlyingActor().getShardMBean().getLastLogIndex();
755 if(lastLogIndex == 2) {
758 Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
761 assertEquals("Last log index", 2, shard.underlyingActor().getShardMBean().getLastLogIndex());
763 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
768 public void testCommitPhaseFailure() throws Throwable {
769 new ShardTestKit(getSystem()) {{
770 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
771 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
772 "testCommitPhaseFailure");
774 waitUntilLeader(shard);
776 // Setup 2 simulated transactions with mock cohorts. The first one fails in the
779 String transactionID1 = "tx1";
780 MutableCompositeModification modification1 = new MutableCompositeModification();
781 DOMStoreThreePhaseCommitCohort cohort1 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
782 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
783 doReturn(Futures.immediateFuture(null)).when(cohort1).preCommit();
784 doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort1).commit();
786 String transactionID2 = "tx2";
787 MutableCompositeModification modification2 = new MutableCompositeModification();
788 DOMStoreThreePhaseCommitCohort cohort2 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort2");
789 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
791 FiniteDuration duration = duration("5 seconds");
792 final Timeout timeout = new Timeout(duration);
794 // Simulate the ForwardedReadyTransaction messages that would be sent
795 // by the ShardTransaction.
797 shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_CLIENT_VERSION,
798 cohort1, modification1, true), getRef());
799 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
801 shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_CLIENT_VERSION,
802 cohort2, modification2, true), getRef());
803 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
805 // Send the CanCommitTransaction message for the first Tx.
807 shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
808 CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
809 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
810 assertEquals("Can commit", true, canCommitReply.getCanCommit());
812 // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
813 // processed after the first Tx completes.
815 Future<Object> canCommitFuture = Patterns.ask(shard,
816 new CanCommitTransaction(transactionID2).toSerializable(), timeout);
818 // Send the CommitTransaction message for the first Tx. This should send back an error
819 // and trigger the 2nd Tx to proceed.
821 shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
822 expectMsgClass(duration, akka.actor.Status.Failure.class);
824 // Wait for the 2nd Tx to complete the canCommit phase.
826 final CountDownLatch latch = new CountDownLatch(1);
827 canCommitFuture.onComplete(new OnComplete<Object>() {
829 public void onComplete(Throwable t, Object resp) {
832 }, getSystem().dispatcher());
834 assertEquals("2nd CanCommit complete", true, latch.await(5, TimeUnit.SECONDS));
836 InOrder inOrder = inOrder(cohort1, cohort2);
837 inOrder.verify(cohort1).canCommit();
838 inOrder.verify(cohort1).preCommit();
839 inOrder.verify(cohort1).commit();
840 inOrder.verify(cohort2).canCommit();
842 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
847 public void testPreCommitPhaseFailure() throws Throwable {
848 new ShardTestKit(getSystem()) {{
849 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
850 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
851 "testPreCommitPhaseFailure");
853 waitUntilLeader(shard);
855 String transactionID = "tx1";
856 MutableCompositeModification modification = new MutableCompositeModification();
857 DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
858 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
859 doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).preCommit();
861 FiniteDuration duration = duration("5 seconds");
863 // Simulate the ForwardedReadyTransaction messages that would be sent
864 // by the ShardTransaction.
866 shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_CLIENT_VERSION,
867 cohort, modification, true), getRef());
868 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
870 // Send the CanCommitTransaction message.
872 shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
873 CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
874 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
875 assertEquals("Can commit", true, canCommitReply.getCanCommit());
877 // Send the CommitTransaction message. This should send back an error
878 // for preCommit failure.
880 shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
881 expectMsgClass(duration, akka.actor.Status.Failure.class);
883 InOrder inOrder = inOrder(cohort);
884 inOrder.verify(cohort).canCommit();
885 inOrder.verify(cohort).preCommit();
887 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
892 public void testCanCommitPhaseFailure() throws Throwable {
893 new ShardTestKit(getSystem()) {{
894 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
895 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
896 "testCanCommitPhaseFailure");
898 waitUntilLeader(shard);
900 final FiniteDuration duration = duration("5 seconds");
902 String transactionID = "tx1";
903 MutableCompositeModification modification = new MutableCompositeModification();
904 DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
905 doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).canCommit();
907 // Simulate the ForwardedReadyTransaction messages that would be sent
908 // by the ShardTransaction.
910 shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_CLIENT_VERSION,
911 cohort, modification, true), getRef());
912 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
914 // Send the CanCommitTransaction message.
916 shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
917 expectMsgClass(duration, akka.actor.Status.Failure.class);
919 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
924 public void testAbortBeforeFinishCommit() throws Throwable {
925 new ShardTestKit(getSystem()) {{
926 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
927 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
928 "testAbortBeforeFinishCommit");
930 waitUntilLeader(shard);
932 final FiniteDuration duration = duration("5 seconds");
933 final Timeout timeout = new Timeout(duration);
935 InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
937 final String transactionID = "tx1";
938 final CountDownLatch abortComplete = new CountDownLatch(1);
939 Function<DOMStoreThreePhaseCommitCohort,ListenableFuture<Void>> preCommit =
940 new Function<DOMStoreThreePhaseCommitCohort,ListenableFuture<Void>>() {
942 public ListenableFuture<Void> apply(final DOMStoreThreePhaseCommitCohort cohort) {
943 ListenableFuture<Void> preCommitFuture = cohort.preCommit();
945 Future<Object> abortFuture = Patterns.ask(shard,
946 new AbortTransaction(transactionID).toSerializable(), timeout);
947 abortFuture.onComplete(new OnComplete<Object>() {
949 public void onComplete(Throwable e, Object resp) {
950 abortComplete.countDown();
952 }, getSystem().dispatcher());
954 return preCommitFuture;
958 MutableCompositeModification modification = new MutableCompositeModification();
959 DOMStoreThreePhaseCommitCohort cohort = setupMockWriteTransaction("cohort1", dataStore,
960 TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME),
961 modification, preCommit);
963 shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_CLIENT_VERSION,
964 cohort, modification, true), getRef());
965 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
967 shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
968 CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
969 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
970 assertEquals("Can commit", true, canCommitReply.getCanCommit());
972 Future<Object> commitFuture = Patterns.ask(shard,
973 new CommitTransaction(transactionID).toSerializable(), timeout);
975 assertEquals("Abort complete", true, abortComplete.await(5, TimeUnit.SECONDS));
977 Await.result(commitFuture, duration);
979 NormalizedNode<?, ?> node = readStore(shard, TestModel.TEST_PATH);
980 assertNotNull(TestModel.TEST_QNAME.getLocalName() + " not found", node);
982 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
987 public void testTransactionCommitTimeout() throws Throwable {
988 dataStoreContext = DatastoreContext.newBuilder().shardTransactionCommitTimeoutInSeconds(1).build();
990 new ShardTestKit(getSystem()) {{
991 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
992 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
993 "testTransactionCommitTimeout");
995 waitUntilLeader(shard);
997 final FiniteDuration duration = duration("5 seconds");
999 InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
1001 writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1002 writeToStore(shard, TestModel.OUTER_LIST_PATH,
1003 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
1005 // Create 1st Tx - will timeout
1007 String transactionID1 = "tx1";
1008 MutableCompositeModification modification1 = new MutableCompositeModification();
1009 DOMStoreThreePhaseCommitCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
1010 YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
1011 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
1012 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1),
1017 String transactionID2 = "tx3";
1018 MutableCompositeModification modification2 = new MutableCompositeModification();
1019 YangInstanceIdentifier listNodePath = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
1020 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build();
1021 DOMStoreThreePhaseCommitCohort cohort2 = setupMockWriteTransaction("cohort3", dataStore,
1023 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2),
1028 shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_CLIENT_VERSION,
1029 cohort1, modification1, true), getRef());
1030 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1032 shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_CLIENT_VERSION,
1033 cohort2, modification2, true), getRef());
1034 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1036 // canCommit 1st Tx. We don't send the commit so it should timeout.
1038 shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1039 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
1041 // canCommit the 2nd Tx - it should complete after the 1st Tx times out.
1043 shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
1044 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
1046 // Commit the 2nd Tx.
1048 shard.tell(new CommitTransaction(transactionID2).toSerializable(), getRef());
1049 expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1051 NormalizedNode<?, ?> node = readStore(shard, listNodePath);
1052 assertNotNull(listNodePath + " not found", node);
1054 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1059 public void testTransactionCommitQueueCapacityExceeded() throws Throwable {
1060 dataStoreContext = DatastoreContext.newBuilder().shardTransactionCommitQueueCapacity(1).build();
1062 new ShardTestKit(getSystem()) {{
1063 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1064 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1065 "testTransactionCommitQueueCapacityExceeded");
1067 waitUntilLeader(shard);
1069 final FiniteDuration duration = duration("5 seconds");
1071 InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
1073 String transactionID1 = "tx1";
1074 MutableCompositeModification modification1 = new MutableCompositeModification();
1075 DOMStoreThreePhaseCommitCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
1076 TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
1078 String transactionID2 = "tx2";
1079 MutableCompositeModification modification2 = new MutableCompositeModification();
1080 DOMStoreThreePhaseCommitCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
1081 TestModel.OUTER_LIST_PATH,
1082 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
1085 String transactionID3 = "tx3";
1086 MutableCompositeModification modification3 = new MutableCompositeModification();
1087 DOMStoreThreePhaseCommitCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
1088 TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification3);
1092 shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_CLIENT_VERSION,
1093 cohort1, modification1, true), getRef());
1094 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1096 shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_CLIENT_VERSION,
1097 cohort2, modification2, true), getRef());
1098 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1100 shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_CLIENT_VERSION,
1101 cohort3, modification3, true), getRef());
1102 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1104 // canCommit 1st Tx.
1106 shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1107 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
1109 // canCommit the 2nd Tx - it should get queued.
1111 shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
1113 // canCommit the 3rd Tx - should exceed queue capacity and fail.
1115 shard.tell(new CanCommitTransaction(transactionID3).toSerializable(), getRef());
1116 expectMsgClass(duration, akka.actor.Status.Failure.class);
1118 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1123 public void testCanCommitBeforeReadyFailure() throws Throwable {
1124 new ShardTestKit(getSystem()) {{
1125 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1126 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1127 "testCanCommitBeforeReadyFailure");
1129 shard.tell(new CanCommitTransaction("tx").toSerializable(), getRef());
1130 expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
1132 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1137 public void testAbortTransaction() throws Throwable {
1138 new ShardTestKit(getSystem()) {{
1139 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1140 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1141 "testAbortTransaction");
1143 waitUntilLeader(shard);
1145 // Setup 2 simulated transactions with mock cohorts. The first one will be aborted.
1147 String transactionID1 = "tx1";
1148 MutableCompositeModification modification1 = new MutableCompositeModification();
1149 DOMStoreThreePhaseCommitCohort cohort1 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
1150 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
1151 doReturn(Futures.immediateFuture(null)).when(cohort1).abort();
1153 String transactionID2 = "tx2";
1154 MutableCompositeModification modification2 = new MutableCompositeModification();
1155 DOMStoreThreePhaseCommitCohort cohort2 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort2");
1156 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
1158 FiniteDuration duration = duration("5 seconds");
1159 final Timeout timeout = new Timeout(duration);
1161 // Simulate the ForwardedReadyTransaction messages that would be sent
1162 // by the ShardTransaction.
1164 shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_CLIENT_VERSION,
1165 cohort1, modification1, true), getRef());
1166 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1168 shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_CLIENT_VERSION,
1169 cohort2, modification2, true), getRef());
1170 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1172 // Send the CanCommitTransaction message for the first Tx.
1174 shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1175 CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1176 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1177 assertEquals("Can commit", true, canCommitReply.getCanCommit());
1179 // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
1180 // processed after the first Tx completes.
1182 Future<Object> canCommitFuture = Patterns.ask(shard,
1183 new CanCommitTransaction(transactionID2).toSerializable(), timeout);
1185 // Send the AbortTransaction message for the first Tx. This should trigger the 2nd
1188 shard.tell(new AbortTransaction(transactionID1).toSerializable(), getRef());
1189 expectMsgClass(duration, AbortTransactionReply.SERIALIZABLE_CLASS);
1191 // Wait for the 2nd Tx to complete the canCommit phase.
1193 final CountDownLatch latch = new CountDownLatch(1);
1194 canCommitFuture.onComplete(new OnComplete<Object>() {
1196 public void onComplete(Throwable t, Object resp) {
1199 }, getSystem().dispatcher());
1201 assertEquals("2nd CanCommit complete", true, latch.await(5, TimeUnit.SECONDS));
1203 InOrder inOrder = inOrder(cohort1, cohort2);
1204 inOrder.verify(cohort1).canCommit();
1205 inOrder.verify(cohort2).canCommit();
1207 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1212 public void testCreateSnapshot() throws IOException, InterruptedException {
1213 testCreateSnapshot(true, "testCreateSnapshot");
1217 public void testCreateSnapshotWithNonPersistentData() throws IOException, InterruptedException {
1218 testCreateSnapshot(false, "testCreateSnapshotWithNonPersistentData");
1221 public void testCreateSnapshot(boolean persistent, final String shardActorName) throws IOException, InterruptedException {
1222 final DatastoreContext dataStoreContext = DatastoreContext.newBuilder().
1223 shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(persistent).build();
1225 new ShardTestKit(getSystem()) {{
1226 final AtomicReference<CountDownLatch> latch = new AtomicReference<>(new CountDownLatch(1));
1227 Creator<Shard> creator = new Creator<Shard>() {
1229 public Shard create() throws Exception {
1230 return new Shard(shardID, Collections.<ShardIdentifier,String>emptyMap(),
1231 dataStoreContext, SCHEMA_CONTEXT) {
1233 protected void commitSnapshot(long sequenceNumber) {
1234 super.commitSnapshot(sequenceNumber);
1235 latch.get().countDown();
1241 TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1242 Props.create(new DelegatingShardCreator(creator)), shardActorName);
1244 waitUntilLeader(shard);
1246 shard.tell(new CaptureSnapshot(-1,-1,-1,-1), getRef());
1248 assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS));
1250 latch.set(new CountDownLatch(1));
1251 shard.tell(new CaptureSnapshot(-1,-1,-1,-1), getRef());
1253 assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS));
1255 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1260 * This test simply verifies that the applySnapShot logic will work
1261 * @throws ReadFailedException
1264 public void testInMemoryDataStoreRestore() throws ReadFailedException {
1265 InMemoryDOMDataStore store = new InMemoryDOMDataStore("test", MoreExecutors.listeningDecorator(
1266 MoreExecutors.sameThreadExecutor()), MoreExecutors.sameThreadExecutor());
1268 store.onGlobalContextUpdated(SCHEMA_CONTEXT);
1270 DOMStoreWriteTransaction putTransaction = store.newWriteOnlyTransaction();
1271 putTransaction.write(TestModel.TEST_PATH,
1272 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1273 commitTransaction(putTransaction);
1276 NormalizedNode expected = readStore(store);
1278 DOMStoreWriteTransaction writeTransaction = store.newWriteOnlyTransaction();
1280 writeTransaction.delete(YangInstanceIdentifier.builder().build());
1281 writeTransaction.write(YangInstanceIdentifier.builder().build(), expected);
1283 commitTransaction(writeTransaction);
1285 NormalizedNode actual = readStore(store);
1287 assertEquals(expected, actual);
1292 public void testRecoveryApplicable(){
1294 final DatastoreContext persistentContext = DatastoreContext.newBuilder().
1295 shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(true).build();
1297 final Props persistentProps = Shard.props(shardID, Collections.<ShardIdentifier, String>emptyMap(),
1298 persistentContext, SCHEMA_CONTEXT);
1300 final DatastoreContext nonPersistentContext = DatastoreContext.newBuilder().
1301 shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(false).build();
1303 final Props nonPersistentProps = Shard.props(shardID, Collections.<ShardIdentifier, String>emptyMap(),
1304 nonPersistentContext, SCHEMA_CONTEXT);
1306 new ShardTestKit(getSystem()) {{
1307 TestActorRef<Shard> shard1 = TestActorRef.create(getSystem(),
1308 persistentProps, "testPersistence1");
1310 assertTrue("Recovery Applicable", shard1.underlyingActor().getDataPersistenceProvider().isRecoveryApplicable());
1312 shard1.tell(PoisonPill.getInstance(), ActorRef.noSender());
1314 TestActorRef<Shard> shard2 = TestActorRef.create(getSystem(),
1315 nonPersistentProps, "testPersistence2");
1317 assertFalse("Recovery Not Applicable", shard2.underlyingActor().getDataPersistenceProvider().isRecoveryApplicable());
1319 shard2.tell(PoisonPill.getInstance(), ActorRef.noSender());
1326 private NormalizedNode readStore(InMemoryDOMDataStore store) throws ReadFailedException {
1327 DOMStoreReadTransaction transaction = store.newReadOnlyTransaction();
1328 CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read =
1329 transaction.read(YangInstanceIdentifier.builder().build());
1331 Optional<NormalizedNode<?, ?>> optional = read.checkedGet();
1333 NormalizedNode<?, ?> normalizedNode = optional.get();
1335 transaction.close();
1337 return normalizedNode;
1340 private void commitTransaction(DOMStoreWriteTransaction transaction) {
1341 DOMStoreThreePhaseCommitCohort commitCohort = transaction.ready();
1342 ListenableFuture<Void> future =
1343 commitCohort.preCommit();
1346 future = commitCohort.commit();
1348 } catch (InterruptedException | ExecutionException e) {
1352 private AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> noOpDataChangeListener() {
1353 return new AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>() {
1355 public void onDataChanged(
1356 AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change) {
1362 static NormalizedNode<?,?> readStore(TestActorRef<Shard> shard, YangInstanceIdentifier id)
1363 throws ExecutionException, InterruptedException {
1364 DOMStoreReadTransaction transaction = shard.underlyingActor().getDataStore().newReadOnlyTransaction();
1366 CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> future =
1367 transaction.read(id);
1369 Optional<NormalizedNode<?, ?>> optional = future.get();
1370 NormalizedNode<?, ?> node = optional.isPresent()? optional.get() : null;
1372 transaction.close();
1377 private void writeToStore(TestActorRef<Shard> shard, YangInstanceIdentifier id, NormalizedNode<?,?> node)
1378 throws ExecutionException, InterruptedException {
1379 DOMStoreWriteTransaction transaction = shard.underlyingActor().getDataStore().newWriteOnlyTransaction();
1381 transaction.write(id, node);
1383 DOMStoreThreePhaseCommitCohort commitCohort = transaction.ready();
1384 commitCohort.preCommit().get();
1385 commitCohort.commit().get();
1388 private static final class DelegatingShardCreator implements Creator<Shard> {
1389 private final Creator<Shard> delegate;
1391 DelegatingShardCreator(Creator<Shard> delegate) {
1392 this.delegate = delegate;
1396 public Shard create() throws Exception {
1397 return delegate.create();