3 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
5 * This program and the accompanying materials are made available under the
6 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
7 * and is available at http://www.eclipse.org/legal/epl-v10.html
11 package org.opendaylight.controller.cluster.datastore;
13 import akka.actor.ActorRef;
14 import akka.actor.Props;
15 import akka.testkit.TestActorRef;
16 import com.google.common.util.concurrent.Futures;
17 import com.google.common.util.concurrent.ListeningExecutorService;
18 import com.google.common.util.concurrent.MoreExecutors;
19 import org.junit.Test;
20 import org.mockito.Mockito;
21 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
22 import org.opendaylight.controller.cluster.datastore.messages.ForwardedCommitTransaction;
23 import org.opendaylight.controller.cluster.datastore.modification.CompositeModification;
24 import org.opendaylight.controller.cluster.datastore.modification.Modification;
25 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
26 import org.opendaylight.controller.md.sal.common.api.data.OptimisticLockFailedException;
27 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
28 import org.opendaylight.controller.protobuff.messages.cohort3pc.ThreePhaseCommitCohortMessages;
29 import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages;
30 import org.opendaylight.controller.protobuff.messages.persistent.PersistentMessages;
31 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
32 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
33 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
34 import scala.concurrent.Await;
35 import scala.concurrent.Future;
36 import scala.concurrent.duration.Duration;
37 import scala.concurrent.duration.FiniteDuration;
39 import java.util.Collections;
40 import java.util.concurrent.TimeUnit;
42 import static org.junit.Assert.assertTrue;
43 import static org.mockito.Mockito.when;
46 public class ThreePhaseCommitCohortFailureTest extends AbstractActorTest {
48 private static ListeningExecutorService storeExecutor =
49 MoreExecutors.listeningDecorator(MoreExecutors.sameThreadExecutor());
51 private static final InMemoryDOMDataStore store =
52 new InMemoryDOMDataStore("OPER", storeExecutor,
53 MoreExecutors.sameThreadExecutor());
55 private static final SchemaContext testSchemaContext =
56 TestModel.createTestContext();
58 private static final ShardIdentifier SHARD_IDENTIFIER =
59 ShardIdentifier.builder().memberName("member-1")
60 .shardName("inventory").type("config").build();
63 store.onGlobalContextUpdated(testSchemaContext);
66 private FiniteDuration ASK_RESULT_DURATION = Duration.create(3000, TimeUnit.MILLISECONDS);
69 @Test(expected = TestException.class)
70 public void testNegativeAbortResultsInException() throws Exception {
72 final ActorRef shard =
74 .actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP,null));
75 final DOMStoreThreePhaseCommitCohort mockCohort = Mockito
76 .mock(DOMStoreThreePhaseCommitCohort.class);
77 final CompositeModification mockComposite =
78 Mockito.mock(CompositeModification.class);
80 ThreePhaseCommitCohort.props(mockCohort, shard, mockComposite);
82 final TestActorRef<ThreePhaseCommitCohort> subject = TestActorRef
83 .create(getSystem(), props,
84 "testNegativeAbortResultsInException");
86 when(mockCohort.abort()).thenReturn(
87 Futures.<Void>immediateFailedFuture(new TestException()));
89 Future<Object> future =
90 akka.pattern.Patterns.ask(subject,
91 ThreePhaseCommitCohortMessages.AbortTransaction.newBuilder()
93 assertTrue(future.isCompleted());
95 Await.result(future, ASK_RESULT_DURATION);
102 @Test(expected = OptimisticLockFailedException.class)
103 public void testNegativeCanCommitResultsInException() throws Exception {
105 final ActorRef shard =
107 .actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP,null));
108 final DOMStoreThreePhaseCommitCohort mockCohort = Mockito
109 .mock(DOMStoreThreePhaseCommitCohort.class);
110 final CompositeModification mockComposite =
111 Mockito.mock(CompositeModification.class);
113 ThreePhaseCommitCohort.props(mockCohort, shard, mockComposite);
115 final TestActorRef<ThreePhaseCommitCohort> subject = TestActorRef
116 .create(getSystem(), props,
117 "testNegativeCanCommitResultsInException");
119 when(mockCohort.canCommit()).thenReturn(
121 .<Boolean>immediateFailedFuture(
122 new OptimisticLockFailedException("some exception")));
124 Future<Object> future =
125 akka.pattern.Patterns.ask(subject,
126 ThreePhaseCommitCohortMessages.CanCommitTransaction.newBuilder()
130 Await.result(future, ASK_RESULT_DURATION);
135 @Test(expected = TestException.class)
136 public void testNegativePreCommitResultsInException() throws Exception {
138 final ActorRef shard =
140 .actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP,null));
141 final DOMStoreThreePhaseCommitCohort mockCohort = Mockito
142 .mock(DOMStoreThreePhaseCommitCohort.class);
143 final CompositeModification mockComposite =
144 Mockito.mock(CompositeModification.class);
146 ThreePhaseCommitCohort.props(mockCohort, shard, mockComposite);
148 final TestActorRef<ThreePhaseCommitCohort> subject = TestActorRef
149 .create(getSystem(), props,
150 "testNegativePreCommitResultsInException");
152 when(mockCohort.preCommit()).thenReturn(
154 .<Void>immediateFailedFuture(
155 new TestException()));
157 Future<Object> future =
158 akka.pattern.Patterns.ask(subject,
159 ThreePhaseCommitCohortMessages.PreCommitTransaction.newBuilder()
162 Await.result(future, ASK_RESULT_DURATION);
166 @Test(expected = TestException.class)
167 public void testNegativeCommitResultsInException() throws Exception {
169 final TestActorRef<Shard> subject = TestActorRef
171 Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP,null),
172 "testNegativeCommitResultsInException");
174 final ActorRef shardTransaction =
176 ShardTransaction.props(store.newReadWriteTransaction(), subject,
177 TestModel.createTestContext()));
179 ShardTransactionMessages.WriteData writeData =
180 ShardTransactionMessages.WriteData.newBuilder()
181 .setInstanceIdentifierPathArguments(
182 NormalizedNodeMessages.InstanceIdentifier.newBuilder()
183 .build()).setNormalizedNode(
184 NormalizedNodeMessages.Node.newBuilder().build()
188 //This is done so that Modification list is updated which is used during commit
190 akka.pattern.Patterns.ask(shardTransaction, writeData, 3000);
192 //ready transaction creates the cohort so that we get into the
193 //block where in commmit is done
194 ShardTransactionMessages.ReadyTransaction readyTransaction =
195 ShardTransactionMessages.ReadyTransaction.newBuilder().build();
198 akka.pattern.Patterns.ask(shardTransaction, readyTransaction, 3000);
200 //but when the message is sent it will have the MockCommit object
201 //so that we can simulate throwing of exception
202 ForwardedCommitTransaction mockForwardCommitTransaction =
203 Mockito.mock(ForwardedCommitTransaction.class);
204 DOMStoreThreePhaseCommitCohort mockThreePhaseCommitTransaction =
205 Mockito.mock(DOMStoreThreePhaseCommitCohort.class);
206 when(mockForwardCommitTransaction.getCohort())
207 .thenReturn(mockThreePhaseCommitTransaction);
208 when(mockThreePhaseCommitTransaction.commit()).thenReturn(Futures
209 .<Void>immediateFailedFuture(
210 new TestException()));
211 Modification mockModification = Mockito.mock(
213 when(mockForwardCommitTransaction.getModification())
214 .thenReturn(mockModification);
216 when(mockModification.toSerializable()).thenReturn(
217 PersistentMessages.CompositeModification.newBuilder().build());
220 akka.pattern.Patterns.ask(subject,
221 mockForwardCommitTransaction
223 Await.result(future, ASK_RESULT_DURATION);
228 private class TestException extends Exception {