1 package org.opendaylight.controller.cluster.datastore;
3 import akka.actor.ActorRef;
4 import akka.actor.Props;
5 import akka.dispatch.Dispatchers;
6 import akka.dispatch.OnComplete;
7 import akka.japi.Creator;
8 import akka.pattern.Patterns;
9 import akka.testkit.JavaTestKit;
10 import akka.testkit.TestActorRef;
11 import akka.util.Timeout;
12 import com.google.common.base.Function;
13 import com.google.common.base.Optional;
14 import com.google.common.util.concurrent.CheckedFuture;
15 import com.google.common.util.concurrent.Futures;
16 import com.google.common.util.concurrent.ListenableFuture;
17 import com.google.common.util.concurrent.MoreExecutors;
18 import org.junit.After;
19 import org.junit.Before;
20 import org.junit.Test;
21 import org.mockito.InOrder;
22 import org.mockito.invocation.InvocationOnMock;
23 import org.mockito.stubbing.Answer;
24 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
25 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
26 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
27 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
28 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
29 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
30 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
31 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
32 import org.opendaylight.controller.cluster.datastore.messages.EnableNotification;
33 import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
34 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
35 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
36 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
37 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
38 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
39 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
40 import org.opendaylight.controller.cluster.datastore.modification.Modification;
41 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
42 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
43 import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec;
44 import org.opendaylight.controller.cluster.datastore.utils.InMemoryJournal;
45 import org.opendaylight.controller.cluster.datastore.utils.InMemorySnapshotStore;
46 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
47 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
48 import org.opendaylight.controller.cluster.raft.Snapshot;
49 import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
50 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
51 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
52 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
53 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload;
54 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
55 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
56 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
57 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
58 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
59 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
60 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
61 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
62 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreFactory;
63 import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages;
64 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
65 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
66 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
67 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
68 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
69 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
70 import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
71 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
72 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
73 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
74 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
75 import scala.concurrent.Await;
76 import scala.concurrent.Future;
77 import scala.concurrent.duration.FiniteDuration;
78 import java.io.IOException;
79 import java.util.Collections;
80 import java.util.HashSet;
82 import java.util.concurrent.CountDownLatch;
83 import java.util.concurrent.ExecutionException;
84 import java.util.concurrent.TimeUnit;
85 import java.util.concurrent.atomic.AtomicInteger;
86 import java.util.concurrent.atomic.AtomicReference;
87 import static org.junit.Assert.assertEquals;
88 import static org.junit.Assert.assertNotNull;
89 import static org.junit.Assert.assertTrue;
90 import static org.junit.Assert.fail;
91 import static org.mockito.Mockito.mock;
92 import static org.mockito.Mockito.doReturn;
93 import static org.mockito.Mockito.doAnswer;
94 import static org.mockito.Mockito.inOrder;
96 public class ShardTest extends AbstractActorTest {
98 private static final SchemaContext SCHEMA_CONTEXT = TestModel.createTestContext();
100 private static final ShardIdentifier IDENTIFIER = ShardIdentifier.builder().memberName("member-1")
101 .shardName("inventory").type("config").build();
103 private static final AtomicInteger NEXT_SHARD_NUM = new AtomicInteger();
105 private static String shardName() {
106 return "shard" + NEXT_SHARD_NUM.getAndIncrement();
109 private DatastoreContext dataStoreContext = DatastoreContext.newBuilder().
110 shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).build();
113 public void setUp() {
114 System.setProperty("shard.persistent", "false");
116 InMemorySnapshotStore.clear();
117 InMemoryJournal.clear();
121 public void tearDown() {
122 InMemorySnapshotStore.clear();
123 InMemoryJournal.clear();
126 private Props newShardProps() {
127 return Shard.props(IDENTIFIER, Collections.<ShardIdentifier,String>emptyMap(),
128 dataStoreContext, SCHEMA_CONTEXT);
132 public void testOnReceiveRegisterListener() throws Exception {
133 new JavaTestKit(getSystem()) {{
134 ActorRef subject = getSystem().actorOf(newShardProps(), "testRegisterChangeListener");
136 subject.tell(new UpdateSchemaContext(SchemaContextHelper.full()), getRef());
138 subject.tell(new RegisterChangeListener(TestModel.TEST_PATH,
139 getRef().path(), AsyncDataBroker.DataChangeScope.BASE), getRef());
141 EnableNotification enable = expectMsgClass(duration("3 seconds"), EnableNotification.class);
142 assertEquals("isEnabled", false, enable.isEnabled());
144 RegisterChangeListenerReply reply = expectMsgClass(duration("3 seconds"),
145 RegisterChangeListenerReply.class);
146 assertTrue(reply.getListenerRegistrationPath().toString().matches(
147 "akka:\\/\\/test\\/user\\/testRegisterChangeListener\\/\\$.*"));
152 public void testCreateTransaction(){
153 new ShardTestKit(getSystem()) {{
154 ActorRef subject = getSystem().actorOf(newShardProps(), "testCreateTransaction");
156 waitUntilLeader(subject);
158 subject.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
160 subject.tell(new CreateTransaction("txn-1",
161 TransactionProxy.TransactionType.READ_ONLY.ordinal() ).toSerializable(), getRef());
163 CreateTransactionReply reply = expectMsgClass(duration("3 seconds"),
164 CreateTransactionReply.class);
166 String path = reply.getTransactionActorPath().toString();
167 assertTrue("Unexpected transaction path " + path,
168 path.contains("akka://test/user/testCreateTransaction/shard-txn-1"));
174 public void testCreateTransactionOnChain(){
175 new ShardTestKit(getSystem()) {{
176 final ActorRef subject = getSystem().actorOf(newShardProps(), "testCreateTransactionOnChain");
178 waitUntilLeader(subject);
180 subject.tell(new CreateTransaction("txn-1",
181 TransactionProxy.TransactionType.READ_ONLY.ordinal() , "foobar").toSerializable(),
184 CreateTransactionReply reply = expectMsgClass(duration("3 seconds"),
185 CreateTransactionReply.class);
187 String path = reply.getTransactionActorPath().toString();
188 assertTrue("Unexpected transaction path " + path,
189 path.contains("akka://test/user/testCreateTransactionOnChain/shard-txn-1"));
195 public void testPeerAddressResolved(){
196 new JavaTestKit(getSystem()) {{
197 final ShardIdentifier identifier =
198 ShardIdentifier.builder().memberName("member-1")
199 .shardName("inventory").type("config").build();
201 Props props = Shard.props(identifier,
202 Collections.<ShardIdentifier, String>singletonMap(identifier, null),
203 dataStoreContext, SCHEMA_CONTEXT);
204 final ActorRef subject = getSystem().actorOf(props, "testPeerAddressResolved");
206 new Within(duration("3 seconds")) {
208 protected void run() {
211 new PeerAddressResolved(identifier, "akka://foobar"),
221 public void testApplySnapshot() throws ExecutionException, InterruptedException {
222 TestActorRef<Shard> ref = TestActorRef.create(getSystem(), newShardProps());
224 NormalizedNodeToNodeCodec codec =
225 new NormalizedNodeToNodeCodec(SCHEMA_CONTEXT);
227 writeToStore(ref, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
229 YangInstanceIdentifier root = YangInstanceIdentifier.builder().build();
230 NormalizedNode<?,?> expected = readStore(ref, root);
232 NormalizedNodeMessages.Container encode = codec.encode(expected);
234 ApplySnapshot applySnapshot = new ApplySnapshot(Snapshot.create(
235 encode.getNormalizedNode().toByteString().toByteArray(),
236 Collections.<ReplicatedLogEntry>emptyList(), 1, 2, 3, 4));
238 ref.underlyingActor().onReceiveCommand(applySnapshot);
240 NormalizedNode<?,?> actual = readStore(ref, root);
242 assertEquals(expected, actual);
246 public void testApplyState() throws Exception {
248 TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps());
250 NormalizedNode<?, ?> node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
252 MutableCompositeModification compMod = new MutableCompositeModification();
253 compMod.addModification(new WriteModification(TestModel.TEST_PATH, node, SCHEMA_CONTEXT));
254 Payload payload = new CompositeModificationPayload(compMod.toSerializable());
255 ApplyState applyState = new ApplyState(null, "test",
256 new ReplicatedLogImplEntry(1, 2, payload));
258 shard.underlyingActor().onReceiveCommand(applyState);
260 NormalizedNode<?,?> actual = readStore(shard, TestModel.TEST_PATH);
261 assertEquals("Applied state", node, actual);
264 @SuppressWarnings("serial")
266 public void testRecovery() throws Exception {
268 // Set up the InMemorySnapshotStore.
270 InMemoryDOMDataStore testStore = InMemoryDOMDataStoreFactory.create("Test", null, null);
271 testStore.onGlobalContextUpdated(SCHEMA_CONTEXT);
273 DOMStoreWriteTransaction writeTx = testStore.newWriteOnlyTransaction();
274 writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
275 DOMStoreThreePhaseCommitCohort commitCohort = writeTx.ready();
276 commitCohort.preCommit().get();
277 commitCohort.commit().get();
279 DOMStoreReadTransaction readTx = testStore.newReadOnlyTransaction();
280 NormalizedNode<?, ?> root = readTx.read(YangInstanceIdentifier.builder().build()).get().get();
282 InMemorySnapshotStore.addSnapshot(IDENTIFIER.toString(), Snapshot.create(
283 new NormalizedNodeToNodeCodec(SCHEMA_CONTEXT).encode(
285 getNormalizedNode().toByteString().toByteArray(),
286 Collections.<ReplicatedLogEntry>emptyList(), 0, 1, -1, -1));
288 // Set up the InMemoryJournal.
290 InMemoryJournal.addEntry(IDENTIFIER.toString(), 0, new ReplicatedLogImplEntry(0, 1, newPayload(
291 new WriteModification(TestModel.OUTER_LIST_PATH,
292 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
295 int nListEntries = 11;
296 Set<Integer> listEntryKeys = new HashSet<>();
297 for(int i = 1; i <= nListEntries; i++) {
298 listEntryKeys.add(Integer.valueOf(i));
299 YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
300 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
301 Modification mod = new MergeModification(path,
302 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i),
304 InMemoryJournal.addEntry(IDENTIFIER.toString(), i, new ReplicatedLogImplEntry(i, 1,
308 InMemoryJournal.addEntry(IDENTIFIER.toString(), nListEntries + 1,
309 new ApplyLogEntries(nListEntries));
311 // Create the actor and wait for recovery complete.
313 final CountDownLatch recoveryComplete = new CountDownLatch(1);
315 Creator<Shard> creator = new Creator<Shard>() {
317 public Shard create() throws Exception {
318 return new Shard(IDENTIFIER, Collections.<ShardIdentifier,String>emptyMap(),
319 dataStoreContext, SCHEMA_CONTEXT) {
321 protected void onRecoveryComplete() {
323 super.onRecoveryComplete();
325 recoveryComplete.countDown();
332 TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
333 Props.create(new DelegatingShardCreator(creator)), "testRecovery");
335 assertEquals("Recovery complete", true, recoveryComplete.await(5, TimeUnit.SECONDS));
337 // Verify data in the data store.
339 NormalizedNode<?, ?> outerList = readStore(shard, TestModel.OUTER_LIST_PATH);
340 assertNotNull(TestModel.OUTER_LIST_QNAME.getLocalName() + " not found", outerList);
341 assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " value is not Iterable",
342 outerList.getValue() instanceof Iterable);
343 for(Object entry: (Iterable<?>) outerList.getValue()) {
344 assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " entry is not MapEntryNode",
345 entry instanceof MapEntryNode);
346 MapEntryNode mapEntry = (MapEntryNode)entry;
347 Optional<DataContainerChild<? extends PathArgument, ?>> idLeaf =
348 mapEntry.getChild(new YangInstanceIdentifier.NodeIdentifier(TestModel.ID_QNAME));
349 assertTrue("Missing leaf " + TestModel.ID_QNAME.getLocalName(), idLeaf.isPresent());
350 Object value = idLeaf.get().getValue();
351 assertTrue("Unexpected value for leaf "+ TestModel.ID_QNAME.getLocalName() + ": " + value,
352 listEntryKeys.remove(value));
355 if(!listEntryKeys.isEmpty()) {
356 fail("Missing " + TestModel.OUTER_LIST_QNAME.getLocalName() + " entries with keys: " +
360 assertEquals("Last log index", nListEntries,
361 shard.underlyingActor().getShardMBean().getLastLogIndex());
362 assertEquals("Commit index", nListEntries,
363 shard.underlyingActor().getShardMBean().getCommitIndex());
364 assertEquals("Last applied", nListEntries,
365 shard.underlyingActor().getShardMBean().getLastApplied());
368 private CompositeModificationPayload newPayload(Modification... mods) {
369 MutableCompositeModification compMod = new MutableCompositeModification();
370 for(Modification mod: mods) {
371 compMod.addModification(mod);
374 return new CompositeModificationPayload(compMod.toSerializable());
377 private DOMStoreThreePhaseCommitCohort setupMockWriteTransaction(String cohortName,
378 InMemoryDOMDataStore dataStore, YangInstanceIdentifier path, NormalizedNode data,
379 MutableCompositeModification modification) {
380 return setupMockWriteTransaction(cohortName, dataStore, path, data, modification, null);
383 private DOMStoreThreePhaseCommitCohort setupMockWriteTransaction(String cohortName,
384 InMemoryDOMDataStore dataStore, YangInstanceIdentifier path, NormalizedNode data,
385 MutableCompositeModification modification,
386 final Function<DOMStoreThreePhaseCommitCohort,ListenableFuture<Void>> preCommit) {
388 DOMStoreWriteTransaction tx = dataStore.newWriteOnlyTransaction();
389 tx.write(path, data);
390 final DOMStoreThreePhaseCommitCohort realCohort = tx.ready();
391 DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, cohortName);
393 doAnswer(new Answer<ListenableFuture<Boolean>>() {
395 public ListenableFuture<Boolean> answer(InvocationOnMock invocation) {
396 return realCohort.canCommit();
398 }).when(cohort).canCommit();
400 doAnswer(new Answer<ListenableFuture<Void>>() {
402 public ListenableFuture<Void> answer(InvocationOnMock invocation) throws Throwable {
403 if(preCommit != null) {
404 return preCommit.apply(realCohort);
406 return realCohort.preCommit();
409 }).when(cohort).preCommit();
411 doAnswer(new Answer<ListenableFuture<Void>>() {
413 public ListenableFuture<Void> answer(InvocationOnMock invocation) throws Throwable {
414 return realCohort.commit();
416 }).when(cohort).commit();
418 doAnswer(new Answer<ListenableFuture<Void>>() {
420 public ListenableFuture<Void> answer(InvocationOnMock invocation) throws Throwable {
421 return realCohort.abort();
423 }).when(cohort).abort();
425 modification.addModification(new WriteModification(path, data, SCHEMA_CONTEXT));
430 @SuppressWarnings({ "unchecked" })
432 public void testConcurrentThreePhaseCommits() throws Throwable {
433 System.setProperty("shard.persistent", "true");
434 new ShardTestKit(getSystem()) {{
435 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
436 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), shardName());
438 waitUntilLeader(shard);
440 // Setup 3 simulated transactions with mock cohorts backed by real cohorts.
442 InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
444 String transactionID1 = "tx1";
445 MutableCompositeModification modification1 = new MutableCompositeModification();
446 DOMStoreThreePhaseCommitCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
447 TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
449 String transactionID2 = "tx2";
450 MutableCompositeModification modification2 = new MutableCompositeModification();
451 DOMStoreThreePhaseCommitCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
452 TestModel.OUTER_LIST_PATH,
453 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
456 String transactionID3 = "tx3";
457 MutableCompositeModification modification3 = new MutableCompositeModification();
458 DOMStoreThreePhaseCommitCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
459 YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
460 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
461 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1),
465 final FiniteDuration duration = FiniteDuration.create(timeoutSec, TimeUnit.SECONDS);
466 final Timeout timeout = new Timeout(duration);
468 // Simulate the ForwardedReadyTransaction message for the first Tx that would be sent
469 // by the ShardTransaction.
471 shard.tell(new ForwardedReadyTransaction(transactionID1, cohort1, modification1, true), getRef());
472 ReadyTransactionReply readyReply = ReadyTransactionReply.fromSerializable(
473 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS));
474 assertEquals("Cohort path", shard.path().toString(), readyReply.getCohortPath());
476 // Send the CanCommitTransaction message for the first Tx.
478 shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
479 CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
480 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
481 assertEquals("Can commit", true, canCommitReply.getCanCommit());
483 // Send the ForwardedReadyTransaction for the next 2 Tx's.
485 shard.tell(new ForwardedReadyTransaction(transactionID2, cohort2, modification2, true), getRef());
486 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
488 shard.tell(new ForwardedReadyTransaction(transactionID3, cohort3, modification3, true), getRef());
489 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
491 // Send the CanCommitTransaction message for the next 2 Tx's. These should get queued and
492 // processed after the first Tx completes.
494 Future<Object> canCommitFuture1 = Patterns.ask(shard,
495 new CanCommitTransaction(transactionID2).toSerializable(), timeout);
497 Future<Object> canCommitFuture2 = Patterns.ask(shard,
498 new CanCommitTransaction(transactionID3).toSerializable(), timeout);
500 // Send the CommitTransaction message for the first Tx. After it completes, it should
501 // trigger the 2nd Tx to proceed which should in turn then trigger the 3rd.
503 shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
504 expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
506 // Wait for the next 2 Tx's to complete.
508 final AtomicReference<Throwable> caughtEx = new AtomicReference<>();
509 final CountDownLatch commitLatch = new CountDownLatch(2);
511 class OnFutureComplete extends OnComplete<Object> {
512 private final Class<?> expRespType;
514 OnFutureComplete(Class<?> expRespType) {
515 this.expRespType = expRespType;
519 public void onComplete(Throwable error, Object resp) {
521 System.out.println(new java.util.Date()+": "+getClass().getSimpleName() + " failure: "+error);
522 caughtEx.set(new AssertionError(getClass().getSimpleName() + " failure", error));
525 assertEquals("Commit response type", expRespType, resp.getClass());
527 } catch (Exception e) {
533 void onSuccess(Object resp) throws Exception {
537 class OnCommitFutureComplete extends OnFutureComplete {
538 OnCommitFutureComplete() {
539 super(CommitTransactionReply.SERIALIZABLE_CLASS);
543 public void onComplete(Throwable error, Object resp) {
544 super.onComplete(error, resp);
545 commitLatch.countDown();
549 class OnCanCommitFutureComplete extends OnFutureComplete {
550 private final String transactionID;
552 OnCanCommitFutureComplete(String transactionID) {
553 super(CanCommitTransactionReply.SERIALIZABLE_CLASS);
554 this.transactionID = transactionID;
558 void onSuccess(Object resp) throws Exception {
559 CanCommitTransactionReply canCommitReply =
560 CanCommitTransactionReply.fromSerializable(resp);
561 assertEquals("Can commit", true, canCommitReply.getCanCommit());
563 Future<Object> commitFuture = Patterns.ask(shard,
564 new CommitTransaction(transactionID).toSerializable(), timeout);
565 commitFuture.onComplete(new OnCommitFutureComplete(), getSystem().dispatcher());
569 canCommitFuture1.onComplete(new OnCanCommitFutureComplete(transactionID2),
570 getSystem().dispatcher());
572 canCommitFuture2.onComplete(new OnCanCommitFutureComplete(transactionID3),
573 getSystem().dispatcher());
575 boolean done = commitLatch.await(timeoutSec, TimeUnit.SECONDS);
577 if(caughtEx.get() != null) {
578 throw caughtEx.get();
581 assertEquals("Commits complete", true, done);
583 InOrder inOrder = inOrder(cohort1, cohort2, cohort3);
584 inOrder.verify(cohort1).canCommit();
585 inOrder.verify(cohort1).preCommit();
586 inOrder.verify(cohort1).commit();
587 inOrder.verify(cohort2).canCommit();
588 inOrder.verify(cohort2).preCommit();
589 inOrder.verify(cohort2).commit();
590 inOrder.verify(cohort3).canCommit();
591 inOrder.verify(cohort3).preCommit();
592 inOrder.verify(cohort3).commit();
594 // Verify data in the data store.
596 NormalizedNode<?, ?> outerList = readStore(shard, TestModel.OUTER_LIST_PATH);
597 assertNotNull(TestModel.OUTER_LIST_QNAME.getLocalName() + " not found", outerList);
598 assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " value is not Iterable",
599 outerList.getValue() instanceof Iterable);
600 Object entry = ((Iterable<Object>)outerList.getValue()).iterator().next();
601 assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " entry is not MapEntryNode",
602 entry instanceof MapEntryNode);
603 MapEntryNode mapEntry = (MapEntryNode)entry;
604 Optional<DataContainerChild<? extends PathArgument, ?>> idLeaf =
605 mapEntry.getChild(new YangInstanceIdentifier.NodeIdentifier(TestModel.ID_QNAME));
606 assertTrue("Missing leaf " + TestModel.ID_QNAME.getLocalName(), idLeaf.isPresent());
607 assertEquals(TestModel.ID_QNAME.getLocalName() + " value", 1, idLeaf.get().getValue());
609 assertEquals("Last log index", 2, shard.underlyingActor().getShardMBean().getLastLogIndex());
614 public void testCommitPhaseFailure() throws Throwable {
615 new ShardTestKit(getSystem()) {{
616 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
617 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), shardName());
619 waitUntilLeader(shard);
621 // Setup 2 simulated transactions with mock cohorts. The first one fails in the
624 String transactionID1 = "tx1";
625 MutableCompositeModification modification1 = new MutableCompositeModification();
626 DOMStoreThreePhaseCommitCohort cohort1 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
627 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
628 doReturn(Futures.immediateFuture(null)).when(cohort1).preCommit();
629 doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort1).commit();
631 String transactionID2 = "tx2";
632 MutableCompositeModification modification2 = new MutableCompositeModification();
633 DOMStoreThreePhaseCommitCohort cohort2 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort2");
634 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
636 FiniteDuration duration = duration("5 seconds");
637 final Timeout timeout = new Timeout(duration);
639 // Simulate the ForwardedReadyTransaction messages that would be sent
640 // by the ShardTransaction.
642 shard.tell(new ForwardedReadyTransaction(transactionID1, cohort1, modification1, true), getRef());
643 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
645 shard.tell(new ForwardedReadyTransaction(transactionID2, cohort2, modification2, true), getRef());
646 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
648 // Send the CanCommitTransaction message for the first Tx.
650 shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
651 CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
652 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
653 assertEquals("Can commit", true, canCommitReply.getCanCommit());
655 // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
656 // processed after the first Tx completes.
658 Future<Object> canCommitFuture = Patterns.ask(shard,
659 new CanCommitTransaction(transactionID2).toSerializable(), timeout);
661 // Send the CommitTransaction message for the first Tx. This should send back an error
662 // and trigger the 2nd Tx to proceed.
664 shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
665 expectMsgClass(duration, akka.actor.Status.Failure.class);
667 // Wait for the 2nd Tx to complete the canCommit phase.
669 final CountDownLatch latch = new CountDownLatch(1);
670 canCommitFuture.onComplete(new OnComplete<Object>() {
672 public void onComplete(Throwable t, Object resp) {
675 }, getSystem().dispatcher());
677 assertEquals("2nd CanCommit complete", true, latch.await(5, TimeUnit.SECONDS));
679 InOrder inOrder = inOrder(cohort1, cohort2);
680 inOrder.verify(cohort1).canCommit();
681 inOrder.verify(cohort1).preCommit();
682 inOrder.verify(cohort1).commit();
683 inOrder.verify(cohort2).canCommit();
688 public void testPreCommitPhaseFailure() throws Throwable {
689 new ShardTestKit(getSystem()) {{
690 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
691 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), shardName());
693 waitUntilLeader(shard);
695 String transactionID = "tx1";
696 MutableCompositeModification modification = new MutableCompositeModification();
697 DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
698 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
699 doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).preCommit();
701 FiniteDuration duration = duration("5 seconds");
703 // Simulate the ForwardedReadyTransaction messages that would be sent
704 // by the ShardTransaction.
706 shard.tell(new ForwardedReadyTransaction(transactionID, cohort, modification, true), getRef());
707 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
709 // Send the CanCommitTransaction message.
711 shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
712 CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
713 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
714 assertEquals("Can commit", true, canCommitReply.getCanCommit());
716 // Send the CommitTransaction message. This should send back an error
717 // for preCommit failure.
719 shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
720 expectMsgClass(duration, akka.actor.Status.Failure.class);
722 InOrder inOrder = inOrder(cohort);
723 inOrder.verify(cohort).canCommit();
724 inOrder.verify(cohort).preCommit();
729 public void testCanCommitPhaseFailure() throws Throwable {
730 new ShardTestKit(getSystem()) {{
731 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
732 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), shardName());
734 waitUntilLeader(shard);
736 final FiniteDuration duration = duration("5 seconds");
738 String transactionID = "tx1";
739 MutableCompositeModification modification = new MutableCompositeModification();
740 DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
741 doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).canCommit();
743 // Simulate the ForwardedReadyTransaction messages that would be sent
744 // by the ShardTransaction.
746 shard.tell(new ForwardedReadyTransaction(transactionID, cohort, modification, true), getRef());
747 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
749 // Send the CanCommitTransaction message.
751 shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
752 expectMsgClass(duration, akka.actor.Status.Failure.class);
757 public void testAbortBeforeFinishCommit() throws Throwable {
758 System.setProperty("shard.persistent", "true");
759 new ShardTestKit(getSystem()) {{
760 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
761 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), shardName());
763 waitUntilLeader(shard);
765 final FiniteDuration duration = duration("5 seconds");
766 final Timeout timeout = new Timeout(duration);
768 InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
770 final String transactionID = "tx1";
771 final CountDownLatch abortComplete = new CountDownLatch(1);
772 Function<DOMStoreThreePhaseCommitCohort,ListenableFuture<Void>> preCommit =
773 new Function<DOMStoreThreePhaseCommitCohort,ListenableFuture<Void>>() {
775 public ListenableFuture<Void> apply(final DOMStoreThreePhaseCommitCohort cohort) {
776 ListenableFuture<Void> preCommitFuture = cohort.preCommit();
778 Future<Object> abortFuture = Patterns.ask(shard,
779 new AbortTransaction(transactionID).toSerializable(), timeout);
780 abortFuture.onComplete(new OnComplete<Object>() {
782 public void onComplete(Throwable e, Object resp) {
783 abortComplete.countDown();
785 }, getSystem().dispatcher());
787 return preCommitFuture;
791 MutableCompositeModification modification = new MutableCompositeModification();
792 DOMStoreThreePhaseCommitCohort cohort = setupMockWriteTransaction("cohort1", dataStore,
793 TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME),
794 modification, preCommit);
796 shard.tell(new ForwardedReadyTransaction(transactionID, cohort, modification, true), getRef());
797 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
799 shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
800 CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
801 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
802 assertEquals("Can commit", true, canCommitReply.getCanCommit());
804 Future<Object> commitFuture = Patterns.ask(shard,
805 new CommitTransaction(transactionID).toSerializable(), timeout);
807 assertEquals("Abort complete", true, abortComplete.await(5, TimeUnit.SECONDS));
809 Await.result(commitFuture, duration);
811 NormalizedNode<?, ?> node = readStore(shard, TestModel.TEST_PATH);
812 assertNotNull(TestModel.TEST_QNAME.getLocalName() + " not found", node);
817 public void testTransactionCommitTimeout() throws Throwable {
818 dataStoreContext = DatastoreContext.newBuilder().shardTransactionCommitTimeoutInSeconds(1).build();
820 new ShardTestKit(getSystem()) {{
821 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
822 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), shardName());
824 waitUntilLeader(shard);
826 final FiniteDuration duration = duration("5 seconds");
828 InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
830 writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
831 writeToStore(shard, TestModel.OUTER_LIST_PATH,
832 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
834 // Create 1st Tx - will timeout
836 String transactionID1 = "tx1";
837 MutableCompositeModification modification1 = new MutableCompositeModification();
838 DOMStoreThreePhaseCommitCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
839 YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
840 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
841 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1),
846 String transactionID2 = "tx3";
847 MutableCompositeModification modification2 = new MutableCompositeModification();
848 YangInstanceIdentifier listNodePath = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
849 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build();
850 DOMStoreThreePhaseCommitCohort cohort2 = setupMockWriteTransaction("cohort3", dataStore,
852 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2),
857 shard.tell(new ForwardedReadyTransaction(transactionID1, cohort1, modification1, true), getRef());
858 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
860 shard.tell(new ForwardedReadyTransaction(transactionID2, cohort2, modification2, true), getRef());
861 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
863 // canCommit 1st Tx. We don't send the commit so it should timeout.
865 shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
866 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
868 // canCommit the 2nd Tx - it should complete after the 1st Tx times out.
870 shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
871 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
873 // Commit the 2nd Tx.
875 shard.tell(new CommitTransaction(transactionID2).toSerializable(), getRef());
876 expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
878 NormalizedNode<?, ?> node = readStore(shard, listNodePath);
879 assertNotNull(listNodePath + " not found", node);
884 public void testTransactionCommitQueueCapacityExceeded() throws Throwable {
885 dataStoreContext = DatastoreContext.newBuilder().shardTransactionCommitQueueCapacity(1).build();
887 new ShardTestKit(getSystem()) {{
888 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
889 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), shardName());
891 waitUntilLeader(shard);
893 final FiniteDuration duration = duration("5 seconds");
895 InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
897 String transactionID1 = "tx1";
898 MutableCompositeModification modification1 = new MutableCompositeModification();
899 DOMStoreThreePhaseCommitCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
900 TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
902 String transactionID2 = "tx2";
903 MutableCompositeModification modification2 = new MutableCompositeModification();
904 DOMStoreThreePhaseCommitCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
905 TestModel.OUTER_LIST_PATH,
906 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
909 String transactionID3 = "tx3";
910 MutableCompositeModification modification3 = new MutableCompositeModification();
911 DOMStoreThreePhaseCommitCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
912 TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification3);
916 shard.tell(new ForwardedReadyTransaction(transactionID1, cohort1, modification1, true), getRef());
917 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
919 shard.tell(new ForwardedReadyTransaction(transactionID2, cohort2, modification2, true), getRef());
920 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
922 shard.tell(new ForwardedReadyTransaction(transactionID3, cohort3, modification3, true), getRef());
923 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
927 shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
928 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
930 // canCommit the 2nd Tx - it should get queued.
932 shard.tell(new CanCommitTransaction(transactionID2).toSerializable(), getRef());
934 // canCommit the 3rd Tx - should exceed queue capacity and fail.
936 shard.tell(new CanCommitTransaction(transactionID3).toSerializable(), getRef());
937 expectMsgClass(duration, akka.actor.Status.Failure.class);
942 public void testCanCommitBeforeReadyFailure() throws Throwable {
943 new ShardTestKit(getSystem()) {{
944 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
945 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), shardName());
947 shard.tell(new CanCommitTransaction("tx").toSerializable(), getRef());
948 expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
953 public void testAbortTransaction() throws Throwable {
954 new ShardTestKit(getSystem()) {{
955 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
956 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), shardName());
958 waitUntilLeader(shard);
960 // Setup 2 simulated transactions with mock cohorts. The first one will be aborted.
962 String transactionID1 = "tx1";
963 MutableCompositeModification modification1 = new MutableCompositeModification();
964 DOMStoreThreePhaseCommitCohort cohort1 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
965 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
966 doReturn(Futures.immediateFuture(null)).when(cohort1).abort();
968 String transactionID2 = "tx2";
969 MutableCompositeModification modification2 = new MutableCompositeModification();
970 DOMStoreThreePhaseCommitCohort cohort2 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort2");
971 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
973 FiniteDuration duration = duration("5 seconds");
974 final Timeout timeout = new Timeout(duration);
976 // Simulate the ForwardedReadyTransaction messages that would be sent
977 // by the ShardTransaction.
979 shard.tell(new ForwardedReadyTransaction(transactionID1, cohort1, modification1, true), getRef());
980 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
982 shard.tell(new ForwardedReadyTransaction(transactionID2, cohort2, modification2, true), getRef());
983 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
985 // Send the CanCommitTransaction message for the first Tx.
987 shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
988 CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
989 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
990 assertEquals("Can commit", true, canCommitReply.getCanCommit());
992 // Send the CanCommitTransaction message for the 2nd Tx. This should get queued and
993 // processed after the first Tx completes.
995 Future<Object> canCommitFuture = Patterns.ask(shard,
996 new CanCommitTransaction(transactionID2).toSerializable(), timeout);
998 // Send the AbortTransaction message for the first Tx. This should trigger the 2nd
1001 shard.tell(new AbortTransaction(transactionID1).toSerializable(), getRef());
1002 expectMsgClass(duration, AbortTransactionReply.SERIALIZABLE_CLASS);
1004 // Wait for the 2nd Tx to complete the canCommit phase.
1006 final CountDownLatch latch = new CountDownLatch(1);
1007 canCommitFuture.onComplete(new OnComplete<Object>() {
1009 public void onComplete(Throwable t, Object resp) {
1012 }, getSystem().dispatcher());
1014 assertEquals("2nd CanCommit complete", true, latch.await(5, TimeUnit.SECONDS));
1016 InOrder inOrder = inOrder(cohort1, cohort2);
1017 inOrder.verify(cohort1).canCommit();
1018 inOrder.verify(cohort2).canCommit();
1023 public void testCreateSnapshot() throws IOException, InterruptedException {
1024 new ShardTestKit(getSystem()) {{
1025 final AtomicReference<CountDownLatch> latch = new AtomicReference<>(new CountDownLatch(1));
1026 Creator<Shard> creator = new Creator<Shard>() {
1028 public Shard create() throws Exception {
1029 return new Shard(IDENTIFIER, Collections.<ShardIdentifier,String>emptyMap(),
1030 dataStoreContext, SCHEMA_CONTEXT) {
1032 public void saveSnapshot(Object snapshot) {
1033 super.saveSnapshot(snapshot);
1034 latch.get().countDown();
1040 TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
1041 Props.create(new DelegatingShardCreator(creator)), "testCreateSnapshot");
1043 waitUntilLeader(shard);
1045 shard.tell(new CaptureSnapshot(-1,-1,-1,-1), getRef());
1047 assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS));
1049 latch.set(new CountDownLatch(1));
1050 shard.tell(new CaptureSnapshot(-1,-1,-1,-1), getRef());
1052 assertEquals("Snapshot saved", true, latch.get().await(5, TimeUnit.SECONDS));
1057 * This test simply verifies that the applySnapShot logic will work
1058 * @throws ReadFailedException
1061 public void testInMemoryDataStoreRestore() throws ReadFailedException {
1062 InMemoryDOMDataStore store = new InMemoryDOMDataStore("test", MoreExecutors.listeningDecorator(
1063 MoreExecutors.sameThreadExecutor()), MoreExecutors.sameThreadExecutor());
1065 store.onGlobalContextUpdated(SCHEMA_CONTEXT);
1067 DOMStoreWriteTransaction putTransaction = store.newWriteOnlyTransaction();
1068 putTransaction.write(TestModel.TEST_PATH,
1069 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1070 commitTransaction(putTransaction);
1073 NormalizedNode expected = readStore(store);
1075 DOMStoreWriteTransaction writeTransaction = store.newWriteOnlyTransaction();
1077 writeTransaction.delete(YangInstanceIdentifier.builder().build());
1078 writeTransaction.write(YangInstanceIdentifier.builder().build(), expected);
1080 commitTransaction(writeTransaction);
1082 NormalizedNode actual = readStore(store);
1084 assertEquals(expected, actual);
1088 private NormalizedNode readStore(InMemoryDOMDataStore store) throws ReadFailedException {
1089 DOMStoreReadTransaction transaction = store.newReadOnlyTransaction();
1090 CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read =
1091 transaction.read(YangInstanceIdentifier.builder().build());
1093 Optional<NormalizedNode<?, ?>> optional = read.checkedGet();
1095 NormalizedNode<?, ?> normalizedNode = optional.get();
1097 transaction.close();
1099 return normalizedNode;
1102 private void commitTransaction(DOMStoreWriteTransaction transaction) {
1103 DOMStoreThreePhaseCommitCohort commitCohort = transaction.ready();
1104 ListenableFuture<Void> future =
1105 commitCohort.preCommit();
1108 future = commitCohort.commit();
1110 } catch (InterruptedException | ExecutionException e) {
1114 private AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> noOpDataChangeListener() {
1115 return new AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>() {
1117 public void onDataChanged(
1118 AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change) {
1124 private NormalizedNode<?,?> readStore(TestActorRef<Shard> shard, YangInstanceIdentifier id)
1125 throws ExecutionException, InterruptedException {
1126 DOMStoreReadTransaction transaction = shard.underlyingActor().getDataStore().newReadOnlyTransaction();
1128 CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> future =
1129 transaction.read(id);
1131 Optional<NormalizedNode<?, ?>> optional = future.get();
1132 NormalizedNode<?, ?> node = optional.isPresent()? optional.get() : null;
1134 transaction.close();
1139 private void writeToStore(TestActorRef<Shard> shard, YangInstanceIdentifier id, NormalizedNode<?,?> node)
1140 throws ExecutionException, InterruptedException {
1141 DOMStoreWriteTransaction transaction = shard.underlyingActor().getDataStore().newWriteOnlyTransaction();
1143 transaction.write(id, node);
1145 DOMStoreThreePhaseCommitCohort commitCohort = transaction.ready();
1146 commitCohort.preCommit().get();
1147 commitCohort.commit().get();
1150 private static final class DelegatingShardCreator implements Creator<Shard> {
1151 private final Creator<Shard> delegate;
1153 DelegatingShardCreator(Creator<Shard> delegate) {
1154 this.delegate = delegate;
1158 public Shard create() throws Exception {
1159 return delegate.create();