2 * Copyright (c) 2018 Inocybe Technologies and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.datastore;
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertTrue;
12 import static org.mockito.ArgumentMatchers.any;
13 import static org.mockito.ArgumentMatchers.anyCollection;
14 import static org.mockito.Mockito.doReturn;
15 import static org.mockito.Mockito.mock;
16 import static org.mockito.Mockito.reset;
17 import static org.mockito.Mockito.verify;
19 import akka.actor.ActorRef;
20 import akka.pattern.Patterns;
21 import akka.util.Timeout;
22 import com.google.common.util.concurrent.FluentFuture;
23 import com.google.common.util.concurrent.Futures;
24 import com.google.common.util.concurrent.ListeningExecutorService;
25 import com.google.common.util.concurrent.MoreExecutors;
26 import com.google.common.util.concurrent.Uninterruptibles;
27 import java.util.ArrayList;
28 import java.util.Collection;
29 import java.util.concurrent.Executors;
30 import java.util.concurrent.TimeUnit;
31 import org.junit.After;
32 import org.junit.Before;
33 import org.junit.Test;
34 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
35 import org.opendaylight.controller.cluster.datastore.DataTreeCohortActor.Abort;
36 import org.opendaylight.controller.cluster.datastore.DataTreeCohortActor.CanCommit;
37 import org.opendaylight.controller.cluster.datastore.DataTreeCohortActor.Commit;
38 import org.opendaylight.controller.cluster.datastore.DataTreeCohortActor.CommitProtocolCommand;
39 import org.opendaylight.controller.cluster.datastore.DataTreeCohortActor.PreCommit;
40 import org.opendaylight.controller.cluster.datastore.DataTreeCohortActor.Success;
41 import org.opendaylight.controller.cluster.raft.TestActorFactory;
42 import org.opendaylight.mdsal.common.api.PostCanCommitStep;
43 import org.opendaylight.mdsal.common.api.PostPreCommitStep;
44 import org.opendaylight.mdsal.common.api.ThreePhaseCommitStep;
45 import org.opendaylight.mdsal.dom.api.DOMDataTreeCandidate;
46 import org.opendaylight.mdsal.dom.api.DOMDataTreeCommitCohort;
47 import org.opendaylight.yangtools.util.concurrent.FluentFutures;
48 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
49 import org.opendaylight.yangtools.yang.data.tree.api.DataValidationFailedException;
50 import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
51 import scala.concurrent.Await;
54 * Unit tests for DataTreeCohortActor.
56 * @author Thomas Pantelis
58 public class DataTreeCohortActorTest extends AbstractActorTest {
59 private static final Collection<DOMDataTreeCandidate> CANDIDATES = new ArrayList<>();
60 private static final EffectiveModelContext MOCK_SCHEMA = mock(EffectiveModelContext.class);
61 private final TestActorFactory actorFactory = new TestActorFactory(getSystem());
62 private final DOMDataTreeCommitCohort mockCohort = mock(DOMDataTreeCommitCohort.class);
63 private final PostCanCommitStep mockPostCanCommit = mock(PostCanCommitStep.class);
64 private final PostPreCommitStep mockPostPreCommit = mock(PostPreCommitStep.class);
72 public void tearDown() {
77 public void testSuccessfulThreePhaseCommit() throws Exception {
78 ActorRef cohortActor = newCohortActor("testSuccessfulThreePhaseCommit");
80 TransactionIdentifier txId = nextTransactionId();
81 askAndAwait(cohortActor, new CanCommit(txId, CANDIDATES, MOCK_SCHEMA, cohortActor));
82 verify(mockCohort).canCommit(txId, MOCK_SCHEMA, CANDIDATES);
84 askAndAwait(cohortActor, new PreCommit(txId));
85 verify(mockPostCanCommit).preCommit();
87 askAndAwait(cohortActor, new Commit(txId));
88 verify(mockPostPreCommit).commit();
91 askAndAwait(cohortActor, new CanCommit(txId, CANDIDATES, MOCK_SCHEMA, cohortActor));
92 verify(mockCohort).canCommit(txId, MOCK_SCHEMA, CANDIDATES);
96 public void testMultipleThreePhaseCommits() throws Exception {
97 ActorRef cohortActor = newCohortActor("testMultipleThreePhaseCommits");
99 TransactionIdentifier txId1 = nextTransactionId();
100 TransactionIdentifier txId2 = nextTransactionId();
102 askAndAwait(cohortActor, new CanCommit(txId1, CANDIDATES, MOCK_SCHEMA, cohortActor));
103 askAndAwait(cohortActor, new CanCommit(txId2, CANDIDATES, MOCK_SCHEMA, cohortActor));
105 askAndAwait(cohortActor, new PreCommit(txId1));
106 askAndAwait(cohortActor, new PreCommit(txId2));
108 askAndAwait(cohortActor, new Commit(txId1));
109 askAndAwait(cohortActor, new Commit(txId2));
113 public void testAsyncCohort() throws Exception {
114 ListeningExecutorService executor = MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor());
116 doReturn(executeWithDelay(executor, mockPostCanCommit))
117 .when(mockCohort).canCommit(any(Object.class), any(EffectiveModelContext.class), anyCollection());
119 doReturn(executor.submit(() -> mockPostPreCommit)).when(mockPostCanCommit).preCommit();
121 doReturn(executor.submit(() -> null)).when(mockPostPreCommit).commit();
123 ActorRef cohortActor = newCohortActor("testAsyncCohort");
125 TransactionIdentifier txId = nextTransactionId();
126 askAndAwait(cohortActor, new CanCommit(txId, CANDIDATES, MOCK_SCHEMA, cohortActor));
127 verify(mockCohort).canCommit(txId, MOCK_SCHEMA, CANDIDATES);
129 askAndAwait(cohortActor, new PreCommit(txId));
130 verify(mockPostCanCommit).preCommit();
132 askAndAwait(cohortActor, new Commit(txId));
133 verify(mockPostPreCommit).commit();
135 executor.shutdownNow();
139 public void testFailureOnCanCommit() throws Exception {
140 DataValidationFailedException failure = new DataValidationFailedException(YangInstanceIdentifier.of(),
142 doReturn(FluentFutures.immediateFailedFluentFuture(failure)).when(mockCohort).canCommit(any(Object.class),
143 any(EffectiveModelContext.class), anyCollection());
145 ActorRef cohortActor = newCohortActor("testFailureOnCanCommit");
147 TransactionIdentifier txId = nextTransactionId();
149 askAndAwait(cohortActor, new CanCommit(txId, CANDIDATES, MOCK_SCHEMA, cohortActor));
150 } catch (DataValidationFailedException e) {
151 assertEquals("DataValidationFailedException", failure, e);
155 askAndAwait(cohortActor, new CanCommit(txId, CANDIDATES, MOCK_SCHEMA, cohortActor));
156 verify(mockCohort).canCommit(txId, MOCK_SCHEMA, CANDIDATES);
160 public void testAbortAfterCanCommit() throws Exception {
161 ActorRef cohortActor = newCohortActor("testAbortAfterCanCommit");
163 TransactionIdentifier txId = nextTransactionId();
164 askAndAwait(cohortActor, new CanCommit(txId, CANDIDATES, MOCK_SCHEMA, cohortActor));
165 verify(mockCohort).canCommit(txId, MOCK_SCHEMA, CANDIDATES);
167 askAndAwait(cohortActor, new Abort(txId));
168 verify(mockPostCanCommit).abort();
171 askAndAwait(cohortActor, new CanCommit(txId, CANDIDATES, MOCK_SCHEMA, cohortActor));
172 verify(mockCohort).canCommit(txId, MOCK_SCHEMA, CANDIDATES);
176 public void testAbortAfterPreCommit() throws Exception {
177 ActorRef cohortActor = newCohortActor("testAbortAfterPreCommit");
179 TransactionIdentifier txId = nextTransactionId();
180 askAndAwait(cohortActor, new CanCommit(txId, CANDIDATES, MOCK_SCHEMA, cohortActor));
181 verify(mockCohort).canCommit(txId, MOCK_SCHEMA, CANDIDATES);
183 askAndAwait(cohortActor, new PreCommit(txId));
184 verify(mockPostCanCommit).preCommit();
186 askAndAwait(cohortActor, new Abort(txId));
187 verify(mockPostPreCommit).abort();
190 private static <T> FluentFuture<T> executeWithDelay(final ListeningExecutorService executor, final T result) {
191 return FluentFuture.from(executor.submit(() -> {
192 Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
197 private ActorRef newCohortActor(final String name) {
198 return actorFactory.createActor(DataTreeCohortActor.props(mockCohort, YangInstanceIdentifier.of()), name);
201 private void resetMockCohort() {
203 doReturn(ThreePhaseCommitStep.NOOP_ABORT_FUTURE).when(mockPostCanCommit).abort();
204 doReturn(Futures.immediateFuture(mockPostPreCommit)).when(mockPostCanCommit).preCommit();
205 doReturn(FluentFutures.immediateFluentFuture(mockPostCanCommit)).when(mockCohort).canCommit(any(Object.class),
206 any(EffectiveModelContext.class), anyCollection());
208 doReturn(ThreePhaseCommitStep.NOOP_ABORT_FUTURE).when(mockPostPreCommit).abort();
209 doReturn(Futures.immediateFuture(null)).when(mockPostPreCommit).commit();
212 private static void askAndAwait(final ActorRef actor, final CommitProtocolCommand<?> message) throws Exception {
213 Timeout timeout = new Timeout(5, TimeUnit.SECONDS);
214 Object result = Await.result(Patterns.ask(actor, message, timeout), timeout.duration());
215 assertTrue("Expected Success but was " + result, result instanceof Success);
216 assertEquals("Success", message.getTxId(), ((Success)result).getTxId());