Add UnsignedLongBitmap
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / AbstractTransactionProxyTest.java
1 /*
2  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore;
9
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertNotNull;
12 import static org.junit.Assert.assertTrue;
13 import static org.junit.Assert.fail;
14 import static org.mockito.ArgumentMatchers.any;
15 import static org.mockito.ArgumentMatchers.argThat;
16 import static org.mockito.ArgumentMatchers.eq;
17 import static org.mockito.ArgumentMatchers.isA;
18 import static org.mockito.Mockito.doReturn;
19 import static org.mockito.Mockito.mock;
20 import static org.mockito.Mockito.verify;
21
22 import akka.actor.ActorRef;
23 import akka.actor.ActorSelection;
24 import akka.actor.ActorSystem;
25 import akka.actor.Props;
26 import akka.dispatch.Futures;
27 import akka.testkit.javadsl.TestKit;
28 import akka.util.Timeout;
29 import com.codahale.metrics.MetricRegistry;
30 import com.codahale.metrics.Timer;
31 import com.google.common.base.Throwables;
32 import com.google.common.collect.ImmutableMap;
33 import com.google.common.util.concurrent.FluentFuture;
34 import com.typesafe.config.Config;
35 import com.typesafe.config.ConfigFactory;
36 import java.util.ArrayList;
37 import java.util.Iterator;
38 import java.util.List;
39 import java.util.Map;
40 import java.util.Objects;
41 import java.util.concurrent.ExecutionException;
42 import java.util.concurrent.TimeUnit;
43 import org.junit.AfterClass;
44 import org.junit.Before;
45 import org.junit.BeforeClass;
46 import org.mockito.ArgumentCaptor;
47 import org.mockito.ArgumentMatcher;
48 import org.mockito.Mock;
49 import org.mockito.Mockito;
50 import org.mockito.MockitoAnnotations;
51 import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
52 import org.opendaylight.controller.cluster.access.concepts.MemberName;
53 import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
54 import org.opendaylight.controller.cluster.datastore.TransactionProxyTest.TestException;
55 import org.opendaylight.controller.cluster.datastore.config.Configuration;
56 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
57 import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
58 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
59 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
60 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
61 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
62 import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
63 import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
64 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
65 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
66 import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction;
67 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
68 import org.opendaylight.controller.cluster.datastore.modification.AbstractModification;
69 import org.opendaylight.controller.cluster.datastore.modification.Modification;
70 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
71 import org.opendaylight.controller.cluster.datastore.shardstrategy.DefaultShardStrategy;
72 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategy;
73 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
74 import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
75 import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
76 import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
77 import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
78 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
79 import org.opendaylight.mdsal.common.api.ReadFailedException;
80 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
81 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
82 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
83 import org.slf4j.Logger;
84 import org.slf4j.LoggerFactory;
85 import scala.concurrent.Await;
86 import scala.concurrent.Future;
87 import scala.concurrent.duration.FiniteDuration;
88
89 /**
90  * Abstract base class for TransactionProxy unit tests.
91  *
92  * @author Thomas Pantelis
93  */
94 public abstract class AbstractTransactionProxyTest extends AbstractTest {
95     protected final Logger log = LoggerFactory.getLogger(getClass());
96
97     private static ActorSystem system;
98     private static SchemaContext SCHEMA_CONTEXT;
99
100     private final Configuration configuration = new MockConfiguration() {
101         Map<String, ShardStrategy> strategyMap = ImmutableMap.<String, ShardStrategy>builder().put(
102                 TestModel.JUNK_QNAME.getLocalName(), new ShardStrategy() {
103                     @Override
104                     public String findShard(final YangInstanceIdentifier path) {
105                         return TestModel.JUNK_QNAME.getLocalName();
106                     }
107                 }).put(
108                 CarsModel.BASE_QNAME.getLocalName(), new ShardStrategy() {
109                     @Override
110                     public String findShard(final YangInstanceIdentifier path) {
111                         return CarsModel.BASE_QNAME.getLocalName();
112                     }
113                 }).build();
114
115         @Override
116         public ShardStrategy getStrategyForModule(final String moduleName) {
117             return strategyMap.get(moduleName);
118         }
119
120         @Override
121         public String getModuleNameFromNameSpace(final String nameSpace) {
122             if (TestModel.JUNK_QNAME.getNamespace().toString().equals(nameSpace)) {
123                 return TestModel.JUNK_QNAME.getLocalName();
124             } else if (CarsModel.BASE_QNAME.getNamespace().toString().equals(nameSpace)) {
125                 return CarsModel.BASE_QNAME.getLocalName();
126             }
127             return null;
128         }
129     };
130
131     @Mock
132     protected ActorUtils mockActorContext;
133
134     protected TransactionContextFactory mockComponentFactory;
135
136     @Mock
137     private ClusterWrapper mockClusterWrapper;
138
139     protected final String memberName = "mock-member";
140
141     private final int operationTimeoutInSeconds = 2;
142     protected final Builder dataStoreContextBuilder = DatastoreContext.newBuilder()
143             .operationTimeoutInSeconds(operationTimeoutInSeconds);
144
145     @BeforeClass
146     public static void setUpClass() {
147
148         Config config = ConfigFactory.parseMap(ImmutableMap.<String, Object>builder()
149                 .put("akka.actor.default-dispatcher.type",
150                         "akka.testkit.CallingThreadDispatcherConfigurator").build())
151                 .withFallback(ConfigFactory.load());
152         system = ActorSystem.create("test", config);
153         SCHEMA_CONTEXT = TestModel.createTestContext();
154     }
155
156     @AfterClass
157     public static void tearDownClass() {
158         TestKit.shutdownActorSystem(system);
159         system = null;
160         SCHEMA_CONTEXT = null;
161     }
162
163     @Before
164     public void setUp() {
165         MockitoAnnotations.initMocks(this);
166
167         doReturn(getSystem()).when(mockActorContext).getActorSystem();
168         doReturn(getSystem().dispatchers().defaultGlobalDispatcher()).when(mockActorContext).getClientDispatcher();
169         doReturn(MemberName.forName(memberName)).when(mockActorContext).getCurrentMemberName();
170         doReturn(new ShardStrategyFactory(configuration)).when(mockActorContext).getShardStrategyFactory();
171         doReturn(SCHEMA_CONTEXT).when(mockActorContext).getSchemaContext();
172         doReturn(new Timeout(operationTimeoutInSeconds, TimeUnit.SECONDS)).when(mockActorContext).getOperationTimeout();
173         doReturn(mockClusterWrapper).when(mockActorContext).getClusterWrapper();
174         doReturn(mockClusterWrapper).when(mockActorContext).getClusterWrapper();
175         doReturn(dataStoreContextBuilder.build()).when(mockActorContext).getDatastoreContext();
176         doReturn(new Timeout(5, TimeUnit.SECONDS)).when(mockActorContext).getTransactionCommitOperationTimeout();
177
178         final ClientIdentifier mockClientId = MockIdentifiers.clientIdentifier(getClass(), memberName);
179         mockComponentFactory = new TransactionContextFactory(mockActorContext, mockClientId);
180
181         Timer timer = new MetricRegistry().timer("test");
182         doReturn(timer).when(mockActorContext).getOperationTimer(any(String.class));
183     }
184
185     protected ActorSystem getSystem() {
186         return system;
187     }
188
189     protected CreateTransaction eqCreateTransaction(final String expMemberName,
190             final TransactionType type) {
191         class CreateTransactionArgumentMatcher implements ArgumentMatcher<CreateTransaction> {
192             @Override
193             public boolean matches(final CreateTransaction argument) {
194                 return argument.getTransactionId().getHistoryId().getClientId().getFrontendId().getMemberName()
195                         .getName().equals(expMemberName) && argument.getTransactionType() == type.ordinal();
196             }
197         }
198
199         return argThat(new CreateTransactionArgumentMatcher());
200     }
201
202     protected DataExists eqDataExists() {
203         class DataExistsArgumentMatcher implements ArgumentMatcher<DataExists> {
204             @Override
205             public boolean matches(final DataExists argument) {
206                 return argument.getPath().equals(TestModel.TEST_PATH);
207             }
208         }
209
210         return argThat(new DataExistsArgumentMatcher());
211     }
212
213     protected ReadData eqReadData() {
214         return eqReadData(TestModel.TEST_PATH);
215     }
216
217     protected ReadData eqReadData(final YangInstanceIdentifier path) {
218         class ReadDataArgumentMatcher implements ArgumentMatcher<ReadData> {
219             @Override
220             public boolean matches(final ReadData argument) {
221                 return argument.getPath().equals(path);
222             }
223         }
224
225         return argThat(new ReadDataArgumentMatcher());
226     }
227
228     protected Future<Object> readyTxReply(final String path) {
229         return Futures.successful((Object)new ReadyTransactionReply(path));
230     }
231
232
233     protected Future<ReadDataReply> readDataReply(final NormalizedNode data) {
234         return Futures.successful(new ReadDataReply(data, DataStoreVersions.CURRENT_VERSION));
235     }
236
237     protected Future<DataExistsReply> dataExistsReply(final boolean exists) {
238         return Futures.successful(new DataExistsReply(exists, DataStoreVersions.CURRENT_VERSION));
239     }
240
241     protected Future<BatchedModificationsReply> batchedModificationsReply(final int count) {
242         return Futures.successful(new BatchedModificationsReply(count));
243     }
244
245     @SuppressWarnings("unchecked")
246     protected Future<Object> incompleteFuture() {
247         return mock(Future.class);
248     }
249
250     protected ActorSelection actorSelection(final ActorRef actorRef) {
251         return getSystem().actorSelection(actorRef.path());
252     }
253
254     protected void expectBatchedModifications(final ActorRef actorRef, final int count) {
255         doReturn(batchedModificationsReply(count)).when(mockActorContext).executeOperationAsync(
256                 eq(actorSelection(actorRef)), isA(BatchedModifications.class), any(Timeout.class));
257     }
258
259     protected void expectBatchedModifications(final int count) {
260         doReturn(batchedModificationsReply(count)).when(mockActorContext).executeOperationAsync(
261                 any(ActorSelection.class), isA(BatchedModifications.class), any(Timeout.class));
262     }
263
264     protected void expectBatchedModificationsReady(final ActorRef actorRef) {
265         expectBatchedModificationsReady(actorRef, false);
266     }
267
268     protected void expectBatchedModificationsReady(final ActorRef actorRef, final boolean doCommitOnReady) {
269         doReturn(doCommitOnReady ? Futures.successful(new CommitTransactionReply().toSerializable()) :
270             readyTxReply(actorRef.path().toString())).when(mockActorContext).executeOperationAsync(
271                     eq(actorSelection(actorRef)), isA(BatchedModifications.class), any(Timeout.class));
272     }
273
274     protected void expectIncompleteBatchedModifications() {
275         doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
276                 any(ActorSelection.class), isA(BatchedModifications.class), any(Timeout.class));
277     }
278
279     protected void expectFailedBatchedModifications(final ActorRef actorRef) {
280         doReturn(Futures.failed(new TestException())).when(mockActorContext).executeOperationAsync(
281                 eq(actorSelection(actorRef)), isA(BatchedModifications.class), any(Timeout.class));
282     }
283
284     protected void expectReadyLocalTransaction(final ActorRef actorRef, final boolean doCommitOnReady) {
285         doReturn(doCommitOnReady ? Futures.successful(new CommitTransactionReply().toSerializable()) :
286             readyTxReply(actorRef.path().toString())).when(mockActorContext).executeOperationAsync(
287                     eq(actorSelection(actorRef)), isA(ReadyLocalTransaction.class), any(Timeout.class));
288     }
289
290     protected CreateTransactionReply createTransactionReply(final ActorRef actorRef, final short transactionVersion) {
291         return new CreateTransactionReply(actorRef.path().toString(), nextTransactionId(), transactionVersion);
292     }
293
294     protected ActorRef setupActorContextWithoutInitialCreateTransaction(final ActorSystem actorSystem) {
295         return setupActorContextWithoutInitialCreateTransaction(actorSystem, DefaultShardStrategy.DEFAULT_SHARD);
296     }
297
298     protected ActorRef setupActorContextWithoutInitialCreateTransaction(final ActorSystem actorSystem,
299             final String shardName) {
300         return setupActorContextWithoutInitialCreateTransaction(actorSystem, shardName,
301                 DataStoreVersions.CURRENT_VERSION);
302     }
303
304     protected ActorRef setupActorContextWithoutInitialCreateTransaction(final ActorSystem actorSystem,
305             final String shardName, final short transactionVersion) {
306         ActorRef actorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
307         log.info("Created mock shard actor {}", actorRef);
308
309         doReturn(actorSystem.actorSelection(actorRef.path()))
310                 .when(mockActorContext).actorSelection(actorRef.path().toString());
311
312         doReturn(primaryShardInfoReply(actorSystem, actorRef, transactionVersion))
313                 .when(mockActorContext).findPrimaryShardAsync(eq(shardName));
314
315         return actorRef;
316     }
317
318     protected Future<PrimaryShardInfo> primaryShardInfoReply(final ActorSystem actorSystem, final ActorRef actorRef) {
319         return primaryShardInfoReply(actorSystem, actorRef, DataStoreVersions.CURRENT_VERSION);
320     }
321
322     protected Future<PrimaryShardInfo> primaryShardInfoReply(final ActorSystem actorSystem, final ActorRef actorRef,
323             final short transactionVersion) {
324         return Futures.successful(new PrimaryShardInfo(actorSystem.actorSelection(actorRef.path()),
325                 transactionVersion));
326     }
327
328     protected ActorRef setupActorContextWithInitialCreateTransaction(final ActorSystem actorSystem,
329             final TransactionType type, final short transactionVersion, final String shardName) {
330         ActorRef shardActorRef = setupActorContextWithoutInitialCreateTransaction(actorSystem, shardName,
331                 transactionVersion);
332
333         return setupActorContextWithInitialCreateTransaction(actorSystem, type, transactionVersion,
334                 memberName, shardActorRef);
335     }
336
337     protected ActorRef setupActorContextWithInitialCreateTransaction(final ActorSystem actorSystem,
338             final TransactionType type, final short transactionVersion, final String prefix,
339             final ActorRef shardActorRef) {
340
341         ActorRef txActorRef;
342         if (type == TransactionType.WRITE_ONLY
343                 && dataStoreContextBuilder.build().isWriteOnlyTransactionOptimizationsEnabled()) {
344             txActorRef = shardActorRef;
345         } else {
346             txActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
347             log.info("Created mock shard Tx actor {}", txActorRef);
348
349             doReturn(actorSystem.actorSelection(txActorRef.path()))
350                 .when(mockActorContext).actorSelection(txActorRef.path().toString());
351
352             doReturn(Futures.successful(createTransactionReply(txActorRef, transactionVersion))).when(mockActorContext)
353                 .executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
354                         eqCreateTransaction(prefix, type), any(Timeout.class));
355         }
356
357         return txActorRef;
358     }
359
360     protected ActorRef setupActorContextWithInitialCreateTransaction(final ActorSystem actorSystem,
361             final TransactionType type) {
362         return setupActorContextWithInitialCreateTransaction(actorSystem, type, DataStoreVersions.CURRENT_VERSION,
363                 DefaultShardStrategy.DEFAULT_SHARD);
364     }
365
366     protected ActorRef setupActorContextWithInitialCreateTransaction(final ActorSystem actorSystem,
367             final TransactionType type,
368             final String shardName) {
369         return setupActorContextWithInitialCreateTransaction(actorSystem, type, DataStoreVersions.CURRENT_VERSION,
370                 shardName);
371     }
372
373     @SuppressWarnings({"checkstyle:avoidHidingCauseException", "checkstyle:IllegalThrows"})
374     protected void propagateReadFailedExceptionCause(final FluentFuture<?> future) throws Throwable {
375         try {
376             future.get(5, TimeUnit.SECONDS);
377             fail("Expected ReadFailedException");
378         } catch (ExecutionException e) {
379             final Throwable cause = e.getCause();
380             assertTrue("Unexpected cause: " + cause.getClass(), cause instanceof ReadFailedException);
381             throw Throwables.getRootCause(cause);
382         }
383     }
384
385     protected List<BatchedModifications> captureBatchedModifications(final ActorRef actorRef) {
386         ArgumentCaptor<BatchedModifications> batchedModificationsCaptor =
387                 ArgumentCaptor.forClass(BatchedModifications.class);
388         verify(mockActorContext, Mockito.atLeastOnce()).executeOperationAsync(
389                 eq(actorSelection(actorRef)), batchedModificationsCaptor.capture(), any(Timeout.class));
390
391         List<BatchedModifications> batchedModifications = filterCaptured(
392                 batchedModificationsCaptor, BatchedModifications.class);
393         return batchedModifications;
394     }
395
396     protected <T> List<T> filterCaptured(final ArgumentCaptor<T> captor, final Class<T> type) {
397         List<T> captured = new ArrayList<>();
398         for (T c: captor.getAllValues()) {
399             if (type.isInstance(c)) {
400                 captured.add(c);
401             }
402         }
403
404         return captured;
405     }
406
407     protected void verifyOneBatchedModification(final ActorRef actorRef, final Modification expected,
408             final boolean expIsReady) {
409         List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
410         assertEquals("Captured BatchedModifications count", 1, batchedModifications.size());
411
412         verifyBatchedModifications(batchedModifications.get(0), expIsReady, expIsReady, expected);
413     }
414
415     protected void verifyBatchedModifications(final Object message, final boolean expIsReady,
416             final Modification... expected) {
417         verifyBatchedModifications(message, expIsReady, false, expected);
418     }
419
420     protected void verifyBatchedModifications(final Object message, final boolean expIsReady,
421             final boolean expIsDoCommitOnReady, final Modification... expected) {
422         assertEquals("Message type", BatchedModifications.class, message.getClass());
423         BatchedModifications batchedModifications = (BatchedModifications)message;
424         assertEquals("BatchedModifications size", expected.length, batchedModifications.getModifications().size());
425         assertEquals("isReady", expIsReady, batchedModifications.isReady());
426         assertEquals("isDoCommitOnReady", expIsDoCommitOnReady, batchedModifications.isDoCommitOnReady());
427         for (int i = 0; i < batchedModifications.getModifications().size(); i++) {
428             Modification actual = batchedModifications.getModifications().get(i);
429             assertEquals("Modification type", expected[i].getClass(), actual.getClass());
430             assertEquals("getPath", ((AbstractModification)expected[i]).getPath(),
431                     ((AbstractModification)actual).getPath());
432             if (actual instanceof WriteModification) {
433                 assertEquals("getData", ((WriteModification)expected[i]).getData(),
434                         ((WriteModification)actual).getData());
435             }
436         }
437     }
438
439     @SuppressWarnings("checkstyle:IllegalCatch")
440     protected void verifyCohortFutures(final AbstractThreePhaseCommitCohort<?> proxy,
441             final Object... expReplies) {
442         assertEquals("getReadyOperationFutures size", expReplies.length,
443                 proxy.getCohortFutures().size());
444
445         List<Object> futureResults = new ArrayList<>();
446         for (Future<?> future : proxy.getCohortFutures()) {
447             assertNotNull("Ready operation Future is null", future);
448             try {
449                 futureResults.add(Await.result(future, FiniteDuration.create(5, TimeUnit.SECONDS)));
450             } catch (Exception e) {
451                 futureResults.add(e);
452             }
453         }
454
455         for (Object expReply : expReplies) {
456             boolean found = false;
457             Iterator<?> iter = futureResults.iterator();
458             while (iter.hasNext()) {
459                 Object actual = iter.next();
460                 if (CommitTransactionReply.isSerializedType(expReply)
461                         && CommitTransactionReply.isSerializedType(actual)
462                         || expReply instanceof ActorSelection && Objects.equals(expReply, actual)) {
463                     found = true;
464                 } else if (expReply instanceof Class && ((Class<?>) expReply).isInstance(actual)) {
465                     found = true;
466                 }
467
468                 if (found) {
469                     iter.remove();
470                     break;
471                 }
472             }
473
474             if (!found) {
475                 fail(String.format("No cohort Future response found for %s. Actual: %s", expReply, futureResults));
476             }
477         }
478     }
479 }