Adjust for Binding RPC codegen changes
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / DataTreeCohortActorTest.java
1 /*
2  * Copyright (c) 2018 Inocybe Technologies and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore;
9
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertTrue;
12 import static org.mockito.Matchers.any;
13 import static org.mockito.Mockito.doReturn;
14 import static org.mockito.Mockito.mock;
15 import static org.mockito.Mockito.reset;
16 import static org.mockito.Mockito.verify;
17
18 import akka.actor.ActorRef;
19 import akka.pattern.Patterns;
20 import akka.util.Timeout;
21 import com.google.common.util.concurrent.Futures;
22 import com.google.common.util.concurrent.ListenableFuture;
23 import com.google.common.util.concurrent.ListeningExecutorService;
24 import com.google.common.util.concurrent.MoreExecutors;
25 import com.google.common.util.concurrent.Uninterruptibles;
26 import java.util.ArrayList;
27 import java.util.Collection;
28 import java.util.concurrent.Executors;
29 import java.util.concurrent.TimeUnit;
30 import org.junit.After;
31 import org.junit.Before;
32 import org.junit.Test;
33 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
34 import org.opendaylight.controller.cluster.datastore.DataTreeCohortActor.Abort;
35 import org.opendaylight.controller.cluster.datastore.DataTreeCohortActor.CanCommit;
36 import org.opendaylight.controller.cluster.datastore.DataTreeCohortActor.Commit;
37 import org.opendaylight.controller.cluster.datastore.DataTreeCohortActor.CommitProtocolCommand;
38 import org.opendaylight.controller.cluster.datastore.DataTreeCohortActor.PreCommit;
39 import org.opendaylight.controller.cluster.datastore.DataTreeCohortActor.Success;
40 import org.opendaylight.controller.cluster.raft.TestActorFactory;
41 import org.opendaylight.mdsal.common.api.PostCanCommitStep;
42 import org.opendaylight.mdsal.common.api.PostPreCommitStep;
43 import org.opendaylight.mdsal.common.api.ThreePhaseCommitStep;
44 import org.opendaylight.mdsal.dom.api.DOMDataTreeCandidate;
45 import org.opendaylight.mdsal.dom.api.DOMDataTreeCommitCohort;
46 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
47 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
48 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
49 import scala.concurrent.Await;
50
51 /**
52  * Unit tests for DataTreeCohortActor.
53  *
54  * @author Thomas Pantelis
55  */
56 public class DataTreeCohortActorTest extends AbstractActorTest {
57     private static final Collection<DOMDataTreeCandidate> CANDIDATES = new ArrayList<>();
58     private static final SchemaContext MOCK_SCHEMA = mock(SchemaContext.class);
59     private final TestActorFactory actorFactory = new TestActorFactory(getSystem());
60     private final DOMDataTreeCommitCohort mockCohort = mock(DOMDataTreeCommitCohort.class);
61     private final PostCanCommitStep mockPostCanCommit = mock(PostCanCommitStep.class);
62     private final PostPreCommitStep mockPostPreCommit = mock(PostPreCommitStep.class);
63
64     @Before
65     public void setup() {
66         resetMockCohort();
67     }
68
69     @After
70     public void tearDown() {
71         actorFactory.close();
72     }
73
74     @Test
75     public void testSuccessfulThreePhaseCommit() throws Exception {
76         ActorRef cohortActor = newCohortActor("testSuccessfulThreePhaseCommit");
77
78         TransactionIdentifier txId = nextTransactionId();
79         askAndAwait(cohortActor, new CanCommit(txId, CANDIDATES, MOCK_SCHEMA, cohortActor));
80         verify(mockCohort).canCommit(txId, CANDIDATES, MOCK_SCHEMA);
81
82         askAndAwait(cohortActor, new PreCommit(txId));
83         verify(mockPostCanCommit).preCommit();
84
85         askAndAwait(cohortActor, new Commit(txId));
86         verify(mockPostPreCommit).commit();
87
88         resetMockCohort();
89         askAndAwait(cohortActor, new CanCommit(txId, CANDIDATES, MOCK_SCHEMA, cohortActor));
90         verify(mockCohort).canCommit(txId, CANDIDATES, MOCK_SCHEMA);
91     }
92
93     @Test
94     public void testMultipleThreePhaseCommits() throws Exception {
95         ActorRef cohortActor = newCohortActor("testMultipleThreePhaseCommits");
96
97         TransactionIdentifier txId1 = nextTransactionId();
98         TransactionIdentifier txId2 = nextTransactionId();
99
100         askAndAwait(cohortActor, new CanCommit(txId1, CANDIDATES, MOCK_SCHEMA, cohortActor));
101         askAndAwait(cohortActor, new CanCommit(txId2, CANDIDATES, MOCK_SCHEMA, cohortActor));
102
103         askAndAwait(cohortActor, new PreCommit(txId1));
104         askAndAwait(cohortActor, new PreCommit(txId2));
105
106         askAndAwait(cohortActor, new Commit(txId1));
107         askAndAwait(cohortActor, new Commit(txId2));
108     }
109
110     @SuppressWarnings("unchecked")
111     @Test
112     public void testAsyncCohort() throws Exception {
113         ListeningExecutorService executor = MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor());
114
115         doReturn(Futures.makeChecked(executeWithDelay(executor, mockPostCanCommit),
116             ex -> new DataValidationFailedException(YangInstanceIdentifier.EMPTY, "mock")))
117                 .when(mockCohort).canCommit(any(Object.class), any(Collection.class), any(SchemaContext.class));
118
119         doReturn(executor.submit(() -> mockPostPreCommit)).when(mockPostCanCommit).preCommit();
120
121         doReturn(executor.submit(() -> null)).when(mockPostPreCommit).commit();
122
123         ActorRef cohortActor = newCohortActor("testAsyncCohort");
124
125         TransactionIdentifier txId = nextTransactionId();
126         askAndAwait(cohortActor, new CanCommit(txId, CANDIDATES, MOCK_SCHEMA, cohortActor));
127         verify(mockCohort).canCommit(txId, CANDIDATES, MOCK_SCHEMA);
128
129         askAndAwait(cohortActor, new PreCommit(txId));
130         verify(mockPostCanCommit).preCommit();
131
132         askAndAwait(cohortActor, new Commit(txId));
133         verify(mockPostPreCommit).commit();
134
135         executor.shutdownNow();
136     }
137
138     @SuppressWarnings("unchecked")
139     @Test
140     public void testFailureOnCanCommit() throws Exception {
141         DataValidationFailedException failure = new DataValidationFailedException(YangInstanceIdentifier.EMPTY, "mock");
142         doReturn(Futures.immediateFailedCheckedFuture(failure)).when(mockCohort).canCommit(any(Object.class),
143                 any(Collection.class), any(SchemaContext.class));
144
145         ActorRef cohortActor = newCohortActor("testFailureOnCanCommit");
146
147         TransactionIdentifier txId = nextTransactionId();
148         try {
149             askAndAwait(cohortActor, new CanCommit(txId, CANDIDATES, MOCK_SCHEMA, cohortActor));
150         } catch (DataValidationFailedException e) {
151             assertEquals("DataValidationFailedException", failure, e);
152         }
153
154         resetMockCohort();
155         askAndAwait(cohortActor, new CanCommit(txId, CANDIDATES, MOCK_SCHEMA, cohortActor));
156         verify(mockCohort).canCommit(txId, CANDIDATES, MOCK_SCHEMA);
157     }
158
159     @Test
160     public void testAbortAfterCanCommit() throws Exception {
161         ActorRef cohortActor = newCohortActor("testAbortAfterCanCommit");
162
163         TransactionIdentifier txId = nextTransactionId();
164         askAndAwait(cohortActor, new CanCommit(txId, CANDIDATES, MOCK_SCHEMA, cohortActor));
165         verify(mockCohort).canCommit(txId, CANDIDATES, MOCK_SCHEMA);
166
167         askAndAwait(cohortActor, new Abort(txId));
168         verify(mockPostCanCommit).abort();
169
170         resetMockCohort();
171         askAndAwait(cohortActor, new CanCommit(txId, CANDIDATES, MOCK_SCHEMA, cohortActor));
172         verify(mockCohort).canCommit(txId, CANDIDATES, MOCK_SCHEMA);
173     }
174
175     @Test
176     public void testAbortAfterPreCommit() throws Exception {
177         ActorRef cohortActor = newCohortActor("testAbortAfterPreCommit");
178
179         TransactionIdentifier txId = nextTransactionId();
180         askAndAwait(cohortActor, new CanCommit(txId, CANDIDATES, MOCK_SCHEMA, cohortActor));
181         verify(mockCohort).canCommit(txId, CANDIDATES, MOCK_SCHEMA);
182
183         askAndAwait(cohortActor, new PreCommit(txId));
184         verify(mockPostCanCommit).preCommit();
185
186         askAndAwait(cohortActor, new Abort(txId));
187         verify(mockPostPreCommit).abort();
188     }
189
190     private static <T> ListenableFuture<T> executeWithDelay(final ListeningExecutorService executor, final T result) {
191         return executor.submit(() -> {
192             Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
193             return result;
194         });
195     }
196
197     private ActorRef newCohortActor(final String name) {
198         return actorFactory.createActor(DataTreeCohortActor.props(mockCohort, YangInstanceIdentifier.EMPTY), name);
199     }
200
201     @SuppressWarnings("unchecked")
202     private void resetMockCohort() {
203         reset(mockCohort);
204         doReturn(ThreePhaseCommitStep.NOOP_ABORT_FUTURE).when(mockPostCanCommit).abort();
205         doReturn(Futures.immediateFuture(mockPostPreCommit)).when(mockPostCanCommit).preCommit();
206         doReturn(Futures.immediateCheckedFuture(mockPostCanCommit)).when(mockCohort).canCommit(any(Object.class),
207                 any(Collection.class), any(SchemaContext.class));
208
209         doReturn(ThreePhaseCommitStep.NOOP_ABORT_FUTURE).when(mockPostPreCommit).abort();
210         doReturn(Futures.immediateFuture(null)).when(mockPostPreCommit).commit();
211     }
212
213     private static void askAndAwait(final ActorRef actor, final CommitProtocolCommand<?> message) throws Exception {
214         Timeout timeout = new Timeout(5, TimeUnit.SECONDS);
215         Object result = Await.result(Patterns.ask(actor, message, timeout), timeout.duration());
216         assertTrue("Expected Success but was " + result, result instanceof Success);
217         assertEquals("Success", message.getTxId(), ((Success)result).getTxId());
218     }
219 }

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.