3 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
5 * This program and the accompanying materials are made available under the
6 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
7 * and is available at http://www.eclipse.org/legal/epl-v10.html
11 package org.opendaylight.controller.cluster.datastore;
13 import akka.actor.ActorRef;
14 import akka.actor.Props;
15 import akka.testkit.TestActorRef;
17 import com.google.common.util.concurrent.Futures;
18 import com.google.common.util.concurrent.ListeningExecutorService;
19 import com.google.common.util.concurrent.MoreExecutors;
21 import org.junit.BeforeClass;
22 import org.junit.Test;
23 import org.mockito.Mockito;
24 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
25 import org.opendaylight.controller.cluster.datastore.messages.ForwardedCommitTransaction;
26 import org.opendaylight.controller.cluster.datastore.modification.CompositeModification;
27 import org.opendaylight.controller.cluster.datastore.modification.Modification;
28 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
29 import org.opendaylight.controller.md.sal.common.api.data.OptimisticLockFailedException;
30 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
31 import org.opendaylight.controller.protobuff.messages.cohort3pc.ThreePhaseCommitCohortMessages;
32 import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages;
33 import org.opendaylight.controller.protobuff.messages.persistent.PersistentMessages;
34 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
35 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
36 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
38 import scala.concurrent.Await;
39 import scala.concurrent.Future;
40 import scala.concurrent.duration.Duration;
41 import scala.concurrent.duration.FiniteDuration;
43 import java.util.Collections;
44 import java.util.concurrent.TimeUnit;
46 import static org.junit.Assert.assertTrue;
47 import static org.mockito.Mockito.when;
50 public class ThreePhaseCommitCohortFailureTest extends AbstractActorTest {
52 private static ListeningExecutorService storeExecutor =
53 MoreExecutors.listeningDecorator(MoreExecutors.sameThreadExecutor());
55 private static final InMemoryDOMDataStore store =
56 new InMemoryDOMDataStore("OPER", storeExecutor,
57 MoreExecutors.sameThreadExecutor());
59 private static final SchemaContext testSchemaContext =
60 TestModel.createTestContext();
62 private static final ShardIdentifier SHARD_IDENTIFIER =
63 ShardIdentifier.builder().memberName("member-1")
64 .shardName("inventory").type("config").build();
66 private final ShardContext shardContext = new ShardContext();
69 public static void staticSetup() {
70 store.onGlobalContextUpdated(testSchemaContext);
73 private final FiniteDuration ASK_RESULT_DURATION = Duration.create(5000, TimeUnit.MILLISECONDS);
76 @Test(expected = TestException.class)
77 public void testNegativeAbortResultsInException() throws Exception {
79 final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
80 Collections.EMPTY_MAP, shardContext));
81 final DOMStoreThreePhaseCommitCohort mockCohort = Mockito
82 .mock(DOMStoreThreePhaseCommitCohort.class);
83 final CompositeModification mockComposite =
84 Mockito.mock(CompositeModification.class);
86 ThreePhaseCommitCohort.props(mockCohort, shard, mockComposite);
88 final TestActorRef<ThreePhaseCommitCohort> subject = TestActorRef
89 .create(getSystem(), props,
90 "testNegativeAbortResultsInException");
92 when(mockCohort.abort()).thenReturn(
93 Futures.<Void>immediateFailedFuture(new TestException()));
95 Future<Object> future =
96 akka.pattern.Patterns.ask(subject,
97 ThreePhaseCommitCohortMessages.AbortTransaction.newBuilder()
99 assertTrue(future.isCompleted());
101 Await.result(future, ASK_RESULT_DURATION);
105 @Test(expected = OptimisticLockFailedException.class)
106 public void testNegativeCanCommitResultsInException() throws Exception {
108 final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
109 Collections.EMPTY_MAP, shardContext));
110 final DOMStoreThreePhaseCommitCohort mockCohort = Mockito
111 .mock(DOMStoreThreePhaseCommitCohort.class);
112 final CompositeModification mockComposite =
113 Mockito.mock(CompositeModification.class);
115 ThreePhaseCommitCohort.props(mockCohort, shard, mockComposite);
117 final TestActorRef<ThreePhaseCommitCohort> subject = TestActorRef
118 .create(getSystem(), props,
119 "testNegativeCanCommitResultsInException");
121 when(mockCohort.canCommit()).thenReturn(
123 .<Boolean>immediateFailedFuture(
124 new OptimisticLockFailedException("some exception")));
126 Future<Object> future =
127 akka.pattern.Patterns.ask(subject,
128 ThreePhaseCommitCohortMessages.CanCommitTransaction.newBuilder()
132 Await.result(future, ASK_RESULT_DURATION);
137 @Test(expected = TestException.class)
138 public void testNegativePreCommitResultsInException() throws Exception {
140 final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
141 Collections.EMPTY_MAP, shardContext));
142 final DOMStoreThreePhaseCommitCohort mockCohort = Mockito
143 .mock(DOMStoreThreePhaseCommitCohort.class);
144 final CompositeModification mockComposite =
145 Mockito.mock(CompositeModification.class);
147 ThreePhaseCommitCohort.props(mockCohort, shard, mockComposite);
149 final TestActorRef<ThreePhaseCommitCohort> subject = TestActorRef
150 .create(getSystem(), props,
151 "testNegativePreCommitResultsInException");
153 when(mockCohort.preCommit()).thenReturn(
155 .<Void>immediateFailedFuture(
156 new TestException()));
158 Future<Object> future =
159 akka.pattern.Patterns.ask(subject,
160 ThreePhaseCommitCohortMessages.PreCommitTransaction.newBuilder()
163 Await.result(future, ASK_RESULT_DURATION);
167 @Test(expected = TestException.class)
168 public void testNegativeCommitResultsInException() throws Exception {
170 final TestActorRef<Shard> subject = TestActorRef.create(getSystem(),
171 Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP, shardContext),
172 "testNegativeCommitResultsInException");
174 final ActorRef shardTransaction =
175 getSystem().actorOf(ShardTransaction.props(store.newReadWriteTransaction(), subject,
176 testSchemaContext, shardContext));
178 ShardTransactionMessages.WriteData writeData =
179 ShardTransactionMessages.WriteData.newBuilder()
180 .setInstanceIdentifierPathArguments(
181 NormalizedNodeMessages.InstanceIdentifier.newBuilder()
182 .build()).setNormalizedNode(
183 NormalizedNodeMessages.Node.newBuilder().build()
187 //This is done so that Modification list is updated which is used during commit
189 akka.pattern.Patterns.ask(shardTransaction, writeData, 3000);
191 //ready transaction creates the cohort so that we get into the
192 //block where in commmit is done
193 ShardTransactionMessages.ReadyTransaction readyTransaction =
194 ShardTransactionMessages.ReadyTransaction.newBuilder().build();
197 akka.pattern.Patterns.ask(shardTransaction, readyTransaction, 3000);
199 //but when the message is sent it will have the MockCommit object
200 //so that we can simulate throwing of exception
201 ForwardedCommitTransaction mockForwardCommitTransaction =
202 Mockito.mock(ForwardedCommitTransaction.class);
203 DOMStoreThreePhaseCommitCohort mockThreePhaseCommitTransaction =
204 Mockito.mock(DOMStoreThreePhaseCommitCohort.class);
205 when(mockForwardCommitTransaction.getCohort())
206 .thenReturn(mockThreePhaseCommitTransaction);
207 when(mockThreePhaseCommitTransaction.commit()).thenReturn(Futures
208 .<Void>immediateFailedFuture(
209 new TestException()));
210 Modification mockModification = Mockito.mock(
212 when(mockForwardCommitTransaction.getModification())
213 .thenReturn(mockModification);
215 when(mockModification.toSerializable()).thenReturn(
216 PersistentMessages.CompositeModification.newBuilder().build());
219 akka.pattern.Patterns.ask(subject,
220 mockForwardCommitTransaction
222 Await.result(future, ASK_RESULT_DURATION);
225 private class TestException extends Exception {