BUG-2138: Create DistributedShardFrontend
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / AbstractTransactionProxyTest.java
1 /*
2  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore;
9
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertNotNull;
12 import static org.junit.Assert.fail;
13 import static org.mockito.Matchers.any;
14 import static org.mockito.Matchers.argThat;
15 import static org.mockito.Matchers.eq;
16 import static org.mockito.Matchers.isA;
17 import static org.mockito.Mockito.doReturn;
18 import static org.mockito.Mockito.mock;
19 import static org.mockito.Mockito.verify;
20
21 import akka.actor.ActorRef;
22 import akka.actor.ActorSelection;
23 import akka.actor.ActorSystem;
24 import akka.actor.Props;
25 import akka.dispatch.Futures;
26 import akka.testkit.JavaTestKit;
27 import akka.util.Timeout;
28 import com.codahale.metrics.MetricRegistry;
29 import com.codahale.metrics.Timer;
30 import com.google.common.base.Throwables;
31 import com.google.common.collect.ImmutableMap;
32 import com.google.common.util.concurrent.CheckedFuture;
33 import com.typesafe.config.Config;
34 import com.typesafe.config.ConfigFactory;
35 import java.io.IOException;
36 import java.util.ArrayList;
37 import java.util.Iterator;
38 import java.util.List;
39 import java.util.Map;
40 import java.util.Objects;
41 import java.util.concurrent.TimeUnit;
42 import org.junit.AfterClass;
43 import org.junit.Before;
44 import org.junit.BeforeClass;
45 import org.mockito.ArgumentCaptor;
46 import org.mockito.ArgumentMatcher;
47 import org.mockito.Mock;
48 import org.mockito.Mockito;
49 import org.mockito.MockitoAnnotations;
50 import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
51 import org.opendaylight.controller.cluster.access.concepts.MemberName;
52 import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
53 import org.opendaylight.controller.cluster.datastore.TransactionProxyTest.TestException;
54 import org.opendaylight.controller.cluster.datastore.config.Configuration;
55 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
56 import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
57 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
58 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
59 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
60 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
61 import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
62 import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
63 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
64 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
65 import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction;
66 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
67 import org.opendaylight.controller.cluster.datastore.modification.AbstractModification;
68 import org.opendaylight.controller.cluster.datastore.modification.Modification;
69 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
70 import org.opendaylight.controller.cluster.datastore.shardstrategy.DefaultShardStrategy;
71 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategy;
72 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
73 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
74 import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
75 import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
76 import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
77 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
78 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
79 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
80 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
81 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
82 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
83 import org.slf4j.Logger;
84 import org.slf4j.LoggerFactory;
85 import scala.concurrent.Await;
86 import scala.concurrent.Future;
87 import scala.concurrent.duration.Duration;
88
89 /**
90  * Abstract base class for TransactionProxy unit tests.
91  *
92  * @author Thomas Pantelis
93  */
94 public abstract class AbstractTransactionProxyTest extends AbstractTest {
95     protected final Logger log = LoggerFactory.getLogger(getClass());
96
97     private static ActorSystem system;
98
99     private final Configuration configuration = new MockConfiguration() {
100         Map<String, ShardStrategy> strategyMap = ImmutableMap.<String, ShardStrategy>builder().put(
101                 "junk", new ShardStrategy() {
102                     @Override
103                     public String findShard(YangInstanceIdentifier path) {
104                         return "junk";
105                     }
106
107                     @Override
108                     public YangInstanceIdentifier getPrefixForPath(YangInstanceIdentifier path) {
109                         return YangInstanceIdentifier.EMPTY;
110                     }
111                 }).put(
112                 "cars", new ShardStrategy() {
113                     @Override
114                     public String findShard(YangInstanceIdentifier path) {
115                         return "cars";
116                     }
117
118                     @Override
119                     public YangInstanceIdentifier getPrefixForPath(YangInstanceIdentifier path) {
120                         return YangInstanceIdentifier.EMPTY;
121                     }
122                 }).build();
123
124         @Override
125         public ShardStrategy getStrategyForModule(String moduleName) {
126             return strategyMap.get(moduleName);
127         }
128
129         @Override
130         public String getModuleNameFromNameSpace(String nameSpace) {
131             if (TestModel.JUNK_QNAME.getNamespace().toASCIIString().equals(nameSpace)) {
132                 return "junk";
133             } else if (CarsModel.BASE_QNAME.getNamespace().toASCIIString().equals(nameSpace)) {
134                 return "cars";
135             }
136             return null;
137         }
138     };
139
140     @Mock
141     protected ActorContext mockActorContext;
142
143     protected TransactionContextFactory mockComponentFactory;
144
145     private SchemaContext schemaContext;
146
147     @Mock
148     private ClusterWrapper mockClusterWrapper;
149
150     protected final String memberName = "mock-member";
151
152     private final int operationTimeoutInSeconds = 2;
153     protected final Builder dataStoreContextBuilder = DatastoreContext.newBuilder()
154             .operationTimeoutInSeconds(operationTimeoutInSeconds);
155
156     @BeforeClass
157     public static void setUpClass() throws IOException {
158
159         Config config = ConfigFactory.parseMap(ImmutableMap.<String, Object>builder()
160                 .put("akka.actor.default-dispatcher.type",
161                         "akka.testkit.CallingThreadDispatcherConfigurator").build())
162                 .withFallback(ConfigFactory.load());
163         system = ActorSystem.create("test", config);
164     }
165
166     @AfterClass
167     public static void tearDownClass() throws IOException {
168         JavaTestKit.shutdownActorSystem(system);
169         system = null;
170     }
171
172     @Before
173     public void setUp() {
174         MockitoAnnotations.initMocks(this);
175
176         schemaContext = TestModel.createTestContext();
177
178         doReturn(getSystem()).when(mockActorContext).getActorSystem();
179         doReturn(getSystem().dispatchers().defaultGlobalDispatcher()).when(mockActorContext).getClientDispatcher();
180         doReturn(MemberName.forName(memberName)).when(mockActorContext).getCurrentMemberName();
181         doReturn(new ShardStrategyFactory(configuration,
182                 LogicalDatastoreType.CONFIGURATION)).when(mockActorContext).getShardStrategyFactory();
183         doReturn(schemaContext).when(mockActorContext).getSchemaContext();
184         doReturn(new Timeout(operationTimeoutInSeconds, TimeUnit.SECONDS)).when(mockActorContext).getOperationTimeout();
185         doReturn(mockClusterWrapper).when(mockActorContext).getClusterWrapper();
186         doReturn(mockClusterWrapper).when(mockActorContext).getClusterWrapper();
187         doReturn(dataStoreContextBuilder.build()).when(mockActorContext).getDatastoreContext();
188
189         final ClientIdentifier mockClientId = MockIdentifiers.clientIdentifier(getClass(), memberName);
190         mockComponentFactory = new TransactionContextFactory(mockActorContext, mockClientId);
191
192         Timer timer = new MetricRegistry().timer("test");
193         doReturn(timer).when(mockActorContext).getOperationTimer(any(String.class));
194     }
195
196     protected ActorSystem getSystem() {
197         return system;
198     }
199
200     protected CreateTransaction eqCreateTransaction(final String expMemberName,
201             final TransactionType type) {
202         ArgumentMatcher<CreateTransaction> matcher = new ArgumentMatcher<CreateTransaction>() {
203             @Override
204             public boolean matches(Object argument) {
205                 if (CreateTransaction.class.equals(argument.getClass())) {
206                     CreateTransaction obj = CreateTransaction.fromSerializable(argument);
207                     return obj.getTransactionId().getHistoryId().getClientId().getFrontendId().getMemberName()
208                             .getName().equals(expMemberName) && obj.getTransactionType() == type.ordinal();
209                 }
210
211                 return false;
212             }
213         };
214
215         return argThat(matcher);
216     }
217
218     protected DataExists eqDataExists() {
219         ArgumentMatcher<DataExists> matcher = new ArgumentMatcher<DataExists>() {
220             @Override
221             public boolean matches(Object argument) {
222                 return argument instanceof DataExists && ((DataExists)argument).getPath().equals(TestModel.TEST_PATH);
223             }
224         };
225
226         return argThat(matcher);
227     }
228
229     protected ReadData eqReadData() {
230         return eqReadData(TestModel.TEST_PATH);
231     }
232
233     protected ReadData eqReadData(final YangInstanceIdentifier path) {
234         ArgumentMatcher<ReadData> matcher = new ArgumentMatcher<ReadData>() {
235             @Override
236             public boolean matches(Object argument) {
237                 return argument instanceof ReadData && ((ReadData)argument).getPath().equals(path);
238             }
239         };
240
241         return argThat(matcher);
242     }
243
244     protected Future<Object> readyTxReply(String path) {
245         return Futures.successful((Object)new ReadyTransactionReply(path));
246     }
247
248
249     protected Future<ReadDataReply> readDataReply(NormalizedNode<?, ?> data) {
250         return Futures.successful(new ReadDataReply(data, DataStoreVersions.CURRENT_VERSION));
251     }
252
253     protected Future<DataExistsReply> dataExistsReply(boolean exists) {
254         return Futures.successful(new DataExistsReply(exists, DataStoreVersions.CURRENT_VERSION));
255     }
256
257     protected Future<BatchedModificationsReply> batchedModificationsReply(int count) {
258         return Futures.successful(new BatchedModificationsReply(count));
259     }
260
261     @SuppressWarnings("unchecked")
262     protected Future<Object> incompleteFuture() {
263         return mock(Future.class);
264     }
265
266     protected ActorSelection actorSelection(ActorRef actorRef) {
267         return getSystem().actorSelection(actorRef.path());
268     }
269
270     protected void expectBatchedModifications(ActorRef actorRef, int count) {
271         doReturn(batchedModificationsReply(count)).when(mockActorContext).executeOperationAsync(
272                 eq(actorSelection(actorRef)), isA(BatchedModifications.class), any(Timeout.class));
273     }
274
275     protected void expectBatchedModifications(int count) {
276         doReturn(batchedModificationsReply(count)).when(mockActorContext).executeOperationAsync(
277                 any(ActorSelection.class), isA(BatchedModifications.class), any(Timeout.class));
278     }
279
280     protected void expectBatchedModificationsReady(ActorRef actorRef) {
281         expectBatchedModificationsReady(actorRef, false);
282     }
283
284     protected void expectBatchedModificationsReady(ActorRef actorRef, boolean doCommitOnReady) {
285         doReturn(doCommitOnReady ? Futures.successful(new CommitTransactionReply().toSerializable()) :
286             readyTxReply(actorRef.path().toString())).when(mockActorContext).executeOperationAsync(
287                     eq(actorSelection(actorRef)), isA(BatchedModifications.class), any(Timeout.class));
288     }
289
290     protected void expectIncompleteBatchedModifications() {
291         doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
292                 any(ActorSelection.class), isA(BatchedModifications.class), any(Timeout.class));
293     }
294
295     protected void expectFailedBatchedModifications(ActorRef actorRef) {
296         doReturn(Futures.failed(new TestException())).when(mockActorContext).executeOperationAsync(
297                 eq(actorSelection(actorRef)), isA(BatchedModifications.class), any(Timeout.class));
298     }
299
300     protected void expectReadyLocalTransaction(ActorRef actorRef, boolean doCommitOnReady) {
301         doReturn(doCommitOnReady ? Futures.successful(new CommitTransactionReply().toSerializable()) :
302             readyTxReply(actorRef.path().toString())).when(mockActorContext).executeOperationAsync(
303                     eq(actorSelection(actorRef)), isA(ReadyLocalTransaction.class), any(Timeout.class));
304     }
305
306     protected CreateTransactionReply createTransactionReply(ActorRef actorRef, short transactionVersion) {
307         return new CreateTransactionReply(actorRef.path().toString(), nextTransactionId(), transactionVersion);
308     }
309
310     protected ActorRef setupActorContextWithoutInitialCreateTransaction(ActorSystem actorSystem) {
311         return setupActorContextWithoutInitialCreateTransaction(actorSystem, DefaultShardStrategy.DEFAULT_SHARD);
312     }
313
314     protected ActorRef setupActorContextWithoutInitialCreateTransaction(ActorSystem actorSystem, String shardName) {
315         return setupActorContextWithoutInitialCreateTransaction(actorSystem, shardName,
316                 DataStoreVersions.CURRENT_VERSION);
317     }
318
319     protected ActorRef setupActorContextWithoutInitialCreateTransaction(ActorSystem actorSystem, String shardName,
320             short transactionVersion) {
321         ActorRef actorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
322         log.info("Created mock shard actor {}", actorRef);
323
324         doReturn(actorSystem.actorSelection(actorRef.path()))
325                 .when(mockActorContext).actorSelection(actorRef.path().toString());
326
327         doReturn(primaryShardInfoReply(actorSystem, actorRef, transactionVersion))
328                 .when(mockActorContext).findPrimaryShardAsync(eq(shardName));
329
330         return actorRef;
331     }
332
333     protected Future<PrimaryShardInfo> primaryShardInfoReply(ActorSystem actorSystem, ActorRef actorRef) {
334         return primaryShardInfoReply(actorSystem, actorRef, DataStoreVersions.CURRENT_VERSION);
335     }
336
337     protected Future<PrimaryShardInfo> primaryShardInfoReply(ActorSystem actorSystem, ActorRef actorRef,
338             short transactionVersion) {
339         return Futures.successful(new PrimaryShardInfo(actorSystem.actorSelection(actorRef.path()),
340                 transactionVersion));
341     }
342
343     protected ActorRef setupActorContextWithInitialCreateTransaction(ActorSystem actorSystem,
344             TransactionType type, short transactionVersion, String shardName) {
345         ActorRef shardActorRef = setupActorContextWithoutInitialCreateTransaction(actorSystem, shardName,
346                 transactionVersion);
347
348         return setupActorContextWithInitialCreateTransaction(actorSystem, type, transactionVersion,
349                 memberName, shardActorRef);
350     }
351
352     protected ActorRef setupActorContextWithInitialCreateTransaction(ActorSystem actorSystem,
353             TransactionType type, short transactionVersion, String prefix, ActorRef shardActorRef) {
354
355         ActorRef txActorRef;
356         if (type == TransactionType.WRITE_ONLY
357                 && dataStoreContextBuilder.build().isWriteOnlyTransactionOptimizationsEnabled()) {
358             txActorRef = shardActorRef;
359         } else {
360             txActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
361             log.info("Created mock shard Tx actor {}", txActorRef);
362
363             doReturn(actorSystem.actorSelection(txActorRef.path()))
364                 .when(mockActorContext).actorSelection(txActorRef.path().toString());
365
366             doReturn(Futures.successful(createTransactionReply(txActorRef, transactionVersion))).when(mockActorContext)
367                 .executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
368                         eqCreateTransaction(prefix, type), any(Timeout.class));
369         }
370
371         return txActorRef;
372     }
373
374     protected ActorRef setupActorContextWithInitialCreateTransaction(ActorSystem actorSystem, TransactionType type) {
375         return setupActorContextWithInitialCreateTransaction(actorSystem, type, DataStoreVersions.CURRENT_VERSION,
376                 DefaultShardStrategy.DEFAULT_SHARD);
377     }
378
379     protected ActorRef setupActorContextWithInitialCreateTransaction(ActorSystem actorSystem, TransactionType type,
380             String shardName) {
381         return setupActorContextWithInitialCreateTransaction(actorSystem, type, DataStoreVersions.CURRENT_VERSION,
382                 shardName);
383     }
384
385     protected void propagateReadFailedExceptionCause(CheckedFuture<?, ReadFailedException> future) throws Exception {
386         try {
387             future.checkedGet(5, TimeUnit.SECONDS);
388             fail("Expected ReadFailedException");
389         } catch (ReadFailedException e) {
390             assertNotNull("Expected a cause", e.getCause());
391             Throwable cause;
392             if (e.getCause().getCause() != null) {
393                 cause = e.getCause().getCause();
394             } else {
395                 cause = e.getCause();
396             }
397
398             Throwables.propagateIfInstanceOf(cause, Exception.class);
399             Throwables.propagate(cause);
400         }
401     }
402
403     protected List<BatchedModifications> captureBatchedModifications(ActorRef actorRef) {
404         ArgumentCaptor<BatchedModifications> batchedModificationsCaptor =
405                 ArgumentCaptor.forClass(BatchedModifications.class);
406         verify(mockActorContext, Mockito.atLeastOnce()).executeOperationAsync(
407                 eq(actorSelection(actorRef)), batchedModificationsCaptor.capture(), any(Timeout.class));
408
409         List<BatchedModifications> batchedModifications = filterCaptured(
410                 batchedModificationsCaptor, BatchedModifications.class);
411         return batchedModifications;
412     }
413
414     protected <T> List<T> filterCaptured(ArgumentCaptor<T> captor, Class<T> type) {
415         List<T> captured = new ArrayList<>();
416         for (T c: captor.getAllValues()) {
417             if (type.isInstance(c)) {
418                 captured.add(c);
419             }
420         }
421
422         return captured;
423     }
424
425     protected void verifyOneBatchedModification(ActorRef actorRef, Modification expected, boolean expIsReady) {
426         List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
427         assertEquals("Captured BatchedModifications count", 1, batchedModifications.size());
428
429         verifyBatchedModifications(batchedModifications.get(0), expIsReady, expIsReady, expected);
430     }
431
432     protected void verifyBatchedModifications(Object message, boolean expIsReady, Modification... expected) {
433         verifyBatchedModifications(message, expIsReady, false, expected);
434     }
435
436     protected void verifyBatchedModifications(Object message, boolean expIsReady, boolean expIsDoCommitOnReady,
437             Modification... expected) {
438         assertEquals("Message type", BatchedModifications.class, message.getClass());
439         BatchedModifications batchedModifications = (BatchedModifications)message;
440         assertEquals("BatchedModifications size", expected.length, batchedModifications.getModifications().size());
441         assertEquals("isReady", expIsReady, batchedModifications.isReady());
442         assertEquals("isDoCommitOnReady", expIsDoCommitOnReady, batchedModifications.isDoCommitOnReady());
443         for (int i = 0; i < batchedModifications.getModifications().size(); i++) {
444             Modification actual = batchedModifications.getModifications().get(i);
445             assertEquals("Modification type", expected[i].getClass(), actual.getClass());
446             assertEquals("getPath", ((AbstractModification)expected[i]).getPath(),
447                     ((AbstractModification)actual).getPath());
448             if (actual instanceof WriteModification) {
449                 assertEquals("getData", ((WriteModification)expected[i]).getData(),
450                         ((WriteModification)actual).getData());
451             }
452         }
453     }
454
455     @SuppressWarnings("checkstyle:IllegalCatch")
456     protected void verifyCohortFutures(AbstractThreePhaseCommitCohort<?> proxy,
457             Object... expReplies) throws Exception {
458         assertEquals("getReadyOperationFutures size", expReplies.length,
459                 proxy.getCohortFutures().size());
460
461         List<Object> futureResults = new ArrayList<>();
462         for (Future<?> future : proxy.getCohortFutures()) {
463             assertNotNull("Ready operation Future is null", future);
464             try {
465                 futureResults.add(Await.result(future, Duration.create(5, TimeUnit.SECONDS)));
466             } catch (Exception e) {
467                 futureResults.add(e);
468             }
469         }
470
471         for (Object expReply : expReplies) {
472             boolean found = false;
473             Iterator<?> iter = futureResults.iterator();
474             while (iter.hasNext()) {
475                 Object actual = iter.next();
476                 if (CommitTransactionReply.isSerializedType(expReply)
477                         && CommitTransactionReply.isSerializedType(actual)) {
478                     found = true;
479                 } else if (expReply instanceof ActorSelection && Objects.equals(expReply, actual)) {
480                     found = true;
481                 } else if (expReply instanceof Class && ((Class<?>) expReply).isInstance(actual)) {
482                     found = true;
483                 }
484
485                 if (found) {
486                     iter.remove();
487                     break;
488                 }
489             }
490
491             if (!found) {
492                 fail(String.format("No cohort Future response found for %s. Actual: %s", expReply, futureResults));
493             }
494         }
495     }
496 }

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.