ac24304e34dcd58df1e7d24784f0baac9d720548
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / databroker / actors / dds / AbstractProxyTransactionTest.java
1 /*
2  * Copyright (c) 2017 Pantheon Technologies s.r.o. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.databroker.actors.dds;
9
10 import static org.hamcrest.CoreMatchers.both;
11 import static org.hamcrest.CoreMatchers.hasItem;
12 import static org.hamcrest.core.Is.isA;
13 import static org.mockito.Mockito.mock;
14 import static org.mockito.Mockito.verify;
15 import static org.mockito.Mockito.when;
16
17 import akka.actor.ActorSystem;
18 import akka.testkit.JavaTestKit;
19 import akka.testkit.TestProbe;
20 import com.google.common.primitives.UnsignedLong;
21 import java.util.ArrayList;
22 import java.util.List;
23 import java.util.Optional;
24 import java.util.function.BiFunction;
25 import java.util.function.Consumer;
26 import org.hamcrest.BaseMatcher;
27 import org.hamcrest.Description;
28 import org.junit.After;
29 import org.junit.Assert;
30 import org.junit.Before;
31 import org.junit.Test;
32 import org.mockito.Mock;
33 import org.mockito.MockitoAnnotations;
34 import org.opendaylight.controller.cluster.access.ABIVersion;
35 import org.opendaylight.controller.cluster.access.client.AbstractClientConnection;
36 import org.opendaylight.controller.cluster.access.client.AccessClientUtil;
37 import org.opendaylight.controller.cluster.access.client.ClientActorContext;
38 import org.opendaylight.controller.cluster.access.client.ConnectionEntry;
39 import org.opendaylight.controller.cluster.access.commands.AbortLocalTransactionRequest;
40 import org.opendaylight.controller.cluster.access.commands.ExistsTransactionRequest;
41 import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequest;
42 import org.opendaylight.controller.cluster.access.commands.PersistenceProtocol;
43 import org.opendaylight.controller.cluster.access.commands.ReadTransactionRequest;
44 import org.opendaylight.controller.cluster.access.commands.TransactionAbortRequest;
45 import org.opendaylight.controller.cluster.access.commands.TransactionAbortSuccess;
46 import org.opendaylight.controller.cluster.access.commands.TransactionDelete;
47 import org.opendaylight.controller.cluster.access.commands.TransactionMerge;
48 import org.opendaylight.controller.cluster.access.commands.TransactionModification;
49 import org.opendaylight.controller.cluster.access.commands.TransactionPurgeRequest;
50 import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
51 import org.opendaylight.controller.cluster.access.commands.TransactionSuccess;
52 import org.opendaylight.controller.cluster.access.commands.TransactionWrite;
53 import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
54 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
55 import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope;
56 import org.opendaylight.controller.cluster.access.concepts.Response;
57 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
58 import org.opendaylight.yangtools.yang.common.QName;
59 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
60 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
61 import org.opendaylight.yangtools.yang.data.api.schema.tree.CursorAwareDataTreeModification;
62 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
63 import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
64
65 public abstract class AbstractProxyTransactionTest<T extends AbstractProxyTransaction> {
66     protected static final TransactionIdentifier TRANSACTION_ID = TestUtils.TRANSACTION_ID;
67     private static final ClientIdentifier CLIENT_ID = TestUtils.CLIENT_ID;
68     private static final LocalHistoryIdentifier HISTORY_ID = TestUtils.HISTORY_ID;
69
70     protected static final YangInstanceIdentifier PATH_1 = YangInstanceIdentifier.builder()
71             .node(QName.create("ns-1", "node-1"))
72             .build();
73     protected static final YangInstanceIdentifier PATH_2 = YangInstanceIdentifier.builder()
74             .node(QName.create("ns-1", "node-2"))
75             .build();
76     protected static final YangInstanceIdentifier PATH_3 = YangInstanceIdentifier.builder()
77             .node(QName.create("ns-1", "node-3"))
78             .build();
79     protected static final ContainerNode DATA_1 = Builders.containerBuilder()
80             .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(PATH_1.getLastPathArgument().getNodeType()))
81             .build();
82     protected static final ContainerNode DATA_2 = Builders.containerBuilder()
83             .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(PATH_2.getLastPathArgument().getNodeType()))
84             .build();
85     protected static final String PERSISTENCE_ID = "per-1";
86
87     @Mock
88     private DataTreeSnapshot snapshot;
89     @Mock
90     private AbstractClientHistory history;
91     private ActorSystem system;
92     private TestProbe backendProbe;
93     private TestProbe clientContextProbe;
94     private TransactionTester<T> tester;
95     protected T transaction;
96
97     @Before
98     public void setUp() throws Exception {
99         MockitoAnnotations.initMocks(this);
100         system = ActorSystem.apply();
101         clientContextProbe = new TestProbe(system, "clientContext");
102         backendProbe = new TestProbe(system, "backend");
103         final ClientActorContext context =
104                 AccessClientUtil.createClientActorContext(system, clientContextProbe.ref(), CLIENT_ID, PERSISTENCE_ID);
105         final ShardBackendInfo backend = new ShardBackendInfo(backendProbe.ref(), 0L, ABIVersion.BORON,
106                 "default", UnsignedLong.ZERO, Optional.empty(), 3);
107         final AbstractClientConnection<ShardBackendInfo> connection =
108                 AccessClientUtil.createConnectedConnection(context, 0L, backend);
109         final ProxyHistory parent = ProxyHistory.createClient(history, connection, HISTORY_ID);
110         transaction = createTransaction(parent, TestUtils.TRANSACTION_ID, snapshot);
111         tester = new TransactionTester<>(transaction, connection, backendProbe);
112     }
113
114     protected abstract T createTransaction(ProxyHistory parent, TransactionIdentifier id, DataTreeSnapshot snapshot);
115
116     @After
117     public void tearDown() throws Exception {
118         JavaTestKit.shutdownActorSystem(system);
119     }
120
121     @Test
122     public abstract void testExists() throws Exception;
123
124     @Test
125     public abstract void testRead() throws Exception;
126
127     @Test
128     public abstract void testWrite() throws Exception;
129
130     @Test
131     public abstract void testMerge() throws Exception;
132
133     @Test
134     public abstract void testDelete() throws Exception;
135
136     @Test
137     public abstract void testDirectCommit() throws Exception;
138
139     @Test
140     public abstract void testCanCommit() throws Exception;
141
142     @Test
143     public abstract void testPreCommit() throws Exception;
144
145     @Test
146     public abstract void testDoCommit() throws Exception;
147
148     @Test
149     public abstract void testForwardToRemoteAbort() throws Exception;
150
151     @Test
152     public abstract void testForwardToRemoteCommit() throws Exception;
153
154     @Test
155     public void testAbortVotingFuture() throws Exception {
156         testRequestResponse(f -> transaction.abort(f), TransactionAbortRequest.class, TransactionAbortSuccess::new);
157     }
158
159     @Test
160     public void testForwardToRemotePurge() throws Exception {
161         final TestProbe probe = new TestProbe(system);
162         final TransactionPurgeRequest request = new TransactionPurgeRequest(TRANSACTION_ID, 0L, probe.ref());
163         testForwardToRemote(request, TransactionPurgeRequest.class);
164     }
165
166     @Test
167     public void testReplayMessages() throws Exception {
168         final TestProbe probe = new TestProbe(system);
169         final List<ConnectionEntry> entries = new ArrayList<>();
170         final Consumer<Response<?, ?>> callback = createCallbackMock();
171         final ReadTransactionRequest request1 =
172                 new ReadTransactionRequest(TRANSACTION_ID, 0L, probe.ref(), PATH_2, true);
173         final ExistsTransactionRequest request2 =
174                 new ExistsTransactionRequest(TRANSACTION_ID, 0L, probe.ref(), PATH_3, true);
175         entries.add(AccessClientUtil.createConnectionEntry(request1, callback, 0L));
176         entries.add(AccessClientUtil.createConnectionEntry(request2, callback, 0L));
177         final TransactionTester<RemoteProxyTransaction> successor = createRemoteProxyTransactionTester();
178         final AbortLocalTransactionRequest successful1 = new AbortLocalTransactionRequest(TRANSACTION_ID, probe.ref());
179         transaction.recordSuccessfulRequest(successful1);
180         final ReadTransactionRequest successful2 =
181                 new ReadTransactionRequest(TRANSACTION_ID, 0L, probe.ref(), PATH_1, true);
182         transaction.recordSuccessfulRequest(successful2);
183         transaction.startReconnect();
184         transaction.replayMessages(successor.getTransaction(), entries);
185
186         final ModifyTransactionRequest transformed = successor.expectTransactionRequest(ModifyTransactionRequest.class);
187         Assert.assertNotNull(transformed);
188         Assert.assertEquals(successful1.getSequence(), transformed.getSequence());
189         Assert.assertTrue(transformed.getPersistenceProtocol().isPresent());
190         Assert.assertEquals(PersistenceProtocol.ABORT, transformed.getPersistenceProtocol().get());
191         Assert.assertEquals(successful2, successor.expectTransactionRequest(ReadTransactionRequest.class));
192         Assert.assertEquals(request1, successor.expectTransactionRequest(ReadTransactionRequest.class));
193         Assert.assertEquals(request2, successor.expectTransactionRequest(ExistsTransactionRequest.class));
194     }
195
196     protected void checkModifications(final ModifyTransactionRequest modifyRequest) {
197         final List<TransactionModification> modifications = modifyRequest.getModifications();
198         Assert.assertEquals(3, modifications.size());
199         Assert.assertThat(modifications, hasItem(both(isA(TransactionWrite.class)).and(hasPath(PATH_1))));
200         Assert.assertThat(modifications, hasItem(both(isA(TransactionMerge.class)).and(hasPath(PATH_2))));
201         Assert.assertThat(modifications, hasItem(both(isA(TransactionDelete.class)).and(hasPath(PATH_3))));
202     }
203
204     protected void testRequestResponse(final Consumer<VotingFuture> consumer,
205                                        final Class<? extends TransactionRequest> expectedRequest,
206                                        final BiFunction<TransactionIdentifier, Long, TransactionSuccess> replySupplier)
207             throws Exception {
208         final TransactionTester<T> tester = getTester();
209         final VotingFuture future = mock(VotingFuture.class);
210         transaction.seal();
211         consumer.accept(future);
212         final TransactionRequest req = tester.expectTransactionRequest(expectedRequest);
213         tester.replySuccess(replySupplier.apply(TRANSACTION_ID, req.getSequence()));
214         verify(future).voteYes();
215     }
216
217     protected <T extends TransactionRequest> T testHandleForwardedRemoteRequest(final T request) throws Exception {
218         transaction.handleForwardedRemoteRequest(request, createCallbackMock());
219         final RequestEnvelope envelope = backendProbe.expectMsgClass(RequestEnvelope.class);
220         final T received = (T) envelope.getMessage();
221         Assert.assertTrue(received.getClass().equals(request.getClass()));
222         Assert.assertEquals(TRANSACTION_ID, received.getTarget());
223         Assert.assertEquals(clientContextProbe.ref(), received.getReplyTo());
224         return received;
225     }
226
227     protected <T extends TransactionRequest> T testForwardToRemote(final TransactionRequest toForward,
228                                                                    final Class<T> expectedMessageClass) {
229         final Consumer<Response<?, ?>> callback = createCallbackMock();
230         final TransactionTester<RemoteProxyTransaction> transactionTester = createRemoteProxyTransactionTester();
231         final RemoteProxyTransaction successor = transactionTester.getTransaction();
232         transaction.forwardToRemote(successor, toForward, callback);
233         return transactionTester.expectTransactionRequest(expectedMessageClass);
234     }
235
236     protected TransactionTester<T> getTester() {
237         return tester;
238     }
239
240     @SuppressWarnings("unchecked")
241     protected <T> Consumer<T> createCallbackMock() {
242         return mock(Consumer.class);
243     }
244
245     protected static BaseMatcher<TransactionModification> hasPath(final YangInstanceIdentifier path) {
246         return new BaseMatcher<TransactionModification>() {
247
248             @Override
249             public boolean matches(final Object item) {
250                 return path.equals(((TransactionModification) item).getPath());
251             }
252
253             @Override
254             public void describeTo(final Description description) {
255                 description.appendValue(path);
256             }
257
258             @Override
259             public void describeMismatch(final Object item, final Description description) {
260                 final TransactionModification modification = (TransactionModification) item;
261                 description.appendText("was ").appendValue(modification.getPath());
262             }
263         };
264     }
265
266     protected TestProbe createProbe() {
267         return new TestProbe(system);
268     }
269
270     protected TransactionTester<LocalReadWriteProxyTransaction> createLocalProxy() {
271         final TestProbe backendProbe = new TestProbe(system, "backend2");
272         final TestProbe clientContextProbe = new TestProbe(system, "clientContext2");
273         final ClientActorContext context =
274                 AccessClientUtil.createClientActorContext(system, clientContextProbe.ref(), CLIENT_ID, PERSISTENCE_ID);
275         final ShardBackendInfo backend = new ShardBackendInfo(backendProbe.ref(), 0L, ABIVersion.BORON,
276                 "default", UnsignedLong.ZERO, Optional.empty(), 3);
277         final AbstractClientConnection<ShardBackendInfo> connection =
278                 AccessClientUtil.createConnectedConnection(context, 0L, backend);
279         final AbstractClientHistory history = mock(AbstractClientHistory.class);
280         final ProxyHistory parent = ProxyHistory.createClient(history, connection, HISTORY_ID);
281         final DataTreeSnapshot snapshot = mock(DataTreeSnapshot.class);
282         when(snapshot.newModification()).thenReturn(mock(CursorAwareDataTreeModification.class));
283         final LocalReadWriteProxyTransaction tx =
284                 new LocalReadWriteProxyTransaction(parent, TestUtils.TRANSACTION_ID, snapshot);
285         return new TransactionTester<>(tx, connection, backendProbe);
286     }
287
288     protected TransactionTester<RemoteProxyTransaction> createRemoteProxyTransactionTester() {
289         final TestProbe clientContextProbe = new TestProbe(system, "remoteClientContext");
290         final TestProbe backendProbe = new TestProbe(system, "remoteBackend");
291         final AbstractClientHistory history = mock(AbstractClientHistory.class);
292         final ClientActorContext context =
293                 AccessClientUtil.createClientActorContext(system, clientContextProbe.ref(), CLIENT_ID, PERSISTENCE_ID);
294         final ShardBackendInfo backend = new ShardBackendInfo(backendProbe.ref(), 0L, ABIVersion.BORON,
295                 "default", UnsignedLong.ZERO, Optional.empty(), 5);
296         final AbstractClientConnection<ShardBackendInfo> connection =
297                 AccessClientUtil.createConnectedConnection(context, 0L, backend);
298         final ProxyHistory proxyHistory = ProxyHistory.createClient(history, connection, HISTORY_ID);
299         final RemoteProxyTransaction transaction =
300                 new RemoteProxyTransaction(proxyHistory, TRANSACTION_ID, false, false);
301         return new TransactionTester<>(transaction, connection, backendProbe);
302     }
303 }