167c5bc348467ce6d09ac347aa349ce2aa5e2ac6
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / databroker / actors / dds / AbstractProxyTransactionTest.java
1 /*
2  * Copyright (c) 2017 Pantheon Technologies s.r.o. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.databroker.actors.dds;
9
10 import static org.hamcrest.CoreMatchers.allOf;
11 import static org.hamcrest.CoreMatchers.hasItem;
12 import static org.hamcrest.MatcherAssert.assertThat;
13 import static org.hamcrest.core.Is.isA;
14 import static org.junit.Assert.assertEquals;
15 import static org.junit.Assert.assertNotNull;
16 import static org.junit.Assert.assertTrue;
17 import static org.mockito.Mockito.mock;
18 import static org.mockito.Mockito.verify;
19 import static org.mockito.Mockito.when;
20
21 import akka.actor.ActorSystem;
22 import akka.testkit.TestProbe;
23 import akka.testkit.javadsl.TestKit;
24 import com.google.common.base.Ticker;
25 import com.google.common.primitives.UnsignedLong;
26 import java.util.ArrayList;
27 import java.util.List;
28 import java.util.Optional;
29 import java.util.function.BiFunction;
30 import java.util.function.Consumer;
31 import org.hamcrest.BaseMatcher;
32 import org.hamcrest.Description;
33 import org.junit.After;
34 import org.junit.Before;
35 import org.junit.Test;
36 import org.junit.runner.RunWith;
37 import org.mockito.Mock;
38 import org.mockito.junit.MockitoJUnitRunner;
39 import org.opendaylight.controller.cluster.access.ABIVersion;
40 import org.opendaylight.controller.cluster.access.client.AbstractClientConnection;
41 import org.opendaylight.controller.cluster.access.client.AccessClientUtil;
42 import org.opendaylight.controller.cluster.access.client.ClientActorContext;
43 import org.opendaylight.controller.cluster.access.client.ConnectionEntry;
44 import org.opendaylight.controller.cluster.access.commands.AbortLocalTransactionRequest;
45 import org.opendaylight.controller.cluster.access.commands.ExistsTransactionRequest;
46 import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequest;
47 import org.opendaylight.controller.cluster.access.commands.PersistenceProtocol;
48 import org.opendaylight.controller.cluster.access.commands.ReadTransactionRequest;
49 import org.opendaylight.controller.cluster.access.commands.TransactionAbortRequest;
50 import org.opendaylight.controller.cluster.access.commands.TransactionAbortSuccess;
51 import org.opendaylight.controller.cluster.access.commands.TransactionDelete;
52 import org.opendaylight.controller.cluster.access.commands.TransactionMerge;
53 import org.opendaylight.controller.cluster.access.commands.TransactionModification;
54 import org.opendaylight.controller.cluster.access.commands.TransactionPurgeRequest;
55 import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
56 import org.opendaylight.controller.cluster.access.commands.TransactionSuccess;
57 import org.opendaylight.controller.cluster.access.commands.TransactionWrite;
58 import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
59 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
60 import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope;
61 import org.opendaylight.controller.cluster.access.concepts.Response;
62 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
63 import org.opendaylight.yangtools.yang.common.QName;
64 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
65 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
66 import org.opendaylight.yangtools.yang.data.api.schema.tree.CursorAwareDataTreeModification;
67 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
68 import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
69
70 @RunWith(MockitoJUnitRunner.StrictStubs.class)
71 public abstract class AbstractProxyTransactionTest<T extends AbstractProxyTransaction> {
72     protected static final TransactionIdentifier TRANSACTION_ID = TestUtils.TRANSACTION_ID;
73     private static final ClientIdentifier CLIENT_ID = TestUtils.CLIENT_ID;
74     private static final LocalHistoryIdentifier HISTORY_ID = TestUtils.HISTORY_ID;
75
76     protected static final YangInstanceIdentifier PATH_1 = YangInstanceIdentifier.builder()
77             .node(QName.create("ns-1", "node-1"))
78             .build();
79     protected static final YangInstanceIdentifier PATH_2 = YangInstanceIdentifier.builder()
80             .node(QName.create("ns-1", "node-2"))
81             .build();
82     protected static final YangInstanceIdentifier PATH_3 = YangInstanceIdentifier.builder()
83             .node(QName.create("ns-1", "node-3"))
84             .build();
85     protected static final ContainerNode DATA_1 = Builders.containerBuilder()
86             .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(PATH_1.getLastPathArgument().getNodeType()))
87             .build();
88     protected static final ContainerNode DATA_2 = Builders.containerBuilder()
89             .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(PATH_2.getLastPathArgument().getNodeType()))
90             .build();
91     protected static final String PERSISTENCE_ID = "per-1";
92
93     @Mock
94     private DataTreeSnapshot snapshot;
95     @Mock
96     private AbstractClientHistory history;
97     private ActorSystem system;
98     private TestProbe backendProbe;
99     private TestProbe clientContextProbe;
100     private TransactionTester<T> tester;
101     protected ClientActorContext context;
102     protected T transaction;
103
104     @Before
105     public void setUp() {
106         system = ActorSystem.apply();
107         clientContextProbe = new TestProbe(system, "clientContext");
108         backendProbe = new TestProbe(system, "backend");
109         context = AccessClientUtil.createClientActorContext(system, clientContextProbe.ref(), CLIENT_ID,
110                 PERSISTENCE_ID);
111         final ShardBackendInfo backend = new ShardBackendInfo(backendProbe.ref(), 0L, ABIVersion.BORON,
112                 "default", UnsignedLong.ZERO, Optional.empty(), 3);
113         final AbstractClientConnection<ShardBackendInfo> connection =
114                 AccessClientUtil.createConnectedConnection(context, 0L, backend);
115         final ProxyHistory parent = ProxyHistory.createClient(history, connection, HISTORY_ID);
116         transaction = createTransaction(parent, TestUtils.TRANSACTION_ID, snapshot);
117         tester = new TransactionTester<>(transaction, connection, backendProbe);
118     }
119
120     @SuppressWarnings("checkstyle:hiddenField")
121     protected abstract T createTransaction(ProxyHistory parent, TransactionIdentifier id, DataTreeSnapshot snapshot);
122
123     @After
124     public void tearDown() {
125         TestKit.shutdownActorSystem(system);
126     }
127
128     @Test
129     public abstract void testExists() throws Exception;
130
131     @Test
132     public abstract void testRead() throws Exception;
133
134     @Test
135     public abstract void testWrite();
136
137     @Test
138     public abstract void testMerge();
139
140     @Test
141     public abstract void testDelete();
142
143     @Test
144     public abstract void testDirectCommit() throws Exception;
145
146     @Test
147     public abstract void testCanCommit();
148
149     @Test
150     public abstract void testPreCommit();
151
152     @Test
153     public abstract void testDoCommit();
154
155     @Test
156     public abstract void testForwardToRemoteAbort();
157
158     @Test
159     public abstract void testForwardToRemoteCommit();
160
161     @Test
162     public void testAbortVotingFuture() {
163         testRequestResponse(f -> transaction.abort(f), TransactionAbortRequest.class, TransactionAbortSuccess::new);
164     }
165
166     @Test
167     public void testForwardToRemotePurge() {
168         final TestProbe probe = new TestProbe(system);
169         final TransactionPurgeRequest request = new TransactionPurgeRequest(TRANSACTION_ID, 0L, probe.ref());
170         testForwardToRemote(request, TransactionPurgeRequest.class);
171     }
172
173     @Test
174     public void testReplayMessages() {
175         final TestProbe probe = new TestProbe(system);
176         final List<ConnectionEntry> entries = new ArrayList<>();
177         final Consumer<Response<?, ?>> callback = createCallbackMock();
178         final ReadTransactionRequest request1 =
179                 new ReadTransactionRequest(TRANSACTION_ID, 2L, probe.ref(), PATH_2, true);
180         final ExistsTransactionRequest request2 =
181                 new ExistsTransactionRequest(TRANSACTION_ID, 3L, probe.ref(), PATH_3, true);
182         entries.add(AccessClientUtil.createConnectionEntry(request1, callback, 0L));
183         entries.add(AccessClientUtil.createConnectionEntry(request2, callback, 0L));
184         final TransactionTester<RemoteProxyTransaction> successor = createRemoteProxyTransactionTester();
185         final AbortLocalTransactionRequest successful1 = new AbortLocalTransactionRequest(TRANSACTION_ID, probe.ref());
186         transaction.recordSuccessfulRequest(successful1);
187         final ReadTransactionRequest successful2 =
188                 new ReadTransactionRequest(TRANSACTION_ID, 1L, probe.ref(), PATH_1, true);
189         transaction.recordSuccessfulRequest(successful2);
190         transaction.startReconnect();
191
192         final ProxyHistory mockSuccessor = mock(ProxyHistory.class);
193         when(mockSuccessor.createTransactionProxy(TRANSACTION_ID, transaction.isSnapshotOnly(), false))
194             .thenReturn(successor.getTransaction());
195
196         transaction.replayMessages(mockSuccessor, entries);
197
198         final ModifyTransactionRequest transformed = successor.expectTransactionRequest(ModifyTransactionRequest.class);
199         assertNotNull(transformed);
200         assertEquals(successful1.getSequence(), transformed.getSequence());
201         assertTrue(transformed.getPersistenceProtocol().isPresent());
202         assertEquals(PersistenceProtocol.ABORT, transformed.getPersistenceProtocol().get());
203
204         ReadTransactionRequest tmpRead = successor.expectTransactionRequest(ReadTransactionRequest.class);
205         assertNotNull(tmpRead);
206         assertEquals(successful2.getTarget(), tmpRead.getTarget());
207         assertEquals(successful2.getSequence(), tmpRead.getSequence());
208         assertEquals(successful2.getPath(), tmpRead.getPath());
209         assertEquals(successor.localActor(), tmpRead.getReplyTo());
210
211         tmpRead = successor.expectTransactionRequest(ReadTransactionRequest.class);
212         assertNotNull(tmpRead);
213         assertEquals(request1.getTarget(), tmpRead.getTarget());
214         assertEquals(request1.getSequence(), tmpRead.getSequence());
215         assertEquals(request1.getPath(), tmpRead.getPath());
216         assertEquals(successor.localActor(), tmpRead.getReplyTo());
217
218         final ExistsTransactionRequest tmpExist = successor.expectTransactionRequest(ExistsTransactionRequest.class);
219         assertNotNull(tmpExist);
220         assertEquals(request2.getTarget(), tmpExist.getTarget());
221         assertEquals(request2.getSequence(), tmpExist.getSequence());
222         assertEquals(request2.getPath(), tmpExist.getPath());
223         assertEquals(successor.localActor(), tmpExist.getReplyTo());
224     }
225
226     protected void checkModifications(final ModifyTransactionRequest modifyRequest) {
227         final List<TransactionModification> modifications = modifyRequest.getModifications();
228         assertEquals(3, modifications.size());
229         assertThat(modifications, hasItem(allOf(isA(TransactionWrite.class), hasPath(PATH_1))));
230         assertThat(modifications, hasItem(allOf(isA(TransactionMerge.class), hasPath(PATH_2))));
231         assertThat(modifications, hasItem(allOf(isA(TransactionDelete.class), hasPath(PATH_3))));
232     }
233
234     @SuppressWarnings("checkstyle:hiddenField")
235     protected <R extends TransactionRequest<R>> void testRequestResponse(final Consumer<VotingFuture<Void>> consumer,
236             final Class<R> expectedRequest,
237             final BiFunction<TransactionIdentifier, Long, TransactionSuccess<?>> replySupplier) {
238         final TransactionTester<T> tester = getTester();
239         final VotingFuture<Void> future = mock(VotingFuture.class);
240         transaction.seal();
241         consumer.accept(future);
242         final TransactionRequest<?> req = tester.expectTransactionRequest(expectedRequest);
243         tester.replySuccess(replySupplier.apply(TRANSACTION_ID, req.getSequence()));
244         verify(future).voteYes();
245     }
246
247     protected <R extends TransactionRequest<R>> R testHandleForwardedRemoteRequest(final R request) {
248         transaction.handleReplayedRemoteRequest(request, createCallbackMock(), Ticker.systemTicker().read());
249         final RequestEnvelope envelope = backendProbe.expectMsgClass(RequestEnvelope.class);
250         final R received = (R) envelope.getMessage();
251         assertTrue(received.getClass().equals(request.getClass()));
252         assertEquals(TRANSACTION_ID, received.getTarget());
253         assertEquals(clientContextProbe.ref(), received.getReplyTo());
254         return received;
255     }
256
257     protected <R extends TransactionRequest<R>> R testForwardToRemote(final TransactionRequest<?> toForward,
258             final Class<R> expectedMessageClass) {
259         final Consumer<Response<?, ?>> callback = createCallbackMock();
260         final TransactionTester<RemoteProxyTransaction> transactionTester = createRemoteProxyTransactionTester();
261         final RemoteProxyTransaction successor = transactionTester.getTransaction();
262         transaction.forwardToRemote(successor, toForward, callback);
263         return transactionTester.expectTransactionRequest(expectedMessageClass);
264     }
265
266     protected TransactionTester<T> getTester() {
267         return tester;
268     }
269
270     @SuppressWarnings("unchecked")
271     protected static <T> Consumer<T> createCallbackMock() {
272         return mock(Consumer.class);
273     }
274
275     protected static BaseMatcher<TransactionModification> hasPath(final YangInstanceIdentifier path) {
276         return new BaseMatcher<>() {
277
278             @Override
279             public boolean matches(final Object item) {
280                 return path.equals(((TransactionModification) item).getPath());
281             }
282
283             @Override
284             public void describeTo(final Description description) {
285                 description.appendValue(path);
286             }
287
288             @Override
289             public void describeMismatch(final Object item, final Description description) {
290                 final TransactionModification modification = (TransactionModification) item;
291                 description.appendText("was ").appendValue(modification.getPath());
292             }
293         };
294     }
295
296     protected TestProbe createProbe() {
297         return new TestProbe(system);
298     }
299
300     @SuppressWarnings("checkstyle:hiddenField")
301     protected TransactionTester<LocalReadWriteProxyTransaction> createLocalProxy() {
302         final TestProbe backendProbe = new TestProbe(system, "backend2");
303         final TestProbe clientContextProbe = new TestProbe(system, "clientContext2");
304         final ClientActorContext context =
305                 AccessClientUtil.createClientActorContext(system, clientContextProbe.ref(), CLIENT_ID, PERSISTENCE_ID);
306         final ShardBackendInfo backend = new ShardBackendInfo(backendProbe.ref(), 0L, ABIVersion.BORON,
307                 "default", UnsignedLong.ZERO, Optional.empty(), 3);
308         final AbstractClientConnection<ShardBackendInfo> connection =
309                 AccessClientUtil.createConnectedConnection(context, 0L, backend);
310         final AbstractClientHistory history = mock(AbstractClientHistory.class);
311         final ProxyHistory parent = ProxyHistory.createClient(history, connection, HISTORY_ID);
312         final DataTreeSnapshot snapshot = mock(DataTreeSnapshot.class);
313         when(snapshot.newModification()).thenReturn(mock(CursorAwareDataTreeModification.class));
314         final LocalReadWriteProxyTransaction tx =
315                 new LocalReadWriteProxyTransaction(parent, TestUtils.TRANSACTION_ID, snapshot);
316         return new TransactionTester<>(tx, connection, backendProbe);
317     }
318
319     @SuppressWarnings("checkstyle:hiddenField")
320     protected TransactionTester<RemoteProxyTransaction> createRemoteProxyTransactionTester() {
321         final TestProbe clientContextProbe = new TestProbe(system, "remoteClientContext");
322         final TestProbe backendProbe = new TestProbe(system, "remoteBackend");
323         final AbstractClientHistory history = mock(AbstractClientHistory.class);
324         final ClientActorContext context =
325                 AccessClientUtil.createClientActorContext(system, clientContextProbe.ref(), CLIENT_ID, PERSISTENCE_ID);
326         final ShardBackendInfo backend = new ShardBackendInfo(backendProbe.ref(), 0L, ABIVersion.BORON,
327                 "default", UnsignedLong.ZERO, Optional.empty(), 5);
328         final AbstractClientConnection<ShardBackendInfo> connection =
329                 AccessClientUtil.createConnectedConnection(context, 0L, backend);
330         final ProxyHistory proxyHistory = ProxyHistory.createClient(history, connection, HISTORY_ID);
331         final RemoteProxyTransaction transaction =
332                 new RemoteProxyTransaction(proxyHistory, TRANSACTION_ID, false, false, false);
333         return new TransactionTester<>(transaction, connection, backendProbe);
334     }
335 }