Merge "Bug 1637: Change Rpc actor calls to async"
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / ThreePhaseCommitCohortFailureTest.java
1 /*
2  *
3  *  Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
4  *
5  *  This program and the accompanying materials are made available under the
6  *  terms of the Eclipse Public License v1.0 which accompanies this distribution,
7  *  and is available at http://www.eclipse.org/legal/epl-v10.html
8  *
9  */
10
11 package org.opendaylight.controller.cluster.datastore;
12
13 import akka.actor.ActorRef;
14 import akka.actor.Props;
15 import akka.testkit.TestActorRef;
16 import akka.util.Timeout;
17
18 import com.google.common.util.concurrent.Futures;
19 import com.google.common.util.concurrent.ListeningExecutorService;
20 import com.google.common.util.concurrent.MoreExecutors;
21
22 import org.junit.BeforeClass;
23 import org.junit.Test;
24 import org.mockito.Mockito;
25 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
26 import org.opendaylight.controller.cluster.datastore.messages.ForwardedCommitTransaction;
27 import org.opendaylight.controller.cluster.datastore.modification.CompositeModification;
28 import org.opendaylight.controller.cluster.datastore.modification.Modification;
29 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
30 import org.opendaylight.controller.md.sal.common.api.data.OptimisticLockFailedException;
31 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
32 import org.opendaylight.controller.protobuff.messages.cohort3pc.ThreePhaseCommitCohortMessages;
33 import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages;
34 import org.opendaylight.controller.protobuff.messages.persistent.PersistentMessages;
35 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
36 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
37 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
38
39 import scala.concurrent.Await;
40 import scala.concurrent.Future;
41 import scala.concurrent.duration.Duration;
42 import scala.concurrent.duration.FiniteDuration;
43
44 import java.util.Collections;
45 import java.util.concurrent.TimeUnit;
46
47 import static org.junit.Assert.assertTrue;
48 import static org.mockito.Mockito.when;
49
50
51 public class ThreePhaseCommitCohortFailureTest extends AbstractActorTest {
52
53     private static ListeningExecutorService storeExecutor =
54         MoreExecutors.listeningDecorator(MoreExecutors.sameThreadExecutor());
55
56     private static final InMemoryDOMDataStore store =
57         new InMemoryDOMDataStore("OPER", storeExecutor,
58             MoreExecutors.sameThreadExecutor());
59
60     private static final SchemaContext testSchemaContext =
61         TestModel.createTestContext();
62
63     private static final ShardIdentifier SHARD_IDENTIFIER =
64         ShardIdentifier.builder().memberName("member-1")
65             .shardName("inventory").type("config").build();
66
67     private final DatastoreContext datastoreContext = new DatastoreContext();
68
69
70     @BeforeClass
71     public static void staticSetup() {
72         store.onGlobalContextUpdated(testSchemaContext);
73     }
74
75     private final FiniteDuration ASK_RESULT_DURATION = Duration.create(5000, TimeUnit.MILLISECONDS);
76
77
78     @Test(expected = TestException.class)
79     public void testNegativeAbortResultsInException() throws Exception {
80
81         final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
82                 Collections.EMPTY_MAP, datastoreContext));
83         final DOMStoreThreePhaseCommitCohort mockCohort = Mockito
84             .mock(DOMStoreThreePhaseCommitCohort.class);
85         final CompositeModification mockComposite =
86             Mockito.mock(CompositeModification.class);
87         final Props props =
88             ThreePhaseCommitCohort.props(mockCohort, shard, mockComposite,SHARD_IDENTIFIER.toString());
89
90         final TestActorRef<ThreePhaseCommitCohort> subject = TestActorRef
91             .create(getSystem(), props,
92                 "testNegativeAbortResultsInException");
93
94         when(mockCohort.abort()).thenReturn(
95             Futures.<Void>immediateFailedFuture(new TestException()));
96
97         Future<Object> future =
98             akka.pattern.Patterns.ask(subject,
99                 ThreePhaseCommitCohortMessages.AbortTransaction.newBuilder()
100                     .build(), 3000);
101         assertTrue(future.isCompleted());
102
103         Await.result(future, ASK_RESULT_DURATION);
104     }
105
106
107     @Test(expected = OptimisticLockFailedException.class)
108     public void testNegativeCanCommitResultsInException() throws Exception {
109
110         final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
111                 Collections.EMPTY_MAP, datastoreContext));
112         final DOMStoreThreePhaseCommitCohort mockCohort = Mockito
113             .mock(DOMStoreThreePhaseCommitCohort.class);
114         final CompositeModification mockComposite =
115             Mockito.mock(CompositeModification.class);
116         final Props props =
117             ThreePhaseCommitCohort.props(mockCohort, shard, mockComposite,SHARD_IDENTIFIER.toString());
118
119         final TestActorRef<ThreePhaseCommitCohort> subject = TestActorRef
120             .create(getSystem(), props,
121                 "testNegativeCanCommitResultsInException");
122
123         when(mockCohort.canCommit()).thenReturn(
124             Futures
125                 .<Boolean>immediateFailedFuture(
126                     new OptimisticLockFailedException("some exception")));
127
128         Future<Object> future =
129             akka.pattern.Patterns.ask(subject,
130                 ThreePhaseCommitCohortMessages.CanCommitTransaction.newBuilder()
131                     .build(), 3000);
132
133
134         Await.result(future, ASK_RESULT_DURATION);
135
136     }
137
138
139     @Test(expected = TestException.class)
140     public void testNegativePreCommitResultsInException() throws Exception {
141
142         final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
143                 Collections.EMPTY_MAP, datastoreContext));
144         final DOMStoreThreePhaseCommitCohort mockCohort = Mockito
145             .mock(DOMStoreThreePhaseCommitCohort.class);
146         final CompositeModification mockComposite =
147             Mockito.mock(CompositeModification.class);
148         final Props props =
149             ThreePhaseCommitCohort.props(mockCohort, shard, mockComposite,SHARD_IDENTIFIER.toString());
150
151         final TestActorRef<ThreePhaseCommitCohort> subject = TestActorRef
152             .create(getSystem(), props,
153                 "testNegativePreCommitResultsInException");
154
155         when(mockCohort.preCommit()).thenReturn(
156             Futures
157                 .<Void>immediateFailedFuture(
158                     new TestException()));
159
160         Future<Object> future =
161             akka.pattern.Patterns.ask(subject,
162                 ThreePhaseCommitCohortMessages.PreCommitTransaction.newBuilder()
163                     .build(), 3000);
164
165         Await.result(future, ASK_RESULT_DURATION);
166
167     }
168
169     @Test(expected = TestException.class)
170     public void testNegativeCommitResultsInException() throws Exception {
171
172         final TestActorRef<Shard> subject = TestActorRef.create(getSystem(),
173                 Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP, datastoreContext),
174                 "testNegativeCommitResultsInException");
175
176         final ActorRef shardTransaction =
177             getSystem().actorOf(ShardTransaction.props(store.newReadWriteTransaction(), subject,
178                     testSchemaContext, datastoreContext,SHARD_IDENTIFIER.toString()));
179
180         ShardTransactionMessages.WriteData writeData =
181             ShardTransactionMessages.WriteData.newBuilder()
182                 .setInstanceIdentifierPathArguments(
183                     NormalizedNodeMessages.InstanceIdentifier.newBuilder()
184                         .build()).setNormalizedNode(
185                 NormalizedNodeMessages.Node.newBuilder().build()
186
187             ).build();
188
189         Timeout askTimeout = new Timeout(ASK_RESULT_DURATION);
190
191         //This is done so that Modification list is updated which is used during commit
192         Future<Object> future = akka.pattern.Patterns.ask(shardTransaction, writeData, askTimeout);
193
194         //ready transaction creates the cohort so that we get into the
195         //block where in commmit is done
196         ShardTransactionMessages.ReadyTransaction readyTransaction =
197             ShardTransactionMessages.ReadyTransaction.newBuilder().build();
198
199         future = akka.pattern.Patterns.ask(shardTransaction, readyTransaction, askTimeout);
200
201         //but when the message is sent it will have the MockCommit object
202         //so that we can simulate throwing of exception
203         ForwardedCommitTransaction mockForwardCommitTransaction =
204             Mockito.mock(ForwardedCommitTransaction.class);
205         DOMStoreThreePhaseCommitCohort mockThreePhaseCommitTransaction =
206             Mockito.mock(DOMStoreThreePhaseCommitCohort.class);
207         when(mockForwardCommitTransaction.getCohort())
208             .thenReturn(mockThreePhaseCommitTransaction);
209         when(mockThreePhaseCommitTransaction.commit()).thenReturn(Futures
210             .<Void>immediateFailedFuture(
211                 new TestException()));
212         Modification mockModification = Mockito.mock(
213             Modification.class);
214         when(mockForwardCommitTransaction.getModification())
215             .thenReturn(mockModification);
216
217         when(mockModification.toSerializable()).thenReturn(
218             PersistentMessages.CompositeModification.newBuilder().build());
219
220         future = akka.pattern.Patterns.ask(subject, mockForwardCommitTransaction, askTimeout);
221         Await.result(future, ASK_RESULT_DURATION);
222     }
223
224     private class TestException extends Exception {
225     }
226 }