3 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
5 * This program and the accompanying materials are made available under the
6 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
7 * and is available at http://www.eclipse.org/legal/epl-v10.html
11 package org.opendaylight.controller.cluster.datastore;
13 import akka.actor.ActorRef;
14 import akka.actor.Props;
15 import akka.testkit.TestActorRef;
17 import com.google.common.util.concurrent.Futures;
18 import com.google.common.util.concurrent.ListeningExecutorService;
19 import com.google.common.util.concurrent.MoreExecutors;
21 import org.junit.BeforeClass;
22 import org.junit.Test;
23 import org.mockito.Mockito;
24 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
25 import org.opendaylight.controller.cluster.datastore.messages.ForwardedCommitTransaction;
26 import org.opendaylight.controller.cluster.datastore.modification.CompositeModification;
27 import org.opendaylight.controller.cluster.datastore.modification.Modification;
28 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
29 import org.opendaylight.controller.md.sal.common.api.data.OptimisticLockFailedException;
30 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
31 import org.opendaylight.controller.protobuff.messages.cohort3pc.ThreePhaseCommitCohortMessages;
32 import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages;
33 import org.opendaylight.controller.protobuff.messages.persistent.PersistentMessages;
34 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
35 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
36 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
38 import scala.concurrent.Await;
39 import scala.concurrent.Future;
40 import scala.concurrent.duration.Duration;
41 import scala.concurrent.duration.FiniteDuration;
43 import java.util.Collections;
44 import java.util.concurrent.TimeUnit;
46 import static org.junit.Assert.assertTrue;
47 import static org.mockito.Mockito.when;
50 public class ThreePhaseCommitCohortFailureTest extends AbstractActorTest {
52 private static ListeningExecutorService storeExecutor =
53 MoreExecutors.listeningDecorator(MoreExecutors.sameThreadExecutor());
55 private static final InMemoryDOMDataStore store =
56 new InMemoryDOMDataStore("OPER", storeExecutor,
57 MoreExecutors.sameThreadExecutor());
59 private static final SchemaContext testSchemaContext =
60 TestModel.createTestContext();
62 private static final ShardIdentifier SHARD_IDENTIFIER =
63 ShardIdentifier.builder().memberName("member-1")
64 .shardName("inventory").type("config").build();
66 private final DatastoreContext datastoreContext = new DatastoreContext();
70 public static void staticSetup() {
71 store.onGlobalContextUpdated(testSchemaContext);
74 private final FiniteDuration ASK_RESULT_DURATION = Duration.create(5000, TimeUnit.MILLISECONDS);
77 @Test(expected = TestException.class)
78 public void testNegativeAbortResultsInException() throws Exception {
80 final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
81 Collections.EMPTY_MAP, datastoreContext));
82 final DOMStoreThreePhaseCommitCohort mockCohort = Mockito
83 .mock(DOMStoreThreePhaseCommitCohort.class);
84 final CompositeModification mockComposite =
85 Mockito.mock(CompositeModification.class);
87 ThreePhaseCommitCohort.props(mockCohort, shard, mockComposite,SHARD_IDENTIFIER.toString());
89 final TestActorRef<ThreePhaseCommitCohort> subject = TestActorRef
90 .create(getSystem(), props,
91 "testNegativeAbortResultsInException");
93 when(mockCohort.abort()).thenReturn(
94 Futures.<Void>immediateFailedFuture(new TestException()));
96 Future<Object> future =
97 akka.pattern.Patterns.ask(subject,
98 ThreePhaseCommitCohortMessages.AbortTransaction.newBuilder()
100 assertTrue(future.isCompleted());
102 Await.result(future, ASK_RESULT_DURATION);
106 @Test(expected = OptimisticLockFailedException.class)
107 public void testNegativeCanCommitResultsInException() throws Exception {
109 final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
110 Collections.EMPTY_MAP, datastoreContext));
111 final DOMStoreThreePhaseCommitCohort mockCohort = Mockito
112 .mock(DOMStoreThreePhaseCommitCohort.class);
113 final CompositeModification mockComposite =
114 Mockito.mock(CompositeModification.class);
116 ThreePhaseCommitCohort.props(mockCohort, shard, mockComposite,SHARD_IDENTIFIER.toString());
118 final TestActorRef<ThreePhaseCommitCohort> subject = TestActorRef
119 .create(getSystem(), props,
120 "testNegativeCanCommitResultsInException");
122 when(mockCohort.canCommit()).thenReturn(
124 .<Boolean>immediateFailedFuture(
125 new OptimisticLockFailedException("some exception")));
127 Future<Object> future =
128 akka.pattern.Patterns.ask(subject,
129 ThreePhaseCommitCohortMessages.CanCommitTransaction.newBuilder()
133 Await.result(future, ASK_RESULT_DURATION);
138 @Test(expected = TestException.class)
139 public void testNegativePreCommitResultsInException() throws Exception {
141 final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
142 Collections.EMPTY_MAP, datastoreContext));
143 final DOMStoreThreePhaseCommitCohort mockCohort = Mockito
144 .mock(DOMStoreThreePhaseCommitCohort.class);
145 final CompositeModification mockComposite =
146 Mockito.mock(CompositeModification.class);
148 ThreePhaseCommitCohort.props(mockCohort, shard, mockComposite,SHARD_IDENTIFIER.toString());
150 final TestActorRef<ThreePhaseCommitCohort> subject = TestActorRef
151 .create(getSystem(), props,
152 "testNegativePreCommitResultsInException");
154 when(mockCohort.preCommit()).thenReturn(
156 .<Void>immediateFailedFuture(
157 new TestException()));
159 Future<Object> future =
160 akka.pattern.Patterns.ask(subject,
161 ThreePhaseCommitCohortMessages.PreCommitTransaction.newBuilder()
164 Await.result(future, ASK_RESULT_DURATION);
168 @Test(expected = TestException.class)
169 public void testNegativeCommitResultsInException() throws Exception {
171 final TestActorRef<Shard> subject = TestActorRef.create(getSystem(),
172 Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP, datastoreContext),
173 "testNegativeCommitResultsInException");
175 final ActorRef shardTransaction =
176 getSystem().actorOf(ShardTransaction.props(store.newReadWriteTransaction(), subject,
177 testSchemaContext, datastoreContext,SHARD_IDENTIFIER.toString()));
179 ShardTransactionMessages.WriteData writeData =
180 ShardTransactionMessages.WriteData.newBuilder()
181 .setInstanceIdentifierPathArguments(
182 NormalizedNodeMessages.InstanceIdentifier.newBuilder()
183 .build()).setNormalizedNode(
184 NormalizedNodeMessages.Node.newBuilder().build()
188 //This is done so that Modification list is updated which is used during commit
190 akka.pattern.Patterns.ask(shardTransaction, writeData, 3000);
192 //ready transaction creates the cohort so that we get into the
193 //block where in commmit is done
194 ShardTransactionMessages.ReadyTransaction readyTransaction =
195 ShardTransactionMessages.ReadyTransaction.newBuilder().build();
198 akka.pattern.Patterns.ask(shardTransaction, readyTransaction, 3000);
200 //but when the message is sent it will have the MockCommit object
201 //so that we can simulate throwing of exception
202 ForwardedCommitTransaction mockForwardCommitTransaction =
203 Mockito.mock(ForwardedCommitTransaction.class);
204 DOMStoreThreePhaseCommitCohort mockThreePhaseCommitTransaction =
205 Mockito.mock(DOMStoreThreePhaseCommitCohort.class);
206 when(mockForwardCommitTransaction.getCohort())
207 .thenReturn(mockThreePhaseCommitTransaction);
208 when(mockThreePhaseCommitTransaction.commit()).thenReturn(Futures
209 .<Void>immediateFailedFuture(
210 new TestException()));
211 Modification mockModification = Mockito.mock(
213 when(mockForwardCommitTransaction.getModification())
214 .thenReturn(mockModification);
216 when(mockModification.toSerializable()).thenReturn(
217 PersistentMessages.CompositeModification.newBuilder().build());
220 akka.pattern.Patterns.ask(subject,
221 mockForwardCommitTransaction
223 Await.result(future, ASK_RESULT_DURATION);
226 private class TestException extends Exception {