1 package org.opendaylight.controller.cluster.datastore;
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertFalse;
5 import static org.junit.Assert.assertNotNull;
6 import static org.junit.Assert.assertNull;
7 import static org.junit.Assert.assertTrue;
8 import static org.junit.Assert.fail;
9 import static org.mockito.Mockito.doAnswer;
10 import static org.mockito.Mockito.doReturn;
11 import static org.mockito.Mockito.inOrder;
12 import static org.mockito.Mockito.mock;
13 import static org.opendaylight.controller.cluster.datastore.DataStoreVersions.CURRENT_VERSION;
14 import akka.actor.ActorRef;
15 import akka.actor.PoisonPill;
16 import akka.actor.Props;
17 import akka.dispatch.Dispatchers;
18 import akka.dispatch.OnComplete;
19 import akka.japi.Creator;
20 import akka.japi.Procedure;
21 import akka.pattern.Patterns;
22 import akka.persistence.SnapshotSelectionCriteria;
23 import akka.testkit.TestActorRef;
24 import akka.util.Timeout;
25 import com.google.common.base.Function;
26 import com.google.common.base.Optional;
27 import com.google.common.util.concurrent.CheckedFuture;
28 import com.google.common.util.concurrent.Futures;
29 import com.google.common.util.concurrent.ListenableFuture;
30 import com.google.common.util.concurrent.MoreExecutors;
31 import com.google.common.util.concurrent.Uninterruptibles;
32 import java.io.IOException;
33 import java.util.Collections;
34 import java.util.HashSet;
37 import java.util.concurrent.CountDownLatch;
38 import java.util.concurrent.ExecutionException;
39 import java.util.concurrent.TimeUnit;
40 import java.util.concurrent.atomic.AtomicInteger;
41 import java.util.concurrent.atomic.AtomicReference;
42 import org.junit.After;
43 import org.junit.Before;
44 import org.junit.Test;
45 import org.mockito.InOrder;
46 import org.mockito.invocation.InvocationOnMock;
47 import org.mockito.stubbing.Answer;
48 import org.opendaylight.controller.cluster.DataPersistenceProvider;
49 import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
50 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
51 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
52 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
53 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
54 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
55 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
56 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
57 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
58 import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
59 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
60 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
61 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
62 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
63 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
64 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
65 import org.opendaylight.controller.cluster.datastore.modification.Modification;
66 import org.opendaylight.controller.cluster.datastore.modification.ModificationPayload;
67 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
68 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
69 import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec;
70 import org.opendaylight.controller.cluster.datastore.utils.InMemoryJournal;
71 import org.opendaylight.controller.cluster.datastore.utils.InMemorySnapshotStore;
72 import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener;
73 import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
74 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
75 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
76 import org.opendaylight.controller.cluster.raft.Snapshot;
77 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
78 import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
79 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
80 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
81 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
82 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
83 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
84 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
85 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload;
86 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload;
87 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
88 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
89 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
90 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
91 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
92 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
93 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
94 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreFactory;
95 import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages;
96 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
97 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
98 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
99 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
100 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
101 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
102 import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
103 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
104 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
105 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
106 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
107 import scala.concurrent.Await;
108 import scala.concurrent.Future;
109 import scala.concurrent.duration.FiniteDuration;
111 public class ShardTest extends AbstractActorTest {
113 private static final SchemaContext SCHEMA_CONTEXT = TestModel.createTestContext();
115 private static final AtomicInteger NEXT_SHARD_NUM = new AtomicInteger();
117 private final ShardIdentifier shardID = ShardIdentifier.builder().memberName("member-1")
118 .shardName("inventory").type("config" + NEXT_SHARD_NUM.getAndIncrement()).build();
120 private final Builder dataStoreContextBuilder = DatastoreContext.newBuilder().
121 shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).
122 shardHeartbeatIntervalInMillis(100);
125 public void setUp() {
126 Builder newBuilder = DatastoreContext.newBuilder();
127 InMemorySnapshotStore.clear();
128 InMemoryJournal.clear();
132 public void tearDown() {
133 InMemorySnapshotStore.clear();
134 InMemoryJournal.clear();
137 private DatastoreContext newDatastoreContext() {
138 return dataStoreContextBuilder.build();
141 private Props newShardProps() {
142 return Shard.props(shardID, Collections.<ShardIdentifier,String>emptyMap(),
143 newDatastoreContext(), SCHEMA_CONTEXT);
147 public void testRegisterChangeListener() throws Exception {
148 new ShardTestKit(getSystem()) {{
149 TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
150 newShardProps(), "testRegisterChangeListener");
152 waitUntilLeader(shard);
154 shard.tell(new UpdateSchemaContext(SchemaContextHelper.full()), ActorRef.noSender());
156 MockDataChangeListener listener = new MockDataChangeListener(1);
157 ActorRef dclActor = getSystem().actorOf(DataChangeListener.props(listener),
158 "testRegisterChangeListener-DataChangeListener");
160 shard.tell(new RegisterChangeListener(TestModel.TEST_PATH,
161 dclActor.path(), AsyncDataBroker.DataChangeScope.BASE), getRef());
163 RegisterChangeListenerReply reply = expectMsgClass(duration("3 seconds"),
164 RegisterChangeListenerReply.class);
165 String replyPath = reply.getListenerRegistrationPath().toString();
166 assertTrue("Incorrect reply path: " + replyPath, replyPath.matches(
167 "akka:\\/\\/test\\/user\\/testRegisterChangeListener\\/\\$.*"));
169 YangInstanceIdentifier path = TestModel.TEST_PATH;
170 writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
172 listener.waitForChangeEvents(path);
174 dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
175 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
179 @SuppressWarnings("serial")
181 public void testChangeListenerNotifiedWhenNotTheLeaderOnRegistration() throws Exception {
182 // This test tests the timing window in which a change listener is registered before the
183 // shard becomes the leader. We verify that the listener is registered and notified of the
184 // existing data when the shard becomes the leader.
185 new ShardTestKit(getSystem()) {{
186 // For this test, we want to send the RegisterChangeListener message after the shard
187 // has recovered from persistence and before it becomes the leader. So we subclass
188 // Shard to override onReceiveCommand and, when the first ElectionTimeout is received,
189 // we know that the shard has been initialized to a follower and has started the
190 // election process. The following 2 CountDownLatches are used to coordinate the
191 // ElectionTimeout with the sending of the RegisterChangeListener message.
192 final CountDownLatch onFirstElectionTimeout = new CountDownLatch(1);
193 final CountDownLatch onChangeListenerRegistered = new CountDownLatch(1);
194 Creator<Shard> creator = new Creator<Shard>() {
195 boolean firstElectionTimeout = true;
198 public Shard create() throws Exception {
199 return new Shard(shardID, Collections.<ShardIdentifier,String>emptyMap(),
200 newDatastoreContext(), SCHEMA_CONTEXT) {
202 public void onReceiveCommand(final Object message) throws Exception {
203 if(message instanceof ElectionTimeout && firstElectionTimeout) {
204 // Got the first ElectionTimeout. We don't forward it to the
205 // base Shard yet until we've sent the RegisterChangeListener
206 // message. So we signal the onFirstElectionTimeout latch to tell
207 // the main thread to send the RegisterChangeListener message and
208 // start a thread to wait on the onChangeListenerRegistered latch,
209 // which the main thread signals after it has sent the message.
210 // After the onChangeListenerRegistered is triggered, we send the
211 // original ElectionTimeout message to proceed with the election.
212 firstElectionTimeout = false;
213 final ActorRef self = getSelf();
217 Uninterruptibles.awaitUninterruptibly(
218 onChangeListenerRegistered, 5, TimeUnit.SECONDS);
219 self.tell(message, self);
223 onFirstElectionTimeout.countDown();
225 super.onReceiveCommand(message);
232 MockDataChangeListener listener = new MockDataChangeListener(1);
233 ActorRef dclActor = getSystem().actorOf(DataChangeListener.props(listener),
234 "testRegisterChangeListenerWhenNotLeaderInitially-DataChangeListener");
236 TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
237 Props.create(new DelegatingShardCreator(creator)),
238 "testRegisterChangeListenerWhenNotLeaderInitially");
240 // Write initial data into the in-memory store.
241 YangInstanceIdentifier path = TestModel.TEST_PATH;
242 writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
244 // Wait until the shard receives the first ElectionTimeout message.
245 assertEquals("Got first ElectionTimeout", true,
246 onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
248 // Now send the RegisterChangeListener and wait for the reply.
249 shard.tell(new RegisterChangeListener(path, dclActor.path(),
250 AsyncDataBroker.DataChangeScope.SUBTREE), getRef());
252 RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
253 RegisterChangeListenerReply.class);
254 assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
256 // Sanity check - verify the shard is not the leader yet.
257 shard.tell(new FindLeader(), getRef());
258 FindLeaderReply findLeadeReply =
259 expectMsgClass(duration("5 seconds"), FindLeaderReply.class);
260 assertNull("Expected the shard not to be the leader", findLeadeReply.getLeaderActor());
262 // Signal the onChangeListenerRegistered latch to tell the thread above to proceed
263 // with the election process.
264 onChangeListenerRegistered.countDown();
266 // Wait for the shard to become the leader and notify our listener with the existing
267 // data in the store.
268 listener.waitForChangeEvents(path);
270 dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
271 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
276 public void testCreateTransaction(){
277 new ShardTestKit(getSystem()) {{
278 ActorRef shard = getSystem().actorOf(newShardProps(), "testCreateTransaction");
280 waitUntilLeader(shard);
282 shard.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
284 shard.tell(new CreateTransaction("txn-1",
285 TransactionProxy.TransactionType.READ_ONLY.ordinal() ).toSerializable(), getRef());
287 CreateTransactionReply reply = expectMsgClass(duration("3 seconds"),
288 CreateTransactionReply.class);
290 String path = reply.getTransactionActorPath().toString();
291 assertTrue("Unexpected transaction path " + path,
292 path.contains("akka://test/user/testCreateTransaction/shard-txn-1"));
294 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
299 public void testCreateTransactionOnChain(){
300 new ShardTestKit(getSystem()) {{
301 final ActorRef shard = getSystem().actorOf(newShardProps(), "testCreateTransactionOnChain");
303 waitUntilLeader(shard);
305 shard.tell(new CreateTransaction("txn-1",
306 TransactionProxy.TransactionType.READ_ONLY.ordinal() , "foobar").toSerializable(),
309 CreateTransactionReply reply = expectMsgClass(duration("3 seconds"),
310 CreateTransactionReply.class);
312 String path = reply.getTransactionActorPath().toString();
313 assertTrue("Unexpected transaction path " + path,
314 path.contains("akka://test/user/testCreateTransactionOnChain/shard-txn-1"));
316 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
320 @SuppressWarnings("serial")
322 public void testPeerAddressResolved() throws Exception {
323 new ShardTestKit(getSystem()) {{
324 final CountDownLatch recoveryComplete = new CountDownLatch(1);
325 class TestShard extends Shard {
327 super(shardID, Collections.<ShardIdentifier, String>singletonMap(shardID, null),
328 newDatastoreContext(), SCHEMA_CONTEXT);
331 Map<String, String> getPeerAddresses() {
332 return getRaftActorContext().getPeerAddresses();
336 protected void onRecoveryComplete() {
338 super.onRecoveryComplete();
340 recoveryComplete.countDown();
345 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
346 Props.create(new DelegatingShardCreator(new Creator<Shard>() {
348 public TestShard create() throws Exception {
349 return new TestShard();
351 })), "testPeerAddressResolved");
353 //waitUntilLeader(shard);
354 assertEquals("Recovery complete", true,
355 Uninterruptibles.awaitUninterruptibly(recoveryComplete, 5, TimeUnit.SECONDS));
357 String address = "akka://foobar";
358 shard.underlyingActor().onReceiveCommand(new PeerAddressResolved(shardID, address));
360 assertEquals("getPeerAddresses", address,
361 ((TestShard)shard.underlyingActor()).getPeerAddresses().get(shardID.toString()));
363 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
368 public void testApplySnapshot() throws Exception {
369 TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(),
370 "testApplySnapshot");
372 InMemoryDOMDataStore store = new InMemoryDOMDataStore("OPER", MoreExecutors.sameThreadExecutor());
373 store.onGlobalContextUpdated(SCHEMA_CONTEXT);
375 writeToStore(store, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
377 YangInstanceIdentifier root = YangInstanceIdentifier.builder().build();
378 NormalizedNode<?,?> expected = readStore(store, root);
380 ApplySnapshot applySnapshot = new ApplySnapshot(Snapshot.create(
381 SerializationUtils.serializeNormalizedNode(expected),
382 Collections.<ReplicatedLogEntry>emptyList(), 1, 2, 3, 4));
384 shard.underlyingActor().onReceiveCommand(applySnapshot);
386 NormalizedNode<?,?> actual = readStore(shard, root);
388 assertEquals("Root node", expected, actual);
390 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
394 public void testApplyHelium2VersionSnapshot() throws Exception {
395 TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(),
396 "testApplySnapshot");
398 NormalizedNodeToNodeCodec codec = new NormalizedNodeToNodeCodec(SCHEMA_CONTEXT);
400 InMemoryDOMDataStore store = new InMemoryDOMDataStore("OPER", MoreExecutors.sameThreadExecutor());
401 store.onGlobalContextUpdated(SCHEMA_CONTEXT);
403 writeToStore(store, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
405 YangInstanceIdentifier root = YangInstanceIdentifier.builder().build();
406 NormalizedNode<?,?> expected = readStore(store, root);
408 NormalizedNodeMessages.Container encode = codec.encode(expected);
410 ApplySnapshot applySnapshot = new ApplySnapshot(Snapshot.create(
411 encode.getNormalizedNode().toByteString().toByteArray(),
412 Collections.<ReplicatedLogEntry>emptyList(), 1, 2, 3, 4));
414 shard.underlyingActor().onReceiveCommand(applySnapshot);
416 NormalizedNode<?,?> actual = readStore(shard, root);
418 assertEquals("Root node", expected, actual);
420 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
424 public void testApplyState() throws Exception {
426 TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(), "testApplyState");
428 NormalizedNode<?, ?> node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
430 ApplyState applyState = new ApplyState(null, "test", new ReplicatedLogImplEntry(1, 2,
431 newModificationPayload(new WriteModification(TestModel.TEST_PATH, node))));
433 shard.underlyingActor().onReceiveCommand(applyState);
435 NormalizedNode<?,?> actual = readStore(shard, TestModel.TEST_PATH);
436 assertEquals("Applied state", node, actual);
438 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
442 public void testApplyStateLegacy() throws Exception {
444 TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(), "testApplyStateLegacy");
446 NormalizedNode<?, ?> node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
448 ApplyState applyState = new ApplyState(null, "test", new ReplicatedLogImplEntry(1, 2,
449 newLegacyByteStringPayload(new WriteModification(TestModel.TEST_PATH, node))));
451 shard.underlyingActor().onReceiveCommand(applyState);
453 NormalizedNode<?,?> actual = readStore(shard, TestModel.TEST_PATH);
454 assertEquals("Applied state", node, actual);
456 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
460 public void testRecovery() throws Exception {
462 // Set up the InMemorySnapshotStore.
464 InMemoryDOMDataStore testStore = InMemoryDOMDataStoreFactory.create("Test", null, null);
465 testStore.onGlobalContextUpdated(SCHEMA_CONTEXT);
467 writeToStore(testStore, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
469 NormalizedNode<?, ?> root = readStore(testStore, YangInstanceIdentifier.builder().build());
471 InMemorySnapshotStore.addSnapshot(shardID.toString(), Snapshot.create(
472 SerializationUtils.serializeNormalizedNode(root),
473 Collections.<ReplicatedLogEntry>emptyList(), 0, 1, -1, -1));
475 // Set up the InMemoryJournal.
477 InMemoryJournal.addEntry(shardID.toString(), 0, new ReplicatedLogImplEntry(0, 1, newLegacyPayload(
478 new WriteModification(TestModel.OUTER_LIST_PATH,
479 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build()))));
481 int nListEntries = 16;
482 Set<Integer> listEntryKeys = new HashSet<>();
484 // Add some ModificationPayload entries
485 for(int i = 1; i <= nListEntries; i++) {
486 listEntryKeys.add(Integer.valueOf(i));
487 YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
488 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
489 Modification mod = new MergeModification(path,
490 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i));
491 InMemoryJournal.addEntry(shardID.toString(), i, new ReplicatedLogImplEntry(i, 1,
492 newModificationPayload(mod)));
495 InMemoryJournal.addEntry(shardID.toString(), nListEntries + 1,
496 new ApplyJournalEntries(nListEntries));
498 testRecovery(listEntryKeys);
502 public void testHelium2VersionRecovery() throws Exception {
504 // Set up the InMemorySnapshotStore.
506 InMemoryDOMDataStore testStore = InMemoryDOMDataStoreFactory.create("Test", null, null);
507 testStore.onGlobalContextUpdated(SCHEMA_CONTEXT);
509 writeToStore(testStore, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
511 NormalizedNode<?, ?> root = readStore(testStore, YangInstanceIdentifier.builder().build());
513 InMemorySnapshotStore.addSnapshot(shardID.toString(), Snapshot.create(
514 new NormalizedNodeToNodeCodec(SCHEMA_CONTEXT).encode(root).
515 getNormalizedNode().toByteString().toByteArray(),
516 Collections.<ReplicatedLogEntry>emptyList(), 0, 1, -1, -1));
518 // Set up the InMemoryJournal.
520 InMemoryJournal.addEntry(shardID.toString(), 0, new ReplicatedLogImplEntry(0, 1, newLegacyPayload(
521 new WriteModification(TestModel.OUTER_LIST_PATH,
522 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build()))));
524 int nListEntries = 16;
525 Set<Integer> listEntryKeys = new HashSet<>();
528 // Add some CompositeModificationPayload entries
530 listEntryKeys.add(Integer.valueOf(i));
531 YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
532 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
533 Modification mod = new MergeModification(path,
534 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i));
535 InMemoryJournal.addEntry(shardID.toString(), i, new ReplicatedLogImplEntry(i, 1,
536 newLegacyPayload(mod)));
539 // Add some CompositeModificationByteStringPayload entries
540 for(; i <= nListEntries; i++) {
541 listEntryKeys.add(Integer.valueOf(i));
542 YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
543 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
544 Modification mod = new MergeModification(path,
545 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i));
546 InMemoryJournal.addEntry(shardID.toString(), i, new ReplicatedLogImplEntry(i, 1,
547 newLegacyByteStringPayload(mod)));
550 InMemoryJournal.addEntry(shardID.toString(), nListEntries + 1, new ApplyLogEntries(nListEntries));
552 testRecovery(listEntryKeys);
555 private void testRecovery(Set<Integer> listEntryKeys) throws Exception {
556 // Create the actor and wait for recovery complete.
558 int nListEntries = listEntryKeys.size();
560 final CountDownLatch recoveryComplete = new CountDownLatch(1);
562 @SuppressWarnings("serial")
563 Creator<Shard> creator = new Creator<Shard>() {
565 public Shard create() throws Exception {
566 return new Shard(shardID, Collections.<ShardIdentifier,String>emptyMap(),
567 newDatastoreContext(), SCHEMA_CONTEXT) {
569 protected void onRecoveryComplete() {
571 super.onRecoveryComplete();
573 recoveryComplete.countDown();
580 TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
581 Props.create(new DelegatingShardCreator(creator)), "testRecovery");
583 assertEquals("Recovery complete", true, recoveryComplete.await(5, TimeUnit.SECONDS));
585 // Verify data in the data store.
587 NormalizedNode<?, ?> outerList = readStore(shard, TestModel.OUTER_LIST_PATH);
588 assertNotNull(TestModel.OUTER_LIST_QNAME.getLocalName() + " not found", outerList);
589 assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " value is not Iterable",
590 outerList.getValue() instanceof Iterable);
591 for(Object entry: (Iterable<?>) outerList.getValue()) {
592 assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " entry is not MapEntryNode",
593 entry instanceof MapEntryNode);
594 MapEntryNode mapEntry = (MapEntryNode)entry;
595 Optional<DataContainerChild<? extends PathArgument, ?>> idLeaf =
596 mapEntry.getChild(new YangInstanceIdentifier.NodeIdentifier(TestModel.ID_QNAME));
597 assertTrue("Missing leaf " + TestModel.ID_QNAME.getLocalName(), idLeaf.isPresent());
598 Object value = idLeaf.get().getValue();
599 assertTrue("Unexpected value for leaf "+ TestModel.ID_QNAME.getLocalName() + ": " + value,
600 listEntryKeys.remove(value));
603 if(!listEntryKeys.isEmpty()) {
604 fail("Missing " + TestModel.OUTER_LIST_QNAME.getLocalName() + " entries with keys: " +
608 assertEquals("Last log index", nListEntries,
609 shard.underlyingActor().getShardMBean().getLastLogIndex());
610 assertEquals("Commit index", nListEntries,
611 shard.underlyingActor().getShardMBean().getCommitIndex());
612 assertEquals("Last applied", nListEntries,
613 shard.underlyingActor().getShardMBean().getLastApplied());
615 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
618 private CompositeModificationPayload newLegacyPayload(final Modification... mods) {
619 MutableCompositeModification compMod = new MutableCompositeModification();
620 for(Modification mod: mods) {
621 compMod.addModification(mod);
624 return new CompositeModificationPayload(compMod.toSerializable());
627 private CompositeModificationByteStringPayload newLegacyByteStringPayload(final Modification... mods) {
628 MutableCompositeModification compMod = new MutableCompositeModification();
629 for(Modification mod: mods) {
630 compMod.addModification(mod);
633 return new CompositeModificationByteStringPayload(compMod.toSerializable());
636 private ModificationPayload newModificationPayload(final Modification... mods) throws IOException {
637 MutableCompositeModification compMod = new MutableCompositeModification();
638 for(Modification mod: mods) {
639 compMod.addModification(mod);
642 return new ModificationPayload(compMod);
645 private DOMStoreThreePhaseCommitCohort setupMockWriteTransaction(final String cohortName,
646 final InMemoryDOMDataStore dataStore, final YangInstanceIdentifier path, final NormalizedNode<?, ?> data,
647 final MutableCompositeModification modification) {
648 return setupMockWriteTransaction(cohortName, dataStore, path, data, modification, null);
651 private DOMStoreThreePhaseCommitCohort setupMockWriteTransaction(final String cohortName,
652 final InMemoryDOMDataStore dataStore, final YangInstanceIdentifier path, final NormalizedNode<?, ?> data,
653 final MutableCompositeModification modification,
654 final Function<DOMStoreThreePhaseCommitCohort,ListenableFuture<Void>> preCommit) {
656 DOMStoreWriteTransaction tx = dataStore.newWriteOnlyTransaction();
657 tx.write(path, data);
658 final DOMStoreThreePhaseCommitCohort realCohort = tx.ready();
659 DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, cohortName);
661 doAnswer(new Answer<ListenableFuture<Boolean>>() {
663 public ListenableFuture<Boolean> answer(final InvocationOnMock invocation) {
664 return realCohort.canCommit();
666 }).when(cohort).canCommit();
668 doAnswer(new Answer<ListenableFuture<Void>>() {
670 public ListenableFuture<Void> answer(final InvocationOnMock invocation) throws Throwable {
671 if(preCommit != null) {
672 return preCommit.apply(realCohort);
674 return realCohort.preCommit();
677 }).when(cohort).preCommit();
679 doAnswer(new Answer<ListenableFuture<Void>>() {
681 public ListenableFuture<Void> answer(final InvocationOnMock invocation) throws Throwable {
682 return realCohort.commit();
684 }).when(cohort).commit();
686 doAnswer(new Answer<ListenableFuture<Void>>() {
688 public ListenableFuture<Void> answer(final InvocationOnMock invocation) throws Throwable {
689 return realCohort.abort();
691 }).when(cohort).abort();
693 modification.addModification(new WriteModification(path, data));
698 @SuppressWarnings({ "unchecked" })
700 public void testConcurrentThreePhaseCommits() throws Throwable {
701 new ShardTestKit(getSystem()) {{
702 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
703 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
704 "testConcurrentThreePhaseCommits");
706 waitUntilLeader(shard);
708 // Setup 3 simulated transactions with mock cohorts backed by real cohorts.
710 InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
712 String transactionID1 = "tx1";
713 MutableCompositeModification modification1 = new MutableCompositeModification();
714 DOMStoreThreePhaseCommitCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
715 TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
717 String transactionID2 = "tx2";
718 MutableCompositeModification modification2 = new MutableCompositeModification();
719 DOMStoreThreePhaseCommitCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
720 TestModel.OUTER_LIST_PATH,
721 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
724 String transactionID3 = "tx3";
725 MutableCompositeModification modification3 = new MutableCompositeModification();
726 DOMStoreThreePhaseCommitCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
727 YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
728 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
729 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1),
733 final FiniteDuration duration = FiniteDuration.create(timeoutSec, TimeUnit.SECONDS);
734 final Timeout timeout = new Timeout(duration);
736 // Simulate the ForwardedReadyTransaction message for the first Tx that would be sent
737 // by the ShardTransaction.
739 shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
740 cohort1, modification1, true), getRef());
741 ReadyTransactionReply readyReply = ReadyTransactionReply.fromSerializable(
742 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS));
743 assertEquals("Cohort path", shard.path().toString(), readyReply.getCohortPath());
745 // Send the CanCommitTransaction message for the first Tx.
747 shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
748 CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
749 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
750 assertEquals("Can commit", true, canCommitReply.getCanCommit());
752 // Send the ForwardedReadyTransaction for the next 2 Tx's.
754 shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
755 cohort2, modification2, true), getRef());
756 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
758 shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
759 cohort3, modification3, true), getRef());
760 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
762 // Send the CanCommitTransaction message for the next 2 Tx's. These should get queued and
763 // processed after the first Tx completes.
765 Future<Object> canCommitFuture1 = Patterns.ask(shard,
766 new CanCommitTransaction(transactionID2).toSerializable(), timeout);
768 Future<Object> canCommitFuture2 = Patterns.ask(shard,
769 new CanCommitTransaction(transactionID3).toSerializable(), timeout);
771 // Send the CommitTransaction message for the first Tx. After it completes, it should
772 // trigger the 2nd Tx to proceed which should in turn then trigger the 3rd.
774 shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
775 expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
777 // Wait for the next 2 Tx's to complete.
779 final AtomicReference<Throwable> caughtEx = new AtomicReference<>();
780 final CountDownLatch commitLatch = new CountDownLatch(2);
782 class OnFutureComplete extends OnComplete<Object> {
783 private final Class<?> expRespType;
785 OnFutureComplete(final Class<?> expRespType) {
786 this.expRespType = expRespType;
790 public void onComplete(final Throwable error, final Object resp) {
792 caughtEx.set(new AssertionError(getClass().getSimpleName() + " failure", error));
795 assertEquals("Commit response type", expRespType, resp.getClass());
797 } catch (Exception e) {
803 void onSuccess(final Object resp) throws Exception {
807 class OnCommitFutureComplete extends OnFutureComplete {
808 OnCommitFutureComplete() {
809 super(CommitTransactionReply.SERIALIZABLE_CLASS);
813 public void onComplete(final Throwable error, final Object resp) {
814 super.onComplete(error, resp);
815 commitLatch.countDown();
819 class OnCanCommitFutureComplete extends OnFutureComplete {
820 private final String transactionID;
822 OnCanCommitFutureComplete(final String transactionID) {
823 super(CanCommitTransactionReply.SERIALIZABLE_CLASS);
824 this.transactionID = transactionID;
828 void onSuccess(final Object resp) throws Exception {
829 CanCommitTransactionReply canCommitReply =
830 CanCommitTransactionReply.fromSerializable(resp);
831 assertEquals("Can commit", true, canCommitReply.getCanCommit());
833 Future<Object> commitFuture = Patterns.ask(shard,
834 new CommitTransaction(transactionID).toSerializable(), timeout);
835 commitFuture.onComplete(new OnCommitFutureComplete(), getSystem().dispatcher());
839 canCommitFuture1.onComplete(new OnCanCommitFutureComplete(transactionID2),
840 getSystem().dispatcher());
842 canCommitFuture2.onComplete(new OnCanCommitFutureComplete(transactionID3),
843 getSystem().dispatcher());
845 boolean done = commitLatch.await(timeoutSec, TimeUnit.SECONDS);
847 if(caughtEx.get() != null) {
848 throw caughtEx.get();
851 assertEquals("Commits complete", true, done);
853 InOrder inOrder = inOrder(cohort1, cohort2, cohort3);
854 inOrder.verify(cohort1).canCommit();
855 inOrder.verify(cohort1).preCommit();
856 inOrder.verify(cohort1).commit();
857 inOrder.verify(cohort2).canCommit();
858 inOrder.verify(cohort2).preCommit();
859 inOrder.verify(cohort2).commit();
860 inOrder.verify(cohort3).canCommit();
861 inOrder.verify(cohort3).preCommit();
862 inOrder.verify(cohort3).commit();
864 // Verify data in the data store.
866 NormalizedNode<?, ?> outerList = readStore(shard, TestModel.OUTER_LIST_PATH);
867 assertNotNull(TestModel.OUTER_LIST_QNAME.getLocalName() + " not found", outerList);
868 assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " value is not Iterable",
869 outerList.getValue() instanceof Iterable);
870 Object entry = ((Iterable<Object>)outerList.getValue()).iterator().next();
871 assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " entry is not MapEntryNode",
872 entry instanceof MapEntryNode);
873 MapEntryNode mapEntry = (MapEntryNode)entry;
874 Optional<DataContainerChild<? extends PathArgument, ?>> idLeaf =
875 mapEntry.getChild(new YangInstanceIdentifier.NodeIdentifier(TestModel.ID_QNAME));
876 assertTrue("Missing leaf " + TestModel.ID_QNAME.getLocalName(), idLeaf.isPresent());
877 assertEquals(TestModel.ID_QNAME.getLocalName() + " value", 1, idLeaf.get().getValue());
879 verifyLastLogIndex(shard, 2);
881 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
885 private void verifyLastLogIndex(TestActorRef<Shard> shard, long expectedValue) {
886 for(int i = 0; i < 20 * 5; i++) {
887 long lastLogIndex = shard.underlyingActor().getShardMBean().getLastLogIndex();
888 if(lastLogIndex == expectedValue) {
891 Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
894 assertEquals("Last log index", expectedValue, shard.underlyingActor().getShardMBean().getLastLogIndex());
898 public void testCommitWithPersistenceDisabled() throws Throwable {
899 dataStoreContextBuilder.persistent(false);
900 new ShardTestKit(getSystem()) {{
901 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
902 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
903 "testCommitPhaseFailure");
905 waitUntilLeader(shard);
907 InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
909 // Setup a simulated transactions with a mock cohort.
911 String transactionID = "tx";
912 MutableCompositeModification modification = new MutableCompositeModification();
913 NormalizedNode<?, ?> containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
914 DOMStoreThreePhaseCommitCohort cohort = setupMockWriteTransaction("cohort", dataStore,
915 TestModel.TEST_PATH, containerNode, modification);
917 FiniteDuration duration = duration("5 seconds");
919 // Simulate the ForwardedReadyTransaction messages that would be sent
920 // by the ShardTransaction.
922 shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
923 cohort, modification, true), getRef());
924 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
926 // Send the CanCommitTransaction message.
928 shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
929 CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
930 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
931 assertEquals("Can commit", true, canCommitReply.getCanCommit());
933 // Send the CanCommitTransaction message.
935 shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
936 expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
938 InOrder inOrder = inOrder(cohort);
939 inOrder.verify(cohort).canCommit();
940 inOrder.verify(cohort).preCommit();
941 inOrder.verify(cohort).commit();
943 NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.TEST_PATH);
944 assertEquals(TestModel.TEST_QNAME.getLocalName(), containerNode, actualNode);
946 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
951 public void testCommitPhaseFailure() throws Throwable {
952 new ShardTestKit(getSystem()) {{
953 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
954 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
955 "testCommitPhaseFailure");
957 waitUntilLeader(shard);
959 // Setup 2 simulated transactions with mock cohorts. The first one fails in the
962 String transactionID1 = "tx1";
963 MutableCompositeModification modification1 = new MutableCompositeModification();
964 DOMStoreThreePhaseCommitCohort cohort1 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
965 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
966 doReturn(Futures.immediateFuture(null)).when(cohort1).preCommit();
967 doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort1).commit();
969 String transactionID2 = "tx2";
970 MutableCompositeModification modification2 = new MutableCompositeModification();
971 DOMStoreThreePhaseCommitCohort cohort2 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort2");
972 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
974 FiniteDuration duration = duration("5 seconds");
975 final Timeout timeout = new Timeout(duration);
977 // Simulate the ForwardedReadyTransaction messages that would be sent
978 // by the ShardTransaction.
980 shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
981 cohort1, modification1, true), getRef());
982 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
984 shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
985 cohort2, modification2, true), getRef());
986 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
988 // Send the CanCommitTransaction message for the first Tx.
990 shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
991 CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
992 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
993 assertEquals("Can commit", true, canCommitReply.getCanCommit());
995 // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
996 // processed after the first Tx completes.
998 Future<Object> canCommitFuture = Patterns.ask(shard,
999 new CanCommitTransaction(transactionID2).toSerializable(), timeout);
1001 // Send the CommitTransaction message for the first Tx. This should send back an error
1002 // and trigger the 2nd Tx to proceed.
1004 shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
1005 expectMsgClass(duration, akka.actor.Status.Failure.class);
1007 // Wait for the 2nd Tx to complete the canCommit phase.
1009 final CountDownLatch latch = new CountDownLatch(1);
1010 canCommitFuture.onComplete(new OnComplete<Object>() {
1012 public void onComplete(final Throwable t, final Object resp) {
1015 }, getSystem().dispatcher());
1017 assertEquals("2nd CanCommit complete", true, latch.await(5, TimeUnit.SECONDS));
1019 InOrder inOrder = inOrder(cohort1, cohort2);
1020 inOrder.verify(cohort1).canCommit();
1021 inOrder.verify(cohort1).preCommit();
1022 inOrder.verify(cohort1).commit();
1023 inOrder.verify(cohort2).canCommit();
1025 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1030 public void testPreCommitPhaseFailure() throws Throwable {
1031 new ShardTestKit(getSystem()) {{
1032 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1033 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1034 "testPreCommitPhaseFailure");
1036 waitUntilLeader(shard);
1038 String transactionID = "tx1";
1039 MutableCompositeModification modification = new MutableCompositeModification();
1040 DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
1041 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
1042 doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).preCommit();
1044 FiniteDuration duration = duration("5 seconds");
1046 // Simulate the ForwardedReadyTransaction messages that would be sent
1047 // by the ShardTransaction.
1049 shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
1050 cohort, modification, true), getRef());
1051 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1053 // Send the CanCommitTransaction message.
1055 shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
1056 CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1057 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1058 assertEquals("Can commit", true, canCommitReply.getCanCommit());
1060 // Send the CommitTransaction message. This should send back an error
1061 // for preCommit failure.
1063 shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
1064 expectMsgClass(duration, akka.actor.Status.Failure.class);
1066 InOrder inOrder = inOrder(cohort);
1067 inOrder.verify(cohort).canCommit();
1068 inOrder.verify(cohort).preCommit();
1070 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1075 public void testCanCommitPhaseFailure() throws Throwable {
1076 new ShardTestKit(getSystem()) {{
1077 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1078 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1079 "testCanCommitPhaseFailure");
1081 waitUntilLeader(shard);
1083 final FiniteDuration duration = duration("5 seconds");
1085 String transactionID = "tx1";
1086 MutableCompositeModification modification = new MutableCompositeModification();
1087 DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
1088 doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).canCommit();
1090 // Simulate the ForwardedReadyTransaction messages that would be sent
1091 // by the ShardTransaction.
1093 shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
1094 cohort, modification, true), getRef());
1095 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1097 // Send the CanCommitTransaction message.
1099 shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
1100 expectMsgClass(duration, akka.actor.Status.Failure.class);
1102 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1107 public void testAbortBeforeFinishCommit() throws Throwable {
1108 new ShardTestKit(getSystem()) {{
1109 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1110 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1111 "testAbortBeforeFinishCommit");
1113 waitUntilLeader(shard);
1115 final FiniteDuration duration = duration("5 seconds");
1116 InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
1118 final String transactionID = "tx1";
1119 Function<DOMStoreThreePhaseCommitCohort,ListenableFuture<Void>> preCommit =
1120 new Function<DOMStoreThreePhaseCommitCohort,ListenableFuture<Void>>() {
1122 public ListenableFuture<Void> apply(final DOMStoreThreePhaseCommitCohort cohort) {
1123 ListenableFuture<Void> preCommitFuture = cohort.preCommit();
1125 // Simulate an AbortTransaction message occurring during replication, after
1126 // persisting and before finishing the commit to the in-memory store.
1127 // We have no followers so due to optimizations in the RaftActor, it does not
1128 // attempt replication and thus we can't send an AbortTransaction message b/c
1129 // it would be processed too late after CommitTransaction completes. So we'll
1130 // simulate an AbortTransaction message occurring during replication by calling
1131 // the shard directly.
1133 shard.underlyingActor().doAbortTransaction(transactionID, null);
1135 return preCommitFuture;
1139 MutableCompositeModification modification = new MutableCompositeModification();
1140 DOMStoreThreePhaseCommitCohort cohort = setupMockWriteTransaction("cohort1", dataStore,
1141 TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME),
1142 modification, preCommit);
1144 shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
1145 cohort, modification, true), getRef());
1146 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1148 shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
1149 CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1150 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1151 assertEquals("Can commit", true, canCommitReply.getCanCommit());
1153 shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
1154 expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1156 NormalizedNode<?, ?> node = readStore(shard, TestModel.TEST_PATH);
1158 // Since we're simulating an abort occurring during replication and before finish commit,
1159 // the data should still get written to the in-memory store since we've gotten past
1160 // canCommit and preCommit and persisted the data.
1161 assertNotNull(TestModel.TEST_QNAME.getLocalName() + " not found", node);
1163 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1168 public void testTransactionCommitTimeout() throws Throwable {
1169 dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1);
1171 new ShardTestKit(getSystem()) {{
1172 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1173 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1174 "testTransactionCommitTimeout");
1176 waitUntilLeader(shard);
1178 final FiniteDuration duration = duration("5 seconds");
1180 InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
1182 writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1183 writeToStore(shard, TestModel.OUTER_LIST_PATH,
1184 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
1186 // Create 1st Tx - will timeout
1188 String transactionID1 = "tx1";
1189 MutableCompositeModification modification1 = new MutableCompositeModification();
1190 DOMStoreThreePhaseCommitCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
1191 YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
1192 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
1193 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1),
1198 String transactionID2 = "tx3";
1199 MutableCompositeModification modification2 = new MutableCompositeModification();
1200 YangInstanceIdentifier listNodePath = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
1201 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build();
1202 DOMStoreThreePhaseCommitCohort cohort2 = setupMockWriteTransaction("cohort3", dataStore,
1204 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2),
1209 shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1210 cohort1, modification1, true), getRef());
1211 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1213 shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1214 cohort2, modification2, true), getRef());
1215 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1217 // canCommit 1st Tx. We don't send the commit so it should timeout.
1219 shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1220 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
1222 // canCommit the 2nd Tx - it should complete after the 1st Tx times out.
1224 shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
1225 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
1227 // Commit the 2nd Tx.
1229 shard.tell(new CommitTransaction(transactionID2).toSerializable(), getRef());
1230 expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
1232 NormalizedNode<?, ?> node = readStore(shard, listNodePath);
1233 assertNotNull(listNodePath + " not found", node);
1235 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1240 public void testTransactionCommitQueueCapacityExceeded() throws Throwable {
1241 dataStoreContextBuilder.shardTransactionCommitQueueCapacity(1);
1243 new ShardTestKit(getSystem()) {{
1244 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1245 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1246 "testTransactionCommitQueueCapacityExceeded");
1248 waitUntilLeader(shard);
1250 final FiniteDuration duration = duration("5 seconds");
1252 InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
1254 String transactionID1 = "tx1";
1255 MutableCompositeModification modification1 = new MutableCompositeModification();
1256 DOMStoreThreePhaseCommitCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
1257 TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
1259 String transactionID2 = "tx2";
1260 MutableCompositeModification modification2 = new MutableCompositeModification();
1261 DOMStoreThreePhaseCommitCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
1262 TestModel.OUTER_LIST_PATH,
1263 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
1266 String transactionID3 = "tx3";
1267 MutableCompositeModification modification3 = new MutableCompositeModification();
1268 DOMStoreThreePhaseCommitCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
1269 TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification3);
1273 shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1274 cohort1, modification1, true), getRef());
1275 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1277 shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1278 cohort2, modification2, true), getRef());
1279 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1281 shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
1282 cohort3, modification3, true), getRef());
1283 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1285 // canCommit 1st Tx.
1287 shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1288 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
1290 // canCommit the 2nd Tx - it should get queued.
1292 shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
1294 // canCommit the 3rd Tx - should exceed queue capacity and fail.
1296 shard.tell(new CanCommitTransaction(transactionID3).toSerializable(), getRef());
1297 expectMsgClass(duration, akka.actor.Status.Failure.class);
1299 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1304 public void testCanCommitBeforeReadyFailure() throws Throwable {
1305 new ShardTestKit(getSystem()) {{
1306 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1307 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1308 "testCanCommitBeforeReadyFailure");
1310 shard.tell(new CanCommitTransaction("tx").toSerializable(), getRef());
1311 expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
1313 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1318 public void testAbortTransaction() throws Throwable {
1319 new ShardTestKit(getSystem()) {{
1320 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1321 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
1322 "testAbortTransaction");
1324 waitUntilLeader(shard);
1326 // Setup 2 simulated transactions with mock cohorts. The first one will be aborted.
1328 String transactionID1 = "tx1";
1329 MutableCompositeModification modification1 = new MutableCompositeModification();
1330 DOMStoreThreePhaseCommitCohort cohort1 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
1331 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
1332 doReturn(Futures.immediateFuture(null)).when(cohort1).abort();
1334 String transactionID2 = "tx2";
1335 MutableCompositeModification modification2 = new MutableCompositeModification();
1336 DOMStoreThreePhaseCommitCohort cohort2 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort2");
1337 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
1339 FiniteDuration duration = duration("5 seconds");
1340 final Timeout timeout = new Timeout(duration);
1342 // Simulate the ForwardedReadyTransaction messages that would be sent
1343 // by the ShardTransaction.
1345 shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
1346 cohort1, modification1, true), getRef());
1347 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1349 shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
1350 cohort2, modification2, true), getRef());
1351 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
1353 // Send the CanCommitTransaction message for the first Tx.
1355 shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
1356 CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
1357 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
1358 assertEquals("Can commit", true, canCommitReply.getCanCommit());
1360 // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
1361 // processed after the first Tx completes.
1363 Future<Object> canCommitFuture = Patterns.ask(shard,
1364 new CanCommitTransaction(transactionID2).toSerializable(), timeout);
1366 // Send the AbortTransaction message for the first Tx. This should trigger the 2nd
1369 shard.tell(new AbortTransaction(transactionID1).toSerializable(), getRef());
1370 expectMsgClass(duration, AbortTransactionReply.SERIALIZABLE_CLASS);
1372 // Wait for the 2nd Tx to complete the canCommit phase.
1374 Await.ready(canCommitFuture, duration);
1376 InOrder inOrder = inOrder(cohort1, cohort2);
1377 inOrder.verify(cohort1).canCommit();
1378 inOrder.verify(cohort2).canCommit();
1380 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1385 public void testCreateSnapshot() throws Exception {
1386 testCreateSnapshot(true, "testCreateSnapshot");
1390 public void testCreateSnapshotWithNonPersistentData() throws Exception {
1391 testCreateSnapshot(false, "testCreateSnapshotWithNonPersistentData");
1394 @SuppressWarnings("serial")
1395 public void testCreateSnapshot(final boolean persistent, final String shardActorName) throws Exception{
1397 final AtomicReference<Object> savedSnapshot = new AtomicReference<>();
1398 class DelegatingPersistentDataProvider implements DataPersistenceProvider {
1399 DataPersistenceProvider delegate;
1401 DelegatingPersistentDataProvider(DataPersistenceProvider delegate) {
1402 this.delegate = delegate;
1406 public boolean isRecoveryApplicable() {
1407 return delegate.isRecoveryApplicable();
1411 public <T> void persist(T o, Procedure<T> procedure) {
1412 delegate.persist(o, procedure);
1416 public void saveSnapshot(Object o) {
1417 savedSnapshot.set(o);
1418 delegate.saveSnapshot(o);
1422 public void deleteSnapshots(SnapshotSelectionCriteria criteria) {
1423 delegate.deleteSnapshots(criteria);
1427 public void deleteMessages(long sequenceNumber) {
1428 delegate.deleteMessages(sequenceNumber);
1432 dataStoreContextBuilder.persistent(persistent);
1434 new ShardTestKit(getSystem()) {{
1435 final AtomicReference<CountDownLatch> latch = new AtomicReference<>(new CountDownLatch(1));
1436 Creator<Shard> creator = new Creator<Shard>() {
1438 public Shard create() throws Exception {
1439 return new Shard(shardID, Collections.<ShardIdentifier,String>emptyMap(),
1440 newDatastoreContext(), SCHEMA_CONTEXT) {
1442 DelegatingPersistentDataProvider delegating;
1445 protected DataPersistenceProvider persistence() {
1446 if(delegating == null) {
1447 delegating = new DelegatingPersistentDataProvider(super.persistence());
1454 protected void commitSnapshot(final long sequenceNumber) {
1455 super.commitSnapshot(sequenceNumber);
1456 latch.get().countDown();
1462 TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1463 Props.create(new DelegatingShardCreator(creator)), shardActorName);
1465 waitUntilLeader(shard);
1467 writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1469 NormalizedNode<?,?> expectedRoot = readStore(shard, YangInstanceIdentifier.builder().build());
1471 CaptureSnapshot capture = new CaptureSnapshot(-1, -1, -1, -1, -1, -1);
1472 shard.tell(capture, getRef());
1474 assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS));
1476 assertTrue("Invalid saved snapshot " + savedSnapshot.get(),
1477 savedSnapshot.get() instanceof Snapshot);
1479 verifySnapshot((Snapshot)savedSnapshot.get(), expectedRoot);
1481 latch.set(new CountDownLatch(1));
1482 savedSnapshot.set(null);
1484 shard.tell(capture, getRef());
1486 assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS));
1488 assertTrue("Invalid saved snapshot " + savedSnapshot.get(),
1489 savedSnapshot.get() instanceof Snapshot);
1491 verifySnapshot((Snapshot)savedSnapshot.get(), expectedRoot);
1493 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
1496 private void verifySnapshot(Snapshot snapshot, NormalizedNode<?,?> expectedRoot) {
1498 NormalizedNode<?, ?> actual = SerializationUtils.deserializeNormalizedNode(snapshot.getState());
1499 assertEquals("Root node", expectedRoot, actual);
1505 * This test simply verifies that the applySnapShot logic will work
1506 * @throws ReadFailedException
1509 public void testInMemoryDataStoreRestore() throws ReadFailedException {
1510 InMemoryDOMDataStore store = new InMemoryDOMDataStore("test", MoreExecutors.sameThreadExecutor());
1512 store.onGlobalContextUpdated(SCHEMA_CONTEXT);
1514 DOMStoreWriteTransaction putTransaction = store.newWriteOnlyTransaction();
1515 putTransaction.write(TestModel.TEST_PATH,
1516 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1517 commitTransaction(putTransaction);
1520 NormalizedNode<?, ?> expected = readStore(store);
1522 DOMStoreWriteTransaction writeTransaction = store.newWriteOnlyTransaction();
1524 writeTransaction.delete(YangInstanceIdentifier.builder().build());
1525 writeTransaction.write(YangInstanceIdentifier.builder().build(), expected);
1527 commitTransaction(writeTransaction);
1529 NormalizedNode<?, ?> actual = readStore(store);
1531 assertEquals(expected, actual);
1535 public void testRecoveryApplicable(){
1537 final DatastoreContext persistentContext = DatastoreContext.newBuilder().
1538 shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(true).build();
1540 final Props persistentProps = Shard.props(shardID, Collections.<ShardIdentifier, String>emptyMap(),
1541 persistentContext, SCHEMA_CONTEXT);
1543 final DatastoreContext nonPersistentContext = DatastoreContext.newBuilder().
1544 shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(false).build();
1546 final Props nonPersistentProps = Shard.props(shardID, Collections.<ShardIdentifier, String>emptyMap(),
1547 nonPersistentContext, SCHEMA_CONTEXT);
1549 new ShardTestKit(getSystem()) {{
1550 TestActorRef<Shard> shard1 = TestActorRef.create(getSystem(),
1551 persistentProps, "testPersistence1");
1553 assertTrue("Recovery Applicable", shard1.underlyingActor().getDataPersistenceProvider().isRecoveryApplicable());
1555 shard1.tell(PoisonPill.getInstance(), ActorRef.noSender());
1557 TestActorRef<Shard> shard2 = TestActorRef.create(getSystem(),
1558 nonPersistentProps, "testPersistence2");
1560 assertFalse("Recovery Not Applicable", shard2.underlyingActor().getDataPersistenceProvider().isRecoveryApplicable());
1562 shard2.tell(PoisonPill.getInstance(), ActorRef.noSender());
1569 private NormalizedNode<?, ?> readStore(final InMemoryDOMDataStore store) throws ReadFailedException {
1570 DOMStoreReadTransaction transaction = store.newReadOnlyTransaction();
1571 CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read =
1572 transaction.read(YangInstanceIdentifier.builder().build());
1574 Optional<NormalizedNode<?, ?>> optional = read.checkedGet();
1576 NormalizedNode<?, ?> normalizedNode = optional.get();
1578 transaction.close();
1580 return normalizedNode;
1583 private void commitTransaction(final DOMStoreWriteTransaction transaction) {
1584 DOMStoreThreePhaseCommitCohort commitCohort = transaction.ready();
1585 ListenableFuture<Void> future =
1586 commitCohort.preCommit();
1589 future = commitCohort.commit();
1591 } catch (InterruptedException | ExecutionException e) {
1595 private AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> noOpDataChangeListener() {
1596 return new AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>() {
1598 public void onDataChanged(
1599 final AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change) {
1605 static NormalizedNode<?,?> readStore(final TestActorRef<Shard> shard, final YangInstanceIdentifier id)
1606 throws ExecutionException, InterruptedException {
1607 return readStore(shard.underlyingActor().getDataStore(), id);
1610 public static NormalizedNode<?,?> readStore(final InMemoryDOMDataStore store, final YangInstanceIdentifier id)
1611 throws ExecutionException, InterruptedException {
1612 DOMStoreReadTransaction transaction = store.newReadOnlyTransaction();
1614 CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> future =
1615 transaction.read(id);
1617 Optional<NormalizedNode<?, ?>> optional = future.get();
1618 NormalizedNode<?, ?> node = optional.isPresent()? optional.get() : null;
1620 transaction.close();
1625 static void writeToStore(final TestActorRef<Shard> shard, final YangInstanceIdentifier id,
1626 final NormalizedNode<?,?> node) throws ExecutionException, InterruptedException {
1627 writeToStore(shard.underlyingActor().getDataStore(), id, node);
1630 public static void writeToStore(final InMemoryDOMDataStore store, final YangInstanceIdentifier id,
1631 final NormalizedNode<?,?> node) throws ExecutionException, InterruptedException {
1632 DOMStoreWriteTransaction transaction = store.newWriteOnlyTransaction();
1634 transaction.write(id, node);
1636 DOMStoreThreePhaseCommitCohort commitCohort = transaction.ready();
1637 commitCohort.preCommit().get();
1638 commitCohort.commit().get();
1641 @SuppressWarnings("serial")
1642 private static final class DelegatingShardCreator implements Creator<Shard> {
1643 private final Creator<Shard> delegate;
1645 DelegatingShardCreator(final Creator<Shard> delegate) {
1646 this.delegate = delegate;
1650 public Shard create() throws Exception {
1651 return delegate.create();