3 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
5 * This program and the accompanying materials are made available under the
6 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
7 * and is available at http://www.eclipse.org/legal/epl-v10.html
11 package org.opendaylight.controller.cluster.datastore;
13 import akka.actor.ActorRef;
14 import akka.actor.Props;
15 import akka.testkit.TestActorRef;
16 import akka.util.Timeout;
18 import com.google.common.util.concurrent.Futures;
19 import com.google.common.util.concurrent.ListeningExecutorService;
20 import com.google.common.util.concurrent.MoreExecutors;
22 import org.junit.BeforeClass;
23 import org.junit.Test;
24 import org.mockito.Mockito;
25 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
26 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
27 import org.opendaylight.controller.cluster.datastore.messages.ForwardedCommitTransaction;
28 import org.opendaylight.controller.cluster.datastore.modification.CompositeModification;
29 import org.opendaylight.controller.cluster.datastore.modification.Modification;
30 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
31 import org.opendaylight.controller.md.sal.common.api.data.OptimisticLockFailedException;
32 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
33 import org.opendaylight.controller.protobuff.messages.cohort3pc.ThreePhaseCommitCohortMessages;
34 import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages;
35 import org.opendaylight.controller.protobuff.messages.persistent.PersistentMessages;
36 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
37 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
38 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
40 import scala.concurrent.Await;
41 import scala.concurrent.Future;
42 import scala.concurrent.duration.Duration;
43 import scala.concurrent.duration.FiniteDuration;
45 import java.util.Collections;
46 import java.util.concurrent.TimeUnit;
48 import static org.junit.Assert.assertTrue;
49 import static org.mockito.Mockito.when;
52 public class ThreePhaseCommitCohortFailureTest extends AbstractActorTest {
54 private static ListeningExecutorService storeExecutor =
55 MoreExecutors.listeningDecorator(MoreExecutors.sameThreadExecutor());
57 private static final InMemoryDOMDataStore store =
58 new InMemoryDOMDataStore("OPER", storeExecutor,
59 MoreExecutors.sameThreadExecutor());
61 private static final SchemaContext testSchemaContext =
62 TestModel.createTestContext();
64 private static final ShardIdentifier SHARD_IDENTIFIER =
65 ShardIdentifier.builder().memberName("member-1")
66 .shardName("inventory").type("config").build();
68 private final DatastoreContext datastoreContext = new DatastoreContext();
70 private final ShardStats shardStats = new ShardStats(SHARD_IDENTIFIER.toString(), "DataStore");
73 public static void staticSetup() {
74 store.onGlobalContextUpdated(testSchemaContext);
77 private final FiniteDuration ASK_RESULT_DURATION = Duration.create(5000, TimeUnit.MILLISECONDS);
79 private ActorRef createShard(){
80 return getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP, datastoreContext, TestModel.createTestContext()));
83 @Test(expected = TestException.class)
84 public void testNegativeAbortResultsInException() throws Exception {
86 final ActorRef shard = createShard();
87 final DOMStoreThreePhaseCommitCohort mockCohort = Mockito
88 .mock(DOMStoreThreePhaseCommitCohort.class);
89 final CompositeModification mockComposite =
90 Mockito.mock(CompositeModification.class);
92 ThreePhaseCommitCohort.props(mockCohort, shard, mockComposite, shardStats);
94 final TestActorRef<ThreePhaseCommitCohort> subject = TestActorRef
95 .create(getSystem(), props,
96 "testNegativeAbortResultsInException");
98 when(mockCohort.abort()).thenReturn(
99 Futures.<Void>immediateFailedFuture(new TestException()));
101 Future<Object> future =
102 akka.pattern.Patterns.ask(subject,
103 ThreePhaseCommitCohortMessages.AbortTransaction.newBuilder()
105 assertTrue(future.isCompleted());
107 Await.result(future, ASK_RESULT_DURATION);
111 @Test(expected = OptimisticLockFailedException.class)
112 public void testNegativeCanCommitResultsInException() throws Exception {
114 final ActorRef shard = createShard();
115 final DOMStoreThreePhaseCommitCohort mockCohort = Mockito
116 .mock(DOMStoreThreePhaseCommitCohort.class);
117 final CompositeModification mockComposite =
118 Mockito.mock(CompositeModification.class);
120 ThreePhaseCommitCohort.props(mockCohort, shard, mockComposite, shardStats);
122 final TestActorRef<ThreePhaseCommitCohort> subject = TestActorRef
123 .create(getSystem(), props,
124 "testNegativeCanCommitResultsInException");
126 when(mockCohort.canCommit()).thenReturn(
128 .<Boolean>immediateFailedFuture(
129 new OptimisticLockFailedException("some exception")));
131 Future<Object> future =
132 akka.pattern.Patterns.ask(subject,
133 ThreePhaseCommitCohortMessages.CanCommitTransaction.newBuilder()
137 Await.result(future, ASK_RESULT_DURATION);
142 @Test(expected = TestException.class)
143 public void testNegativePreCommitResultsInException() throws Exception {
145 final ActorRef shard = createShard();
146 final DOMStoreThreePhaseCommitCohort mockCohort = Mockito
147 .mock(DOMStoreThreePhaseCommitCohort.class);
148 final CompositeModification mockComposite =
149 Mockito.mock(CompositeModification.class);
151 ThreePhaseCommitCohort.props(mockCohort, shard, mockComposite, shardStats);
153 final TestActorRef<ThreePhaseCommitCohort> subject = TestActorRef
154 .create(getSystem(), props,
155 "testNegativePreCommitResultsInException");
157 when(mockCohort.preCommit()).thenReturn(
159 .<Void>immediateFailedFuture(
160 new TestException()));
162 Future<Object> future =
163 akka.pattern.Patterns.ask(subject,
164 ThreePhaseCommitCohortMessages.PreCommitTransaction.newBuilder()
167 Await.result(future, ASK_RESULT_DURATION);
171 @Test(expected = TestException.class)
172 public void testNegativeCommitResultsInException() throws Exception {
174 final TestActorRef<Shard> subject = TestActorRef.create(getSystem(),
175 Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP, datastoreContext, TestModel.createTestContext()),
176 "testNegativeCommitResultsInException");
178 final ActorRef shardTransaction =
179 getSystem().actorOf(ShardTransaction.props(store.newReadWriteTransaction(), subject,
180 testSchemaContext, datastoreContext, shardStats));
182 ShardTransactionMessages.WriteData writeData =
183 ShardTransactionMessages.WriteData.newBuilder()
184 .setInstanceIdentifierPathArguments(
185 NormalizedNodeMessages.InstanceIdentifier.newBuilder()
186 .build()).setNormalizedNode(
187 NormalizedNodeMessages.Node.newBuilder().build()
191 Timeout askTimeout = new Timeout(ASK_RESULT_DURATION);
193 //This is done so that Modification list is updated which is used during commit
194 Future<Object> future = akka.pattern.Patterns.ask(shardTransaction, writeData, askTimeout);
196 //ready transaction creates the cohort so that we get into the
197 //block where in commmit is done
198 ShardTransactionMessages.ReadyTransaction readyTransaction =
199 ShardTransactionMessages.ReadyTransaction.newBuilder().build();
201 future = akka.pattern.Patterns.ask(shardTransaction, readyTransaction, askTimeout);
203 //but when the message is sent it will have the MockCommit object
204 //so that we can simulate throwing of exception
205 ForwardedCommitTransaction mockForwardCommitTransaction =
206 Mockito.mock(ForwardedCommitTransaction.class);
207 DOMStoreThreePhaseCommitCohort mockThreePhaseCommitTransaction =
208 Mockito.mock(DOMStoreThreePhaseCommitCohort.class);
209 when(mockForwardCommitTransaction.getCohort())
210 .thenReturn(mockThreePhaseCommitTransaction);
211 when(mockThreePhaseCommitTransaction.commit()).thenReturn(Futures
212 .<Void>immediateFailedFuture(
213 new TestException()));
214 Modification mockModification = Mockito.mock(
216 when(mockForwardCommitTransaction.getModification())
217 .thenReturn(mockModification);
219 when(mockModification.toSerializable()).thenReturn(
220 PersistentMessages.CompositeModification.newBuilder().build());
222 future = akka.pattern.Patterns.ask(subject, mockForwardCommitTransaction, askTimeout);
223 Await.result(future, ASK_RESULT_DURATION);
226 private class TestException extends Exception {