3 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
5 * This program and the accompanying materials are made available under the
6 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
7 * and is available at http://www.eclipse.org/legal/epl-v10.html
11 package org.opendaylight.controller.cluster.datastore;
13 import akka.actor.ActorRef;
14 import akka.actor.Props;
15 import akka.testkit.TestActorRef;
16 import akka.util.Timeout;
18 import com.google.common.util.concurrent.Futures;
19 import com.google.common.util.concurrent.ListeningExecutorService;
20 import com.google.common.util.concurrent.MoreExecutors;
22 import org.junit.BeforeClass;
23 import org.junit.Test;
24 import org.mockito.Mockito;
25 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
26 import org.opendaylight.controller.cluster.datastore.messages.ForwardedCommitTransaction;
27 import org.opendaylight.controller.cluster.datastore.modification.CompositeModification;
28 import org.opendaylight.controller.cluster.datastore.modification.Modification;
29 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
30 import org.opendaylight.controller.md.sal.common.api.data.OptimisticLockFailedException;
31 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
32 import org.opendaylight.controller.protobuff.messages.cohort3pc.ThreePhaseCommitCohortMessages;
33 import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages;
34 import org.opendaylight.controller.protobuff.messages.persistent.PersistentMessages;
35 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
36 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
37 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
39 import scala.concurrent.Await;
40 import scala.concurrent.Future;
41 import scala.concurrent.duration.Duration;
42 import scala.concurrent.duration.FiniteDuration;
44 import java.util.Collections;
45 import java.util.concurrent.TimeUnit;
47 import static org.junit.Assert.assertTrue;
48 import static org.mockito.Mockito.when;
51 public class ThreePhaseCommitCohortFailureTest extends AbstractActorTest {
53 private static ListeningExecutorService storeExecutor =
54 MoreExecutors.listeningDecorator(MoreExecutors.sameThreadExecutor());
56 private static final InMemoryDOMDataStore store =
57 new InMemoryDOMDataStore("OPER", storeExecutor,
58 MoreExecutors.sameThreadExecutor());
60 private static final SchemaContext testSchemaContext =
61 TestModel.createTestContext();
63 private static final ShardIdentifier SHARD_IDENTIFIER =
64 ShardIdentifier.builder().memberName("member-1")
65 .shardName("inventory").type("config").build();
67 private final DatastoreContext datastoreContext = new DatastoreContext();
71 public static void staticSetup() {
72 store.onGlobalContextUpdated(testSchemaContext);
75 private final FiniteDuration ASK_RESULT_DURATION = Duration.create(5000, TimeUnit.MILLISECONDS);
78 @Test(expected = TestException.class)
79 public void testNegativeAbortResultsInException() throws Exception {
81 final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
82 Collections.EMPTY_MAP, datastoreContext));
83 final DOMStoreThreePhaseCommitCohort mockCohort = Mockito
84 .mock(DOMStoreThreePhaseCommitCohort.class);
85 final CompositeModification mockComposite =
86 Mockito.mock(CompositeModification.class);
88 ThreePhaseCommitCohort.props(mockCohort, shard, mockComposite,SHARD_IDENTIFIER.toString());
90 final TestActorRef<ThreePhaseCommitCohort> subject = TestActorRef
91 .create(getSystem(), props,
92 "testNegativeAbortResultsInException");
94 when(mockCohort.abort()).thenReturn(
95 Futures.<Void>immediateFailedFuture(new TestException()));
97 Future<Object> future =
98 akka.pattern.Patterns.ask(subject,
99 ThreePhaseCommitCohortMessages.AbortTransaction.newBuilder()
101 assertTrue(future.isCompleted());
103 Await.result(future, ASK_RESULT_DURATION);
107 @Test(expected = OptimisticLockFailedException.class)
108 public void testNegativeCanCommitResultsInException() throws Exception {
110 final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
111 Collections.EMPTY_MAP, datastoreContext));
112 final DOMStoreThreePhaseCommitCohort mockCohort = Mockito
113 .mock(DOMStoreThreePhaseCommitCohort.class);
114 final CompositeModification mockComposite =
115 Mockito.mock(CompositeModification.class);
117 ThreePhaseCommitCohort.props(mockCohort, shard, mockComposite,SHARD_IDENTIFIER.toString());
119 final TestActorRef<ThreePhaseCommitCohort> subject = TestActorRef
120 .create(getSystem(), props,
121 "testNegativeCanCommitResultsInException");
123 when(mockCohort.canCommit()).thenReturn(
125 .<Boolean>immediateFailedFuture(
126 new OptimisticLockFailedException("some exception")));
128 Future<Object> future =
129 akka.pattern.Patterns.ask(subject,
130 ThreePhaseCommitCohortMessages.CanCommitTransaction.newBuilder()
134 Await.result(future, ASK_RESULT_DURATION);
139 @Test(expected = TestException.class)
140 public void testNegativePreCommitResultsInException() throws Exception {
142 final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
143 Collections.EMPTY_MAP, datastoreContext));
144 final DOMStoreThreePhaseCommitCohort mockCohort = Mockito
145 .mock(DOMStoreThreePhaseCommitCohort.class);
146 final CompositeModification mockComposite =
147 Mockito.mock(CompositeModification.class);
149 ThreePhaseCommitCohort.props(mockCohort, shard, mockComposite,SHARD_IDENTIFIER.toString());
151 final TestActorRef<ThreePhaseCommitCohort> subject = TestActorRef
152 .create(getSystem(), props,
153 "testNegativePreCommitResultsInException");
155 when(mockCohort.preCommit()).thenReturn(
157 .<Void>immediateFailedFuture(
158 new TestException()));
160 Future<Object> future =
161 akka.pattern.Patterns.ask(subject,
162 ThreePhaseCommitCohortMessages.PreCommitTransaction.newBuilder()
165 Await.result(future, ASK_RESULT_DURATION);
169 @Test(expected = TestException.class)
170 public void testNegativeCommitResultsInException() throws Exception {
172 final TestActorRef<Shard> subject = TestActorRef.create(getSystem(),
173 Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP, datastoreContext),
174 "testNegativeCommitResultsInException");
176 final ActorRef shardTransaction =
177 getSystem().actorOf(ShardTransaction.props(store.newReadWriteTransaction(), subject,
178 testSchemaContext, datastoreContext,SHARD_IDENTIFIER.toString()));
180 ShardTransactionMessages.WriteData writeData =
181 ShardTransactionMessages.WriteData.newBuilder()
182 .setInstanceIdentifierPathArguments(
183 NormalizedNodeMessages.InstanceIdentifier.newBuilder()
184 .build()).setNormalizedNode(
185 NormalizedNodeMessages.Node.newBuilder().build()
189 Timeout askTimeout = new Timeout(ASK_RESULT_DURATION);
191 //This is done so that Modification list is updated which is used during commit
192 Future<Object> future = akka.pattern.Patterns.ask(shardTransaction, writeData, askTimeout);
194 //ready transaction creates the cohort so that we get into the
195 //block where in commmit is done
196 ShardTransactionMessages.ReadyTransaction readyTransaction =
197 ShardTransactionMessages.ReadyTransaction.newBuilder().build();
199 future = akka.pattern.Patterns.ask(shardTransaction, readyTransaction, askTimeout);
201 //but when the message is sent it will have the MockCommit object
202 //so that we can simulate throwing of exception
203 ForwardedCommitTransaction mockForwardCommitTransaction =
204 Mockito.mock(ForwardedCommitTransaction.class);
205 DOMStoreThreePhaseCommitCohort mockThreePhaseCommitTransaction =
206 Mockito.mock(DOMStoreThreePhaseCommitCohort.class);
207 when(mockForwardCommitTransaction.getCohort())
208 .thenReturn(mockThreePhaseCommitTransaction);
209 when(mockThreePhaseCommitTransaction.commit()).thenReturn(Futures
210 .<Void>immediateFailedFuture(
211 new TestException()));
212 Modification mockModification = Mockito.mock(
214 when(mockForwardCommitTransaction.getModification())
215 .thenReturn(mockModification);
217 when(mockModification.toSerializable()).thenReturn(
218 PersistentMessages.CompositeModification.newBuilder().build());
220 future = akka.pattern.Patterns.ask(subject, mockForwardCommitTransaction, askTimeout);
221 Await.result(future, ASK_RESULT_DURATION);
224 private class TestException extends Exception {