2 * Copyright (c) 2018 Inocybe Technologies and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.datastore;
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertTrue;
12 import static org.mockito.Matchers.any;
13 import static org.mockito.Mockito.doReturn;
14 import static org.mockito.Mockito.mock;
15 import static org.mockito.Mockito.reset;
16 import static org.mockito.Mockito.verify;
18 import akka.actor.ActorRef;
19 import akka.pattern.Patterns;
20 import akka.util.Timeout;
21 import com.google.common.util.concurrent.Futures;
22 import com.google.common.util.concurrent.JdkFutureAdapters;
23 import com.google.common.util.concurrent.ListenableFuture;
24 import com.google.common.util.concurrent.MoreExecutors;
25 import com.google.common.util.concurrent.Uninterruptibles;
26 import java.util.ArrayList;
27 import java.util.Collection;
28 import java.util.concurrent.ExecutorService;
29 import java.util.concurrent.Executors;
30 import java.util.concurrent.TimeUnit;
31 import org.junit.After;
32 import org.junit.Before;
33 import org.junit.Test;
34 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
35 import org.opendaylight.controller.cluster.datastore.DataTreeCohortActor.Abort;
36 import org.opendaylight.controller.cluster.datastore.DataTreeCohortActor.CanCommit;
37 import org.opendaylight.controller.cluster.datastore.DataTreeCohortActor.Commit;
38 import org.opendaylight.controller.cluster.datastore.DataTreeCohortActor.CommitProtocolCommand;
39 import org.opendaylight.controller.cluster.datastore.DataTreeCohortActor.PreCommit;
40 import org.opendaylight.controller.cluster.datastore.DataTreeCohortActor.Success;
41 import org.opendaylight.controller.cluster.raft.TestActorFactory;
42 import org.opendaylight.mdsal.common.api.PostCanCommitStep;
43 import org.opendaylight.mdsal.common.api.PostPreCommitStep;
44 import org.opendaylight.mdsal.common.api.ThreePhaseCommitStep;
45 import org.opendaylight.mdsal.dom.api.DOMDataTreeCandidate;
46 import org.opendaylight.mdsal.dom.api.DOMDataTreeCommitCohort;
47 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
48 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
49 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
50 import scala.concurrent.Await;
53 * Unit tests for DataTreeCohortActor.
55 * @author Thomas Pantelis
57 public class DataTreeCohortActorTest extends AbstractActorTest {
58 private static final Collection<DOMDataTreeCandidate> CANDIDATES = new ArrayList<>();
59 private static final SchemaContext MOCK_SCHEMA = mock(SchemaContext.class);
60 private final TestActorFactory actorFactory = new TestActorFactory(getSystem());
61 private final DOMDataTreeCommitCohort mockCohort = mock(DOMDataTreeCommitCohort.class);
62 private final PostCanCommitStep mockPostCanCommit = mock(PostCanCommitStep.class);
63 private final PostPreCommitStep mockPostPreCommit = mock(PostPreCommitStep.class);
71 public void tearDown() {
76 public void testSuccessfulThreePhaseCommit() throws Exception {
77 ActorRef cohortActor = newCohortActor("testSuccessfulThreePhaseCommit");
79 TransactionIdentifier txId = nextTransactionId();
80 askAndAwait(cohortActor, new CanCommit(txId, CANDIDATES, MOCK_SCHEMA, cohortActor));
81 verify(mockCohort).canCommit(txId, CANDIDATES, MOCK_SCHEMA);
83 askAndAwait(cohortActor, new PreCommit(txId));
84 verify(mockPostCanCommit).preCommit();
86 askAndAwait(cohortActor, new Commit(txId));
87 verify(mockPostPreCommit).commit();
90 askAndAwait(cohortActor, new CanCommit(txId, CANDIDATES, MOCK_SCHEMA, cohortActor));
91 verify(mockCohort).canCommit(txId, CANDIDATES, MOCK_SCHEMA);
95 public void testMultipleThreePhaseCommits() throws Exception {
96 ActorRef cohortActor = newCohortActor("testMultipleThreePhaseCommits");
98 TransactionIdentifier txId1 = nextTransactionId();
99 TransactionIdentifier txId2 = nextTransactionId();
101 askAndAwait(cohortActor, new CanCommit(txId1, CANDIDATES, MOCK_SCHEMA, cohortActor));
102 askAndAwait(cohortActor, new CanCommit(txId2, CANDIDATES, MOCK_SCHEMA, cohortActor));
104 askAndAwait(cohortActor, new PreCommit(txId1));
105 askAndAwait(cohortActor, new PreCommit(txId2));
107 askAndAwait(cohortActor, new Commit(txId1));
108 askAndAwait(cohortActor, new Commit(txId2));
111 @SuppressWarnings("unchecked")
113 public void testAsyncCohort() throws Exception {
114 ExecutorService executor = Executors.newSingleThreadExecutor();
116 doReturn(Futures.makeChecked(executeWithDelay(executor, mockPostCanCommit),
117 ex -> new DataValidationFailedException(YangInstanceIdentifier.EMPTY, "mock")))
118 .when(mockCohort).canCommit(any(Object.class), any(Collection.class), any(SchemaContext.class));
120 doReturn(JdkFutureAdapters.listenInPoolThread(executor.submit(() ->
121 mockPostPreCommit), MoreExecutors.directExecutor())).when(mockPostCanCommit).preCommit();
123 doReturn(JdkFutureAdapters.listenInPoolThread(executor.submit(() ->
124 null), MoreExecutors.directExecutor())).when(mockPostPreCommit).commit();
126 ActorRef cohortActor = newCohortActor("testAsyncCohort");
128 TransactionIdentifier txId = nextTransactionId();
129 askAndAwait(cohortActor, new CanCommit(txId, CANDIDATES, MOCK_SCHEMA, cohortActor));
130 verify(mockCohort).canCommit(txId, CANDIDATES, MOCK_SCHEMA);
132 askAndAwait(cohortActor, new PreCommit(txId));
133 verify(mockPostCanCommit).preCommit();
135 askAndAwait(cohortActor, new Commit(txId));
136 verify(mockPostPreCommit).commit();
138 executor.shutdownNow();
141 @SuppressWarnings("unchecked")
143 public void testFailureOnCanCommit() throws Exception {
144 DataValidationFailedException failure = new DataValidationFailedException(YangInstanceIdentifier.EMPTY, "mock");
145 doReturn(Futures.immediateFailedCheckedFuture(failure)).when(mockCohort).canCommit(any(Object.class),
146 any(Collection.class), any(SchemaContext.class));
148 ActorRef cohortActor = newCohortActor("testFailureOnCanCommit");
150 TransactionIdentifier txId = nextTransactionId();
152 askAndAwait(cohortActor, new CanCommit(txId, CANDIDATES, MOCK_SCHEMA, cohortActor));
153 } catch (DataValidationFailedException e) {
154 assertEquals("DataValidationFailedException", failure, e);
158 askAndAwait(cohortActor, new CanCommit(txId, CANDIDATES, MOCK_SCHEMA, cohortActor));
159 verify(mockCohort).canCommit(txId, CANDIDATES, MOCK_SCHEMA);
163 public void testAbortAfterCanCommit() throws Exception {
164 ActorRef cohortActor = newCohortActor("testAbortAfterCanCommit");
166 TransactionIdentifier txId = nextTransactionId();
167 askAndAwait(cohortActor, new CanCommit(txId, CANDIDATES, MOCK_SCHEMA, cohortActor));
168 verify(mockCohort).canCommit(txId, CANDIDATES, MOCK_SCHEMA);
170 askAndAwait(cohortActor, new Abort(txId));
171 verify(mockPostCanCommit).abort();
174 askAndAwait(cohortActor, new CanCommit(txId, CANDIDATES, MOCK_SCHEMA, cohortActor));
175 verify(mockCohort).canCommit(txId, CANDIDATES, MOCK_SCHEMA);
179 public void testAbortAfterPreCommit() throws Exception {
180 ActorRef cohortActor = newCohortActor("testAbortAfterPreCommit");
182 TransactionIdentifier txId = nextTransactionId();
183 askAndAwait(cohortActor, new CanCommit(txId, CANDIDATES, MOCK_SCHEMA, cohortActor));
184 verify(mockCohort).canCommit(txId, CANDIDATES, MOCK_SCHEMA);
186 askAndAwait(cohortActor, new PreCommit(txId));
187 verify(mockPostCanCommit).preCommit();
189 askAndAwait(cohortActor, new Abort(txId));
190 verify(mockPostPreCommit).abort();
193 private <T> ListenableFuture<T> executeWithDelay(ExecutorService executor, T result) {
194 return JdkFutureAdapters.listenInPoolThread(executor.submit(() -> {
195 Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
197 }), MoreExecutors.directExecutor());
200 private ActorRef newCohortActor(String name) {
201 return actorFactory.createActor(DataTreeCohortActor.props(mockCohort, YangInstanceIdentifier.EMPTY), name);
204 @SuppressWarnings("unchecked")
205 private void resetMockCohort() {
207 doReturn(ThreePhaseCommitStep.NOOP_ABORT_FUTURE).when(mockPostCanCommit).abort();
208 doReturn(Futures.immediateFuture(mockPostPreCommit)).when(mockPostCanCommit).preCommit();
209 doReturn(Futures.immediateCheckedFuture(mockPostCanCommit)).when(mockCohort).canCommit(any(Object.class),
210 any(Collection.class), any(SchemaContext.class));
212 doReturn(ThreePhaseCommitStep.NOOP_ABORT_FUTURE).when(mockPostPreCommit).abort();
213 doReturn(Futures.immediateFuture(null)).when(mockPostPreCommit).commit();
216 private static void askAndAwait(ActorRef actor, CommitProtocolCommand<?> message) throws Exception {
217 Timeout timeout = new Timeout(5, TimeUnit.SECONDS);
218 Object result = Await.result(Patterns.ask(actor, message, timeout), timeout.duration());
219 assertTrue("Expected Success but was " + result, result instanceof Success);
220 assertEquals("Success", message.getTxId(), ((Success)result).getTxId());