1 package org.opendaylight.controller.cluster.datastore;
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertFalse;
5 import static org.junit.Assert.assertNotNull;
6 import static org.junit.Assert.assertNull;
7 import static org.junit.Assert.assertTrue;
8 import static org.junit.Assert.fail;
9 import static org.mockito.Mockito.doAnswer;
10 import static org.mockito.Mockito.doReturn;
11 import static org.mockito.Mockito.inOrder;
12 import static org.mockito.Mockito.mock;
13 import static org.opendaylight.controller.cluster.datastore.DataStoreVersions.CURRENT_VERSION;
14 import akka.actor.ActorRef;
15 import akka.actor.PoisonPill;
16 import akka.actor.Props;
17 import akka.dispatch.Dispatchers;
18 import akka.dispatch.OnComplete;
19 import akka.japi.Creator;
20 import akka.pattern.Patterns;
21 import akka.testkit.TestActorRef;
22 import akka.util.Timeout;
23 import com.google.common.base.Function;
24 import com.google.common.base.Optional;
25 import com.google.common.util.concurrent.CheckedFuture;
26 import com.google.common.util.concurrent.Futures;
27 import com.google.common.util.concurrent.ListenableFuture;
28 import com.google.common.util.concurrent.MoreExecutors;
29 import com.google.common.util.concurrent.Uninterruptibles;
30 import java.io.IOException;
31 import java.util.Collections;
32 import java.util.HashSet;
35 import java.util.concurrent.CountDownLatch;
36 import java.util.concurrent.ExecutionException;
37 import java.util.concurrent.TimeUnit;
38 import java.util.concurrent.atomic.AtomicInteger;
39 import java.util.concurrent.atomic.AtomicReference;
40 import org.junit.After;
41 import org.junit.Before;
42 import org.junit.Test;
43 import org.mockito.InOrder;
44 import org.mockito.invocation.InvocationOnMock;
45 import org.mockito.stubbing.Answer;
46 import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
47 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
48 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
49 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
50 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
51 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
52 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
53 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
54 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
55 import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
56 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
57 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
58 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
59 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
60 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
61 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
62 import org.opendaylight.controller.cluster.datastore.modification.Modification;
63 import org.opendaylight.controller.cluster.datastore.modification.ModificationPayload;
64 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
65 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
66 import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec;
67 import org.opendaylight.controller.cluster.datastore.utils.InMemoryJournal;
68 import org.opendaylight.controller.cluster.datastore.utils.InMemorySnapshotStore;
69 import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener;
70 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
71 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
72 import org.opendaylight.controller.cluster.raft.Snapshot;
73 import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
74 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
75 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
76 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
77 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
78 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
79 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
80 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload;
81 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload;
82 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
83 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
84 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
85 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
86 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
87 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
88 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
89 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreFactory;
90 import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages;
91 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
92 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
93 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
94 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
95 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
96 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
97 import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
98 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
99 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
100 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
101 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
102 import scala.concurrent.Await;
103 import scala.concurrent.Future;
104 import scala.concurrent.duration.FiniteDuration;
106 public class ShardTest extends AbstractActorTest {
108 private static final SchemaContext SCHEMA_CONTEXT = TestModel.createTestContext();
110 private static final AtomicInteger NEXT_SHARD_NUM = new AtomicInteger();
112 private final ShardIdentifier shardID = ShardIdentifier.builder().memberName("member-1")
113 .shardName("inventory").type("config" + NEXT_SHARD_NUM.getAndIncrement()).build();
115 private final Builder dataStoreContextBuilder = DatastoreContext.newBuilder().
116 shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).
117 shardHeartbeatIntervalInMillis(100);
120 public void setUp() {
121 Builder newBuilder = DatastoreContext.newBuilder();
122 InMemorySnapshotStore.clear();
123 InMemoryJournal.clear();
127 public void tearDown() {
128 InMemorySnapshotStore.clear();
129 InMemoryJournal.clear();
132 private DatastoreContext newDatastoreContext() {
133 return dataStoreContextBuilder.build();
136 private Props newShardProps() {
137 return Shard.props(shardID, Collections.<ShardIdentifier,String>emptyMap(),
138 newDatastoreContext(), SCHEMA_CONTEXT);
142 public void testRegisterChangeListener() throws Exception {
143 new ShardTestKit(getSystem()) {{
144 TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
145 newShardProps(), "testRegisterChangeListener");
147 waitUntilLeader(shard);
149 shard.tell(new UpdateSchemaContext(SchemaContextHelper.full()), ActorRef.noSender());
151 MockDataChangeListener listener = new MockDataChangeListener(1);
152 ActorRef dclActor = getSystem().actorOf(DataChangeListener.props(listener),
153 "testRegisterChangeListener-DataChangeListener");
155 shard.tell(new RegisterChangeListener(TestModel.TEST_PATH,
156 dclActor.path(), AsyncDataBroker.DataChangeScope.BASE), getRef());
158 RegisterChangeListenerReply reply = expectMsgClass(duration("3 seconds"),
159 RegisterChangeListenerReply.class);
160 String replyPath = reply.getListenerRegistrationPath().toString();
161 assertTrue("Incorrect reply path: " + replyPath, replyPath.matches(
162 "akka:\\/\\/test\\/user\\/testRegisterChangeListener\\/\\$.*"));
164 YangInstanceIdentifier path = TestModel.TEST_PATH;
165 writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
167 listener.waitForChangeEvents(path);
169 dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
170 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
174 @SuppressWarnings("serial")
176 public void testChangeListenerNotifiedWhenNotTheLeaderOnRegistration() throws Exception {
177 // This test tests the timing window in which a change listener is registered before the
178 // shard becomes the leader. We verify that the listener is registered and notified of the
179 // existing data when the shard becomes the leader.
180 new ShardTestKit(getSystem()) {{
181 // For this test, we want to send the RegisterChangeListener message after the shard
182 // has recovered from persistence and before it becomes the leader. So we subclass
183 // Shard to override onReceiveCommand and, when the first ElectionTimeout is received,
184 // we know that the shard has been initialized to a follower and has started the
185 // election process. The following 2 CountDownLatches are used to coordinate the
186 // ElectionTimeout with the sending of the RegisterChangeListener message.
187 final CountDownLatch onFirstElectionTimeout = new CountDownLatch(1);
188 final CountDownLatch onChangeListenerRegistered = new CountDownLatch(1);
189 Creator<Shard> creator = new Creator<Shard>() {
190 boolean firstElectionTimeout = true;
193 public Shard create() throws Exception {
194 return new Shard(shardID, Collections.<ShardIdentifier,String>emptyMap(),
195 newDatastoreContext(), SCHEMA_CONTEXT) {
197 public void onReceiveCommand(final Object message) throws Exception {
198 if(message instanceof ElectionTimeout && firstElectionTimeout) {
199 // Got the first ElectionTimeout. We don't forward it to the
200 // base Shard yet until we've sent the RegisterChangeListener
201 // message. So we signal the onFirstElectionTimeout latch to tell
202 // the main thread to send the RegisterChangeListener message and
203 // start a thread to wait on the onChangeListenerRegistered latch,
204 // which the main thread signals after it has sent the message.
205 // After the onChangeListenerRegistered is triggered, we send the
206 // original ElectionTimeout message to proceed with the election.
207 firstElectionTimeout = false;
208 final ActorRef self = getSelf();
212 Uninterruptibles.awaitUninterruptibly(
213 onChangeListenerRegistered, 5, TimeUnit.SECONDS);
214 self.tell(message, self);
218 onFirstElectionTimeout.countDown();
220 super.onReceiveCommand(message);
227 MockDataChangeListener listener = new MockDataChangeListener(1);
228 ActorRef dclActor = getSystem().actorOf(DataChangeListener.props(listener),
229 "testRegisterChangeListenerWhenNotLeaderInitially-DataChangeListener");
231 TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
232 Props.create(new DelegatingShardCreator(creator)),
233 "testRegisterChangeListenerWhenNotLeaderInitially");
235 // Write initial data into the in-memory store.
236 YangInstanceIdentifier path = TestModel.TEST_PATH;
237 writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
239 // Wait until the shard receives the first ElectionTimeout message.
240 assertEquals("Got first ElectionTimeout", true,
241 onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
243 // Now send the RegisterChangeListener and wait for the reply.
244 shard.tell(new RegisterChangeListener(path, dclActor.path(),
245 AsyncDataBroker.DataChangeScope.SUBTREE), getRef());
247 RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
248 RegisterChangeListenerReply.class);
249 assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
251 // Sanity check - verify the shard is not the leader yet.
252 shard.tell(new FindLeader(), getRef());
253 FindLeaderReply findLeadeReply =
254 expectMsgClass(duration("5 seconds"), FindLeaderReply.class);
255 assertNull("Expected the shard not to be the leader", findLeadeReply.getLeaderActor());
257 // Signal the onChangeListenerRegistered latch to tell the thread above to proceed
258 // with the election process.
259 onChangeListenerRegistered.countDown();
261 // Wait for the shard to become the leader and notify our listener with the existing
262 // data in the store.
263 listener.waitForChangeEvents(path);
265 dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
266 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
271 public void testCreateTransaction(){
272 new ShardTestKit(getSystem()) {{
273 ActorRef shard = getSystem().actorOf(newShardProps(), "testCreateTransaction");
275 waitUntilLeader(shard);
277 shard.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
279 shard.tell(new CreateTransaction("txn-1",
280 TransactionProxy.TransactionType.READ_ONLY.ordinal() ).toSerializable(), getRef());
282 CreateTransactionReply reply = expectMsgClass(duration("3 seconds"),
283 CreateTransactionReply.class);
285 String path = reply.getTransactionActorPath().toString();
286 assertTrue("Unexpected transaction path " + path,
287 path.contains("akka://test/user/testCreateTransaction/shard-txn-1"));
289 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
294 public void testCreateTransactionOnChain(){
295 new ShardTestKit(getSystem()) {{
296 final ActorRef shard = getSystem().actorOf(newShardProps(), "testCreateTransactionOnChain");
298 waitUntilLeader(shard);
300 shard.tell(new CreateTransaction("txn-1",
301 TransactionProxy.TransactionType.READ_ONLY.ordinal() , "foobar").toSerializable(),
304 CreateTransactionReply reply = expectMsgClass(duration("3 seconds"),
305 CreateTransactionReply.class);
307 String path = reply.getTransactionActorPath().toString();
308 assertTrue("Unexpected transaction path " + path,
309 path.contains("akka://test/user/testCreateTransactionOnChain/shard-txn-1"));
311 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
315 @SuppressWarnings("serial")
317 public void testPeerAddressResolved() throws Exception {
318 new ShardTestKit(getSystem()) {{
319 final CountDownLatch recoveryComplete = new CountDownLatch(1);
320 class TestShard extends Shard {
322 super(shardID, Collections.<ShardIdentifier, String>singletonMap(shardID, null),
323 newDatastoreContext(), SCHEMA_CONTEXT);
326 Map<String, String> getPeerAddresses() {
327 return getRaftActorContext().getPeerAddresses();
331 protected void onRecoveryComplete() {
333 super.onRecoveryComplete();
335 recoveryComplete.countDown();
340 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
341 Props.create(new DelegatingShardCreator(new Creator<Shard>() {
343 public TestShard create() throws Exception {
344 return new TestShard();
346 })), "testPeerAddressResolved");
348 //waitUntilLeader(shard);
349 assertEquals("Recovery complete", true,
350 Uninterruptibles.awaitUninterruptibly(recoveryComplete, 5, TimeUnit.SECONDS));
352 String address = "akka://foobar";
353 shard.underlyingActor().onReceiveCommand(new PeerAddressResolved(shardID, address));
355 assertEquals("getPeerAddresses", address,
356 ((TestShard)shard.underlyingActor()).getPeerAddresses().get(shardID.toString()));
358 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
363 public void testApplySnapshot() throws Exception {
364 TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(),
365 "testApplySnapshot");
367 NormalizedNodeToNodeCodec codec =
368 new NormalizedNodeToNodeCodec(SCHEMA_CONTEXT);
370 writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
372 YangInstanceIdentifier root = YangInstanceIdentifier.builder().build();
373 NormalizedNode<?,?> expected = readStore(shard, root);
375 NormalizedNodeMessages.Container encode = codec.encode(expected);
377 ApplySnapshot applySnapshot = new ApplySnapshot(Snapshot.create(
378 encode.getNormalizedNode().toByteString().toByteArray(),
379 Collections.<ReplicatedLogEntry>emptyList(), 1, 2, 3, 4));
381 shard.underlyingActor().onReceiveCommand(applySnapshot);
383 NormalizedNode<?,?> actual = readStore(shard, root);
385 assertEquals(expected, actual);
387 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
391 public void testApplyState() throws Exception {
393 TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(), "testApplyState");
395 NormalizedNode<?, ?> node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
397 ApplyState applyState = new ApplyState(null, "test", new ReplicatedLogImplEntry(1, 2,
398 newModificationPayload(new WriteModification(TestModel.TEST_PATH, node))));
400 shard.underlyingActor().onReceiveCommand(applyState);
402 NormalizedNode<?,?> actual = readStore(shard, TestModel.TEST_PATH);
403 assertEquals("Applied state", node, actual);
405 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
409 public void testApplyStateLegacy() throws Exception {
411 TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(), "testApplyStateLegacy");
413 NormalizedNode<?, ?> node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
415 ApplyState applyState = new ApplyState(null, "test", new ReplicatedLogImplEntry(1, 2,
416 newLegacyByteStringPayload(new WriteModification(TestModel.TEST_PATH, node))));
418 shard.underlyingActor().onReceiveCommand(applyState);
420 NormalizedNode<?,?> actual = readStore(shard, TestModel.TEST_PATH);
421 assertEquals("Applied state", node, actual);
423 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
426 @SuppressWarnings("serial")
428 public void testRecovery() throws Exception {
430 // Set up the InMemorySnapshotStore.
432 InMemoryDOMDataStore testStore = InMemoryDOMDataStoreFactory.create("Test", null, null);
433 testStore.onGlobalContextUpdated(SCHEMA_CONTEXT);
435 DOMStoreWriteTransaction writeTx = testStore.newWriteOnlyTransaction();
436 writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
437 DOMStoreThreePhaseCommitCohort commitCohort = writeTx.ready();
438 commitCohort.preCommit().get();
439 commitCohort.commit().get();
441 DOMStoreReadTransaction readTx = testStore.newReadOnlyTransaction();
442 NormalizedNode<?, ?> root = readTx.read(YangInstanceIdentifier.builder().build()).get().get();
444 InMemorySnapshotStore.addSnapshot(shardID.toString(), Snapshot.create(
445 new NormalizedNodeToNodeCodec(SCHEMA_CONTEXT).encode(
447 getNormalizedNode().toByteString().toByteArray(),
448 Collections.<ReplicatedLogEntry>emptyList(), 0, 1, -1, -1));
450 // Set up the InMemoryJournal.
452 InMemoryJournal.addEntry(shardID.toString(), 0, new ReplicatedLogImplEntry(0, 1, newLegacyPayload(
453 new WriteModification(TestModel.OUTER_LIST_PATH,
454 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build()))));
456 int nListEntries = 16;
457 Set<Integer> listEntryKeys = new HashSet<>();
460 // Add some of the legacy CompositeModificationPayload
462 listEntryKeys.add(Integer.valueOf(i));
463 YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
464 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
465 Modification mod = new MergeModification(path,
466 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i));
467 InMemoryJournal.addEntry(shardID.toString(), i, new ReplicatedLogImplEntry(i, 1,
468 newLegacyPayload(mod)));
471 // Add some of the legacy CompositeModificationByteStringPayload
473 listEntryKeys.add(Integer.valueOf(i));
474 YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
475 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
476 Modification mod = new MergeModification(path,
477 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i));
478 InMemoryJournal.addEntry(shardID.toString(), i, new ReplicatedLogImplEntry(i, 1,
479 newLegacyByteStringPayload(mod)));
482 // Add some of the ModificationPayload
483 for(; i <= nListEntries; i++) {
484 listEntryKeys.add(Integer.valueOf(i));
485 YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
486 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
487 Modification mod = new MergeModification(path,
488 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i));
489 InMemoryJournal.addEntry(shardID.toString(), i, new ReplicatedLogImplEntry(i, 1,
490 newModificationPayload(mod)));
493 InMemoryJournal.addEntry(shardID.toString(), nListEntries + 1,
494 new ApplyLogEntries(nListEntries));
496 // Create the actor and wait for recovery complete.
498 final CountDownLatch recoveryComplete = new CountDownLatch(1);
500 Creator<Shard> creator = new Creator<Shard>() {
502 public Shard create() throws Exception {
503 return new Shard(shardID, Collections.<ShardIdentifier,String>emptyMap(),
504 newDatastoreContext(), SCHEMA_CONTEXT) {
506 protected void onRecoveryComplete() {
508 super.onRecoveryComplete();
510 recoveryComplete.countDown();
517 TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
518 Props.create(new DelegatingShardCreator(creator)), "testRecovery");
520 assertEquals("Recovery complete", true, recoveryComplete.await(5, TimeUnit.SECONDS));
522 // Verify data in the data store.
524 NormalizedNode<?, ?> outerList = readStore(shard, TestModel.OUTER_LIST_PATH);
525 assertNotNull(TestModel.OUTER_LIST_QNAME.getLocalName() + " not found", outerList);
526 assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " value is not Iterable",
527 outerList.getValue() instanceof Iterable);
528 for(Object entry: (Iterable<?>) outerList.getValue()) {
529 assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " entry is not MapEntryNode",
530 entry instanceof MapEntryNode);
531 MapEntryNode mapEntry = (MapEntryNode)entry;
532 Optional<DataContainerChild<? extends PathArgument, ?>> idLeaf =
533 mapEntry.getChild(new YangInstanceIdentifier.NodeIdentifier(TestModel.ID_QNAME));
534 assertTrue("Missing leaf " + TestModel.ID_QNAME.getLocalName(), idLeaf.isPresent());
535 Object value = idLeaf.get().getValue();
536 assertTrue("Unexpected value for leaf "+ TestModel.ID_QNAME.getLocalName() + ": " + value,
537 listEntryKeys.remove(value));
540 if(!listEntryKeys.isEmpty()) {
541 fail("Missing " + TestModel.OUTER_LIST_QNAME.getLocalName() + " entries with keys: " +
545 assertEquals("Last log index", nListEntries,
546 shard.underlyingActor().getShardMBean().getLastLogIndex());
547 assertEquals("Commit index", nListEntries,
548 shard.underlyingActor().getShardMBean().getCommitIndex());
549 assertEquals("Last applied", nListEntries,
550 shard.underlyingActor().getShardMBean().getLastApplied());
552 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
555 private CompositeModificationPayload newLegacyPayload(final Modification... mods) {
556 MutableCompositeModification compMod = new MutableCompositeModification();
557 for(Modification mod: mods) {
558 compMod.addModification(mod);
561 return new CompositeModificationPayload(compMod.toSerializable());
564 private CompositeModificationByteStringPayload newLegacyByteStringPayload(final Modification... mods) {
565 MutableCompositeModification compMod = new MutableCompositeModification();
566 for(Modification mod: mods) {
567 compMod.addModification(mod);
570 return new CompositeModificationByteStringPayload(compMod.toSerializable());
573 private ModificationPayload newModificationPayload(final Modification... mods) throws IOException {
574 MutableCompositeModification compMod = new MutableCompositeModification();
575 for(Modification mod: mods) {
576 compMod.addModification(mod);
579 return new ModificationPayload(compMod);
582 private DOMStoreThreePhaseCommitCohort setupMockWriteTransaction(final String cohortName,
583 final InMemoryDOMDataStore dataStore, final YangInstanceIdentifier path, final NormalizedNode<?, ?> data,
584 final MutableCompositeModification modification) {
585 return setupMockWriteTransaction(cohortName, dataStore, path, data, modification, null);
588 private DOMStoreThreePhaseCommitCohort setupMockWriteTransaction(final String cohortName,
589 final InMemoryDOMDataStore dataStore, final YangInstanceIdentifier path, final NormalizedNode<?, ?> data,
590 final MutableCompositeModification modification,
591 final Function<DOMStoreThreePhaseCommitCohort,ListenableFuture<Void>> preCommit) {
593 DOMStoreWriteTransaction tx = dataStore.newWriteOnlyTransaction();
594 tx.write(path, data);
595 final DOMStoreThreePhaseCommitCohort realCohort = tx.ready();
596 DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, cohortName);
598 doAnswer(new Answer<ListenableFuture<Boolean>>() {
600 public ListenableFuture<Boolean> answer(final InvocationOnMock invocation) {
601 return realCohort.canCommit();
603 }).when(cohort).canCommit();
605 doAnswer(new Answer<ListenableFuture<Void>>() {
607 public ListenableFuture<Void> answer(final InvocationOnMock invocation) throws Throwable {
608 if(preCommit != null) {
609 return preCommit.apply(realCohort);
611 return realCohort.preCommit();
614 }).when(cohort).preCommit();
616 doAnswer(new Answer<ListenableFuture<Void>>() {
618 public ListenableFuture<Void> answer(final InvocationOnMock invocation) throws Throwable {
619 return realCohort.commit();
621 }).when(cohort).commit();
623 doAnswer(new Answer<ListenableFuture<Void>>() {
625 public ListenableFuture<Void> answer(final InvocationOnMock invocation) throws Throwable {
626 return realCohort.abort();
628 }).when(cohort).abort();
630 modification.addModification(new WriteModification(path, data));
635 @SuppressWarnings({ "unchecked" })
637 public void testConcurrentThreePhaseCommits() throws Throwable {
638 new ShardTestKit(getSystem()) {{
639 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
640 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
641 "testConcurrentThreePhaseCommits");
643 waitUntilLeader(shard);
645 // Setup 3 simulated transactions with mock cohorts backed by real cohorts.
647 InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
649 String transactionID1 = "tx1";
650 MutableCompositeModification modification1 = new MutableCompositeModification();
651 DOMStoreThreePhaseCommitCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
652 TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
654 String transactionID2 = "tx2";
655 MutableCompositeModification modification2 = new MutableCompositeModification();
656 DOMStoreThreePhaseCommitCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
657 TestModel.OUTER_LIST_PATH,
658 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
661 String transactionID3 = "tx3";
662 MutableCompositeModification modification3 = new MutableCompositeModification();
663 DOMStoreThreePhaseCommitCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
664 YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
665 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
666 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1),
670 final FiniteDuration duration = FiniteDuration.create(timeoutSec, TimeUnit.SECONDS);
671 final Timeout timeout = new Timeout(duration);
673 // Simulate the ForwardedReadyTransaction message for the first Tx that would be sent
674 // by the ShardTransaction.
676 shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
677 cohort1, modification1, true), getRef());
678 ReadyTransactionReply readyReply = ReadyTransactionReply.fromSerializable(
679 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS));
680 assertEquals("Cohort path", shard.path().toString(), readyReply.getCohortPath());
682 // Send the CanCommitTransaction message for the first Tx.
684 shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
685 CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
686 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
687 assertEquals("Can commit", true, canCommitReply.getCanCommit());
689 // Send the ForwardedReadyTransaction for the next 2 Tx's.
691 shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
692 cohort2, modification2, true), getRef());
693 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
695 shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
696 cohort3, modification3, true), getRef());
697 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
699 // Send the CanCommitTransaction message for the next 2 Tx's. These should get queued and
700 // processed after the first Tx completes.
702 Future<Object> canCommitFuture1 = Patterns.ask(shard,
703 new CanCommitTransaction(transactionID2).toSerializable(), timeout);
705 Future<Object> canCommitFuture2 = Patterns.ask(shard,
706 new CanCommitTransaction(transactionID3).toSerializable(), timeout);
708 // Send the CommitTransaction message for the first Tx. After it completes, it should
709 // trigger the 2nd Tx to proceed which should in turn then trigger the 3rd.
711 shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
712 expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
714 // Wait for the next 2 Tx's to complete.
716 final AtomicReference<Throwable> caughtEx = new AtomicReference<>();
717 final CountDownLatch commitLatch = new CountDownLatch(2);
719 class OnFutureComplete extends OnComplete<Object> {
720 private final Class<?> expRespType;
722 OnFutureComplete(final Class<?> expRespType) {
723 this.expRespType = expRespType;
727 public void onComplete(final Throwable error, final Object resp) {
729 caughtEx.set(new AssertionError(getClass().getSimpleName() + " failure", error));
732 assertEquals("Commit response type", expRespType, resp.getClass());
734 } catch (Exception e) {
740 void onSuccess(final Object resp) throws Exception {
744 class OnCommitFutureComplete extends OnFutureComplete {
745 OnCommitFutureComplete() {
746 super(CommitTransactionReply.SERIALIZABLE_CLASS);
750 public void onComplete(final Throwable error, final Object resp) {
751 super.onComplete(error, resp);
752 commitLatch.countDown();
756 class OnCanCommitFutureComplete extends OnFutureComplete {
757 private final String transactionID;
759 OnCanCommitFutureComplete(final String transactionID) {
760 super(CanCommitTransactionReply.SERIALIZABLE_CLASS);
761 this.transactionID = transactionID;
765 void onSuccess(final Object resp) throws Exception {
766 CanCommitTransactionReply canCommitReply =
767 CanCommitTransactionReply.fromSerializable(resp);
768 assertEquals("Can commit", true, canCommitReply.getCanCommit());
770 Future<Object> commitFuture = Patterns.ask(shard,
771 new CommitTransaction(transactionID).toSerializable(), timeout);
772 commitFuture.onComplete(new OnCommitFutureComplete(), getSystem().dispatcher());
776 canCommitFuture1.onComplete(new OnCanCommitFutureComplete(transactionID2),
777 getSystem().dispatcher());
779 canCommitFuture2.onComplete(new OnCanCommitFutureComplete(transactionID3),
780 getSystem().dispatcher());
782 boolean done = commitLatch.await(timeoutSec, TimeUnit.SECONDS);
784 if(caughtEx.get() != null) {
785 throw caughtEx.get();
788 assertEquals("Commits complete", true, done);
790 InOrder inOrder = inOrder(cohort1, cohort2, cohort3);
791 inOrder.verify(cohort1).canCommit();
792 inOrder.verify(cohort1).preCommit();
793 inOrder.verify(cohort1).commit();
794 inOrder.verify(cohort2).canCommit();
795 inOrder.verify(cohort2).preCommit();
796 inOrder.verify(cohort2).commit();
797 inOrder.verify(cohort3).canCommit();
798 inOrder.verify(cohort3).preCommit();
799 inOrder.verify(cohort3).commit();
801 // Verify data in the data store.
803 NormalizedNode<?, ?> outerList = readStore(shard, TestModel.OUTER_LIST_PATH);
804 assertNotNull(TestModel.OUTER_LIST_QNAME.getLocalName() + " not found", outerList);
805 assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " value is not Iterable",
806 outerList.getValue() instanceof Iterable);
807 Object entry = ((Iterable<Object>)outerList.getValue()).iterator().next();
808 assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " entry is not MapEntryNode",
809 entry instanceof MapEntryNode);
810 MapEntryNode mapEntry = (MapEntryNode)entry;
811 Optional<DataContainerChild<? extends PathArgument, ?>> idLeaf =
812 mapEntry.getChild(new YangInstanceIdentifier.NodeIdentifier(TestModel.ID_QNAME));
813 assertTrue("Missing leaf " + TestModel.ID_QNAME.getLocalName(), idLeaf.isPresent());
814 assertEquals(TestModel.ID_QNAME.getLocalName() + " value", 1, idLeaf.get().getValue());
816 verifyLastLogIndex(shard, 2);
818 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
822 private void verifyLastLogIndex(TestActorRef<Shard> shard, long expectedValue) {
823 for(int i = 0; i < 20 * 5; i++) {
824 long lastLogIndex = shard.underlyingActor().getShardMBean().getLastLogIndex();
825 if(lastLogIndex == expectedValue) {
828 Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
831 assertEquals("Last log index", expectedValue, shard.underlyingActor().getShardMBean().getLastLogIndex());
835 public void testCommitWithPersistenceDisabled() throws Throwable {
836 dataStoreContextBuilder.persistent(false);
837 new ShardTestKit(getSystem()) {{
838 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
839 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
840 "testCommitPhaseFailure");
842 waitUntilLeader(shard);
844 InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
846 // Setup a simulated transactions with a mock cohort.
848 String transactionID = "tx";
849 MutableCompositeModification modification = new MutableCompositeModification();
850 NormalizedNode<?, ?> containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
851 DOMStoreThreePhaseCommitCohort cohort = setupMockWriteTransaction("cohort", dataStore,
852 TestModel.TEST_PATH, containerNode, modification);
854 FiniteDuration duration = duration("5 seconds");
856 // Simulate the ForwardedReadyTransaction messages that would be sent
857 // by the ShardTransaction.
859 shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
860 cohort, modification, true), getRef());
861 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
863 // Send the CanCommitTransaction message.
865 shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
866 CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
867 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
868 assertEquals("Can commit", true, canCommitReply.getCanCommit());
870 // Send the CanCommitTransaction message.
872 shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
873 expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
875 InOrder inOrder = inOrder(cohort);
876 inOrder.verify(cohort).canCommit();
877 inOrder.verify(cohort).preCommit();
878 inOrder.verify(cohort).commit();
880 NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.TEST_PATH);
881 assertEquals(TestModel.TEST_QNAME.getLocalName(), containerNode, actualNode);
883 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
888 public void testCommitPhaseFailure() throws Throwable {
889 new ShardTestKit(getSystem()) {{
890 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
891 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
892 "testCommitPhaseFailure");
894 waitUntilLeader(shard);
896 // Setup 2 simulated transactions with mock cohorts. The first one fails in the
899 String transactionID1 = "tx1";
900 MutableCompositeModification modification1 = new MutableCompositeModification();
901 DOMStoreThreePhaseCommitCohort cohort1 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
902 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
903 doReturn(Futures.immediateFuture(null)).when(cohort1).preCommit();
904 doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort1).commit();
906 String transactionID2 = "tx2";
907 MutableCompositeModification modification2 = new MutableCompositeModification();
908 DOMStoreThreePhaseCommitCohort cohort2 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort2");
909 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
911 FiniteDuration duration = duration("5 seconds");
912 final Timeout timeout = new Timeout(duration);
914 // Simulate the ForwardedReadyTransaction messages that would be sent
915 // by the ShardTransaction.
917 shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
918 cohort1, modification1, true), getRef());
919 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
921 shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
922 cohort2, modification2, true), getRef());
923 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
925 // Send the CanCommitTransaction message for the first Tx.
927 shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
928 CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
929 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
930 assertEquals("Can commit", true, canCommitReply.getCanCommit());
932 // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
933 // processed after the first Tx completes.
935 Future<Object> canCommitFuture = Patterns.ask(shard,
936 new CanCommitTransaction(transactionID2).toSerializable(), timeout);
938 // Send the CommitTransaction message for the first Tx. This should send back an error
939 // and trigger the 2nd Tx to proceed.
941 shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
942 expectMsgClass(duration, akka.actor.Status.Failure.class);
944 // Wait for the 2nd Tx to complete the canCommit phase.
946 final CountDownLatch latch = new CountDownLatch(1);
947 canCommitFuture.onComplete(new OnComplete<Object>() {
949 public void onComplete(final Throwable t, final Object resp) {
952 }, getSystem().dispatcher());
954 assertEquals("2nd CanCommit complete", true, latch.await(5, TimeUnit.SECONDS));
956 InOrder inOrder = inOrder(cohort1, cohort2);
957 inOrder.verify(cohort1).canCommit();
958 inOrder.verify(cohort1).preCommit();
959 inOrder.verify(cohort1).commit();
960 inOrder.verify(cohort2).canCommit();
962 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
967 public void testPreCommitPhaseFailure() throws Throwable {
968 new ShardTestKit(getSystem()) {{
969 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
970 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
971 "testPreCommitPhaseFailure");
973 waitUntilLeader(shard);
975 String transactionID = "tx1";
976 MutableCompositeModification modification = new MutableCompositeModification();
977 DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
978 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
979 doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).preCommit();
981 FiniteDuration duration = duration("5 seconds");
983 // Simulate the ForwardedReadyTransaction messages that would be sent
984 // by the ShardTransaction.
986 shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
987 cohort, modification, true), getRef());
988 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
990 // Send the CanCommitTransaction message.
992 shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
993 CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
994 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
995 assertEquals("Can commit", true, canCommitReply.getCanCommit());
997 // Send the CommitTransaction message. This should send back an error
998 // for preCommit failure.
1000 shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
1001 expectMsgClass(duration, akka.actor.Status.Failure.class);
1003 InOrder inOrder = inOrder(cohort);
1004 inOrder.verify(cohort).canCommit();
1005 inOrder.verify(cohort).preCommit();
1007 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1012 public void testCanCommitPhaseFailure() throws Throwable {
1013 new ShardTestKit(getSystem()) {{
1014 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1015 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1016 "testCanCommitPhaseFailure");
1018 waitUntilLeader(shard);
1020 final FiniteDuration duration = duration("5 seconds");
1022 String transactionID = "tx1";
1023 MutableCompositeModification modification = new MutableCompositeModification();
1024 DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
1025 doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).canCommit();
1027 // Simulate the ForwardedReadyTransaction messages that would be sent
1028 // by the ShardTransaction.
1030 shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
1031 cohort, modification, true), getRef());
1032 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1034 // Send the CanCommitTransaction message.
1036 shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
1037 expectMsgClass(duration, akka.actor.Status.Failure.class);
1039 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1044 public void testAbortBeforeFinishCommit() throws Throwable {
1045 new ShardTestKit(getSystem()) {{
1046 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1047 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1048 "testAbortBeforeFinishCommit");
1050 waitUntilLeader(shard);
1052 final FiniteDuration duration = duration("5 seconds");
1053 InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
1055 final String transactionID = "tx1";
1056 Function<DOMStoreThreePhaseCommitCohort,ListenableFuture<Void>> preCommit =
1057 new Function<DOMStoreThreePhaseCommitCohort,ListenableFuture<Void>>() {
1059 public ListenableFuture<Void> apply(final DOMStoreThreePhaseCommitCohort cohort) {
1060 ListenableFuture<Void> preCommitFuture = cohort.preCommit();
1062 // Simulate an AbortTransaction message occurring during replication, after
1063 // persisting and before finishing the commit to the in-memory store.
1064 // We have no followers so due to optimizations in the RaftActor, it does not
1065 // attempt replication and thus we can't send an AbortTransaction message b/c
1066 // it would be processed too late after CommitTransaction completes. So we'll
1067 // simulate an AbortTransaction message occurring during replication by calling
1068 // the shard directly.
1070 shard.underlyingActor().doAbortTransaction(transactionID, null);
1072 return preCommitFuture;
1076 MutableCompositeModification modification = new MutableCompositeModification();
1077 DOMStoreThreePhaseCommitCohort cohort = setupMockWriteTransaction("cohort1", dataStore,
1078 TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME),
1079 modification, preCommit);
1081 shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
1082 cohort, modification, true), getRef());
1083 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1085 shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
1086 CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1087 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1088 assertEquals("Can commit", true, canCommitReply.getCanCommit());
1090 shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
1091 expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1093 NormalizedNode<?, ?> node = readStore(shard, TestModel.TEST_PATH);
1095 // Since we're simulating an abort occurring during replication and before finish commit,
1096 // the data should still get written to the in-memory store since we've gotten past
1097 // canCommit and preCommit and persisted the data.
1098 assertNotNull(TestModel.TEST_QNAME.getLocalName() + " not found", node);
1100 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1105 public void testTransactionCommitTimeout() throws Throwable {
1106 dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1);
1108 new ShardTestKit(getSystem()) {{
1109 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1110 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1111 "testTransactionCommitTimeout");
1113 waitUntilLeader(shard);
1115 final FiniteDuration duration = duration("5 seconds");
1117 InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
1119 writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1120 writeToStore(shard, TestModel.OUTER_LIST_PATH,
1121 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
1123 // Create 1st Tx - will timeout
1125 String transactionID1 = "tx1";
1126 MutableCompositeModification modification1 = new MutableCompositeModification();
1127 DOMStoreThreePhaseCommitCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
1128 YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
1129 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
1130 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1),
1135 String transactionID2 = "tx3";
1136 MutableCompositeModification modification2 = new MutableCompositeModification();
1137 YangInstanceIdentifier listNodePath = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
1138 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build();
1139 DOMStoreThreePhaseCommitCohort cohort2 = setupMockWriteTransaction("cohort3", dataStore,
1141 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2),
1146 shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1147 cohort1, modification1, true), getRef());
1148 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1150 shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1151 cohort2, modification2, true), getRef());
1152 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1154 // canCommit 1st Tx. We don't send the commit so it should timeout.
1156 shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1157 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
1159 // canCommit the 2nd Tx - it should complete after the 1st Tx times out.
1161 shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
1162 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
1164 // Commit the 2nd Tx.
1166 shard.tell(new CommitTransaction(transactionID2).toSerializable(), getRef());
1167 expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1169 NormalizedNode<?, ?> node = readStore(shard, listNodePath);
1170 assertNotNull(listNodePath + " not found", node);
1172 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1177 public void testTransactionCommitQueueCapacityExceeded() throws Throwable {
1178 dataStoreContextBuilder.shardTransactionCommitQueueCapacity(1);
1180 new ShardTestKit(getSystem()) {{
1181 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1182 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1183 "testTransactionCommitQueueCapacityExceeded");
1185 waitUntilLeader(shard);
1187 final FiniteDuration duration = duration("5 seconds");
1189 InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
1191 String transactionID1 = "tx1";
1192 MutableCompositeModification modification1 = new MutableCompositeModification();
1193 DOMStoreThreePhaseCommitCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
1194 TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
1196 String transactionID2 = "tx2";
1197 MutableCompositeModification modification2 = new MutableCompositeModification();
1198 DOMStoreThreePhaseCommitCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
1199 TestModel.OUTER_LIST_PATH,
1200 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
1203 String transactionID3 = "tx3";
1204 MutableCompositeModification modification3 = new MutableCompositeModification();
1205 DOMStoreThreePhaseCommitCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
1206 TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification3);
1210 shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1211 cohort1, modification1, true), getRef());
1212 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1214 shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1215 cohort2, modification2, true), getRef());
1216 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1218 shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
1219 cohort3, modification3, true), getRef());
1220 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1222 // canCommit 1st Tx.
1224 shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1225 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
1227 // canCommit the 2nd Tx - it should get queued.
1229 shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
1231 // canCommit the 3rd Tx - should exceed queue capacity and fail.
1233 shard.tell(new CanCommitTransaction(transactionID3).toSerializable(), getRef());
1234 expectMsgClass(duration, akka.actor.Status.Failure.class);
1236 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1241 public void testCanCommitBeforeReadyFailure() throws Throwable {
1242 new ShardTestKit(getSystem()) {{
1243 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1244 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1245 "testCanCommitBeforeReadyFailure");
1247 shard.tell(new CanCommitTransaction("tx").toSerializable(), getRef());
1248 expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
1250 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1255 public void testAbortTransaction() throws Throwable {
1256 new ShardTestKit(getSystem()) {{
1257 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1258 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1259 "testAbortTransaction");
1261 waitUntilLeader(shard);
1263 // Setup 2 simulated transactions with mock cohorts. The first one will be aborted.
1265 String transactionID1 = "tx1";
1266 MutableCompositeModification modification1 = new MutableCompositeModification();
1267 DOMStoreThreePhaseCommitCohort cohort1 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
1268 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
1269 doReturn(Futures.immediateFuture(null)).when(cohort1).abort();
1271 String transactionID2 = "tx2";
1272 MutableCompositeModification modification2 = new MutableCompositeModification();
1273 DOMStoreThreePhaseCommitCohort cohort2 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort2");
1274 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
1276 FiniteDuration duration = duration("5 seconds");
1277 final Timeout timeout = new Timeout(duration);
1279 // Simulate the ForwardedReadyTransaction messages that would be sent
1280 // by the ShardTransaction.
1282 shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1283 cohort1, modification1, true), getRef());
1284 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1286 shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1287 cohort2, modification2, true), getRef());
1288 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1290 // Send the CanCommitTransaction message for the first Tx.
1292 shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1293 CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1294 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1295 assertEquals("Can commit", true, canCommitReply.getCanCommit());
1297 // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
1298 // processed after the first Tx completes.
1300 Future<Object> canCommitFuture = Patterns.ask(shard,
1301 new CanCommitTransaction(transactionID2).toSerializable(), timeout);
1303 // Send the AbortTransaction message for the first Tx. This should trigger the 2nd
1306 shard.tell(new AbortTransaction(transactionID1).toSerializable(), getRef());
1307 expectMsgClass(duration, AbortTransactionReply.SERIALIZABLE_CLASS);
1309 // Wait for the 2nd Tx to complete the canCommit phase.
1311 Await.ready(canCommitFuture, duration);
1313 InOrder inOrder = inOrder(cohort1, cohort2);
1314 inOrder.verify(cohort1).canCommit();
1315 inOrder.verify(cohort2).canCommit();
1317 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1322 public void testCreateSnapshot() throws IOException, InterruptedException {
1323 testCreateSnapshot(true, "testCreateSnapshot");
1327 public void testCreateSnapshotWithNonPersistentData() throws IOException, InterruptedException {
1328 testCreateSnapshot(false, "testCreateSnapshotWithNonPersistentData");
1331 @SuppressWarnings("serial")
1332 public void testCreateSnapshot(final boolean persistent, final String shardActorName) throws IOException, InterruptedException {
1333 final DatastoreContext dataStoreContext = DatastoreContext.newBuilder().
1334 shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(persistent).build();
1336 new ShardTestKit(getSystem()) {{
1337 final AtomicReference<CountDownLatch> latch = new AtomicReference<>(new CountDownLatch(1));
1338 Creator<Shard> creator = new Creator<Shard>() {
1340 public Shard create() throws Exception {
1341 return new Shard(shardID, Collections.<ShardIdentifier,String>emptyMap(),
1342 newDatastoreContext(), SCHEMA_CONTEXT) {
1344 protected void commitSnapshot(final long sequenceNumber) {
1345 super.commitSnapshot(sequenceNumber);
1346 latch.get().countDown();
1352 TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1353 Props.create(new DelegatingShardCreator(creator)), shardActorName);
1355 waitUntilLeader(shard);
1357 shard.tell(new CaptureSnapshot(-1,-1,-1,-1), getRef());
1359 assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS));
1361 latch.set(new CountDownLatch(1));
1362 shard.tell(new CaptureSnapshot(-1,-1,-1,-1), getRef());
1364 assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS));
1366 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1371 * This test simply verifies that the applySnapShot logic will work
1372 * @throws ReadFailedException
1375 public void testInMemoryDataStoreRestore() throws ReadFailedException {
1376 InMemoryDOMDataStore store = new InMemoryDOMDataStore("test", MoreExecutors.sameThreadExecutor());
1378 store.onGlobalContextUpdated(SCHEMA_CONTEXT);
1380 DOMStoreWriteTransaction putTransaction = store.newWriteOnlyTransaction();
1381 putTransaction.write(TestModel.TEST_PATH,
1382 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1383 commitTransaction(putTransaction);
1386 NormalizedNode<?, ?> expected = readStore(store);
1388 DOMStoreWriteTransaction writeTransaction = store.newWriteOnlyTransaction();
1390 writeTransaction.delete(YangInstanceIdentifier.builder().build());
1391 writeTransaction.write(YangInstanceIdentifier.builder().build(), expected);
1393 commitTransaction(writeTransaction);
1395 NormalizedNode<?, ?> actual = readStore(store);
1397 assertEquals(expected, actual);
1401 public void testRecoveryApplicable(){
1403 final DatastoreContext persistentContext = DatastoreContext.newBuilder().
1404 shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(true).build();
1406 final Props persistentProps = Shard.props(shardID, Collections.<ShardIdentifier, String>emptyMap(),
1407 persistentContext, SCHEMA_CONTEXT);
1409 final DatastoreContext nonPersistentContext = DatastoreContext.newBuilder().
1410 shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(false).build();
1412 final Props nonPersistentProps = Shard.props(shardID, Collections.<ShardIdentifier, String>emptyMap(),
1413 nonPersistentContext, SCHEMA_CONTEXT);
1415 new ShardTestKit(getSystem()) {{
1416 TestActorRef<Shard> shard1 = TestActorRef.create(getSystem(),
1417 persistentProps, "testPersistence1");
1419 assertTrue("Recovery Applicable", shard1.underlyingActor().getDataPersistenceProvider().isRecoveryApplicable());
1421 shard1.tell(PoisonPill.getInstance(), ActorRef.noSender());
1423 TestActorRef<Shard> shard2 = TestActorRef.create(getSystem(),
1424 nonPersistentProps, "testPersistence2");
1426 assertFalse("Recovery Not Applicable", shard2.underlyingActor().getDataPersistenceProvider().isRecoveryApplicable());
1428 shard2.tell(PoisonPill.getInstance(), ActorRef.noSender());
1435 private NormalizedNode<?, ?> readStore(final InMemoryDOMDataStore store) throws ReadFailedException {
1436 DOMStoreReadTransaction transaction = store.newReadOnlyTransaction();
1437 CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read =
1438 transaction.read(YangInstanceIdentifier.builder().build());
1440 Optional<NormalizedNode<?, ?>> optional = read.checkedGet();
1442 NormalizedNode<?, ?> normalizedNode = optional.get();
1444 transaction.close();
1446 return normalizedNode;
1449 private void commitTransaction(final DOMStoreWriteTransaction transaction) {
1450 DOMStoreThreePhaseCommitCohort commitCohort = transaction.ready();
1451 ListenableFuture<Void> future =
1452 commitCohort.preCommit();
1455 future = commitCohort.commit();
1457 } catch (InterruptedException | ExecutionException e) {
1461 private AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> noOpDataChangeListener() {
1462 return new AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>() {
1464 public void onDataChanged(
1465 final AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change) {
1471 static NormalizedNode<?,?> readStore(final TestActorRef<Shard> shard, final YangInstanceIdentifier id)
1472 throws ExecutionException, InterruptedException {
1473 DOMStoreReadTransaction transaction = shard.underlyingActor().getDataStore().newReadOnlyTransaction();
1475 CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> future =
1476 transaction.read(id);
1478 Optional<NormalizedNode<?, ?>> optional = future.get();
1479 NormalizedNode<?, ?> node = optional.isPresent()? optional.get() : null;
1481 transaction.close();
1486 private void writeToStore(final TestActorRef<Shard> shard, final YangInstanceIdentifier id, final NormalizedNode<?,?> node)
1487 throws ExecutionException, InterruptedException {
1488 DOMStoreWriteTransaction transaction = shard.underlyingActor().getDataStore().newWriteOnlyTransaction();
1490 transaction.write(id, node);
1492 DOMStoreThreePhaseCommitCohort commitCohort = transaction.ready();
1493 commitCohort.preCommit().get();
1494 commitCohort.commit().get();
1497 @SuppressWarnings("serial")
1498 private static final class DelegatingShardCreator implements Creator<Shard> {
1499 private final Creator<Shard> delegate;
1501 DelegatingShardCreator(final Creator<Shard> delegate) {
1502 this.delegate = delegate;
1506 public Shard create() throws Exception {
1507 return delegate.create();