Bug 1637: Change Rpc actor calls to async
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / ThreePhaseCommitCohortFailureTest.java
1 /*
2  *
3  *  Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
4  *
5  *  This program and the accompanying materials are made available under the
6  *  terms of the Eclipse Public License v1.0 which accompanies this distribution,
7  *  and is available at http://www.eclipse.org/legal/epl-v10.html
8  *
9  */
10
11 package org.opendaylight.controller.cluster.datastore;
12
13 import akka.actor.ActorRef;
14 import akka.actor.Props;
15 import akka.testkit.TestActorRef;
16
17 import com.google.common.util.concurrent.Futures;
18 import com.google.common.util.concurrent.ListeningExecutorService;
19 import com.google.common.util.concurrent.MoreExecutors;
20
21 import org.junit.BeforeClass;
22 import org.junit.Test;
23 import org.mockito.Mockito;
24 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
25 import org.opendaylight.controller.cluster.datastore.messages.ForwardedCommitTransaction;
26 import org.opendaylight.controller.cluster.datastore.modification.CompositeModification;
27 import org.opendaylight.controller.cluster.datastore.modification.Modification;
28 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
29 import org.opendaylight.controller.md.sal.common.api.data.OptimisticLockFailedException;
30 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
31 import org.opendaylight.controller.protobuff.messages.cohort3pc.ThreePhaseCommitCohortMessages;
32 import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages;
33 import org.opendaylight.controller.protobuff.messages.persistent.PersistentMessages;
34 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
35 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
36 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
37
38 import scala.concurrent.Await;
39 import scala.concurrent.Future;
40 import scala.concurrent.duration.Duration;
41 import scala.concurrent.duration.FiniteDuration;
42
43 import java.util.Collections;
44 import java.util.concurrent.TimeUnit;
45
46 import static org.junit.Assert.assertTrue;
47 import static org.mockito.Mockito.when;
48
49
50 public class ThreePhaseCommitCohortFailureTest extends AbstractActorTest {
51
52     private static ListeningExecutorService storeExecutor =
53         MoreExecutors.listeningDecorator(MoreExecutors.sameThreadExecutor());
54
55     private static final InMemoryDOMDataStore store =
56         new InMemoryDOMDataStore("OPER", storeExecutor,
57             MoreExecutors.sameThreadExecutor());
58
59     private static final SchemaContext testSchemaContext =
60         TestModel.createTestContext();
61
62     private static final ShardIdentifier SHARD_IDENTIFIER =
63         ShardIdentifier.builder().memberName("member-1")
64             .shardName("inventory").type("config").build();
65
66     private final DatastoreContext datastoreContext = new DatastoreContext();
67
68
69     @BeforeClass
70     public static void staticSetup() {
71         store.onGlobalContextUpdated(testSchemaContext);
72     }
73
74     private final FiniteDuration ASK_RESULT_DURATION = Duration.create(5000, TimeUnit.MILLISECONDS);
75
76
77     @Test(expected = TestException.class)
78     public void testNegativeAbortResultsInException() throws Exception {
79
80         final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
81                 Collections.EMPTY_MAP, datastoreContext));
82         final DOMStoreThreePhaseCommitCohort mockCohort = Mockito
83             .mock(DOMStoreThreePhaseCommitCohort.class);
84         final CompositeModification mockComposite =
85             Mockito.mock(CompositeModification.class);
86         final Props props =
87             ThreePhaseCommitCohort.props(mockCohort, shard, mockComposite,SHARD_IDENTIFIER.toString());
88
89         final TestActorRef<ThreePhaseCommitCohort> subject = TestActorRef
90             .create(getSystem(), props,
91                 "testNegativeAbortResultsInException");
92
93         when(mockCohort.abort()).thenReturn(
94             Futures.<Void>immediateFailedFuture(new TestException()));
95
96         Future<Object> future =
97             akka.pattern.Patterns.ask(subject,
98                 ThreePhaseCommitCohortMessages.AbortTransaction.newBuilder()
99                     .build(), 3000);
100         assertTrue(future.isCompleted());
101
102         Await.result(future, ASK_RESULT_DURATION);
103     }
104
105
106     @Test(expected = OptimisticLockFailedException.class)
107     public void testNegativeCanCommitResultsInException() throws Exception {
108
109         final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
110                 Collections.EMPTY_MAP, datastoreContext));
111         final DOMStoreThreePhaseCommitCohort mockCohort = Mockito
112             .mock(DOMStoreThreePhaseCommitCohort.class);
113         final CompositeModification mockComposite =
114             Mockito.mock(CompositeModification.class);
115         final Props props =
116             ThreePhaseCommitCohort.props(mockCohort, shard, mockComposite,SHARD_IDENTIFIER.toString());
117
118         final TestActorRef<ThreePhaseCommitCohort> subject = TestActorRef
119             .create(getSystem(), props,
120                 "testNegativeCanCommitResultsInException");
121
122         when(mockCohort.canCommit()).thenReturn(
123             Futures
124                 .<Boolean>immediateFailedFuture(
125                     new OptimisticLockFailedException("some exception")));
126
127         Future<Object> future =
128             akka.pattern.Patterns.ask(subject,
129                 ThreePhaseCommitCohortMessages.CanCommitTransaction.newBuilder()
130                     .build(), 3000);
131
132
133         Await.result(future, ASK_RESULT_DURATION);
134
135     }
136
137
138     @Test(expected = TestException.class)
139     public void testNegativePreCommitResultsInException() throws Exception {
140
141         final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
142                 Collections.EMPTY_MAP, datastoreContext));
143         final DOMStoreThreePhaseCommitCohort mockCohort = Mockito
144             .mock(DOMStoreThreePhaseCommitCohort.class);
145         final CompositeModification mockComposite =
146             Mockito.mock(CompositeModification.class);
147         final Props props =
148             ThreePhaseCommitCohort.props(mockCohort, shard, mockComposite,SHARD_IDENTIFIER.toString());
149
150         final TestActorRef<ThreePhaseCommitCohort> subject = TestActorRef
151             .create(getSystem(), props,
152                 "testNegativePreCommitResultsInException");
153
154         when(mockCohort.preCommit()).thenReturn(
155             Futures
156                 .<Void>immediateFailedFuture(
157                     new TestException()));
158
159         Future<Object> future =
160             akka.pattern.Patterns.ask(subject,
161                 ThreePhaseCommitCohortMessages.PreCommitTransaction.newBuilder()
162                     .build(), 3000);
163
164         Await.result(future, ASK_RESULT_DURATION);
165
166     }
167
168     @Test(expected = TestException.class)
169     public void testNegativeCommitResultsInException() throws Exception {
170
171         final TestActorRef<Shard> subject = TestActorRef.create(getSystem(),
172                 Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP, datastoreContext),
173                 "testNegativeCommitResultsInException");
174
175         final ActorRef shardTransaction =
176             getSystem().actorOf(ShardTransaction.props(store.newReadWriteTransaction(), subject,
177                     testSchemaContext, datastoreContext,SHARD_IDENTIFIER.toString()));
178
179         ShardTransactionMessages.WriteData writeData =
180             ShardTransactionMessages.WriteData.newBuilder()
181                 .setInstanceIdentifierPathArguments(
182                     NormalizedNodeMessages.InstanceIdentifier.newBuilder()
183                         .build()).setNormalizedNode(
184                 NormalizedNodeMessages.Node.newBuilder().build()
185
186             ).build();
187
188         //This is done so that Modification list is updated which is used during commit
189         Future future =
190             akka.pattern.Patterns.ask(shardTransaction, writeData, 3000);
191
192         //ready transaction creates the cohort so that we get into the
193         //block where in commmit is done
194         ShardTransactionMessages.ReadyTransaction readyTransaction =
195             ShardTransactionMessages.ReadyTransaction.newBuilder().build();
196
197         future =
198             akka.pattern.Patterns.ask(shardTransaction, readyTransaction, 3000);
199
200         //but when the message is sent it will have the MockCommit object
201         //so that we can simulate throwing of exception
202         ForwardedCommitTransaction mockForwardCommitTransaction =
203             Mockito.mock(ForwardedCommitTransaction.class);
204         DOMStoreThreePhaseCommitCohort mockThreePhaseCommitTransaction =
205             Mockito.mock(DOMStoreThreePhaseCommitCohort.class);
206         when(mockForwardCommitTransaction.getCohort())
207             .thenReturn(mockThreePhaseCommitTransaction);
208         when(mockThreePhaseCommitTransaction.commit()).thenReturn(Futures
209             .<Void>immediateFailedFuture(
210                 new TestException()));
211         Modification mockModification = Mockito.mock(
212             Modification.class);
213         when(mockForwardCommitTransaction.getModification())
214             .thenReturn(mockModification);
215
216         when(mockModification.toSerializable()).thenReturn(
217             PersistentMessages.CompositeModification.newBuilder().build());
218
219         future =
220             akka.pattern.Patterns.ask(subject,
221                 mockForwardCommitTransaction
222                 , 3000);
223         Await.result(future, ASK_RESULT_DURATION);
224     }
225
226     private class TestException extends Exception {
227     }
228 }