1 package org.opendaylight.controller.cluster.datastore;
3 import akka.actor.ActorRef;
4 import akka.actor.PoisonPill;
5 import akka.actor.Props;
6 import akka.dispatch.Dispatchers;
7 import akka.dispatch.OnComplete;
8 import akka.japi.Creator;
9 import akka.pattern.Patterns;
10 import akka.testkit.TestActorRef;
11 import akka.util.Timeout;
12 import com.google.common.base.Function;
13 import com.google.common.base.Optional;
14 import com.google.common.util.concurrent.CheckedFuture;
15 import com.google.common.util.concurrent.Futures;
16 import com.google.common.util.concurrent.ListenableFuture;
17 import com.google.common.util.concurrent.MoreExecutors;
18 import com.google.common.util.concurrent.Uninterruptibles;
19 import org.junit.After;
20 import org.junit.Before;
21 import org.junit.Test;
22 import org.mockito.InOrder;
23 import org.mockito.invocation.InvocationOnMock;
24 import org.mockito.stubbing.Answer;
25 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
26 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
27 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
28 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
29 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
30 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
31 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
32 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
33 import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
34 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
35 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
36 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
37 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
38 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
39 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
40 import org.opendaylight.controller.cluster.datastore.modification.Modification;
41 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
42 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
43 import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec;
44 import org.opendaylight.controller.cluster.datastore.utils.InMemoryJournal;
45 import org.opendaylight.controller.cluster.datastore.utils.InMemorySnapshotStore;
46 import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener;
47 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
48 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
49 import org.opendaylight.controller.cluster.raft.Snapshot;
50 import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
51 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
52 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
53 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
54 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
55 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
56 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
57 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload;
58 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
59 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
60 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
61 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
62 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
63 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
64 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
65 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
66 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreFactory;
67 import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages;
68 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
69 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
70 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
71 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
72 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
73 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
74 import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
75 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
76 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
77 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
78 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
79 import scala.concurrent.Await;
80 import scala.concurrent.Future;
81 import scala.concurrent.duration.FiniteDuration;
82 import java.io.IOException;
83 import java.util.Collections;
84 import java.util.HashSet;
87 import java.util.concurrent.CountDownLatch;
88 import java.util.concurrent.ExecutionException;
89 import java.util.concurrent.TimeUnit;
90 import java.util.concurrent.atomic.AtomicInteger;
91 import java.util.concurrent.atomic.AtomicReference;
92 import static org.junit.Assert.assertEquals;
93 import static org.junit.Assert.assertNotNull;
94 import static org.junit.Assert.assertNull;
95 import static org.junit.Assert.assertTrue;
96 import static org.junit.Assert.fail;
97 import static org.mockito.Mockito.mock;
98 import static org.mockito.Mockito.doReturn;
99 import static org.mockito.Mockito.doAnswer;
100 import static org.mockito.Mockito.inOrder;
102 public class ShardTest extends AbstractActorTest {
104 private static final SchemaContext SCHEMA_CONTEXT = TestModel.createTestContext();
106 private static final AtomicInteger NEXT_SHARD_NUM = new AtomicInteger();
108 private final ShardIdentifier shardID = ShardIdentifier.builder().memberName("member-1")
109 .shardName("inventory").type("config" + NEXT_SHARD_NUM.getAndIncrement()).build();
111 private DatastoreContext dataStoreContext = DatastoreContext.newBuilder().
112 shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).
113 shardHeartbeatIntervalInMillis(100).build();
116 public void setUp() {
117 System.setProperty("shard.persistent", "false");
119 InMemorySnapshotStore.clear();
120 InMemoryJournal.clear();
124 public void tearDown() {
125 InMemorySnapshotStore.clear();
126 InMemoryJournal.clear();
129 private Props newShardProps() {
130 return Shard.props(shardID, Collections.<ShardIdentifier,String>emptyMap(),
131 dataStoreContext, SCHEMA_CONTEXT);
135 public void testRegisterChangeListener() throws Exception {
136 new ShardTestKit(getSystem()) {{
137 TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
138 newShardProps(), "testRegisterChangeListener");
140 waitUntilLeader(shard);
142 shard.tell(new UpdateSchemaContext(SchemaContextHelper.full()), ActorRef.noSender());
144 MockDataChangeListener listener = new MockDataChangeListener(1);
145 ActorRef dclActor = getSystem().actorOf(DataChangeListener.props(listener),
146 "testRegisterChangeListener-DataChangeListener");
148 shard.tell(new RegisterChangeListener(TestModel.TEST_PATH,
149 dclActor.path(), AsyncDataBroker.DataChangeScope.BASE), getRef());
151 RegisterChangeListenerReply reply = expectMsgClass(duration("3 seconds"),
152 RegisterChangeListenerReply.class);
153 String replyPath = reply.getListenerRegistrationPath().toString();
154 assertTrue("Incorrect reply path: " + replyPath, replyPath.matches(
155 "akka:\\/\\/test\\/user\\/testRegisterChangeListener\\/\\$.*"));
157 YangInstanceIdentifier path = TestModel.TEST_PATH;
158 writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
160 listener.waitForChangeEvents(path);
162 dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
163 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
167 @SuppressWarnings("serial")
169 public void testChangeListenerNotifiedWhenNotTheLeaderOnRegistration() throws Exception {
170 // This test tests the timing window in which a change listener is registered before the
171 // shard becomes the leader. We verify that the listener is registered and notified of the
172 // existing data when the shard becomes the leader.
173 new ShardTestKit(getSystem()) {{
174 // For this test, we want to send the RegisterChangeListener message after the shard
175 // has recovered from persistence and before it becomes the leader. So we subclass
176 // Shard to override onReceiveCommand and, when the first ElectionTimeout is received,
177 // we know that the shard has been initialized to a follower and has started the
178 // election process. The following 2 CountDownLatches are used to coordinate the
179 // ElectionTimeout with the sending of the RegisterChangeListener message.
180 final CountDownLatch onFirstElectionTimeout = new CountDownLatch(1);
181 final CountDownLatch onChangeListenerRegistered = new CountDownLatch(1);
182 Creator<Shard> creator = new Creator<Shard>() {
183 boolean firstElectionTimeout = true;
186 public Shard create() throws Exception {
187 return new Shard(shardID, Collections.<ShardIdentifier,String>emptyMap(),
188 dataStoreContext, SCHEMA_CONTEXT) {
190 public void onReceiveCommand(final Object message) {
191 if(message instanceof ElectionTimeout && firstElectionTimeout) {
192 // Got the first ElectionTimeout. We don't forward it to the
193 // base Shard yet until we've sent the RegisterChangeListener
194 // message. So we signal the onFirstElectionTimeout latch to tell
195 // the main thread to send the RegisterChangeListener message and
196 // start a thread to wait on the onChangeListenerRegistered latch,
197 // which the main thread signals after it has sent the message.
198 // After the onChangeListenerRegistered is triggered, we send the
199 // original ElectionTimeout message to proceed with the election.
200 firstElectionTimeout = false;
201 final ActorRef self = getSelf();
205 Uninterruptibles.awaitUninterruptibly(
206 onChangeListenerRegistered, 5, TimeUnit.SECONDS);
207 self.tell(message, self);
211 onFirstElectionTimeout.countDown();
213 super.onReceiveCommand(message);
220 MockDataChangeListener listener = new MockDataChangeListener(1);
221 ActorRef dclActor = getSystem().actorOf(DataChangeListener.props(listener),
222 "testRegisterChangeListenerWhenNotLeaderInitially-DataChangeListener");
224 TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
225 Props.create(new DelegatingShardCreator(creator)),
226 "testRegisterChangeListenerWhenNotLeaderInitially");
228 // Write initial data into the in-memory store.
229 YangInstanceIdentifier path = TestModel.TEST_PATH;
230 writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
232 // Wait until the shard receives the first ElectionTimeout message.
233 assertEquals("Got first ElectionTimeout", true,
234 onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
236 // Now send the RegisterChangeListener and wait for the reply.
237 shard.tell(new RegisterChangeListener(path, dclActor.path(),
238 AsyncDataBroker.DataChangeScope.SUBTREE), getRef());
240 RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
241 RegisterChangeListenerReply.class);
242 assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
244 // Sanity check - verify the shard is not the leader yet.
245 shard.tell(new FindLeader(), getRef());
246 FindLeaderReply findLeadeReply =
247 expectMsgClass(duration("5 seconds"), FindLeaderReply.class);
248 assertNull("Expected the shard not to be the leader", findLeadeReply.getLeaderActor());
250 // Signal the onChangeListenerRegistered latch to tell the thread above to proceed
251 // with the election process.
252 onChangeListenerRegistered.countDown();
254 // Wait for the shard to become the leader and notify our listener with the existing
255 // data in the store.
256 listener.waitForChangeEvents(path);
258 dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
259 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
264 public void testCreateTransaction(){
265 new ShardTestKit(getSystem()) {{
266 ActorRef shard = getSystem().actorOf(newShardProps(), "testCreateTransaction");
268 waitUntilLeader(shard);
270 shard.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
272 shard.tell(new CreateTransaction("txn-1",
273 TransactionProxy.TransactionType.READ_ONLY.ordinal() ).toSerializable(), getRef());
275 CreateTransactionReply reply = expectMsgClass(duration("3 seconds"),
276 CreateTransactionReply.class);
278 String path = reply.getTransactionActorPath().toString();
279 assertTrue("Unexpected transaction path " + path,
280 path.contains("akka://test/user/testCreateTransaction/shard-txn-1"));
282 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
287 public void testCreateTransactionOnChain(){
288 new ShardTestKit(getSystem()) {{
289 final ActorRef shard = getSystem().actorOf(newShardProps(), "testCreateTransactionOnChain");
291 waitUntilLeader(shard);
293 shard.tell(new CreateTransaction("txn-1",
294 TransactionProxy.TransactionType.READ_ONLY.ordinal() , "foobar").toSerializable(),
297 CreateTransactionReply reply = expectMsgClass(duration("3 seconds"),
298 CreateTransactionReply.class);
300 String path = reply.getTransactionActorPath().toString();
301 assertTrue("Unexpected transaction path " + path,
302 path.contains("akka://test/user/testCreateTransactionOnChain/shard-txn-1"));
304 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
309 public void testPeerAddressResolved(){
310 new ShardTestKit(getSystem()) {{
311 final CountDownLatch recoveryComplete = new CountDownLatch(1);
312 class TestShard extends Shard {
314 super(shardID, Collections.<ShardIdentifier, String>singletonMap(shardID, null),
315 dataStoreContext, SCHEMA_CONTEXT);
318 Map<String, String> getPeerAddresses() {
319 return getRaftActorContext().getPeerAddresses();
323 protected void onRecoveryComplete() {
325 super.onRecoveryComplete();
327 recoveryComplete.countDown();
332 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
333 Props.create(new DelegatingShardCreator(new Creator<Shard>() {
335 public TestShard create() throws Exception {
336 return new TestShard();
338 })), "testPeerAddressResolved");
340 //waitUntilLeader(shard);
341 assertEquals("Recovery complete", true,
342 Uninterruptibles.awaitUninterruptibly(recoveryComplete, 5, TimeUnit.SECONDS));
344 String address = "akka://foobar";
345 shard.underlyingActor().onReceiveCommand(new PeerAddressResolved(shardID, address));
347 assertEquals("getPeerAddresses", address,
348 ((TestShard)shard.underlyingActor()).getPeerAddresses().get(shardID.toString()));
350 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
355 public void testApplySnapshot() throws ExecutionException, InterruptedException {
356 TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(),
357 "testApplySnapshot");
359 NormalizedNodeToNodeCodec codec =
360 new NormalizedNodeToNodeCodec(SCHEMA_CONTEXT);
362 writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
364 YangInstanceIdentifier root = YangInstanceIdentifier.builder().build();
365 NormalizedNode<?,?> expected = readStore(shard, root);
367 NormalizedNodeMessages.Container encode = codec.encode(expected);
369 ApplySnapshot applySnapshot = new ApplySnapshot(Snapshot.create(
370 encode.getNormalizedNode().toByteString().toByteArray(),
371 Collections.<ReplicatedLogEntry>emptyList(), 1, 2, 3, 4));
373 shard.underlyingActor().onReceiveCommand(applySnapshot);
375 NormalizedNode<?,?> actual = readStore(shard, root);
377 assertEquals(expected, actual);
379 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
383 public void testApplyState() throws Exception {
385 TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(), "testApplyState");
387 NormalizedNode<?, ?> node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
389 MutableCompositeModification compMod = new MutableCompositeModification();
390 compMod.addModification(new WriteModification(TestModel.TEST_PATH, node, SCHEMA_CONTEXT));
391 Payload payload = new CompositeModificationPayload(compMod.toSerializable());
392 ApplyState applyState = new ApplyState(null, "test",
393 new ReplicatedLogImplEntry(1, 2, payload));
395 shard.underlyingActor().onReceiveCommand(applyState);
397 NormalizedNode<?,?> actual = readStore(shard, TestModel.TEST_PATH);
398 assertEquals("Applied state", node, actual);
400 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
403 @SuppressWarnings("serial")
405 public void testRecovery() throws Exception {
407 // Set up the InMemorySnapshotStore.
409 InMemoryDOMDataStore testStore = InMemoryDOMDataStoreFactory.create("Test", null, null);
410 testStore.onGlobalContextUpdated(SCHEMA_CONTEXT);
412 DOMStoreWriteTransaction writeTx = testStore.newWriteOnlyTransaction();
413 writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
414 DOMStoreThreePhaseCommitCohort commitCohort = writeTx.ready();
415 commitCohort.preCommit().get();
416 commitCohort.commit().get();
418 DOMStoreReadTransaction readTx = testStore.newReadOnlyTransaction();
419 NormalizedNode<?, ?> root = readTx.read(YangInstanceIdentifier.builder().build()).get().get();
421 InMemorySnapshotStore.addSnapshot(shardID.toString(), Snapshot.create(
422 new NormalizedNodeToNodeCodec(SCHEMA_CONTEXT).encode(
424 getNormalizedNode().toByteString().toByteArray(),
425 Collections.<ReplicatedLogEntry>emptyList(), 0, 1, -1, -1));
427 // Set up the InMemoryJournal.
429 InMemoryJournal.addEntry(shardID.toString(), 0, new ReplicatedLogImplEntry(0, 1, newPayload(
430 new WriteModification(TestModel.OUTER_LIST_PATH,
431 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
434 int nListEntries = 11;
435 Set<Integer> listEntryKeys = new HashSet<>();
436 for(int i = 1; i <= nListEntries; i++) {
437 listEntryKeys.add(Integer.valueOf(i));
438 YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
439 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
440 Modification mod = new MergeModification(path,
441 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i),
443 InMemoryJournal.addEntry(shardID.toString(), i, new ReplicatedLogImplEntry(i, 1,
447 InMemoryJournal.addEntry(shardID.toString(), nListEntries + 1,
448 new ApplyLogEntries(nListEntries));
450 // Create the actor and wait for recovery complete.
452 final CountDownLatch recoveryComplete = new CountDownLatch(1);
454 Creator<Shard> creator = new Creator<Shard>() {
456 public Shard create() throws Exception {
457 return new Shard(shardID, Collections.<ShardIdentifier,String>emptyMap(),
458 dataStoreContext, SCHEMA_CONTEXT) {
460 protected void onRecoveryComplete() {
462 super.onRecoveryComplete();
464 recoveryComplete.countDown();
471 TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
472 Props.create(new DelegatingShardCreator(creator)), "testRecovery");
474 assertEquals("Recovery complete", true, recoveryComplete.await(5, TimeUnit.SECONDS));
476 // Verify data in the data store.
478 NormalizedNode<?, ?> outerList = readStore(shard, TestModel.OUTER_LIST_PATH);
479 assertNotNull(TestModel.OUTER_LIST_QNAME.getLocalName() + " not found", outerList);
480 assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " value is not Iterable",
481 outerList.getValue() instanceof Iterable);
482 for(Object entry: (Iterable<?>) outerList.getValue()) {
483 assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " entry is not MapEntryNode",
484 entry instanceof MapEntryNode);
485 MapEntryNode mapEntry = (MapEntryNode)entry;
486 Optional<DataContainerChild<? extends PathArgument, ?>> idLeaf =
487 mapEntry.getChild(new YangInstanceIdentifier.NodeIdentifier(TestModel.ID_QNAME));
488 assertTrue("Missing leaf " + TestModel.ID_QNAME.getLocalName(), idLeaf.isPresent());
489 Object value = idLeaf.get().getValue();
490 assertTrue("Unexpected value for leaf "+ TestModel.ID_QNAME.getLocalName() + ": " + value,
491 listEntryKeys.remove(value));
494 if(!listEntryKeys.isEmpty()) {
495 fail("Missing " + TestModel.OUTER_LIST_QNAME.getLocalName() + " entries with keys: " +
499 assertEquals("Last log index", nListEntries,
500 shard.underlyingActor().getShardMBean().getLastLogIndex());
501 assertEquals("Commit index", nListEntries,
502 shard.underlyingActor().getShardMBean().getCommitIndex());
503 assertEquals("Last applied", nListEntries,
504 shard.underlyingActor().getShardMBean().getLastApplied());
506 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
509 private CompositeModificationPayload newPayload(Modification... mods) {
510 MutableCompositeModification compMod = new MutableCompositeModification();
511 for(Modification mod: mods) {
512 compMod.addModification(mod);
515 return new CompositeModificationPayload(compMod.toSerializable());
518 private DOMStoreThreePhaseCommitCohort setupMockWriteTransaction(String cohortName,
519 InMemoryDOMDataStore dataStore, YangInstanceIdentifier path, NormalizedNode data,
520 MutableCompositeModification modification) {
521 return setupMockWriteTransaction(cohortName, dataStore, path, data, modification, null);
524 private DOMStoreThreePhaseCommitCohort setupMockWriteTransaction(String cohortName,
525 InMemoryDOMDataStore dataStore, YangInstanceIdentifier path, NormalizedNode data,
526 MutableCompositeModification modification,
527 final Function<DOMStoreThreePhaseCommitCohort,ListenableFuture<Void>> preCommit) {
529 DOMStoreWriteTransaction tx = dataStore.newWriteOnlyTransaction();
530 tx.write(path, data);
531 final DOMStoreThreePhaseCommitCohort realCohort = tx.ready();
532 DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, cohortName);
534 doAnswer(new Answer<ListenableFuture<Boolean>>() {
536 public ListenableFuture<Boolean> answer(InvocationOnMock invocation) {
537 return realCohort.canCommit();
539 }).when(cohort).canCommit();
541 doAnswer(new Answer<ListenableFuture<Void>>() {
543 public ListenableFuture<Void> answer(InvocationOnMock invocation) throws Throwable {
544 if(preCommit != null) {
545 return preCommit.apply(realCohort);
547 return realCohort.preCommit();
550 }).when(cohort).preCommit();
552 doAnswer(new Answer<ListenableFuture<Void>>() {
554 public ListenableFuture<Void> answer(InvocationOnMock invocation) throws Throwable {
555 return realCohort.commit();
557 }).when(cohort).commit();
559 doAnswer(new Answer<ListenableFuture<Void>>() {
561 public ListenableFuture<Void> answer(InvocationOnMock invocation) throws Throwable {
562 return realCohort.abort();
564 }).when(cohort).abort();
566 modification.addModification(new WriteModification(path, data, SCHEMA_CONTEXT));
571 @SuppressWarnings({ "unchecked" })
573 public void testConcurrentThreePhaseCommits() throws Throwable {
574 System.setProperty("shard.persistent", "true");
575 new ShardTestKit(getSystem()) {{
576 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
577 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
578 "testConcurrentThreePhaseCommits");
580 waitUntilLeader(shard);
582 // Setup 3 simulated transactions with mock cohorts backed by real cohorts.
584 InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
586 String transactionID1 = "tx1";
587 MutableCompositeModification modification1 = new MutableCompositeModification();
588 DOMStoreThreePhaseCommitCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
589 TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
591 String transactionID2 = "tx2";
592 MutableCompositeModification modification2 = new MutableCompositeModification();
593 DOMStoreThreePhaseCommitCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
594 TestModel.OUTER_LIST_PATH,
595 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
598 String transactionID3 = "tx3";
599 MutableCompositeModification modification3 = new MutableCompositeModification();
600 DOMStoreThreePhaseCommitCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
601 YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
602 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
603 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1),
607 final FiniteDuration duration = FiniteDuration.create(timeoutSec, TimeUnit.SECONDS);
608 final Timeout timeout = new Timeout(duration);
610 // Simulate the ForwardedReadyTransaction message for the first Tx that would be sent
611 // by the ShardTransaction.
613 shard.tell(new ForwardedReadyTransaction(transactionID1, cohort1, modification1, true), getRef());
614 ReadyTransactionReply readyReply = ReadyTransactionReply.fromSerializable(
615 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS));
616 assertEquals("Cohort path", shard.path().toString(), readyReply.getCohortPath());
618 // Send the CanCommitTransaction message for the first Tx.
620 shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
621 CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
622 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
623 assertEquals("Can commit", true, canCommitReply.getCanCommit());
625 // Send the ForwardedReadyTransaction for the next 2 Tx's.
627 shard.tell(new ForwardedReadyTransaction(transactionID2, cohort2, modification2, true), getRef());
628 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
630 shard.tell(new ForwardedReadyTransaction(transactionID3, cohort3, modification3, true), getRef());
631 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
633 // Send the CanCommitTransaction message for the next 2 Tx's. These should get queued and
634 // processed after the first Tx completes.
636 Future<Object> canCommitFuture1 = Patterns.ask(shard,
637 new CanCommitTransaction(transactionID2).toSerializable(), timeout);
639 Future<Object> canCommitFuture2 = Patterns.ask(shard,
640 new CanCommitTransaction(transactionID3).toSerializable(), timeout);
642 // Send the CommitTransaction message for the first Tx. After it completes, it should
643 // trigger the 2nd Tx to proceed which should in turn then trigger the 3rd.
645 shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
646 expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
648 // Wait for the next 2 Tx's to complete.
650 final AtomicReference<Throwable> caughtEx = new AtomicReference<>();
651 final CountDownLatch commitLatch = new CountDownLatch(2);
653 class OnFutureComplete extends OnComplete<Object> {
654 private final Class<?> expRespType;
656 OnFutureComplete(Class<?> expRespType) {
657 this.expRespType = expRespType;
661 public void onComplete(Throwable error, Object resp) {
663 caughtEx.set(new AssertionError(getClass().getSimpleName() + " failure", error));
666 assertEquals("Commit response type", expRespType, resp.getClass());
668 } catch (Exception e) {
674 void onSuccess(Object resp) throws Exception {
678 class OnCommitFutureComplete extends OnFutureComplete {
679 OnCommitFutureComplete() {
680 super(CommitTransactionReply.SERIALIZABLE_CLASS);
684 public void onComplete(Throwable error, Object resp) {
685 super.onComplete(error, resp);
686 commitLatch.countDown();
690 class OnCanCommitFutureComplete extends OnFutureComplete {
691 private final String transactionID;
693 OnCanCommitFutureComplete(String transactionID) {
694 super(CanCommitTransactionReply.SERIALIZABLE_CLASS);
695 this.transactionID = transactionID;
699 void onSuccess(Object resp) throws Exception {
700 CanCommitTransactionReply canCommitReply =
701 CanCommitTransactionReply.fromSerializable(resp);
702 assertEquals("Can commit", true, canCommitReply.getCanCommit());
704 Future<Object> commitFuture = Patterns.ask(shard,
705 new CommitTransaction(transactionID).toSerializable(), timeout);
706 commitFuture.onComplete(new OnCommitFutureComplete(), getSystem().dispatcher());
710 canCommitFuture1.onComplete(new OnCanCommitFutureComplete(transactionID2),
711 getSystem().dispatcher());
713 canCommitFuture2.onComplete(new OnCanCommitFutureComplete(transactionID3),
714 getSystem().dispatcher());
716 boolean done = commitLatch.await(timeoutSec, TimeUnit.SECONDS);
718 if(caughtEx.get() != null) {
719 throw caughtEx.get();
722 assertEquals("Commits complete", true, done);
724 InOrder inOrder = inOrder(cohort1, cohort2, cohort3);
725 inOrder.verify(cohort1).canCommit();
726 inOrder.verify(cohort1).preCommit();
727 inOrder.verify(cohort1).commit();
728 inOrder.verify(cohort2).canCommit();
729 inOrder.verify(cohort2).preCommit();
730 inOrder.verify(cohort2).commit();
731 inOrder.verify(cohort3).canCommit();
732 inOrder.verify(cohort3).preCommit();
733 inOrder.verify(cohort3).commit();
735 // Verify data in the data store.
737 NormalizedNode<?, ?> outerList = readStore(shard, TestModel.OUTER_LIST_PATH);
738 assertNotNull(TestModel.OUTER_LIST_QNAME.getLocalName() + " not found", outerList);
739 assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " value is not Iterable",
740 outerList.getValue() instanceof Iterable);
741 Object entry = ((Iterable<Object>)outerList.getValue()).iterator().next();
742 assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " entry is not MapEntryNode",
743 entry instanceof MapEntryNode);
744 MapEntryNode mapEntry = (MapEntryNode)entry;
745 Optional<DataContainerChild<? extends PathArgument, ?>> idLeaf =
746 mapEntry.getChild(new YangInstanceIdentifier.NodeIdentifier(TestModel.ID_QNAME));
747 assertTrue("Missing leaf " + TestModel.ID_QNAME.getLocalName(), idLeaf.isPresent());
748 assertEquals(TestModel.ID_QNAME.getLocalName() + " value", 1, idLeaf.get().getValue());
750 for(int i = 0; i < 20 * 5; i++) {
751 long lastLogIndex = shard.underlyingActor().getShardMBean().getLastLogIndex();
752 if(lastLogIndex == 2) {
755 Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
758 assertEquals("Last log index", 2, shard.underlyingActor().getShardMBean().getLastLogIndex());
760 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
765 public void testCommitPhaseFailure() throws Throwable {
766 new ShardTestKit(getSystem()) {{
767 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
768 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
769 "testCommitPhaseFailure");
771 waitUntilLeader(shard);
773 // Setup 2 simulated transactions with mock cohorts. The first one fails in the
776 String transactionID1 = "tx1";
777 MutableCompositeModification modification1 = new MutableCompositeModification();
778 DOMStoreThreePhaseCommitCohort cohort1 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
779 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
780 doReturn(Futures.immediateFuture(null)).when(cohort1).preCommit();
781 doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort1).commit();
783 String transactionID2 = "tx2";
784 MutableCompositeModification modification2 = new MutableCompositeModification();
785 DOMStoreThreePhaseCommitCohort cohort2 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort2");
786 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
788 FiniteDuration duration = duration("5 seconds");
789 final Timeout timeout = new Timeout(duration);
791 // Simulate the ForwardedReadyTransaction messages that would be sent
792 // by the ShardTransaction.
794 shard.tell(new ForwardedReadyTransaction(transactionID1, cohort1, modification1, true), getRef());
795 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
797 shard.tell(new ForwardedReadyTransaction(transactionID2, cohort2, modification2, true), getRef());
798 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
800 // Send the CanCommitTransaction message for the first Tx.
802 shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
803 CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
804 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
805 assertEquals("Can commit", true, canCommitReply.getCanCommit());
807 // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
808 // processed after the first Tx completes.
810 Future<Object> canCommitFuture = Patterns.ask(shard,
811 new CanCommitTransaction(transactionID2).toSerializable(), timeout);
813 // Send the CommitTransaction message for the first Tx. This should send back an error
814 // and trigger the 2nd Tx to proceed.
816 shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
817 expectMsgClass(duration, akka.actor.Status.Failure.class);
819 // Wait for the 2nd Tx to complete the canCommit phase.
821 final CountDownLatch latch = new CountDownLatch(1);
822 canCommitFuture.onComplete(new OnComplete<Object>() {
824 public void onComplete(Throwable t, Object resp) {
827 }, getSystem().dispatcher());
829 assertEquals("2nd CanCommit complete", true, latch.await(5, TimeUnit.SECONDS));
831 InOrder inOrder = inOrder(cohort1, cohort2);
832 inOrder.verify(cohort1).canCommit();
833 inOrder.verify(cohort1).preCommit();
834 inOrder.verify(cohort1).commit();
835 inOrder.verify(cohort2).canCommit();
837 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
842 public void testPreCommitPhaseFailure() throws Throwable {
843 new ShardTestKit(getSystem()) {{
844 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
845 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
846 "testPreCommitPhaseFailure");
848 waitUntilLeader(shard);
850 String transactionID = "tx1";
851 MutableCompositeModification modification = new MutableCompositeModification();
852 DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
853 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
854 doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).preCommit();
856 FiniteDuration duration = duration("5 seconds");
858 // Simulate the ForwardedReadyTransaction messages that would be sent
859 // by the ShardTransaction.
861 shard.tell(new ForwardedReadyTransaction(transactionID, cohort, modification, true), getRef());
862 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
864 // Send the CanCommitTransaction message.
866 shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
867 CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
868 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
869 assertEquals("Can commit", true, canCommitReply.getCanCommit());
871 // Send the CommitTransaction message. This should send back an error
872 // for preCommit failure.
874 shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
875 expectMsgClass(duration, akka.actor.Status.Failure.class);
877 InOrder inOrder = inOrder(cohort);
878 inOrder.verify(cohort).canCommit();
879 inOrder.verify(cohort).preCommit();
881 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
886 public void testCanCommitPhaseFailure() throws Throwable {
887 new ShardTestKit(getSystem()) {{
888 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
889 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
890 "testCanCommitPhaseFailure");
892 waitUntilLeader(shard);
894 final FiniteDuration duration = duration("5 seconds");
896 String transactionID = "tx1";
897 MutableCompositeModification modification = new MutableCompositeModification();
898 DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
899 doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).canCommit();
901 // Simulate the ForwardedReadyTransaction messages that would be sent
902 // by the ShardTransaction.
904 shard.tell(new ForwardedReadyTransaction(transactionID, cohort, modification, true), getRef());
905 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
907 // Send the CanCommitTransaction message.
909 shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
910 expectMsgClass(duration, akka.actor.Status.Failure.class);
912 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
917 public void testAbortBeforeFinishCommit() throws Throwable {
918 System.setProperty("shard.persistent", "true");
919 new ShardTestKit(getSystem()) {{
920 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
921 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
922 "testAbortBeforeFinishCommit");
924 waitUntilLeader(shard);
926 final FiniteDuration duration = duration("5 seconds");
927 final Timeout timeout = new Timeout(duration);
929 InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
931 final String transactionID = "tx1";
932 final CountDownLatch abortComplete = new CountDownLatch(1);
933 Function<DOMStoreThreePhaseCommitCohort,ListenableFuture<Void>> preCommit =
934 new Function<DOMStoreThreePhaseCommitCohort,ListenableFuture<Void>>() {
936 public ListenableFuture<Void> apply(final DOMStoreThreePhaseCommitCohort cohort) {
937 ListenableFuture<Void> preCommitFuture = cohort.preCommit();
939 Future<Object> abortFuture = Patterns.ask(shard,
940 new AbortTransaction(transactionID).toSerializable(), timeout);
941 abortFuture.onComplete(new OnComplete<Object>() {
943 public void onComplete(Throwable e, Object resp) {
944 abortComplete.countDown();
946 }, getSystem().dispatcher());
948 return preCommitFuture;
952 MutableCompositeModification modification = new MutableCompositeModification();
953 DOMStoreThreePhaseCommitCohort cohort = setupMockWriteTransaction("cohort1", dataStore,
954 TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME),
955 modification, preCommit);
957 shard.tell(new ForwardedReadyTransaction(transactionID, cohort, modification, true), getRef());
958 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
960 shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
961 CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
962 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
963 assertEquals("Can commit", true, canCommitReply.getCanCommit());
965 Future<Object> commitFuture = Patterns.ask(shard,
966 new CommitTransaction(transactionID).toSerializable(), timeout);
968 assertEquals("Abort complete", true, abortComplete.await(5, TimeUnit.SECONDS));
970 Await.result(commitFuture, duration);
972 NormalizedNode<?, ?> node = readStore(shard, TestModel.TEST_PATH);
973 assertNotNull(TestModel.TEST_QNAME.getLocalName() + " not found", node);
975 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
980 public void testTransactionCommitTimeout() throws Throwable {
981 dataStoreContext = DatastoreContext.newBuilder().shardTransactionCommitTimeoutInSeconds(1).build();
983 new ShardTestKit(getSystem()) {{
984 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
985 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
986 "testTransactionCommitTimeout");
988 waitUntilLeader(shard);
990 final FiniteDuration duration = duration("5 seconds");
992 InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
994 writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
995 writeToStore(shard, TestModel.OUTER_LIST_PATH,
996 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
998 // Create 1st Tx - will timeout
1000 String transactionID1 = "tx1";
1001 MutableCompositeModification modification1 = new MutableCompositeModification();
1002 DOMStoreThreePhaseCommitCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
1003 YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
1004 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
1005 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1),
1010 String transactionID2 = "tx3";
1011 MutableCompositeModification modification2 = new MutableCompositeModification();
1012 YangInstanceIdentifier listNodePath = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
1013 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build();
1014 DOMStoreThreePhaseCommitCohort cohort2 = setupMockWriteTransaction("cohort3", dataStore,
1016 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2),
1021 shard.tell(new ForwardedReadyTransaction(transactionID1, cohort1, modification1, true), getRef());
1022 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1024 shard.tell(new ForwardedReadyTransaction(transactionID2, cohort2, modification2, true), getRef());
1025 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1027 // canCommit 1st Tx. We don't send the commit so it should timeout.
1029 shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1030 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
1032 // canCommit the 2nd Tx - it should complete after the 1st Tx times out.
1034 shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
1035 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
1037 // Commit the 2nd Tx.
1039 shard.tell(new CommitTransaction(transactionID2).toSerializable(), getRef());
1040 expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1042 NormalizedNode<?, ?> node = readStore(shard, listNodePath);
1043 assertNotNull(listNodePath + " not found", node);
1045 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1050 public void testTransactionCommitQueueCapacityExceeded() throws Throwable {
1051 dataStoreContext = DatastoreContext.newBuilder().shardTransactionCommitQueueCapacity(1).build();
1053 new ShardTestKit(getSystem()) {{
1054 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1055 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1056 "testTransactionCommitQueueCapacityExceeded");
1058 waitUntilLeader(shard);
1060 final FiniteDuration duration = duration("5 seconds");
1062 InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
1064 String transactionID1 = "tx1";
1065 MutableCompositeModification modification1 = new MutableCompositeModification();
1066 DOMStoreThreePhaseCommitCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
1067 TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
1069 String transactionID2 = "tx2";
1070 MutableCompositeModification modification2 = new MutableCompositeModification();
1071 DOMStoreThreePhaseCommitCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
1072 TestModel.OUTER_LIST_PATH,
1073 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
1076 String transactionID3 = "tx3";
1077 MutableCompositeModification modification3 = new MutableCompositeModification();
1078 DOMStoreThreePhaseCommitCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
1079 TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification3);
1083 shard.tell(new ForwardedReadyTransaction(transactionID1, cohort1, modification1, true), getRef());
1084 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1086 shard.tell(new ForwardedReadyTransaction(transactionID2, cohort2, modification2, true), getRef());
1087 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1089 shard.tell(new ForwardedReadyTransaction(transactionID3, cohort3, modification3, true), getRef());
1090 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1092 // canCommit 1st Tx.
1094 shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1095 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
1097 // canCommit the 2nd Tx - it should get queued.
1099 shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
1101 // canCommit the 3rd Tx - should exceed queue capacity and fail.
1103 shard.tell(new CanCommitTransaction(transactionID3).toSerializable(), getRef());
1104 expectMsgClass(duration, akka.actor.Status.Failure.class);
1106 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1111 public void testCanCommitBeforeReadyFailure() throws Throwable {
1112 new ShardTestKit(getSystem()) {{
1113 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1114 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1115 "testCanCommitBeforeReadyFailure");
1117 shard.tell(new CanCommitTransaction("tx").toSerializable(), getRef());
1118 expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
1120 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1125 public void testAbortTransaction() throws Throwable {
1126 new ShardTestKit(getSystem()) {{
1127 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1128 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1129 "testAbortTransaction");
1131 waitUntilLeader(shard);
1133 // Setup 2 simulated transactions with mock cohorts. The first one will be aborted.
1135 String transactionID1 = "tx1";
1136 MutableCompositeModification modification1 = new MutableCompositeModification();
1137 DOMStoreThreePhaseCommitCohort cohort1 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
1138 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
1139 doReturn(Futures.immediateFuture(null)).when(cohort1).abort();
1141 String transactionID2 = "tx2";
1142 MutableCompositeModification modification2 = new MutableCompositeModification();
1143 DOMStoreThreePhaseCommitCohort cohort2 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort2");
1144 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
1146 FiniteDuration duration = duration("5 seconds");
1147 final Timeout timeout = new Timeout(duration);
1149 // Simulate the ForwardedReadyTransaction messages that would be sent
1150 // by the ShardTransaction.
1152 shard.tell(new ForwardedReadyTransaction(transactionID1, cohort1, modification1, true), getRef());
1153 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1155 shard.tell(new ForwardedReadyTransaction(transactionID2, cohort2, modification2, true), getRef());
1156 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1158 // Send the CanCommitTransaction message for the first Tx.
1160 shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1161 CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1162 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1163 assertEquals("Can commit", true, canCommitReply.getCanCommit());
1165 // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
1166 // processed after the first Tx completes.
1168 Future<Object> canCommitFuture = Patterns.ask(shard,
1169 new CanCommitTransaction(transactionID2).toSerializable(), timeout);
1171 // Send the AbortTransaction message for the first Tx. This should trigger the 2nd
1174 shard.tell(new AbortTransaction(transactionID1).toSerializable(), getRef());
1175 expectMsgClass(duration, AbortTransactionReply.SERIALIZABLE_CLASS);
1177 // Wait for the 2nd Tx to complete the canCommit phase.
1179 final CountDownLatch latch = new CountDownLatch(1);
1180 canCommitFuture.onComplete(new OnComplete<Object>() {
1182 public void onComplete(Throwable t, Object resp) {
1185 }, getSystem().dispatcher());
1187 assertEquals("2nd CanCommit complete", true, latch.await(5, TimeUnit.SECONDS));
1189 InOrder inOrder = inOrder(cohort1, cohort2);
1190 inOrder.verify(cohort1).canCommit();
1191 inOrder.verify(cohort2).canCommit();
1193 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1198 public void testCreateSnapshot() throws IOException, InterruptedException {
1199 new ShardTestKit(getSystem()) {{
1200 final AtomicReference<CountDownLatch> latch = new AtomicReference<>(new CountDownLatch(1));
1201 Creator<Shard> creator = new Creator<Shard>() {
1203 public Shard create() throws Exception {
1204 return new Shard(shardID, Collections.<ShardIdentifier,String>emptyMap(),
1205 dataStoreContext, SCHEMA_CONTEXT) {
1207 public void saveSnapshot(Object snapshot) {
1208 super.saveSnapshot(snapshot);
1209 latch.get().countDown();
1215 TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1216 Props.create(new DelegatingShardCreator(creator)), "testCreateSnapshot");
1218 waitUntilLeader(shard);
1220 shard.tell(new CaptureSnapshot(-1,-1,-1,-1), getRef());
1222 assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS));
1224 latch.set(new CountDownLatch(1));
1225 shard.tell(new CaptureSnapshot(-1,-1,-1,-1), getRef());
1227 assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS));
1229 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1234 * This test simply verifies that the applySnapShot logic will work
1235 * @throws ReadFailedException
1238 public void testInMemoryDataStoreRestore() throws ReadFailedException {
1239 InMemoryDOMDataStore store = new InMemoryDOMDataStore("test", MoreExecutors.listeningDecorator(
1240 MoreExecutors.sameThreadExecutor()), MoreExecutors.sameThreadExecutor());
1242 store.onGlobalContextUpdated(SCHEMA_CONTEXT);
1244 DOMStoreWriteTransaction putTransaction = store.newWriteOnlyTransaction();
1245 putTransaction.write(TestModel.TEST_PATH,
1246 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1247 commitTransaction(putTransaction);
1250 NormalizedNode expected = readStore(store);
1252 DOMStoreWriteTransaction writeTransaction = store.newWriteOnlyTransaction();
1254 writeTransaction.delete(YangInstanceIdentifier.builder().build());
1255 writeTransaction.write(YangInstanceIdentifier.builder().build(), expected);
1257 commitTransaction(writeTransaction);
1259 NormalizedNode actual = readStore(store);
1261 assertEquals(expected, actual);
1265 private NormalizedNode readStore(InMemoryDOMDataStore store) throws ReadFailedException {
1266 DOMStoreReadTransaction transaction = store.newReadOnlyTransaction();
1267 CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read =
1268 transaction.read(YangInstanceIdentifier.builder().build());
1270 Optional<NormalizedNode<?, ?>> optional = read.checkedGet();
1272 NormalizedNode<?, ?> normalizedNode = optional.get();
1274 transaction.close();
1276 return normalizedNode;
1279 private void commitTransaction(DOMStoreWriteTransaction transaction) {
1280 DOMStoreThreePhaseCommitCohort commitCohort = transaction.ready();
1281 ListenableFuture<Void> future =
1282 commitCohort.preCommit();
1285 future = commitCohort.commit();
1287 } catch (InterruptedException | ExecutionException e) {
1291 private AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> noOpDataChangeListener() {
1292 return new AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>() {
1294 public void onDataChanged(
1295 AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change) {
1301 private NormalizedNode<?,?> readStore(TestActorRef<Shard> shard, YangInstanceIdentifier id)
1302 throws ExecutionException, InterruptedException {
1303 DOMStoreReadTransaction transaction = shard.underlyingActor().getDataStore().newReadOnlyTransaction();
1305 CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> future =
1306 transaction.read(id);
1308 Optional<NormalizedNode<?, ?>> optional = future.get();
1309 NormalizedNode<?, ?> node = optional.isPresent()? optional.get() : null;
1311 transaction.close();
1316 private void writeToStore(TestActorRef<Shard> shard, YangInstanceIdentifier id, NormalizedNode<?,?> node)
1317 throws ExecutionException, InterruptedException {
1318 DOMStoreWriteTransaction transaction = shard.underlyingActor().getDataStore().newWriteOnlyTransaction();
1320 transaction.write(id, node);
1322 DOMStoreThreePhaseCommitCohort commitCohort = transaction.ready();
1323 commitCohort.preCommit().get();
1324 commitCohort.commit().get();
1327 private static final class DelegatingShardCreator implements Creator<Shard> {
1328 private final Creator<Shard> delegate;
1330 DelegatingShardCreator(Creator<Shard> delegate) {
1331 this.delegate = delegate;
1335 public Shard create() throws Exception {
1336 return delegate.create();