3 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
5 * This program and the accompanying materials are made available under the
6 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
7 * and is available at http://www.eclipse.org/legal/epl-v10.html
11 package org.opendaylight.controller.cluster.datastore;
13 import akka.actor.ActorRef;
14 import akka.actor.Props;
15 import akka.testkit.TestActorRef;
16 import akka.util.Timeout;
18 import com.google.common.util.concurrent.Futures;
19 import com.google.common.util.concurrent.ListeningExecutorService;
20 import com.google.common.util.concurrent.MoreExecutors;
22 import org.junit.BeforeClass;
23 import org.junit.Test;
24 import org.mockito.Mockito;
25 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
26 import org.opendaylight.controller.cluster.datastore.messages.ForwardedCommitTransaction;
27 import org.opendaylight.controller.cluster.datastore.modification.CompositeModification;
28 import org.opendaylight.controller.cluster.datastore.modification.Modification;
29 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
30 import org.opendaylight.controller.md.sal.common.api.data.OptimisticLockFailedException;
31 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
32 import org.opendaylight.controller.protobuff.messages.cohort3pc.ThreePhaseCommitCohortMessages;
33 import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages;
34 import org.opendaylight.controller.protobuff.messages.persistent.PersistentMessages;
35 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
36 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
37 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
39 import scala.concurrent.Await;
40 import scala.concurrent.Future;
41 import scala.concurrent.duration.Duration;
42 import scala.concurrent.duration.FiniteDuration;
44 import java.util.Collections;
45 import java.util.concurrent.TimeUnit;
47 import static org.junit.Assert.assertTrue;
48 import static org.mockito.Mockito.when;
51 public class ThreePhaseCommitCohortFailureTest extends AbstractActorTest {
53 private static ListeningExecutorService storeExecutor =
54 MoreExecutors.listeningDecorator(MoreExecutors.sameThreadExecutor());
56 private static final InMemoryDOMDataStore store =
57 new InMemoryDOMDataStore("OPER", storeExecutor,
58 MoreExecutors.sameThreadExecutor());
60 private static final SchemaContext testSchemaContext =
61 TestModel.createTestContext();
63 private static final ShardIdentifier SHARD_IDENTIFIER =
64 ShardIdentifier.builder().memberName("member-1")
65 .shardName("inventory").type("config").build();
67 private final DatastoreContext datastoreContext = new DatastoreContext();
71 public static void staticSetup() {
72 store.onGlobalContextUpdated(testSchemaContext);
75 private final FiniteDuration ASK_RESULT_DURATION = Duration.create(5000, TimeUnit.MILLISECONDS);
77 private ActorRef createShard(){
78 return getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP, datastoreContext, TestModel.createTestContext()));
81 @Test(expected = TestException.class)
82 public void testNegativeAbortResultsInException() throws Exception {
84 final ActorRef shard = createShard();
85 final DOMStoreThreePhaseCommitCohort mockCohort = Mockito
86 .mock(DOMStoreThreePhaseCommitCohort.class);
87 final CompositeModification mockComposite =
88 Mockito.mock(CompositeModification.class);
90 ThreePhaseCommitCohort.props(mockCohort, shard, mockComposite,SHARD_IDENTIFIER.toString());
92 final TestActorRef<ThreePhaseCommitCohort> subject = TestActorRef
93 .create(getSystem(), props,
94 "testNegativeAbortResultsInException");
96 when(mockCohort.abort()).thenReturn(
97 Futures.<Void>immediateFailedFuture(new TestException()));
99 Future<Object> future =
100 akka.pattern.Patterns.ask(subject,
101 ThreePhaseCommitCohortMessages.AbortTransaction.newBuilder()
103 assertTrue(future.isCompleted());
105 Await.result(future, ASK_RESULT_DURATION);
109 @Test(expected = OptimisticLockFailedException.class)
110 public void testNegativeCanCommitResultsInException() throws Exception {
112 final ActorRef shard = createShard();
113 final DOMStoreThreePhaseCommitCohort mockCohort = Mockito
114 .mock(DOMStoreThreePhaseCommitCohort.class);
115 final CompositeModification mockComposite =
116 Mockito.mock(CompositeModification.class);
118 ThreePhaseCommitCohort.props(mockCohort, shard, mockComposite,SHARD_IDENTIFIER.toString());
120 final TestActorRef<ThreePhaseCommitCohort> subject = TestActorRef
121 .create(getSystem(), props,
122 "testNegativeCanCommitResultsInException");
124 when(mockCohort.canCommit()).thenReturn(
126 .<Boolean>immediateFailedFuture(
127 new OptimisticLockFailedException("some exception")));
129 Future<Object> future =
130 akka.pattern.Patterns.ask(subject,
131 ThreePhaseCommitCohortMessages.CanCommitTransaction.newBuilder()
135 Await.result(future, ASK_RESULT_DURATION);
140 @Test(expected = TestException.class)
141 public void testNegativePreCommitResultsInException() throws Exception {
143 final ActorRef shard = createShard();
144 final DOMStoreThreePhaseCommitCohort mockCohort = Mockito
145 .mock(DOMStoreThreePhaseCommitCohort.class);
146 final CompositeModification mockComposite =
147 Mockito.mock(CompositeModification.class);
149 ThreePhaseCommitCohort.props(mockCohort, shard, mockComposite,SHARD_IDENTIFIER.toString());
151 final TestActorRef<ThreePhaseCommitCohort> subject = TestActorRef
152 .create(getSystem(), props,
153 "testNegativePreCommitResultsInException");
155 when(mockCohort.preCommit()).thenReturn(
157 .<Void>immediateFailedFuture(
158 new TestException()));
160 Future<Object> future =
161 akka.pattern.Patterns.ask(subject,
162 ThreePhaseCommitCohortMessages.PreCommitTransaction.newBuilder()
165 Await.result(future, ASK_RESULT_DURATION);
169 @Test(expected = TestException.class)
170 public void testNegativeCommitResultsInException() throws Exception {
172 final TestActorRef<Shard> subject = TestActorRef.create(getSystem(),
173 Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP, datastoreContext, TestModel.createTestContext()),
174 "testNegativeCommitResultsInException");
176 final ActorRef shardTransaction =
177 getSystem().actorOf(ShardTransaction.props(store.newReadWriteTransaction(), subject,
178 testSchemaContext, datastoreContext,SHARD_IDENTIFIER.toString()));
180 ShardTransactionMessages.WriteData writeData =
181 ShardTransactionMessages.WriteData.newBuilder()
182 .setInstanceIdentifierPathArguments(
183 NormalizedNodeMessages.InstanceIdentifier.newBuilder()
184 .build()).setNormalizedNode(
185 NormalizedNodeMessages.Node.newBuilder().build()
189 Timeout askTimeout = new Timeout(ASK_RESULT_DURATION);
191 //This is done so that Modification list is updated which is used during commit
192 Future<Object> future = akka.pattern.Patterns.ask(shardTransaction, writeData, askTimeout);
194 //ready transaction creates the cohort so that we get into the
195 //block where in commmit is done
196 ShardTransactionMessages.ReadyTransaction readyTransaction =
197 ShardTransactionMessages.ReadyTransaction.newBuilder().build();
199 future = akka.pattern.Patterns.ask(shardTransaction, readyTransaction, askTimeout);
201 //but when the message is sent it will have the MockCommit object
202 //so that we can simulate throwing of exception
203 ForwardedCommitTransaction mockForwardCommitTransaction =
204 Mockito.mock(ForwardedCommitTransaction.class);
205 DOMStoreThreePhaseCommitCohort mockThreePhaseCommitTransaction =
206 Mockito.mock(DOMStoreThreePhaseCommitCohort.class);
207 when(mockForwardCommitTransaction.getCohort())
208 .thenReturn(mockThreePhaseCommitTransaction);
209 when(mockThreePhaseCommitTransaction.commit()).thenReturn(Futures
210 .<Void>immediateFailedFuture(
211 new TestException()));
212 Modification mockModification = Mockito.mock(
214 when(mockForwardCommitTransaction.getModification())
215 .thenReturn(mockModification);
217 when(mockModification.toSerializable()).thenReturn(
218 PersistentMessages.CompositeModification.newBuilder().build());
220 future = akka.pattern.Patterns.ask(subject, mockForwardCommitTransaction, askTimeout);
221 Await.result(future, ASK_RESULT_DURATION);
224 private class TestException extends Exception {