1 package org.opendaylight.controller.cluster.datastore;
3 import akka.actor.ActorRef;
4 import akka.actor.PoisonPill;
5 import akka.actor.Props;
6 import akka.dispatch.Dispatchers;
7 import akka.dispatch.OnComplete;
8 import akka.japi.Creator;
9 import akka.pattern.Patterns;
10 import akka.testkit.TestActorRef;
11 import akka.util.Timeout;
12 import com.google.common.base.Function;
13 import com.google.common.base.Optional;
14 import com.google.common.util.concurrent.CheckedFuture;
15 import com.google.common.util.concurrent.Futures;
16 import com.google.common.util.concurrent.ListenableFuture;
17 import com.google.common.util.concurrent.MoreExecutors;
18 import com.google.common.util.concurrent.Uninterruptibles;
19 import org.junit.After;
20 import org.junit.Before;
21 import org.junit.Test;
22 import org.mockito.InOrder;
23 import org.mockito.invocation.InvocationOnMock;
24 import org.mockito.stubbing.Answer;
25 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
26 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
27 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
28 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
29 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
30 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
31 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
32 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
33 import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
34 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
35 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
36 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
37 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
38 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
39 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
40 import org.opendaylight.controller.cluster.datastore.modification.Modification;
41 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
42 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
43 import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec;
44 import org.opendaylight.controller.cluster.datastore.utils.InMemoryJournal;
45 import org.opendaylight.controller.cluster.datastore.utils.InMemorySnapshotStore;
46 import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener;
47 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
48 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
49 import org.opendaylight.controller.cluster.raft.Snapshot;
50 import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
51 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
52 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
53 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
54 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
55 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
56 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
57 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload;
58 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
59 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
60 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
61 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
62 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
63 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
64 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
65 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
66 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreFactory;
67 import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages;
68 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
69 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
70 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
71 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
72 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
73 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
74 import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
75 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
76 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
77 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
78 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
79 import scala.concurrent.Await;
80 import scala.concurrent.Future;
81 import scala.concurrent.duration.FiniteDuration;
83 import java.io.IOException;
84 import java.util.Collections;
85 import java.util.HashSet;
88 import java.util.concurrent.CountDownLatch;
89 import java.util.concurrent.ExecutionException;
90 import java.util.concurrent.TimeUnit;
91 import java.util.concurrent.atomic.AtomicInteger;
92 import java.util.concurrent.atomic.AtomicReference;
94 import static org.junit.Assert.assertEquals;
95 import static org.junit.Assert.assertFalse;
96 import static org.junit.Assert.assertNotNull;
97 import static org.junit.Assert.assertNull;
98 import static org.junit.Assert.assertTrue;
99 import static org.junit.Assert.fail;
100 import static org.mockito.Mockito.doAnswer;
101 import static org.mockito.Mockito.doReturn;
102 import static org.mockito.Mockito.inOrder;
103 import static org.mockito.Mockito.mock;
104 import static org.opendaylight.controller.cluster.datastore.messages.CreateTransaction.CURRENT_VERSION;
107 public class ShardTest extends AbstractActorTest {
109 private static final SchemaContext SCHEMA_CONTEXT = TestModel.createTestContext();
111 private static final AtomicInteger NEXT_SHARD_NUM = new AtomicInteger();
113 private final ShardIdentifier shardID = ShardIdentifier.builder().memberName("member-1")
114 .shardName("inventory").type("config" + NEXT_SHARD_NUM.getAndIncrement()).build();
116 private DatastoreContext dataStoreContext = DatastoreContext.newBuilder().
117 shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).
118 shardHeartbeatIntervalInMillis(100).build();
121 public void setUp() {
122 InMemorySnapshotStore.clear();
123 InMemoryJournal.clear();
127 public void tearDown() {
128 InMemorySnapshotStore.clear();
129 InMemoryJournal.clear();
132 private Props newShardProps() {
133 return Shard.props(shardID, Collections.<ShardIdentifier,String>emptyMap(),
134 dataStoreContext, SCHEMA_CONTEXT);
138 public void testRegisterChangeListener() throws Exception {
139 new ShardTestKit(getSystem()) {{
140 TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
141 newShardProps(), "testRegisterChangeListener");
143 waitUntilLeader(shard);
145 shard.tell(new UpdateSchemaContext(SchemaContextHelper.full()), ActorRef.noSender());
147 MockDataChangeListener listener = new MockDataChangeListener(1);
148 ActorRef dclActor = getSystem().actorOf(DataChangeListener.props(listener),
149 "testRegisterChangeListener-DataChangeListener");
151 shard.tell(new RegisterChangeListener(TestModel.TEST_PATH,
152 dclActor.path(), AsyncDataBroker.DataChangeScope.BASE), getRef());
154 RegisterChangeListenerReply reply = expectMsgClass(duration("3 seconds"),
155 RegisterChangeListenerReply.class);
156 String replyPath = reply.getListenerRegistrationPath().toString();
157 assertTrue("Incorrect reply path: " + replyPath, replyPath.matches(
158 "akka:\\/\\/test\\/user\\/testRegisterChangeListener\\/\\$.*"));
160 YangInstanceIdentifier path = TestModel.TEST_PATH;
161 writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
163 listener.waitForChangeEvents(path);
165 dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
166 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
170 @SuppressWarnings("serial")
172 public void testChangeListenerNotifiedWhenNotTheLeaderOnRegistration() throws Exception {
173 // This test tests the timing window in which a change listener is registered before the
174 // shard becomes the leader. We verify that the listener is registered and notified of the
175 // existing data when the shard becomes the leader.
176 new ShardTestKit(getSystem()) {{
177 // For this test, we want to send the RegisterChangeListener message after the shard
178 // has recovered from persistence and before it becomes the leader. So we subclass
179 // Shard to override onReceiveCommand and, when the first ElectionTimeout is received,
180 // we know that the shard has been initialized to a follower and has started the
181 // election process. The following 2 CountDownLatches are used to coordinate the
182 // ElectionTimeout with the sending of the RegisterChangeListener message.
183 final CountDownLatch onFirstElectionTimeout = new CountDownLatch(1);
184 final CountDownLatch onChangeListenerRegistered = new CountDownLatch(1);
185 Creator<Shard> creator = new Creator<Shard>() {
186 boolean firstElectionTimeout = true;
189 public Shard create() throws Exception {
190 return new Shard(shardID, Collections.<ShardIdentifier,String>emptyMap(),
191 dataStoreContext, SCHEMA_CONTEXT) {
193 public void onReceiveCommand(final Object message) throws Exception {
194 if(message instanceof ElectionTimeout && firstElectionTimeout) {
195 // Got the first ElectionTimeout. We don't forward it to the
196 // base Shard yet until we've sent the RegisterChangeListener
197 // message. So we signal the onFirstElectionTimeout latch to tell
198 // the main thread to send the RegisterChangeListener message and
199 // start a thread to wait on the onChangeListenerRegistered latch,
200 // which the main thread signals after it has sent the message.
201 // After the onChangeListenerRegistered is triggered, we send the
202 // original ElectionTimeout message to proceed with the election.
203 firstElectionTimeout = false;
204 final ActorRef self = getSelf();
208 Uninterruptibles.awaitUninterruptibly(
209 onChangeListenerRegistered, 5, TimeUnit.SECONDS);
210 self.tell(message, self);
214 onFirstElectionTimeout.countDown();
216 super.onReceiveCommand(message);
223 MockDataChangeListener listener = new MockDataChangeListener(1);
224 ActorRef dclActor = getSystem().actorOf(DataChangeListener.props(listener),
225 "testRegisterChangeListenerWhenNotLeaderInitially-DataChangeListener");
227 TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
228 Props.create(new DelegatingShardCreator(creator)),
229 "testRegisterChangeListenerWhenNotLeaderInitially");
231 // Write initial data into the in-memory store.
232 YangInstanceIdentifier path = TestModel.TEST_PATH;
233 writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
235 // Wait until the shard receives the first ElectionTimeout message.
236 assertEquals("Got first ElectionTimeout", true,
237 onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
239 // Now send the RegisterChangeListener and wait for the reply.
240 shard.tell(new RegisterChangeListener(path, dclActor.path(),
241 AsyncDataBroker.DataChangeScope.SUBTREE), getRef());
243 RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
244 RegisterChangeListenerReply.class);
245 assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
247 // Sanity check - verify the shard is not the leader yet.
248 shard.tell(new FindLeader(), getRef());
249 FindLeaderReply findLeadeReply =
250 expectMsgClass(duration("5 seconds"), FindLeaderReply.class);
251 assertNull("Expected the shard not to be the leader", findLeadeReply.getLeaderActor());
253 // Signal the onChangeListenerRegistered latch to tell the thread above to proceed
254 // with the election process.
255 onChangeListenerRegistered.countDown();
257 // Wait for the shard to become the leader and notify our listener with the existing
258 // data in the store.
259 listener.waitForChangeEvents(path);
261 dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
262 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
267 public void testCreateTransaction(){
268 new ShardTestKit(getSystem()) {{
269 ActorRef shard = getSystem().actorOf(newShardProps(), "testCreateTransaction");
271 waitUntilLeader(shard);
273 shard.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
275 shard.tell(new CreateTransaction("txn-1",
276 TransactionProxy.TransactionType.READ_ONLY.ordinal() ).toSerializable(), getRef());
278 CreateTransactionReply reply = expectMsgClass(duration("3 seconds"),
279 CreateTransactionReply.class);
281 String path = reply.getTransactionActorPath().toString();
282 assertTrue("Unexpected transaction path " + path,
283 path.contains("akka://test/user/testCreateTransaction/shard-txn-1"));
285 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
290 public void testCreateTransactionOnChain(){
291 new ShardTestKit(getSystem()) {{
292 final ActorRef shard = getSystem().actorOf(newShardProps(), "testCreateTransactionOnChain");
294 waitUntilLeader(shard);
296 shard.tell(new CreateTransaction("txn-1",
297 TransactionProxy.TransactionType.READ_ONLY.ordinal() , "foobar").toSerializable(),
300 CreateTransactionReply reply = expectMsgClass(duration("3 seconds"),
301 CreateTransactionReply.class);
303 String path = reply.getTransactionActorPath().toString();
304 assertTrue("Unexpected transaction path " + path,
305 path.contains("akka://test/user/testCreateTransactionOnChain/shard-txn-1"));
307 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
312 public void testPeerAddressResolved() throws Exception {
313 new ShardTestKit(getSystem()) {{
314 final CountDownLatch recoveryComplete = new CountDownLatch(1);
315 class TestShard extends Shard {
317 super(shardID, Collections.<ShardIdentifier, String>singletonMap(shardID, null),
318 dataStoreContext, SCHEMA_CONTEXT);
321 Map<String, String> getPeerAddresses() {
322 return getRaftActorContext().getPeerAddresses();
326 protected void onRecoveryComplete() {
328 super.onRecoveryComplete();
330 recoveryComplete.countDown();
335 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
336 Props.create(new DelegatingShardCreator(new Creator<Shard>() {
338 public TestShard create() throws Exception {
339 return new TestShard();
341 })), "testPeerAddressResolved");
343 //waitUntilLeader(shard);
344 assertEquals("Recovery complete", true,
345 Uninterruptibles.awaitUninterruptibly(recoveryComplete, 5, TimeUnit.SECONDS));
347 String address = "akka://foobar";
348 shard.underlyingActor().onReceiveCommand(new PeerAddressResolved(shardID, address));
350 assertEquals("getPeerAddresses", address,
351 ((TestShard)shard.underlyingActor()).getPeerAddresses().get(shardID.toString()));
353 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
358 public void testApplySnapshot() throws Exception {
359 TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(),
360 "testApplySnapshot");
362 NormalizedNodeToNodeCodec codec =
363 new NormalizedNodeToNodeCodec(SCHEMA_CONTEXT);
365 writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
367 YangInstanceIdentifier root = YangInstanceIdentifier.builder().build();
368 NormalizedNode<?,?> expected = readStore(shard, root);
370 NormalizedNodeMessages.Container encode = codec.encode(expected);
372 ApplySnapshot applySnapshot = new ApplySnapshot(Snapshot.create(
373 encode.getNormalizedNode().toByteString().toByteArray(),
374 Collections.<ReplicatedLogEntry>emptyList(), 1, 2, 3, 4));
376 shard.underlyingActor().onReceiveCommand(applySnapshot);
378 NormalizedNode<?,?> actual = readStore(shard, root);
380 assertEquals(expected, actual);
382 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
386 public void testApplyState() throws Exception {
388 TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(), "testApplyState");
390 NormalizedNode<?, ?> node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
392 MutableCompositeModification compMod = new MutableCompositeModification();
393 compMod.addModification(new WriteModification(TestModel.TEST_PATH, node, SCHEMA_CONTEXT));
394 Payload payload = new CompositeModificationPayload(compMod.toSerializable());
395 ApplyState applyState = new ApplyState(null, "test",
396 new ReplicatedLogImplEntry(1, 2, payload));
398 shard.underlyingActor().onReceiveCommand(applyState);
400 NormalizedNode<?,?> actual = readStore(shard, TestModel.TEST_PATH);
401 assertEquals("Applied state", node, actual);
403 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
406 @SuppressWarnings("serial")
408 public void testRecovery() throws Exception {
410 // Set up the InMemorySnapshotStore.
412 InMemoryDOMDataStore testStore = InMemoryDOMDataStoreFactory.create("Test", null, null);
413 testStore.onGlobalContextUpdated(SCHEMA_CONTEXT);
415 DOMStoreWriteTransaction writeTx = testStore.newWriteOnlyTransaction();
416 writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
417 DOMStoreThreePhaseCommitCohort commitCohort = writeTx.ready();
418 commitCohort.preCommit().get();
419 commitCohort.commit().get();
421 DOMStoreReadTransaction readTx = testStore.newReadOnlyTransaction();
422 NormalizedNode<?, ?> root = readTx.read(YangInstanceIdentifier.builder().build()).get().get();
424 InMemorySnapshotStore.addSnapshot(shardID.toString(), Snapshot.create(
425 new NormalizedNodeToNodeCodec(SCHEMA_CONTEXT).encode(
427 getNormalizedNode().toByteString().toByteArray(),
428 Collections.<ReplicatedLogEntry>emptyList(), 0, 1, -1, -1));
430 // Set up the InMemoryJournal.
432 InMemoryJournal.addEntry(shardID.toString(), 0, new ReplicatedLogImplEntry(0, 1, newPayload(
433 new WriteModification(TestModel.OUTER_LIST_PATH,
434 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
437 int nListEntries = 11;
438 Set<Integer> listEntryKeys = new HashSet<>();
439 for(int i = 1; i <= nListEntries; i++) {
440 listEntryKeys.add(Integer.valueOf(i));
441 YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
442 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
443 Modification mod = new MergeModification(path,
444 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i),
446 InMemoryJournal.addEntry(shardID.toString(), i, new ReplicatedLogImplEntry(i, 1,
450 InMemoryJournal.addEntry(shardID.toString(), nListEntries + 1,
451 new ApplyLogEntries(nListEntries));
453 // Create the actor and wait for recovery complete.
455 final CountDownLatch recoveryComplete = new CountDownLatch(1);
457 Creator<Shard> creator = new Creator<Shard>() {
459 public Shard create() throws Exception {
460 return new Shard(shardID, Collections.<ShardIdentifier,String>emptyMap(),
461 dataStoreContext, SCHEMA_CONTEXT) {
463 protected void onRecoveryComplete() {
465 super.onRecoveryComplete();
467 recoveryComplete.countDown();
474 TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
475 Props.create(new DelegatingShardCreator(creator)), "testRecovery");
477 assertEquals("Recovery complete", true, recoveryComplete.await(5, TimeUnit.SECONDS));
479 // Verify data in the data store.
481 NormalizedNode<?, ?> outerList = readStore(shard, TestModel.OUTER_LIST_PATH);
482 assertNotNull(TestModel.OUTER_LIST_QNAME.getLocalName() + " not found", outerList);
483 assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " value is not Iterable",
484 outerList.getValue() instanceof Iterable);
485 for(Object entry: (Iterable<?>) outerList.getValue()) {
486 assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " entry is not MapEntryNode",
487 entry instanceof MapEntryNode);
488 MapEntryNode mapEntry = (MapEntryNode)entry;
489 Optional<DataContainerChild<? extends PathArgument, ?>> idLeaf =
490 mapEntry.getChild(new YangInstanceIdentifier.NodeIdentifier(TestModel.ID_QNAME));
491 assertTrue("Missing leaf " + TestModel.ID_QNAME.getLocalName(), idLeaf.isPresent());
492 Object value = idLeaf.get().getValue();
493 assertTrue("Unexpected value for leaf "+ TestModel.ID_QNAME.getLocalName() + ": " + value,
494 listEntryKeys.remove(value));
497 if(!listEntryKeys.isEmpty()) {
498 fail("Missing " + TestModel.OUTER_LIST_QNAME.getLocalName() + " entries with keys: " +
502 assertEquals("Last log index", nListEntries,
503 shard.underlyingActor().getShardMBean().getLastLogIndex());
504 assertEquals("Commit index", nListEntries,
505 shard.underlyingActor().getShardMBean().getCommitIndex());
506 assertEquals("Last applied", nListEntries,
507 shard.underlyingActor().getShardMBean().getLastApplied());
509 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
512 private CompositeModificationPayload newPayload(Modification... mods) {
513 MutableCompositeModification compMod = new MutableCompositeModification();
514 for(Modification mod: mods) {
515 compMod.addModification(mod);
518 return new CompositeModificationPayload(compMod.toSerializable());
521 private DOMStoreThreePhaseCommitCohort setupMockWriteTransaction(String cohortName,
522 InMemoryDOMDataStore dataStore, YangInstanceIdentifier path, NormalizedNode data,
523 MutableCompositeModification modification) {
524 return setupMockWriteTransaction(cohortName, dataStore, path, data, modification, null);
527 private DOMStoreThreePhaseCommitCohort setupMockWriteTransaction(String cohortName,
528 InMemoryDOMDataStore dataStore, YangInstanceIdentifier path, NormalizedNode data,
529 MutableCompositeModification modification,
530 final Function<DOMStoreThreePhaseCommitCohort,ListenableFuture<Void>> preCommit) {
532 DOMStoreWriteTransaction tx = dataStore.newWriteOnlyTransaction();
533 tx.write(path, data);
534 final DOMStoreThreePhaseCommitCohort realCohort = tx.ready();
535 DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, cohortName);
537 doAnswer(new Answer<ListenableFuture<Boolean>>() {
539 public ListenableFuture<Boolean> answer(InvocationOnMock invocation) {
540 return realCohort.canCommit();
542 }).when(cohort).canCommit();
544 doAnswer(new Answer<ListenableFuture<Void>>() {
546 public ListenableFuture<Void> answer(InvocationOnMock invocation) throws Throwable {
547 if(preCommit != null) {
548 return preCommit.apply(realCohort);
550 return realCohort.preCommit();
553 }).when(cohort).preCommit();
555 doAnswer(new Answer<ListenableFuture<Void>>() {
557 public ListenableFuture<Void> answer(InvocationOnMock invocation) throws Throwable {
558 return realCohort.commit();
560 }).when(cohort).commit();
562 doAnswer(new Answer<ListenableFuture<Void>>() {
564 public ListenableFuture<Void> answer(InvocationOnMock invocation) throws Throwable {
565 return realCohort.abort();
567 }).when(cohort).abort();
569 modification.addModification(new WriteModification(path, data, SCHEMA_CONTEXT));
574 @SuppressWarnings({ "unchecked" })
576 public void testConcurrentThreePhaseCommits() throws Throwable {
577 new ShardTestKit(getSystem()) {{
578 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
579 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
580 "testConcurrentThreePhaseCommits");
582 waitUntilLeader(shard);
584 // Setup 3 simulated transactions with mock cohorts backed by real cohorts.
586 InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
588 String transactionID1 = "tx1";
589 MutableCompositeModification modification1 = new MutableCompositeModification();
590 DOMStoreThreePhaseCommitCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
591 TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
593 String transactionID2 = "tx2";
594 MutableCompositeModification modification2 = new MutableCompositeModification();
595 DOMStoreThreePhaseCommitCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
596 TestModel.OUTER_LIST_PATH,
597 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
600 String transactionID3 = "tx3";
601 MutableCompositeModification modification3 = new MutableCompositeModification();
602 DOMStoreThreePhaseCommitCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
603 YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
604 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
605 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1),
609 final FiniteDuration duration = FiniteDuration.create(timeoutSec, TimeUnit.SECONDS);
610 final Timeout timeout = new Timeout(duration);
612 // Simulate the ForwardedReadyTransaction message for the first Tx that would be sent
613 // by the ShardTransaction.
615 shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
616 cohort1, modification1, true), getRef());
617 ReadyTransactionReply readyReply = ReadyTransactionReply.fromSerializable(
618 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS));
619 assertEquals("Cohort path", shard.path().toString(), readyReply.getCohortPath());
621 // Send the CanCommitTransaction message for the first Tx.
623 shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
624 CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
625 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
626 assertEquals("Can commit", true, canCommitReply.getCanCommit());
628 // Send the ForwardedReadyTransaction for the next 2 Tx's.
630 shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
631 cohort2, modification2, true), getRef());
632 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
634 shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
635 cohort3, modification3, true), getRef());
636 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
638 // Send the CanCommitTransaction message for the next 2 Tx's. These should get queued and
639 // processed after the first Tx completes.
641 Future<Object> canCommitFuture1 = Patterns.ask(shard,
642 new CanCommitTransaction(transactionID2).toSerializable(), timeout);
644 Future<Object> canCommitFuture2 = Patterns.ask(shard,
645 new CanCommitTransaction(transactionID3).toSerializable(), timeout);
647 // Send the CommitTransaction message for the first Tx. After it completes, it should
648 // trigger the 2nd Tx to proceed which should in turn then trigger the 3rd.
650 shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
651 expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
653 // Wait for the next 2 Tx's to complete.
655 final AtomicReference<Throwable> caughtEx = new AtomicReference<>();
656 final CountDownLatch commitLatch = new CountDownLatch(2);
658 class OnFutureComplete extends OnComplete<Object> {
659 private final Class<?> expRespType;
661 OnFutureComplete(Class<?> expRespType) {
662 this.expRespType = expRespType;
666 public void onComplete(Throwable error, Object resp) {
668 caughtEx.set(new AssertionError(getClass().getSimpleName() + " failure", error));
671 assertEquals("Commit response type", expRespType, resp.getClass());
673 } catch (Exception e) {
679 void onSuccess(Object resp) throws Exception {
683 class OnCommitFutureComplete extends OnFutureComplete {
684 OnCommitFutureComplete() {
685 super(CommitTransactionReply.SERIALIZABLE_CLASS);
689 public void onComplete(Throwable error, Object resp) {
690 super.onComplete(error, resp);
691 commitLatch.countDown();
695 class OnCanCommitFutureComplete extends OnFutureComplete {
696 private final String transactionID;
698 OnCanCommitFutureComplete(String transactionID) {
699 super(CanCommitTransactionReply.SERIALIZABLE_CLASS);
700 this.transactionID = transactionID;
704 void onSuccess(Object resp) throws Exception {
705 CanCommitTransactionReply canCommitReply =
706 CanCommitTransactionReply.fromSerializable(resp);
707 assertEquals("Can commit", true, canCommitReply.getCanCommit());
709 Future<Object> commitFuture = Patterns.ask(shard,
710 new CommitTransaction(transactionID).toSerializable(), timeout);
711 commitFuture.onComplete(new OnCommitFutureComplete(), getSystem().dispatcher());
715 canCommitFuture1.onComplete(new OnCanCommitFutureComplete(transactionID2),
716 getSystem().dispatcher());
718 canCommitFuture2.onComplete(new OnCanCommitFutureComplete(transactionID3),
719 getSystem().dispatcher());
721 boolean done = commitLatch.await(timeoutSec, TimeUnit.SECONDS);
723 if(caughtEx.get() != null) {
724 throw caughtEx.get();
727 assertEquals("Commits complete", true, done);
729 InOrder inOrder = inOrder(cohort1, cohort2, cohort3);
730 inOrder.verify(cohort1).canCommit();
731 inOrder.verify(cohort1).preCommit();
732 inOrder.verify(cohort1).commit();
733 inOrder.verify(cohort2).canCommit();
734 inOrder.verify(cohort2).preCommit();
735 inOrder.verify(cohort2).commit();
736 inOrder.verify(cohort3).canCommit();
737 inOrder.verify(cohort3).preCommit();
738 inOrder.verify(cohort3).commit();
740 // Verify data in the data store.
742 NormalizedNode<?, ?> outerList = readStore(shard, TestModel.OUTER_LIST_PATH);
743 assertNotNull(TestModel.OUTER_LIST_QNAME.getLocalName() + " not found", outerList);
744 assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " value is not Iterable",
745 outerList.getValue() instanceof Iterable);
746 Object entry = ((Iterable<Object>)outerList.getValue()).iterator().next();
747 assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " entry is not MapEntryNode",
748 entry instanceof MapEntryNode);
749 MapEntryNode mapEntry = (MapEntryNode)entry;
750 Optional<DataContainerChild<? extends PathArgument, ?>> idLeaf =
751 mapEntry.getChild(new YangInstanceIdentifier.NodeIdentifier(TestModel.ID_QNAME));
752 assertTrue("Missing leaf " + TestModel.ID_QNAME.getLocalName(), idLeaf.isPresent());
753 assertEquals(TestModel.ID_QNAME.getLocalName() + " value", 1, idLeaf.get().getValue());
755 for(int i = 0; i < 20 * 5; i++) {
756 long lastLogIndex = shard.underlyingActor().getShardMBean().getLastLogIndex();
757 if(lastLogIndex == 2) {
760 Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
763 assertEquals("Last log index", 2, shard.underlyingActor().getShardMBean().getLastLogIndex());
765 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
770 public void testCommitPhaseFailure() throws Throwable {
771 new ShardTestKit(getSystem()) {{
772 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
773 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
774 "testCommitPhaseFailure");
776 waitUntilLeader(shard);
778 // Setup 2 simulated transactions with mock cohorts. The first one fails in the
781 String transactionID1 = "tx1";
782 MutableCompositeModification modification1 = new MutableCompositeModification();
783 DOMStoreThreePhaseCommitCohort cohort1 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
784 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
785 doReturn(Futures.immediateFuture(null)).when(cohort1).preCommit();
786 doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort1).commit();
788 String transactionID2 = "tx2";
789 MutableCompositeModification modification2 = new MutableCompositeModification();
790 DOMStoreThreePhaseCommitCohort cohort2 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort2");
791 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
793 FiniteDuration duration = duration("5 seconds");
794 final Timeout timeout = new Timeout(duration);
796 // Simulate the ForwardedReadyTransaction messages that would be sent
797 // by the ShardTransaction.
799 shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
800 cohort1, modification1, true), getRef());
801 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
803 shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
804 cohort2, modification2, true), getRef());
805 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
807 // Send the CanCommitTransaction message for the first Tx.
809 shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
810 CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
811 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
812 assertEquals("Can commit", true, canCommitReply.getCanCommit());
814 // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
815 // processed after the first Tx completes.
817 Future<Object> canCommitFuture = Patterns.ask(shard,
818 new CanCommitTransaction(transactionID2).toSerializable(), timeout);
820 // Send the CommitTransaction message for the first Tx. This should send back an error
821 // and trigger the 2nd Tx to proceed.
823 shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
824 expectMsgClass(duration, akka.actor.Status.Failure.class);
826 // Wait for the 2nd Tx to complete the canCommit phase.
828 final CountDownLatch latch = new CountDownLatch(1);
829 canCommitFuture.onComplete(new OnComplete<Object>() {
831 public void onComplete(Throwable t, Object resp) {
834 }, getSystem().dispatcher());
836 assertEquals("2nd CanCommit complete", true, latch.await(5, TimeUnit.SECONDS));
838 InOrder inOrder = inOrder(cohort1, cohort2);
839 inOrder.verify(cohort1).canCommit();
840 inOrder.verify(cohort1).preCommit();
841 inOrder.verify(cohort1).commit();
842 inOrder.verify(cohort2).canCommit();
844 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
849 public void testPreCommitPhaseFailure() throws Throwable {
850 new ShardTestKit(getSystem()) {{
851 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
852 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
853 "testPreCommitPhaseFailure");
855 waitUntilLeader(shard);
857 String transactionID = "tx1";
858 MutableCompositeModification modification = new MutableCompositeModification();
859 DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
860 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
861 doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).preCommit();
863 FiniteDuration duration = duration("5 seconds");
865 // Simulate the ForwardedReadyTransaction messages that would be sent
866 // by the ShardTransaction.
868 shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
869 cohort, modification, true), getRef());
870 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
872 // Send the CanCommitTransaction message.
874 shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
875 CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
876 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
877 assertEquals("Can commit", true, canCommitReply.getCanCommit());
879 // Send the CommitTransaction message. This should send back an error
880 // for preCommit failure.
882 shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
883 expectMsgClass(duration, akka.actor.Status.Failure.class);
885 InOrder inOrder = inOrder(cohort);
886 inOrder.verify(cohort).canCommit();
887 inOrder.verify(cohort).preCommit();
889 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
894 public void testCanCommitPhaseFailure() throws Throwable {
895 new ShardTestKit(getSystem()) {{
896 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
897 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
898 "testCanCommitPhaseFailure");
900 waitUntilLeader(shard);
902 final FiniteDuration duration = duration("5 seconds");
904 String transactionID = "tx1";
905 MutableCompositeModification modification = new MutableCompositeModification();
906 DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
907 doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).canCommit();
909 // Simulate the ForwardedReadyTransaction messages that would be sent
910 // by the ShardTransaction.
912 shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
913 cohort, modification, true), getRef());
914 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
916 // Send the CanCommitTransaction message.
918 shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
919 expectMsgClass(duration, akka.actor.Status.Failure.class);
921 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
926 public void testAbortBeforeFinishCommit() throws Throwable {
927 new ShardTestKit(getSystem()) {{
928 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
929 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
930 "testAbortBeforeFinishCommit");
932 waitUntilLeader(shard);
934 final FiniteDuration duration = duration("5 seconds");
935 final Timeout timeout = new Timeout(duration);
937 InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
939 final String transactionID = "tx1";
940 final CountDownLatch abortComplete = new CountDownLatch(1);
941 Function<DOMStoreThreePhaseCommitCohort,ListenableFuture<Void>> preCommit =
942 new Function<DOMStoreThreePhaseCommitCohort,ListenableFuture<Void>>() {
944 public ListenableFuture<Void> apply(final DOMStoreThreePhaseCommitCohort cohort) {
945 ListenableFuture<Void> preCommitFuture = cohort.preCommit();
947 Future<Object> abortFuture = Patterns.ask(shard,
948 new AbortTransaction(transactionID).toSerializable(), timeout);
949 abortFuture.onComplete(new OnComplete<Object>() {
951 public void onComplete(Throwable e, Object resp) {
952 abortComplete.countDown();
954 }, getSystem().dispatcher());
956 return preCommitFuture;
960 MutableCompositeModification modification = new MutableCompositeModification();
961 DOMStoreThreePhaseCommitCohort cohort = setupMockWriteTransaction("cohort1", dataStore,
962 TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME),
963 modification, preCommit);
965 shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
966 cohort, modification, true), getRef());
967 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
969 shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
970 CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
971 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
972 assertEquals("Can commit", true, canCommitReply.getCanCommit());
974 Future<Object> commitFuture = Patterns.ask(shard,
975 new CommitTransaction(transactionID).toSerializable(), timeout);
977 assertEquals("Abort complete", true, abortComplete.await(5, TimeUnit.SECONDS));
979 Await.result(commitFuture, duration);
981 NormalizedNode<?, ?> node = readStore(shard, TestModel.TEST_PATH);
982 assertNotNull(TestModel.TEST_QNAME.getLocalName() + " not found", node);
984 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
989 public void testTransactionCommitTimeout() throws Throwable {
990 dataStoreContext = DatastoreContext.newBuilder().shardTransactionCommitTimeoutInSeconds(1).build();
992 new ShardTestKit(getSystem()) {{
993 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
994 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
995 "testTransactionCommitTimeout");
997 waitUntilLeader(shard);
999 final FiniteDuration duration = duration("5 seconds");
1001 InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
1003 writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1004 writeToStore(shard, TestModel.OUTER_LIST_PATH,
1005 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
1007 // Create 1st Tx - will timeout
1009 String transactionID1 = "tx1";
1010 MutableCompositeModification modification1 = new MutableCompositeModification();
1011 DOMStoreThreePhaseCommitCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
1012 YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
1013 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
1014 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1),
1019 String transactionID2 = "tx3";
1020 MutableCompositeModification modification2 = new MutableCompositeModification();
1021 YangInstanceIdentifier listNodePath = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
1022 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build();
1023 DOMStoreThreePhaseCommitCohort cohort2 = setupMockWriteTransaction("cohort3", dataStore,
1025 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2),
1030 shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1031 cohort1, modification1, true), getRef());
1032 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1034 shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1035 cohort2, modification2, true), getRef());
1036 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1038 // canCommit 1st Tx. We don't send the commit so it should timeout.
1040 shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1041 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
1043 // canCommit the 2nd Tx - it should complete after the 1st Tx times out.
1045 shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
1046 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
1048 // Commit the 2nd Tx.
1050 shard.tell(new CommitTransaction(transactionID2).toSerializable(), getRef());
1051 expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1053 NormalizedNode<?, ?> node = readStore(shard, listNodePath);
1054 assertNotNull(listNodePath + " not found", node);
1056 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1061 public void testTransactionCommitQueueCapacityExceeded() throws Throwable {
1062 dataStoreContext = DatastoreContext.newBuilder().shardTransactionCommitQueueCapacity(1).build();
1064 new ShardTestKit(getSystem()) {{
1065 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1066 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1067 "testTransactionCommitQueueCapacityExceeded");
1069 waitUntilLeader(shard);
1071 final FiniteDuration duration = duration("5 seconds");
1073 InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
1075 String transactionID1 = "tx1";
1076 MutableCompositeModification modification1 = new MutableCompositeModification();
1077 DOMStoreThreePhaseCommitCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
1078 TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
1080 String transactionID2 = "tx2";
1081 MutableCompositeModification modification2 = new MutableCompositeModification();
1082 DOMStoreThreePhaseCommitCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
1083 TestModel.OUTER_LIST_PATH,
1084 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
1087 String transactionID3 = "tx3";
1088 MutableCompositeModification modification3 = new MutableCompositeModification();
1089 DOMStoreThreePhaseCommitCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
1090 TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification3);
1094 shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1095 cohort1, modification1, true), getRef());
1096 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1098 shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1099 cohort2, modification2, true), getRef());
1100 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1102 shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
1103 cohort3, modification3, true), getRef());
1104 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1106 // canCommit 1st Tx.
1108 shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1109 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
1111 // canCommit the 2nd Tx - it should get queued.
1113 shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
1115 // canCommit the 3rd Tx - should exceed queue capacity and fail.
1117 shard.tell(new CanCommitTransaction(transactionID3).toSerializable(), getRef());
1118 expectMsgClass(duration, akka.actor.Status.Failure.class);
1120 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1125 public void testCanCommitBeforeReadyFailure() throws Throwable {
1126 new ShardTestKit(getSystem()) {{
1127 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1128 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1129 "testCanCommitBeforeReadyFailure");
1131 shard.tell(new CanCommitTransaction("tx").toSerializable(), getRef());
1132 expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
1134 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1139 public void testAbortTransaction() throws Throwable {
1140 new ShardTestKit(getSystem()) {{
1141 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1142 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1143 "testAbortTransaction");
1145 waitUntilLeader(shard);
1147 // Setup 2 simulated transactions with mock cohorts. The first one will be aborted.
1149 String transactionID1 = "tx1";
1150 MutableCompositeModification modification1 = new MutableCompositeModification();
1151 DOMStoreThreePhaseCommitCohort cohort1 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
1152 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
1153 doReturn(Futures.immediateFuture(null)).when(cohort1).abort();
1155 String transactionID2 = "tx2";
1156 MutableCompositeModification modification2 = new MutableCompositeModification();
1157 DOMStoreThreePhaseCommitCohort cohort2 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort2");
1158 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
1160 FiniteDuration duration = duration("5 seconds");
1161 final Timeout timeout = new Timeout(duration);
1163 // Simulate the ForwardedReadyTransaction messages that would be sent
1164 // by the ShardTransaction.
1166 shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1167 cohort1, modification1, true), getRef());
1168 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1170 shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1171 cohort2, modification2, true), getRef());
1172 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1174 // Send the CanCommitTransaction message for the first Tx.
1176 shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1177 CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1178 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1179 assertEquals("Can commit", true, canCommitReply.getCanCommit());
1181 // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
1182 // processed after the first Tx completes.
1184 Future<Object> canCommitFuture = Patterns.ask(shard,
1185 new CanCommitTransaction(transactionID2).toSerializable(), timeout);
1187 // Send the AbortTransaction message for the first Tx. This should trigger the 2nd
1190 shard.tell(new AbortTransaction(transactionID1).toSerializable(), getRef());
1191 expectMsgClass(duration, AbortTransactionReply.SERIALIZABLE_CLASS);
1193 // Wait for the 2nd Tx to complete the canCommit phase.
1195 final CountDownLatch latch = new CountDownLatch(1);
1196 canCommitFuture.onComplete(new OnComplete<Object>() {
1198 public void onComplete(Throwable t, Object resp) {
1201 }, getSystem().dispatcher());
1203 assertEquals("2nd CanCommit complete", true, latch.await(5, TimeUnit.SECONDS));
1205 InOrder inOrder = inOrder(cohort1, cohort2);
1206 inOrder.verify(cohort1).canCommit();
1207 inOrder.verify(cohort2).canCommit();
1209 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1214 public void testCreateSnapshot() throws IOException, InterruptedException {
1215 testCreateSnapshot(true, "testCreateSnapshot");
1219 public void testCreateSnapshotWithNonPersistentData() throws IOException, InterruptedException {
1220 testCreateSnapshot(false, "testCreateSnapshotWithNonPersistentData");
1223 public void testCreateSnapshot(boolean persistent, final String shardActorName) throws IOException, InterruptedException {
1224 final DatastoreContext dataStoreContext = DatastoreContext.newBuilder().
1225 shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(persistent).build();
1227 new ShardTestKit(getSystem()) {{
1228 final AtomicReference<CountDownLatch> latch = new AtomicReference<>(new CountDownLatch(1));
1229 Creator<Shard> creator = new Creator<Shard>() {
1231 public Shard create() throws Exception {
1232 return new Shard(shardID, Collections.<ShardIdentifier,String>emptyMap(),
1233 dataStoreContext, SCHEMA_CONTEXT) {
1235 protected void commitSnapshot(long sequenceNumber) {
1236 super.commitSnapshot(sequenceNumber);
1237 latch.get().countDown();
1243 TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1244 Props.create(new DelegatingShardCreator(creator)), shardActorName);
1246 waitUntilLeader(shard);
1248 shard.tell(new CaptureSnapshot(-1,-1,-1,-1), getRef());
1250 assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS));
1252 latch.set(new CountDownLatch(1));
1253 shard.tell(new CaptureSnapshot(-1,-1,-1,-1), getRef());
1255 assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS));
1257 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1262 * This test simply verifies that the applySnapShot logic will work
1263 * @throws ReadFailedException
1266 public void testInMemoryDataStoreRestore() throws ReadFailedException {
1267 InMemoryDOMDataStore store = new InMemoryDOMDataStore("test", MoreExecutors.listeningDecorator(
1268 MoreExecutors.sameThreadExecutor()), MoreExecutors.sameThreadExecutor());
1270 store.onGlobalContextUpdated(SCHEMA_CONTEXT);
1272 DOMStoreWriteTransaction putTransaction = store.newWriteOnlyTransaction();
1273 putTransaction.write(TestModel.TEST_PATH,
1274 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1275 commitTransaction(putTransaction);
1278 NormalizedNode expected = readStore(store);
1280 DOMStoreWriteTransaction writeTransaction = store.newWriteOnlyTransaction();
1282 writeTransaction.delete(YangInstanceIdentifier.builder().build());
1283 writeTransaction.write(YangInstanceIdentifier.builder().build(), expected);
1285 commitTransaction(writeTransaction);
1287 NormalizedNode actual = readStore(store);
1289 assertEquals(expected, actual);
1294 public void testRecoveryApplicable(){
1296 final DatastoreContext persistentContext = DatastoreContext.newBuilder().
1297 shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(true).build();
1299 final Props persistentProps = Shard.props(shardID, Collections.<ShardIdentifier, String>emptyMap(),
1300 persistentContext, SCHEMA_CONTEXT);
1302 final DatastoreContext nonPersistentContext = DatastoreContext.newBuilder().
1303 shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(false).build();
1305 final Props nonPersistentProps = Shard.props(shardID, Collections.<ShardIdentifier, String>emptyMap(),
1306 nonPersistentContext, SCHEMA_CONTEXT);
1308 new ShardTestKit(getSystem()) {{
1309 TestActorRef<Shard> shard1 = TestActorRef.create(getSystem(),
1310 persistentProps, "testPersistence1");
1312 assertTrue("Recovery Applicable", shard1.underlyingActor().getDataPersistenceProvider().isRecoveryApplicable());
1314 shard1.tell(PoisonPill.getInstance(), ActorRef.noSender());
1316 TestActorRef<Shard> shard2 = TestActorRef.create(getSystem(),
1317 nonPersistentProps, "testPersistence2");
1319 assertFalse("Recovery Not Applicable", shard2.underlyingActor().getDataPersistenceProvider().isRecoveryApplicable());
1321 shard2.tell(PoisonPill.getInstance(), ActorRef.noSender());
1328 private NormalizedNode readStore(InMemoryDOMDataStore store) throws ReadFailedException {
1329 DOMStoreReadTransaction transaction = store.newReadOnlyTransaction();
1330 CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read =
1331 transaction.read(YangInstanceIdentifier.builder().build());
1333 Optional<NormalizedNode<?, ?>> optional = read.checkedGet();
1335 NormalizedNode<?, ?> normalizedNode = optional.get();
1337 transaction.close();
1339 return normalizedNode;
1342 private void commitTransaction(DOMStoreWriteTransaction transaction) {
1343 DOMStoreThreePhaseCommitCohort commitCohort = transaction.ready();
1344 ListenableFuture<Void> future =
1345 commitCohort.preCommit();
1348 future = commitCohort.commit();
1350 } catch (InterruptedException | ExecutionException e) {
1354 private AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> noOpDataChangeListener() {
1355 return new AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>() {
1357 public void onDataChanged(
1358 AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change) {
1364 static NormalizedNode<?,?> readStore(TestActorRef<Shard> shard, YangInstanceIdentifier id)
1365 throws ExecutionException, InterruptedException {
1366 DOMStoreReadTransaction transaction = shard.underlyingActor().getDataStore().newReadOnlyTransaction();
1368 CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> future =
1369 transaction.read(id);
1371 Optional<NormalizedNode<?, ?>> optional = future.get();
1372 NormalizedNode<?, ?> node = optional.isPresent()? optional.get() : null;
1374 transaction.close();
1379 private void writeToStore(TestActorRef<Shard> shard, YangInstanceIdentifier id, NormalizedNode<?,?> node)
1380 throws ExecutionException, InterruptedException {
1381 DOMStoreWriteTransaction transaction = shard.underlyingActor().getDataStore().newWriteOnlyTransaction();
1383 transaction.write(id, node);
1385 DOMStoreThreePhaseCommitCohort commitCohort = transaction.ready();
1386 commitCohort.preCommit().get();
1387 commitCohort.commit().get();
1390 private static final class DelegatingShardCreator implements Creator<Shard> {
1391 private final Creator<Shard> delegate;
1393 DelegatingShardCreator(Creator<Shard> delegate) {
1394 this.delegate = delegate;
1398 public Shard create() throws Exception {
1399 return delegate.create();