Refactor LegacyTransactionConntextImpl
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / AbstractTransactionProxyTest.java
1 /*
2  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore;
9
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertNotNull;
12 import static org.junit.Assert.fail;
13 import static org.mockito.Matchers.any;
14 import static org.mockito.Matchers.argThat;
15 import static org.mockito.Matchers.eq;
16 import static org.mockito.Matchers.isA;
17 import static org.mockito.Mockito.doReturn;
18 import static org.mockito.Mockito.mock;
19 import static org.mockito.Mockito.verify;
20 import akka.actor.ActorRef;
21 import akka.actor.ActorSelection;
22 import akka.actor.ActorSystem;
23 import akka.actor.Props;
24 import akka.dispatch.Futures;
25 import akka.testkit.JavaTestKit;
26 import com.google.common.collect.ImmutableMap;
27 import com.google.common.util.concurrent.CheckedFuture;
28 import com.typesafe.config.Config;
29 import com.typesafe.config.ConfigFactory;
30 import java.io.IOException;
31 import java.util.ArrayList;
32 import java.util.List;
33 import java.util.concurrent.TimeUnit;
34 import org.junit.AfterClass;
35 import org.junit.Before;
36 import org.junit.BeforeClass;
37 import org.mockito.ArgumentCaptor;
38 import org.mockito.ArgumentMatcher;
39 import org.mockito.Mock;
40 import org.mockito.Mockito;
41 import org.mockito.MockitoAnnotations;
42 import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
43 import org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType;
44 import org.opendaylight.controller.cluster.datastore.TransactionProxyTest.TestException;
45 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
46 import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
47 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
48 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
49 import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
50 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
51 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
52 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
53 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
54 import org.opendaylight.controller.cluster.datastore.modification.AbstractModification;
55 import org.opendaylight.controller.cluster.datastore.modification.Modification;
56 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
57 import org.opendaylight.controller.cluster.datastore.shardstrategy.DefaultShardStrategy;
58 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
59 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
60 import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor;
61 import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
62 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
63 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
64 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
65 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
66 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
67 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
68 import scala.concurrent.Await;
69 import scala.concurrent.Future;
70 import scala.concurrent.duration.Duration;
71
72 /**
73  * Abstract base class for TransactionProxy unit tests.
74  *
75  * @author Thomas Pantelis
76  */
77 public abstract class AbstractTransactionProxyTest {
78     private static ActorSystem system;
79
80     private final Configuration configuration = new MockConfiguration();
81
82     @Mock
83     protected ActorContext mockActorContext;
84
85     private SchemaContext schemaContext;
86
87     @Mock
88     private ClusterWrapper mockClusterWrapper;
89
90     protected final String memberName = "mock-member";
91
92     protected final Builder dataStoreContextBuilder = DatastoreContext.newBuilder().operationTimeoutInSeconds(2).
93             shardBatchedModificationCount(1);
94
95     @BeforeClass
96     public static void setUpClass() throws IOException {
97
98         Config config = ConfigFactory.parseMap(ImmutableMap.<String, Object>builder().
99                 put("akka.actor.default-dispatcher.type",
100                         "akka.testkit.CallingThreadDispatcherConfigurator").build()).
101                 withFallback(ConfigFactory.load());
102         system = ActorSystem.create("test", config);
103     }
104
105     @AfterClass
106     public static void tearDownClass() throws IOException {
107         JavaTestKit.shutdownActorSystem(system);
108         system = null;
109     }
110
111     @Before
112     public void setUp(){
113         MockitoAnnotations.initMocks(this);
114
115         schemaContext = TestModel.createTestContext();
116
117         doReturn(getSystem()).when(mockActorContext).getActorSystem();
118         doReturn(getSystem().dispatchers().defaultGlobalDispatcher()).when(mockActorContext).getClientDispatcher();
119         doReturn(memberName).when(mockActorContext).getCurrentMemberName();
120         doReturn(schemaContext).when(mockActorContext).getSchemaContext();
121         doReturn(mockClusterWrapper).when(mockActorContext).getClusterWrapper();
122         doReturn(mockClusterWrapper).when(mockActorContext).getClusterWrapper();
123         doReturn(dataStoreContextBuilder.build()).when(mockActorContext).getDatastoreContext();
124         doReturn(10).when(mockActorContext).getTransactionOutstandingOperationLimit();
125
126         ShardStrategyFactory.setConfiguration(configuration);
127     }
128
129     protected ActorSystem getSystem() {
130         return system;
131     }
132
133     protected CreateTransaction eqCreateTransaction(final String memberName,
134             final TransactionType type) {
135         ArgumentMatcher<CreateTransaction> matcher = new ArgumentMatcher<CreateTransaction>() {
136             @Override
137             public boolean matches(Object argument) {
138                 if(CreateTransaction.SERIALIZABLE_CLASS.equals(argument.getClass())) {
139                     CreateTransaction obj = CreateTransaction.fromSerializable(argument);
140                     return obj.getTransactionId().startsWith(memberName) &&
141                             obj.getTransactionType() == type.ordinal();
142                 }
143
144                 return false;
145             }
146         };
147
148         return argThat(matcher);
149     }
150
151     protected DataExists eqSerializedDataExists() {
152         ArgumentMatcher<DataExists> matcher = new ArgumentMatcher<DataExists>() {
153             @Override
154             public boolean matches(Object argument) {
155                 return DataExists.SERIALIZABLE_CLASS.equals(argument.getClass()) &&
156                        DataExists.fromSerializable(argument).getPath().equals(TestModel.TEST_PATH);
157             }
158         };
159
160         return argThat(matcher);
161     }
162
163     protected DataExists eqDataExists() {
164         ArgumentMatcher<DataExists> matcher = new ArgumentMatcher<DataExists>() {
165             @Override
166             public boolean matches(Object argument) {
167                 return (argument instanceof DataExists) &&
168                     ((DataExists)argument).getPath().equals(TestModel.TEST_PATH);
169             }
170         };
171
172         return argThat(matcher);
173     }
174
175     protected ReadData eqSerializedReadData() {
176         return eqSerializedReadData(TestModel.TEST_PATH);
177     }
178
179     protected ReadData eqSerializedReadData(final YangInstanceIdentifier path) {
180         ArgumentMatcher<ReadData> matcher = new ArgumentMatcher<ReadData>() {
181             @Override
182             public boolean matches(Object argument) {
183                 return ReadData.SERIALIZABLE_CLASS.equals(argument.getClass()) &&
184                        ReadData.fromSerializable(argument).getPath().equals(path);
185             }
186         };
187
188         return argThat(matcher);
189     }
190
191     protected ReadData eqReadData() {
192         ArgumentMatcher<ReadData> matcher = new ArgumentMatcher<ReadData>() {
193             @Override
194             public boolean matches(Object argument) {
195                 return (argument instanceof ReadData) &&
196                     ((ReadData)argument).getPath().equals(TestModel.TEST_PATH);
197             }
198         };
199
200         return argThat(matcher);
201     }
202
203     protected Future<Object> readySerializedTxReply(String path) {
204         return Futures.successful((Object)new ReadyTransactionReply(path).toSerializable());
205     }
206
207     protected Future<Object> readyTxReply(String path) {
208         return Futures.successful((Object)new ReadyTransactionReply(path));
209     }
210
211     protected Future<Object> readSerializedDataReply(NormalizedNode<?, ?> data,
212             short transactionVersion) {
213         return Futures.successful(new ReadDataReply(data, transactionVersion).toSerializable());
214     }
215
216     protected Future<Object> readSerializedDataReply(NormalizedNode<?, ?> data) {
217         return readSerializedDataReply(data, DataStoreVersions.CURRENT_VERSION);
218     }
219
220     protected Future<ReadDataReply> readDataReply(NormalizedNode<?, ?> data) {
221         return Futures.successful(new ReadDataReply(data, DataStoreVersions.CURRENT_VERSION));
222     }
223
224     protected Future<Object> dataExistsSerializedReply(boolean exists) {
225         return Futures.successful(new DataExistsReply(exists).toSerializable());
226     }
227
228     protected Future<DataExistsReply> dataExistsReply(boolean exists) {
229         return Futures.successful(new DataExistsReply(exists));
230     }
231
232     protected Future<BatchedModificationsReply> batchedModificationsReply(int count) {
233         return Futures.successful(new BatchedModificationsReply(count));
234     }
235
236     protected Future<Object> incompleteFuture(){
237         return mock(Future.class);
238     }
239
240     protected ActorSelection actorSelection(ActorRef actorRef) {
241         return getSystem().actorSelection(actorRef.path());
242     }
243
244     protected void expectBatchedModifications(ActorRef actorRef, int count) {
245         doReturn(batchedModificationsReply(count)).when(mockActorContext).executeOperationAsync(
246                 eq(actorSelection(actorRef)), isA(BatchedModifications.class));
247     }
248
249     protected void expectBatchedModifications(int count) {
250         doReturn(batchedModificationsReply(count)).when(mockActorContext).executeOperationAsync(
251                 any(ActorSelection.class), isA(BatchedModifications.class));
252     }
253
254     protected void expectIncompleteBatchedModifications() {
255         doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
256                 any(ActorSelection.class), isA(BatchedModifications.class));
257     }
258
259     protected void expectReadyTransaction(ActorRef actorRef) {
260         doReturn(readySerializedTxReply(actorRef.path().toString())).when(mockActorContext).executeOperationAsync(
261                 eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS));
262     }
263
264     protected void expectFailedBatchedModifications(ActorRef actorRef) {
265         doReturn(Futures.failed(new TestException())).when(mockActorContext).executeOperationAsync(
266                 eq(actorSelection(actorRef)), isA(BatchedModifications.class));
267     }
268
269     protected CreateTransactionReply createTransactionReply(ActorRef actorRef, int transactionVersion){
270         return CreateTransactionReply.newBuilder()
271             .setTransactionActorPath(actorRef.path().toString())
272             .setTransactionId("txn-1")
273             .setMessageVersion(transactionVersion)
274             .build();
275     }
276
277     protected ActorRef setupActorContextWithoutInitialCreateTransaction(ActorSystem actorSystem) {
278         ActorRef actorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
279         doReturn(actorSystem.actorSelection(actorRef.path())).
280                 when(mockActorContext).actorSelection(actorRef.path().toString());
281
282         doReturn(Futures.successful(actorSystem.actorSelection(actorRef.path()))).
283                 when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
284
285         doReturn(false).when(mockActorContext).isPathLocal(actorRef.path().toString());
286
287         doReturn(10).when(mockActorContext).getTransactionOutstandingOperationLimit();
288
289         return actorRef;
290     }
291
292     protected ActorRef setupActorContextWithInitialCreateTransaction(ActorSystem actorSystem,
293             TransactionType type, int transactionVersion) {
294         ActorRef actorRef = setupActorContextWithoutInitialCreateTransaction(actorSystem);
295
296         doReturn(Futures.successful(createTransactionReply(actorRef, transactionVersion))).when(mockActorContext).
297                 executeOperationAsync(eq(actorSystem.actorSelection(actorRef.path())),
298                         eqCreateTransaction(memberName, type));
299
300         return actorRef;
301     }
302
303     protected ActorRef setupActorContextWithInitialCreateTransaction(ActorSystem actorSystem, TransactionType type) {
304         return setupActorContextWithInitialCreateTransaction(actorSystem, type, DataStoreVersions.CURRENT_VERSION);
305     }
306
307
308     protected void propagateReadFailedExceptionCause(CheckedFuture<?, ReadFailedException> future)
309             throws Throwable {
310
311         try {
312             future.checkedGet(5, TimeUnit.SECONDS);
313             fail("Expected ReadFailedException");
314         } catch(ReadFailedException e) {
315             throw e.getCause();
316         }
317     }
318
319     protected List<BatchedModifications> captureBatchedModifications(ActorRef actorRef) {
320         ArgumentCaptor<BatchedModifications> batchedModificationsCaptor =
321                 ArgumentCaptor.forClass(BatchedModifications.class);
322         verify(mockActorContext, Mockito.atLeastOnce()).executeOperationAsync(
323                 eq(actorSelection(actorRef)), batchedModificationsCaptor.capture());
324
325         List<BatchedModifications> batchedModifications = filterCaptured(
326                 batchedModificationsCaptor, BatchedModifications.class);
327         return batchedModifications;
328     }
329
330     protected <T> List<T> filterCaptured(ArgumentCaptor<T> captor, Class<T> type) {
331         List<T> captured = new ArrayList<>();
332         for(T c: captor.getAllValues()) {
333             if(type.isInstance(c)) {
334                 captured.add(c);
335             }
336         }
337
338         return captured;
339     }
340
341     protected void verifyOneBatchedModification(ActorRef actorRef, Modification expected) {
342         List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
343         assertEquals("Captured BatchedModifications count", 1, batchedModifications.size());
344
345         verifyBatchedModifications(batchedModifications.get(0), expected);
346     }
347
348     protected void verifyBatchedModifications(Object message, Modification... expected) {
349         assertEquals("Message type", BatchedModifications.class, message.getClass());
350         BatchedModifications batchedModifications = (BatchedModifications)message;
351         assertEquals("BatchedModifications size", expected.length, batchedModifications.getModifications().size());
352         for(int i = 0; i < batchedModifications.getModifications().size(); i++) {
353             Modification actual = batchedModifications.getModifications().get(i);
354             assertEquals("Modification type", expected[i].getClass(), actual.getClass());
355             assertEquals("getPath", ((AbstractModification)expected[i]).getPath(),
356                     ((AbstractModification)actual).getPath());
357             if(actual instanceof WriteModification) {
358                 assertEquals("getData", ((WriteModification)expected[i]).getData(),
359                         ((WriteModification)actual).getData());
360             }
361         }
362     }
363
364     protected void verifyCohortFutures(ThreePhaseCommitCohortProxy proxy,
365             Object... expReplies) throws Exception {
366             assertEquals("getReadyOperationFutures size", expReplies.length,
367                     proxy.getCohortFutures().size());
368
369             int i = 0;
370             for( Future<ActorSelection> future: proxy.getCohortFutures()) {
371                 assertNotNull("Ready operation Future is null", future);
372
373                 Object expReply = expReplies[i++];
374                 if(expReply instanceof ActorSelection) {
375                     ActorSelection actual = Await.result(future, Duration.create(5, TimeUnit.SECONDS));
376                     assertEquals("Cohort actor path", expReply, actual);
377                 } else {
378                     // Expecting exception.
379                     try {
380                         Await.result(future, Duration.create(5, TimeUnit.SECONDS));
381                         fail("Expected exception from ready operation Future");
382                     } catch(Exception e) {
383                         // Expected
384                     }
385                 }
386             }
387         }
388 }