Fix shard deadlock in 3 nodes
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / AbstractTransactionProxyTest.java
1 /*
2  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore;
9
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertNotNull;
12 import static org.junit.Assert.fail;
13 import static org.mockito.Matchers.any;
14 import static org.mockito.Matchers.argThat;
15 import static org.mockito.Matchers.eq;
16 import static org.mockito.Matchers.isA;
17 import static org.mockito.Mockito.doReturn;
18 import static org.mockito.Mockito.mock;
19 import static org.mockito.Mockito.verify;
20
21 import akka.actor.ActorRef;
22 import akka.actor.ActorSelection;
23 import akka.actor.ActorSystem;
24 import akka.actor.Props;
25 import akka.dispatch.Futures;
26 import akka.testkit.javadsl.TestKit;
27 import akka.util.Timeout;
28 import com.codahale.metrics.MetricRegistry;
29 import com.codahale.metrics.Timer;
30 import com.google.common.base.Throwables;
31 import com.google.common.collect.ImmutableMap;
32 import com.google.common.util.concurrent.CheckedFuture;
33 import com.typesafe.config.Config;
34 import com.typesafe.config.ConfigFactory;
35 import java.io.IOException;
36 import java.util.ArrayList;
37 import java.util.Iterator;
38 import java.util.List;
39 import java.util.Map;
40 import java.util.Objects;
41 import java.util.concurrent.TimeUnit;
42 import org.junit.AfterClass;
43 import org.junit.Before;
44 import org.junit.BeforeClass;
45 import org.mockito.ArgumentCaptor;
46 import org.mockito.ArgumentMatcher;
47 import org.mockito.Mock;
48 import org.mockito.Mockito;
49 import org.mockito.MockitoAnnotations;
50 import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
51 import org.opendaylight.controller.cluster.access.concepts.MemberName;
52 import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
53 import org.opendaylight.controller.cluster.datastore.TransactionProxyTest.TestException;
54 import org.opendaylight.controller.cluster.datastore.config.Configuration;
55 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
56 import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
57 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
58 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
59 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
60 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
61 import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
62 import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
63 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
64 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
65 import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction;
66 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
67 import org.opendaylight.controller.cluster.datastore.modification.AbstractModification;
68 import org.opendaylight.controller.cluster.datastore.modification.Modification;
69 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
70 import org.opendaylight.controller.cluster.datastore.shardstrategy.DefaultShardStrategy;
71 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategy;
72 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
73 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
74 import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
75 import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
76 import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
77 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
78 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
79 import org.opendaylight.mdsal.common.api.ReadFailedException;
80 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
81 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
82 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
83 import org.slf4j.Logger;
84 import org.slf4j.LoggerFactory;
85 import scala.concurrent.Await;
86 import scala.concurrent.Future;
87 import scala.concurrent.duration.Duration;
88
89 /**
90  * Abstract base class for TransactionProxy unit tests.
91  *
92  * @author Thomas Pantelis
93  */
94 public abstract class AbstractTransactionProxyTest extends AbstractTest {
95     protected final Logger log = LoggerFactory.getLogger(getClass());
96
97     private static ActorSystem system;
98
99     private final Configuration configuration = new MockConfiguration() {
100         Map<String, ShardStrategy> strategyMap = ImmutableMap.<String, ShardStrategy>builder().put(
101                 TestModel.JUNK_QNAME.getLocalName(), new ShardStrategy() {
102                     @Override
103                     public String findShard(final YangInstanceIdentifier path) {
104                         return TestModel.JUNK_QNAME.getLocalName();
105                     }
106
107                     @Override
108                     public YangInstanceIdentifier getPrefixForPath(final YangInstanceIdentifier path) {
109                         return YangInstanceIdentifier.EMPTY;
110                     }
111                 }).put(
112                 CarsModel.BASE_QNAME.getLocalName(), new ShardStrategy() {
113                     @Override
114                     public String findShard(final YangInstanceIdentifier path) {
115                         return CarsModel.BASE_QNAME.getLocalName();
116                     }
117
118                     @Override
119                     public YangInstanceIdentifier getPrefixForPath(final YangInstanceIdentifier path) {
120                         return YangInstanceIdentifier.EMPTY;
121                     }
122                 }).build();
123
124         @Override
125         public ShardStrategy getStrategyForModule(final String moduleName) {
126             return strategyMap.get(moduleName);
127         }
128
129         @Override
130         public String getModuleNameFromNameSpace(final String nameSpace) {
131             if (TestModel.JUNK_QNAME.getNamespace().toASCIIString().equals(nameSpace)) {
132                 return TestModel.JUNK_QNAME.getLocalName();
133             } else if (CarsModel.BASE_QNAME.getNamespace().toASCIIString().equals(nameSpace)) {
134                 return CarsModel.BASE_QNAME.getLocalName();
135             }
136             return null;
137         }
138     };
139
140     @Mock
141     protected ActorContext mockActorContext;
142
143     protected TransactionContextFactory mockComponentFactory;
144
145     private SchemaContext schemaContext;
146
147     @Mock
148     private ClusterWrapper mockClusterWrapper;
149
150     protected final String memberName = "mock-member";
151
152     private final int operationTimeoutInSeconds = 2;
153     protected final Builder dataStoreContextBuilder = DatastoreContext.newBuilder()
154             .operationTimeoutInSeconds(operationTimeoutInSeconds);
155
156     @BeforeClass
157     public static void setUpClass() throws IOException {
158
159         Config config = ConfigFactory.parseMap(ImmutableMap.<String, Object>builder()
160                 .put("akka.actor.default-dispatcher.type",
161                         "akka.testkit.CallingThreadDispatcherConfigurator").build())
162                 .withFallback(ConfigFactory.load());
163         system = ActorSystem.create("test", config);
164     }
165
166     @AfterClass
167     public static void tearDownClass() throws IOException {
168         TestKit.shutdownActorSystem(system);
169         system = null;
170     }
171
172     @Before
173     public void setUp() {
174         MockitoAnnotations.initMocks(this);
175
176         schemaContext = TestModel.createTestContext();
177
178         doReturn(getSystem()).when(mockActorContext).getActorSystem();
179         doReturn(getSystem().dispatchers().defaultGlobalDispatcher()).when(mockActorContext).getClientDispatcher();
180         doReturn(MemberName.forName(memberName)).when(mockActorContext).getCurrentMemberName();
181         doReturn(new ShardStrategyFactory(configuration,
182                 LogicalDatastoreType.CONFIGURATION)).when(mockActorContext).getShardStrategyFactory();
183         doReturn(schemaContext).when(mockActorContext).getSchemaContext();
184         doReturn(new Timeout(operationTimeoutInSeconds, TimeUnit.SECONDS)).when(mockActorContext).getOperationTimeout();
185         doReturn(mockClusterWrapper).when(mockActorContext).getClusterWrapper();
186         doReturn(mockClusterWrapper).when(mockActorContext).getClusterWrapper();
187         doReturn(dataStoreContextBuilder.build()).when(mockActorContext).getDatastoreContext();
188
189         final ClientIdentifier mockClientId = MockIdentifiers.clientIdentifier(getClass(), memberName);
190         mockComponentFactory = new TransactionContextFactory(mockActorContext, mockClientId);
191
192         Timer timer = new MetricRegistry().timer("test");
193         doReturn(timer).when(mockActorContext).getOperationTimer(any(String.class));
194     }
195
196     protected ActorSystem getSystem() {
197         return system;
198     }
199
200     protected CreateTransaction eqCreateTransaction(final String expMemberName,
201             final TransactionType type) {
202         ArgumentMatcher<CreateTransaction> matcher = new ArgumentMatcher<CreateTransaction>() {
203             @Override
204             public boolean matches(final Object argument) {
205                 if (CreateTransaction.class.equals(argument.getClass())) {
206                     CreateTransaction obj = CreateTransaction.fromSerializable(argument);
207                     return obj.getTransactionId().getHistoryId().getClientId().getFrontendId().getMemberName()
208                             .getName().equals(expMemberName) && obj.getTransactionType() == type.ordinal();
209                 }
210
211                 return false;
212             }
213         };
214
215         return argThat(matcher);
216     }
217
218     protected DataExists eqDataExists() {
219         ArgumentMatcher<DataExists> matcher = new ArgumentMatcher<DataExists>() {
220             @Override
221             public boolean matches(final Object argument) {
222                 return argument instanceof DataExists && ((DataExists)argument).getPath().equals(TestModel.TEST_PATH);
223             }
224         };
225
226         return argThat(matcher);
227     }
228
229     protected ReadData eqReadData() {
230         return eqReadData(TestModel.TEST_PATH);
231     }
232
233     protected ReadData eqReadData(final YangInstanceIdentifier path) {
234         ArgumentMatcher<ReadData> matcher = new ArgumentMatcher<ReadData>() {
235             @Override
236             public boolean matches(final Object argument) {
237                 return argument instanceof ReadData && ((ReadData)argument).getPath().equals(path);
238             }
239         };
240
241         return argThat(matcher);
242     }
243
244     protected Future<Object> readyTxReply(final String path) {
245         return Futures.successful((Object)new ReadyTransactionReply(path));
246     }
247
248
249     protected Future<ReadDataReply> readDataReply(final NormalizedNode<?, ?> data) {
250         return Futures.successful(new ReadDataReply(data, DataStoreVersions.CURRENT_VERSION));
251     }
252
253     protected Future<DataExistsReply> dataExistsReply(final boolean exists) {
254         return Futures.successful(new DataExistsReply(exists, DataStoreVersions.CURRENT_VERSION));
255     }
256
257     protected Future<BatchedModificationsReply> batchedModificationsReply(final int count) {
258         return Futures.successful(new BatchedModificationsReply(count));
259     }
260
261     @SuppressWarnings("unchecked")
262     protected Future<Object> incompleteFuture() {
263         return mock(Future.class);
264     }
265
266     protected ActorSelection actorSelection(final ActorRef actorRef) {
267         return getSystem().actorSelection(actorRef.path());
268     }
269
270     protected void expectBatchedModifications(final ActorRef actorRef, final int count) {
271         doReturn(batchedModificationsReply(count)).when(mockActorContext).executeOperationAsync(
272                 eq(actorSelection(actorRef)), isA(BatchedModifications.class), any(Timeout.class));
273     }
274
275     protected void expectBatchedModifications(final int count) {
276         doReturn(batchedModificationsReply(count)).when(mockActorContext).executeOperationAsync(
277                 any(ActorSelection.class), isA(BatchedModifications.class), any(Timeout.class));
278     }
279
280     protected void expectBatchedModificationsReady(final ActorRef actorRef) {
281         expectBatchedModificationsReady(actorRef, false);
282     }
283
284     protected void expectBatchedModificationsReady(final ActorRef actorRef, final boolean doCommitOnReady) {
285         doReturn(doCommitOnReady ? Futures.successful(new CommitTransactionReply().toSerializable()) :
286             readyTxReply(actorRef.path().toString())).when(mockActorContext).executeOperationAsync(
287                     eq(actorSelection(actorRef)), isA(BatchedModifications.class), any(Timeout.class));
288     }
289
290     protected void expectIncompleteBatchedModifications() {
291         doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
292                 any(ActorSelection.class), isA(BatchedModifications.class), any(Timeout.class));
293     }
294
295     protected void expectFailedBatchedModifications(final ActorRef actorRef) {
296         doReturn(Futures.failed(new TestException())).when(mockActorContext).executeOperationAsync(
297                 eq(actorSelection(actorRef)), isA(BatchedModifications.class), any(Timeout.class));
298     }
299
300     protected void expectReadyLocalTransaction(final ActorRef actorRef, final boolean doCommitOnReady) {
301         doReturn(doCommitOnReady ? Futures.successful(new CommitTransactionReply().toSerializable()) :
302             readyTxReply(actorRef.path().toString())).when(mockActorContext).executeOperationAsync(
303                     eq(actorSelection(actorRef)), isA(ReadyLocalTransaction.class), any(Timeout.class));
304     }
305
306     protected CreateTransactionReply createTransactionReply(final ActorRef actorRef, final short transactionVersion) {
307         return new CreateTransactionReply(actorRef.path().toString(), nextTransactionId(), transactionVersion);
308     }
309
310     protected ActorRef setupActorContextWithoutInitialCreateTransaction(final ActorSystem actorSystem) {
311         return setupActorContextWithoutInitialCreateTransaction(actorSystem, DefaultShardStrategy.DEFAULT_SHARD);
312     }
313
314     protected ActorRef setupActorContextWithoutInitialCreateTransaction(final ActorSystem actorSystem,
315             final String shardName) {
316         return setupActorContextWithoutInitialCreateTransaction(actorSystem, shardName,
317                 DataStoreVersions.CURRENT_VERSION);
318     }
319
320     protected ActorRef setupActorContextWithoutInitialCreateTransaction(final ActorSystem actorSystem,
321             final String shardName, final short transactionVersion) {
322         ActorRef actorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
323         log.info("Created mock shard actor {}", actorRef);
324
325         doReturn(actorSystem.actorSelection(actorRef.path()))
326                 .when(mockActorContext).actorSelection(actorRef.path().toString());
327
328         doReturn(primaryShardInfoReply(actorSystem, actorRef, transactionVersion))
329                 .when(mockActorContext).findPrimaryShardAsync(eq(shardName));
330
331         return actorRef;
332     }
333
334     protected Future<PrimaryShardInfo> primaryShardInfoReply(final ActorSystem actorSystem, final ActorRef actorRef) {
335         return primaryShardInfoReply(actorSystem, actorRef, DataStoreVersions.CURRENT_VERSION);
336     }
337
338     protected Future<PrimaryShardInfo> primaryShardInfoReply(final ActorSystem actorSystem, final ActorRef actorRef,
339             final short transactionVersion) {
340         return Futures.successful(new PrimaryShardInfo(actorSystem.actorSelection(actorRef.path()),
341                 transactionVersion));
342     }
343
344     protected ActorRef setupActorContextWithInitialCreateTransaction(final ActorSystem actorSystem,
345             final TransactionType type, final short transactionVersion, final String shardName) {
346         ActorRef shardActorRef = setupActorContextWithoutInitialCreateTransaction(actorSystem, shardName,
347                 transactionVersion);
348
349         return setupActorContextWithInitialCreateTransaction(actorSystem, type, transactionVersion,
350                 memberName, shardActorRef);
351     }
352
353     protected ActorRef setupActorContextWithInitialCreateTransaction(final ActorSystem actorSystem,
354             final TransactionType type, final short transactionVersion, final String prefix,
355             final ActorRef shardActorRef) {
356
357         ActorRef txActorRef;
358         if (type == TransactionType.WRITE_ONLY
359                 && dataStoreContextBuilder.build().isWriteOnlyTransactionOptimizationsEnabled()) {
360             txActorRef = shardActorRef;
361         } else {
362             txActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
363             log.info("Created mock shard Tx actor {}", txActorRef);
364
365             doReturn(actorSystem.actorSelection(txActorRef.path()))
366                 .when(mockActorContext).actorSelection(txActorRef.path().toString());
367
368             doReturn(Futures.successful(createTransactionReply(txActorRef, transactionVersion))).when(mockActorContext)
369                 .executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
370                         eqCreateTransaction(prefix, type), any(Timeout.class));
371         }
372
373         return txActorRef;
374     }
375
376     protected ActorRef setupActorContextWithInitialCreateTransaction(final ActorSystem actorSystem,
377             final TransactionType type) {
378         return setupActorContextWithInitialCreateTransaction(actorSystem, type, DataStoreVersions.CURRENT_VERSION,
379                 DefaultShardStrategy.DEFAULT_SHARD);
380     }
381
382     protected ActorRef setupActorContextWithInitialCreateTransaction(final ActorSystem actorSystem,
383             final TransactionType type,
384             final String shardName) {
385         return setupActorContextWithInitialCreateTransaction(actorSystem, type, DataStoreVersions.CURRENT_VERSION,
386                 shardName);
387     }
388
389     @SuppressWarnings("checkstyle:avoidHidingCauseException")
390     protected void propagateReadFailedExceptionCause(final CheckedFuture<?, ReadFailedException> future)
391             throws Exception {
392         try {
393             future.checkedGet(5, TimeUnit.SECONDS);
394             fail("Expected ReadFailedException");
395         } catch (ReadFailedException e) {
396             assertNotNull("Expected a cause", e.getCause());
397             Throwable cause;
398             if (e.getCause().getCause() != null) {
399                 cause = e.getCause().getCause();
400             } else {
401                 cause = e.getCause();
402             }
403
404             Throwables.propagateIfPossible(cause, Exception.class);
405             throw new RuntimeException(cause);
406         }
407     }
408
409     protected List<BatchedModifications> captureBatchedModifications(final ActorRef actorRef) {
410         ArgumentCaptor<BatchedModifications> batchedModificationsCaptor =
411                 ArgumentCaptor.forClass(BatchedModifications.class);
412         verify(mockActorContext, Mockito.atLeastOnce()).executeOperationAsync(
413                 eq(actorSelection(actorRef)), batchedModificationsCaptor.capture(), any(Timeout.class));
414
415         List<BatchedModifications> batchedModifications = filterCaptured(
416                 batchedModificationsCaptor, BatchedModifications.class);
417         return batchedModifications;
418     }
419
420     protected <T> List<T> filterCaptured(final ArgumentCaptor<T> captor, final Class<T> type) {
421         List<T> captured = new ArrayList<>();
422         for (T c: captor.getAllValues()) {
423             if (type.isInstance(c)) {
424                 captured.add(c);
425             }
426         }
427
428         return captured;
429     }
430
431     protected void verifyOneBatchedModification(final ActorRef actorRef, final Modification expected,
432             final boolean expIsReady) {
433         List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
434         assertEquals("Captured BatchedModifications count", 1, batchedModifications.size());
435
436         verifyBatchedModifications(batchedModifications.get(0), expIsReady, expIsReady, expected);
437     }
438
439     protected void verifyBatchedModifications(final Object message, final boolean expIsReady,
440             final Modification... expected) {
441         verifyBatchedModifications(message, expIsReady, false, expected);
442     }
443
444     protected void verifyBatchedModifications(final Object message, final boolean expIsReady,
445             final boolean expIsDoCommitOnReady, final Modification... expected) {
446         assertEquals("Message type", BatchedModifications.class, message.getClass());
447         BatchedModifications batchedModifications = (BatchedModifications)message;
448         assertEquals("BatchedModifications size", expected.length, batchedModifications.getModifications().size());
449         assertEquals("isReady", expIsReady, batchedModifications.isReady());
450         assertEquals("isDoCommitOnReady", expIsDoCommitOnReady, batchedModifications.isDoCommitOnReady());
451         for (int i = 0; i < batchedModifications.getModifications().size(); i++) {
452             Modification actual = batchedModifications.getModifications().get(i);
453             assertEquals("Modification type", expected[i].getClass(), actual.getClass());
454             assertEquals("getPath", ((AbstractModification)expected[i]).getPath(),
455                     ((AbstractModification)actual).getPath());
456             if (actual instanceof WriteModification) {
457                 assertEquals("getData", ((WriteModification)expected[i]).getData(),
458                         ((WriteModification)actual).getData());
459             }
460         }
461     }
462
463     @SuppressWarnings("checkstyle:IllegalCatch")
464     protected void verifyCohortFutures(final AbstractThreePhaseCommitCohort<?> proxy,
465             final Object... expReplies) {
466         assertEquals("getReadyOperationFutures size", expReplies.length,
467                 proxy.getCohortFutures().size());
468
469         List<Object> futureResults = new ArrayList<>();
470         for (Future<?> future : proxy.getCohortFutures()) {
471             assertNotNull("Ready operation Future is null", future);
472             try {
473                 futureResults.add(Await.result(future, Duration.create(5, TimeUnit.SECONDS)));
474             } catch (Exception e) {
475                 futureResults.add(e);
476             }
477         }
478
479         for (Object expReply : expReplies) {
480             boolean found = false;
481             Iterator<?> iter = futureResults.iterator();
482             while (iter.hasNext()) {
483                 Object actual = iter.next();
484                 if (CommitTransactionReply.isSerializedType(expReply)
485                         && CommitTransactionReply.isSerializedType(actual)) {
486                     found = true;
487                 } else if (expReply instanceof ActorSelection && Objects.equals(expReply, actual)) {
488                     found = true;
489                 } else if (expReply instanceof Class && ((Class<?>) expReply).isInstance(actual)) {
490                     found = true;
491                 }
492
493                 if (found) {
494                     iter.remove();
495                     break;
496                 }
497             }
498
499             if (!found) {
500                 fail(String.format("No cohort Future response found for %s. Actual: %s", expReply, futureResults));
501             }
502         }
503     }
504 }