b13bbe7f6c978d6cbc7d6c2a1bd6374ddddf791c
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / databroker / actors / dds / AbstractProxyTransactionTest.java
1 /*
2  * Copyright (c) 2017 Pantheon Technologies s.r.o. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.databroker.actors.dds;
9
10 import static org.hamcrest.CoreMatchers.allOf;
11 import static org.hamcrest.CoreMatchers.hasItem;
12 import static org.hamcrest.MatcherAssert.assertThat;
13 import static org.hamcrest.core.Is.isA;
14 import static org.junit.Assert.assertEquals;
15 import static org.junit.Assert.assertNotNull;
16 import static org.junit.Assert.assertTrue;
17 import static org.mockito.Mockito.mock;
18 import static org.mockito.Mockito.verify;
19 import static org.mockito.Mockito.when;
20
21 import akka.actor.ActorSystem;
22 import akka.testkit.TestProbe;
23 import akka.testkit.javadsl.TestKit;
24 import com.google.common.base.Ticker;
25 import com.google.common.primitives.UnsignedLong;
26 import java.util.ArrayList;
27 import java.util.List;
28 import java.util.Optional;
29 import java.util.function.BiFunction;
30 import java.util.function.Consumer;
31 import org.hamcrest.BaseMatcher;
32 import org.hamcrest.Description;
33 import org.junit.After;
34 import org.junit.Before;
35 import org.junit.Test;
36 import org.mockito.Mock;
37 import org.mockito.MockitoAnnotations;
38 import org.opendaylight.controller.cluster.access.ABIVersion;
39 import org.opendaylight.controller.cluster.access.client.AbstractClientConnection;
40 import org.opendaylight.controller.cluster.access.client.AccessClientUtil;
41 import org.opendaylight.controller.cluster.access.client.ClientActorContext;
42 import org.opendaylight.controller.cluster.access.client.ConnectionEntry;
43 import org.opendaylight.controller.cluster.access.commands.AbortLocalTransactionRequest;
44 import org.opendaylight.controller.cluster.access.commands.ExistsTransactionRequest;
45 import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequest;
46 import org.opendaylight.controller.cluster.access.commands.PersistenceProtocol;
47 import org.opendaylight.controller.cluster.access.commands.ReadTransactionRequest;
48 import org.opendaylight.controller.cluster.access.commands.TransactionAbortRequest;
49 import org.opendaylight.controller.cluster.access.commands.TransactionAbortSuccess;
50 import org.opendaylight.controller.cluster.access.commands.TransactionDelete;
51 import org.opendaylight.controller.cluster.access.commands.TransactionMerge;
52 import org.opendaylight.controller.cluster.access.commands.TransactionModification;
53 import org.opendaylight.controller.cluster.access.commands.TransactionPurgeRequest;
54 import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
55 import org.opendaylight.controller.cluster.access.commands.TransactionSuccess;
56 import org.opendaylight.controller.cluster.access.commands.TransactionWrite;
57 import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
58 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
59 import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope;
60 import org.opendaylight.controller.cluster.access.concepts.Response;
61 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
62 import org.opendaylight.yangtools.yang.common.QName;
63 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
64 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
65 import org.opendaylight.yangtools.yang.data.api.schema.tree.CursorAwareDataTreeModification;
66 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
67 import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
68
69 public abstract class AbstractProxyTransactionTest<T extends AbstractProxyTransaction> {
70     protected static final TransactionIdentifier TRANSACTION_ID = TestUtils.TRANSACTION_ID;
71     private static final ClientIdentifier CLIENT_ID = TestUtils.CLIENT_ID;
72     private static final LocalHistoryIdentifier HISTORY_ID = TestUtils.HISTORY_ID;
73
74     protected static final YangInstanceIdentifier PATH_1 = YangInstanceIdentifier.builder()
75             .node(QName.create("ns-1", "node-1"))
76             .build();
77     protected static final YangInstanceIdentifier PATH_2 = YangInstanceIdentifier.builder()
78             .node(QName.create("ns-1", "node-2"))
79             .build();
80     protected static final YangInstanceIdentifier PATH_3 = YangInstanceIdentifier.builder()
81             .node(QName.create("ns-1", "node-3"))
82             .build();
83     protected static final ContainerNode DATA_1 = Builders.containerBuilder()
84             .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(PATH_1.getLastPathArgument().getNodeType()))
85             .build();
86     protected static final ContainerNode DATA_2 = Builders.containerBuilder()
87             .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(PATH_2.getLastPathArgument().getNodeType()))
88             .build();
89     protected static final String PERSISTENCE_ID = "per-1";
90
91     @Mock
92     private DataTreeSnapshot snapshot;
93     @Mock
94     private AbstractClientHistory history;
95     private ActorSystem system;
96     private TestProbe backendProbe;
97     private TestProbe clientContextProbe;
98     private TransactionTester<T> tester;
99     protected ClientActorContext context;
100     protected T transaction;
101
102     @Before
103     public void setUp() {
104         MockitoAnnotations.initMocks(this);
105         system = ActorSystem.apply();
106         clientContextProbe = new TestProbe(system, "clientContext");
107         backendProbe = new TestProbe(system, "backend");
108         context = AccessClientUtil.createClientActorContext(system, clientContextProbe.ref(), CLIENT_ID,
109                 PERSISTENCE_ID);
110         final ShardBackendInfo backend = new ShardBackendInfo(backendProbe.ref(), 0L, ABIVersion.BORON,
111                 "default", UnsignedLong.ZERO, Optional.empty(), 3);
112         final AbstractClientConnection<ShardBackendInfo> connection =
113                 AccessClientUtil.createConnectedConnection(context, 0L, backend);
114         final ProxyHistory parent = ProxyHistory.createClient(history, connection, HISTORY_ID);
115         transaction = createTransaction(parent, TestUtils.TRANSACTION_ID, snapshot);
116         tester = new TransactionTester<>(transaction, connection, backendProbe);
117     }
118
119     @SuppressWarnings("checkstyle:hiddenField")
120     protected abstract T createTransaction(ProxyHistory parent, TransactionIdentifier id, DataTreeSnapshot snapshot);
121
122     @After
123     public void tearDown() {
124         TestKit.shutdownActorSystem(system);
125     }
126
127     @Test
128     public abstract void testExists() throws Exception;
129
130     @Test
131     public abstract void testRead() throws Exception;
132
133     @Test
134     public abstract void testWrite();
135
136     @Test
137     public abstract void testMerge();
138
139     @Test
140     public abstract void testDelete();
141
142     @Test
143     public abstract void testDirectCommit() throws Exception;
144
145     @Test
146     public abstract void testCanCommit();
147
148     @Test
149     public abstract void testPreCommit();
150
151     @Test
152     public abstract void testDoCommit();
153
154     @Test
155     public abstract void testForwardToRemoteAbort();
156
157     @Test
158     public abstract void testForwardToRemoteCommit();
159
160     @Test
161     public void testAbortVotingFuture() {
162         testRequestResponse(f -> transaction.abort(f), TransactionAbortRequest.class, TransactionAbortSuccess::new);
163     }
164
165     @Test
166     public void testForwardToRemotePurge() {
167         final TestProbe probe = new TestProbe(system);
168         final TransactionPurgeRequest request = new TransactionPurgeRequest(TRANSACTION_ID, 0L, probe.ref());
169         testForwardToRemote(request, TransactionPurgeRequest.class);
170     }
171
172     @Test
173     public void testReplayMessages() {
174         final TestProbe probe = new TestProbe(system);
175         final List<ConnectionEntry> entries = new ArrayList<>();
176         final Consumer<Response<?, ?>> callback = createCallbackMock();
177         final ReadTransactionRequest request1 =
178                 new ReadTransactionRequest(TRANSACTION_ID, 2L, probe.ref(), PATH_2, true);
179         final ExistsTransactionRequest request2 =
180                 new ExistsTransactionRequest(TRANSACTION_ID, 3L, probe.ref(), PATH_3, true);
181         entries.add(AccessClientUtil.createConnectionEntry(request1, callback, 0L));
182         entries.add(AccessClientUtil.createConnectionEntry(request2, callback, 0L));
183         final TransactionTester<RemoteProxyTransaction> successor = createRemoteProxyTransactionTester();
184         final AbortLocalTransactionRequest successful1 = new AbortLocalTransactionRequest(TRANSACTION_ID, probe.ref());
185         transaction.recordSuccessfulRequest(successful1);
186         final ReadTransactionRequest successful2 =
187                 new ReadTransactionRequest(TRANSACTION_ID, 1L, probe.ref(), PATH_1, true);
188         transaction.recordSuccessfulRequest(successful2);
189         transaction.startReconnect();
190
191         final ProxyHistory mockSuccessor = mock(ProxyHistory.class);
192         when(mockSuccessor.createTransactionProxy(TRANSACTION_ID, transaction.isSnapshotOnly(), false))
193             .thenReturn(successor.getTransaction());
194
195         transaction.replayMessages(mockSuccessor, entries);
196
197         final ModifyTransactionRequest transformed = successor.expectTransactionRequest(ModifyTransactionRequest.class);
198         assertNotNull(transformed);
199         assertEquals(successful1.getSequence(), transformed.getSequence());
200         assertTrue(transformed.getPersistenceProtocol().isPresent());
201         assertEquals(PersistenceProtocol.ABORT, transformed.getPersistenceProtocol().get());
202
203         ReadTransactionRequest tmpRead = successor.expectTransactionRequest(ReadTransactionRequest.class);
204         assertNotNull(tmpRead);
205         assertEquals(successful2.getTarget(), tmpRead.getTarget());
206         assertEquals(successful2.getSequence(), tmpRead.getSequence());
207         assertEquals(successful2.getPath(), tmpRead.getPath());
208         assertEquals(successor.localActor(), tmpRead.getReplyTo());
209
210         tmpRead = successor.expectTransactionRequest(ReadTransactionRequest.class);
211         assertNotNull(tmpRead);
212         assertEquals(request1.getTarget(), tmpRead.getTarget());
213         assertEquals(request1.getSequence(), tmpRead.getSequence());
214         assertEquals(request1.getPath(), tmpRead.getPath());
215         assertEquals(successor.localActor(), tmpRead.getReplyTo());
216
217         final ExistsTransactionRequest tmpExist = successor.expectTransactionRequest(ExistsTransactionRequest.class);
218         assertNotNull(tmpExist);
219         assertEquals(request2.getTarget(), tmpExist.getTarget());
220         assertEquals(request2.getSequence(), tmpExist.getSequence());
221         assertEquals(request2.getPath(), tmpExist.getPath());
222         assertEquals(successor.localActor(), tmpExist.getReplyTo());
223     }
224
225     protected void checkModifications(final ModifyTransactionRequest modifyRequest) {
226         final List<TransactionModification> modifications = modifyRequest.getModifications();
227         assertEquals(3, modifications.size());
228         assertThat(modifications, hasItem(allOf(isA(TransactionWrite.class), hasPath(PATH_1))));
229         assertThat(modifications, hasItem(allOf(isA(TransactionMerge.class), hasPath(PATH_2))));
230         assertThat(modifications, hasItem(allOf(isA(TransactionDelete.class), hasPath(PATH_3))));
231     }
232
233     @SuppressWarnings("checkstyle:hiddenField")
234     protected <R extends TransactionRequest<R>> void testRequestResponse(final Consumer<VotingFuture<Void>> consumer,
235             final Class<R> expectedRequest,
236             final BiFunction<TransactionIdentifier, Long, TransactionSuccess<?>> replySupplier) {
237         final TransactionTester<T> tester = getTester();
238         final VotingFuture<Void> future = mock(VotingFuture.class);
239         transaction.seal();
240         consumer.accept(future);
241         final TransactionRequest<?> req = tester.expectTransactionRequest(expectedRequest);
242         tester.replySuccess(replySupplier.apply(TRANSACTION_ID, req.getSequence()));
243         verify(future).voteYes();
244     }
245
246     protected <R extends TransactionRequest<R>> R testHandleForwardedRemoteRequest(final R request) {
247         transaction.handleReplayedRemoteRequest(request, createCallbackMock(), Ticker.systemTicker().read());
248         final RequestEnvelope envelope = backendProbe.expectMsgClass(RequestEnvelope.class);
249         final R received = (R) envelope.getMessage();
250         assertTrue(received.getClass().equals(request.getClass()));
251         assertEquals(TRANSACTION_ID, received.getTarget());
252         assertEquals(clientContextProbe.ref(), received.getReplyTo());
253         return received;
254     }
255
256     protected <R extends TransactionRequest<R>> R testForwardToRemote(final TransactionRequest<?> toForward,
257             final Class<R> expectedMessageClass) {
258         final Consumer<Response<?, ?>> callback = createCallbackMock();
259         final TransactionTester<RemoteProxyTransaction> transactionTester = createRemoteProxyTransactionTester();
260         final RemoteProxyTransaction successor = transactionTester.getTransaction();
261         transaction.forwardToRemote(successor, toForward, callback);
262         return transactionTester.expectTransactionRequest(expectedMessageClass);
263     }
264
265     protected TransactionTester<T> getTester() {
266         return tester;
267     }
268
269     @SuppressWarnings("unchecked")
270     protected static <T> Consumer<T> createCallbackMock() {
271         return mock(Consumer.class);
272     }
273
274     protected static BaseMatcher<TransactionModification> hasPath(final YangInstanceIdentifier path) {
275         return new BaseMatcher<>() {
276
277             @Override
278             public boolean matches(final Object item) {
279                 return path.equals(((TransactionModification) item).getPath());
280             }
281
282             @Override
283             public void describeTo(final Description description) {
284                 description.appendValue(path);
285             }
286
287             @Override
288             public void describeMismatch(final Object item, final Description description) {
289                 final TransactionModification modification = (TransactionModification) item;
290                 description.appendText("was ").appendValue(modification.getPath());
291             }
292         };
293     }
294
295     protected TestProbe createProbe() {
296         return new TestProbe(system);
297     }
298
299     @SuppressWarnings("checkstyle:hiddenField")
300     protected TransactionTester<LocalReadWriteProxyTransaction> createLocalProxy() {
301         final TestProbe backendProbe = new TestProbe(system, "backend2");
302         final TestProbe clientContextProbe = new TestProbe(system, "clientContext2");
303         final ClientActorContext context =
304                 AccessClientUtil.createClientActorContext(system, clientContextProbe.ref(), CLIENT_ID, PERSISTENCE_ID);
305         final ShardBackendInfo backend = new ShardBackendInfo(backendProbe.ref(), 0L, ABIVersion.BORON,
306                 "default", UnsignedLong.ZERO, Optional.empty(), 3);
307         final AbstractClientConnection<ShardBackendInfo> connection =
308                 AccessClientUtil.createConnectedConnection(context, 0L, backend);
309         final AbstractClientHistory history = mock(AbstractClientHistory.class);
310         final ProxyHistory parent = ProxyHistory.createClient(history, connection, HISTORY_ID);
311         final DataTreeSnapshot snapshot = mock(DataTreeSnapshot.class);
312         when(snapshot.newModification()).thenReturn(mock(CursorAwareDataTreeModification.class));
313         final LocalReadWriteProxyTransaction tx =
314                 new LocalReadWriteProxyTransaction(parent, TestUtils.TRANSACTION_ID, snapshot);
315         return new TransactionTester<>(tx, connection, backendProbe);
316     }
317
318     @SuppressWarnings("checkstyle:hiddenField")
319     protected TransactionTester<RemoteProxyTransaction> createRemoteProxyTransactionTester() {
320         final TestProbe clientContextProbe = new TestProbe(system, "remoteClientContext");
321         final TestProbe backendProbe = new TestProbe(system, "remoteBackend");
322         final AbstractClientHistory history = mock(AbstractClientHistory.class);
323         final ClientActorContext context =
324                 AccessClientUtil.createClientActorContext(system, clientContextProbe.ref(), CLIENT_ID, PERSISTENCE_ID);
325         final ShardBackendInfo backend = new ShardBackendInfo(backendProbe.ref(), 0L, ABIVersion.BORON,
326                 "default", UnsignedLong.ZERO, Optional.empty(), 5);
327         final AbstractClientConnection<ShardBackendInfo> connection =
328                 AccessClientUtil.createConnectedConnection(context, 0L, backend);
329         final ProxyHistory proxyHistory = ProxyHistory.createClient(history, connection, HISTORY_ID);
330         final RemoteProxyTransaction transaction =
331                 new RemoteProxyTransaction(proxyHistory, TRANSACTION_ID, false, false, false);
332         return new TransactionTester<>(transaction, connection, backendProbe);
333     }
334 }