Improve segmented journal actor metrics
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / databroker / actors / dds / AbstractProxyTransactionTest.java
1 /*
2  * Copyright (c) 2017 Pantheon Technologies s.r.o. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.databroker.actors.dds;
9
10 import static org.hamcrest.CoreMatchers.allOf;
11 import static org.hamcrest.CoreMatchers.hasItem;
12 import static org.hamcrest.MatcherAssert.assertThat;
13 import static org.hamcrest.core.Is.isA;
14 import static org.junit.Assert.assertEquals;
15 import static org.junit.Assert.assertNotNull;
16 import static org.junit.Assert.assertTrue;
17 import static org.mockito.Mockito.doReturn;
18 import static org.mockito.Mockito.mock;
19 import static org.mockito.Mockito.verify;
20 import static org.mockito.Mockito.when;
21
22 import akka.actor.ActorSystem;
23 import akka.testkit.TestProbe;
24 import akka.testkit.javadsl.TestKit;
25 import com.google.common.base.Ticker;
26 import com.google.common.primitives.UnsignedLong;
27 import java.util.ArrayList;
28 import java.util.List;
29 import java.util.Optional;
30 import java.util.function.BiFunction;
31 import java.util.function.Consumer;
32 import org.hamcrest.BaseMatcher;
33 import org.hamcrest.Description;
34 import org.junit.After;
35 import org.junit.Before;
36 import org.junit.Test;
37 import org.junit.runner.RunWith;
38 import org.mockito.Mock;
39 import org.mockito.junit.MockitoJUnitRunner;
40 import org.opendaylight.controller.cluster.access.ABIVersion;
41 import org.opendaylight.controller.cluster.access.client.AbstractClientConnection;
42 import org.opendaylight.controller.cluster.access.client.AccessClientUtil;
43 import org.opendaylight.controller.cluster.access.client.ClientActorContext;
44 import org.opendaylight.controller.cluster.access.client.ConnectionEntry;
45 import org.opendaylight.controller.cluster.access.commands.AbortLocalTransactionRequest;
46 import org.opendaylight.controller.cluster.access.commands.ExistsTransactionRequest;
47 import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequest;
48 import org.opendaylight.controller.cluster.access.commands.PersistenceProtocol;
49 import org.opendaylight.controller.cluster.access.commands.ReadTransactionRequest;
50 import org.opendaylight.controller.cluster.access.commands.TransactionAbortRequest;
51 import org.opendaylight.controller.cluster.access.commands.TransactionAbortSuccess;
52 import org.opendaylight.controller.cluster.access.commands.TransactionDelete;
53 import org.opendaylight.controller.cluster.access.commands.TransactionMerge;
54 import org.opendaylight.controller.cluster.access.commands.TransactionModification;
55 import org.opendaylight.controller.cluster.access.commands.TransactionPurgeRequest;
56 import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
57 import org.opendaylight.controller.cluster.access.commands.TransactionSuccess;
58 import org.opendaylight.controller.cluster.access.commands.TransactionWrite;
59 import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
60 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
61 import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope;
62 import org.opendaylight.controller.cluster.access.concepts.Response;
63 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
64 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
65 import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
66 import org.opendaylight.yangtools.yang.common.Empty;
67 import org.opendaylight.yangtools.yang.common.QName;
68 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
69 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
70 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
71 import org.opendaylight.yangtools.yang.data.spi.node.ImmutableNodes;
72 import org.opendaylight.yangtools.yang.data.tree.api.CursorAwareDataTreeModification;
73 import org.opendaylight.yangtools.yang.data.tree.api.DataTreeSnapshot;
74
75 @RunWith(MockitoJUnitRunner.StrictStubs.class)
76 public abstract class AbstractProxyTransactionTest<T extends AbstractProxyTransaction> {
77     protected static final TransactionIdentifier TRANSACTION_ID = TestUtils.TRANSACTION_ID;
78     private static final ClientIdentifier CLIENT_ID = TestUtils.CLIENT_ID;
79     private static final LocalHistoryIdentifier HISTORY_ID = TestUtils.HISTORY_ID;
80
81     protected static final YangInstanceIdentifier PATH_1 = YangInstanceIdentifier.builder()
82             .node(QName.create("ns-1", "node-1"))
83             .build();
84     protected static final YangInstanceIdentifier PATH_2 = YangInstanceIdentifier.builder()
85             .node(QName.create("ns-1", "node-2"))
86             .build();
87     protected static final YangInstanceIdentifier PATH_3 = YangInstanceIdentifier.builder()
88             .node(QName.create("ns-1", "node-3"))
89             .build();
90     protected static final ContainerNode DATA_1 = ImmutableNodes.newContainerBuilder()
91             .withNodeIdentifier(new NodeIdentifier(PATH_1.getLastPathArgument().getNodeType()))
92             .build();
93     protected static final ContainerNode DATA_2 = ImmutableNodes.newContainerBuilder()
94             .withNodeIdentifier(new NodeIdentifier(PATH_2.getLastPathArgument().getNodeType()))
95             .build();
96     protected static final String PERSISTENCE_ID = "per-1";
97
98     @Mock
99     private DataTreeSnapshot snapshot;
100     @Mock
101     private AbstractClientHistory history;
102     @Mock
103     private DatastoreContext datastoreContext;
104     @Mock
105     private ActorUtils actorUtils;
106
107     private ActorSystem system;
108     private TestProbe backendProbe;
109     private TestProbe clientContextProbe;
110     private TransactionTester<T> tester;
111     protected ClientActorContext context;
112     protected T transaction;
113
114     @Before
115     public void setUp() {
116         system = ActorSystem.apply();
117         clientContextProbe = new TestProbe(system, "clientContext");
118         backendProbe = new TestProbe(system, "backend");
119         context = AccessClientUtil.createClientActorContext(system, clientContextProbe.ref(), CLIENT_ID,
120                 PERSISTENCE_ID);
121         final ShardBackendInfo backend = new ShardBackendInfo(backendProbe.ref(), 0L, ABIVersion.current(),
122                 "default", UnsignedLong.ZERO, Optional.empty(), 3);
123         final AbstractClientConnection<ShardBackendInfo> connection =
124                 AccessClientUtil.createConnectedConnection(context, 0L, backend);
125
126         final ProxyHistory parent = ProxyHistory.createClient(history, connection, HISTORY_ID);
127         transaction = createTransaction(parent, TestUtils.TRANSACTION_ID, snapshot);
128         tester = new TransactionTester<>(transaction, connection, backendProbe);
129     }
130
131     protected final void mockForRemote() {
132         doReturn(1000).when(datastoreContext).getShardBatchedModificationCount();
133         doReturn(datastoreContext).when(actorUtils).getDatastoreContext();
134         doReturn(actorUtils).when(history).actorUtils();
135     }
136
137     @SuppressWarnings("checkstyle:hiddenField")
138     protected abstract T createTransaction(ProxyHistory parent, TransactionIdentifier id, DataTreeSnapshot snapshot);
139
140     @After
141     public void tearDown() {
142         TestKit.shutdownActorSystem(system);
143     }
144
145     @Test
146     public abstract void testExists() throws Exception;
147
148     @Test
149     public abstract void testRead() throws Exception;
150
151     @Test
152     public abstract void testWrite();
153
154     @Test
155     public abstract void testMerge();
156
157     @Test
158     public abstract void testDelete();
159
160     @Test
161     public abstract void testDirectCommit() throws Exception;
162
163     @Test
164     public abstract void testCanCommit();
165
166     @Test
167     public abstract void testPreCommit();
168
169     @Test
170     public abstract void testDoCommit();
171
172     @Test
173     public abstract void testForwardToRemoteAbort();
174
175     @Test
176     public abstract void testForwardToRemoteCommit();
177
178     @Test
179     public void testAbortVotingFuture() {
180         testRequestResponse(f -> transaction.abort(f), TransactionAbortRequest.class, TransactionAbortSuccess::new);
181     }
182
183     @Test
184     public void testForwardToRemotePurge() {
185         final TestProbe probe = new TestProbe(system);
186         final TransactionPurgeRequest request = new TransactionPurgeRequest(TRANSACTION_ID, 0L, probe.ref());
187         testForwardToRemote(request, TransactionPurgeRequest.class);
188     }
189
190     @Test
191     public void testReplayMessages() {
192         final TestProbe probe = new TestProbe(system);
193         final List<ConnectionEntry> entries = new ArrayList<>();
194         final Consumer<Response<?, ?>> callback = createCallbackMock();
195         final ReadTransactionRequest request1 =
196                 new ReadTransactionRequest(TRANSACTION_ID, 2L, probe.ref(), PATH_2, true);
197         final ExistsTransactionRequest request2 =
198                 new ExistsTransactionRequest(TRANSACTION_ID, 3L, probe.ref(), PATH_3, true);
199         entries.add(AccessClientUtil.createConnectionEntry(request1, callback, 0L));
200         entries.add(AccessClientUtil.createConnectionEntry(request2, callback, 0L));
201         final TransactionTester<RemoteProxyTransaction> successor = createRemoteProxyTransactionTester();
202         final AbortLocalTransactionRequest successful1 = new AbortLocalTransactionRequest(TRANSACTION_ID, probe.ref());
203         transaction.recordSuccessfulRequest(successful1);
204         final ReadTransactionRequest successful2 =
205                 new ReadTransactionRequest(TRANSACTION_ID, 1L, probe.ref(), PATH_1, true);
206         transaction.recordSuccessfulRequest(successful2);
207         transaction.startReconnect();
208
209         final ProxyHistory mockSuccessor = mock(ProxyHistory.class);
210         when(mockSuccessor.createTransactionProxy(TRANSACTION_ID, transaction.isSnapshotOnly(), false))
211             .thenReturn(successor.getTransaction());
212
213         transaction.replayMessages(mockSuccessor, entries);
214
215         final ModifyTransactionRequest transformed = successor.expectTransactionRequest(ModifyTransactionRequest.class);
216         assertNotNull(transformed);
217         assertEquals(successful1.getSequence(), transformed.getSequence());
218         assertEquals(Optional.of(PersistenceProtocol.ABORT), transformed.getPersistenceProtocol());
219
220         ReadTransactionRequest tmpRead = successor.expectTransactionRequest(ReadTransactionRequest.class);
221         assertNotNull(tmpRead);
222         assertEquals(successful2.getTarget(), tmpRead.getTarget());
223         assertEquals(successful2.getSequence(), tmpRead.getSequence());
224         assertEquals(successful2.getPath(), tmpRead.getPath());
225         assertEquals(successor.localActor(), tmpRead.getReplyTo());
226
227         tmpRead = successor.expectTransactionRequest(ReadTransactionRequest.class);
228         assertNotNull(tmpRead);
229         assertEquals(request1.getTarget(), tmpRead.getTarget());
230         assertEquals(request1.getSequence(), tmpRead.getSequence());
231         assertEquals(request1.getPath(), tmpRead.getPath());
232         assertEquals(successor.localActor(), tmpRead.getReplyTo());
233
234         final ExistsTransactionRequest tmpExist = successor.expectTransactionRequest(ExistsTransactionRequest.class);
235         assertNotNull(tmpExist);
236         assertEquals(request2.getTarget(), tmpExist.getTarget());
237         assertEquals(request2.getSequence(), tmpExist.getSequence());
238         assertEquals(request2.getPath(), tmpExist.getPath());
239         assertEquals(successor.localActor(), tmpExist.getReplyTo());
240     }
241
242     protected void checkModifications(final ModifyTransactionRequest modifyRequest) {
243         final List<TransactionModification> modifications = modifyRequest.getModifications();
244         assertEquals(3, modifications.size());
245         assertThat(modifications, hasItem(allOf(isA(TransactionWrite.class), hasPath(PATH_1))));
246         assertThat(modifications, hasItem(allOf(isA(TransactionMerge.class), hasPath(PATH_2))));
247         assertThat(modifications, hasItem(allOf(isA(TransactionDelete.class), hasPath(PATH_3))));
248     }
249
250     @SuppressWarnings("checkstyle:hiddenField")
251     protected <R extends TransactionRequest<R>> void testRequestResponse(final Consumer<VotingFuture<Empty>> consumer,
252             final Class<R> expectedRequest,
253             final BiFunction<TransactionIdentifier, Long, TransactionSuccess<?>> replySupplier) {
254         final TransactionTester<T> tester = getTester();
255         final VotingFuture<Empty> future = mock(VotingFuture.class);
256         transaction.seal();
257         consumer.accept(future);
258         final TransactionRequest<?> req = tester.expectTransactionRequest(expectedRequest);
259         tester.replySuccess(replySupplier.apply(TRANSACTION_ID, req.getSequence()));
260         verify(future).voteYes();
261     }
262
263     protected <R extends TransactionRequest<R>> R testHandleForwardedRemoteRequest(final R request) {
264         transaction.handleReplayedRemoteRequest(request, createCallbackMock(), Ticker.systemTicker().read());
265         final RequestEnvelope envelope = backendProbe.expectMsgClass(RequestEnvelope.class);
266         final R received = (R) envelope.getMessage();
267         assertTrue(received.getClass().equals(request.getClass()));
268         assertEquals(TRANSACTION_ID, received.getTarget());
269         assertEquals(clientContextProbe.ref(), received.getReplyTo());
270         return received;
271     }
272
273     protected <R extends TransactionRequest<R>> R testForwardToRemote(final TransactionRequest<?> toForward,
274             final Class<R> expectedMessageClass) {
275         final Consumer<Response<?, ?>> callback = createCallbackMock();
276         final TransactionTester<RemoteProxyTransaction> transactionTester = createRemoteProxyTransactionTester();
277         final RemoteProxyTransaction successor = transactionTester.getTransaction();
278         transaction.forwardToRemote(successor, toForward, callback);
279         return transactionTester.expectTransactionRequest(expectedMessageClass);
280     }
281
282     protected TransactionTester<T> getTester() {
283         return tester;
284     }
285
286     @SuppressWarnings("unchecked")
287     protected static <T> Consumer<T> createCallbackMock() {
288         return mock(Consumer.class);
289     }
290
291     protected static BaseMatcher<TransactionModification> hasPath(final YangInstanceIdentifier path) {
292         return new BaseMatcher<>() {
293
294             @Override
295             public boolean matches(final Object item) {
296                 return path.equals(((TransactionModification) item).getPath());
297             }
298
299             @Override
300             public void describeTo(final Description description) {
301                 description.appendValue(path);
302             }
303
304             @Override
305             public void describeMismatch(final Object item, final Description description) {
306                 final TransactionModification modification = (TransactionModification) item;
307                 description.appendText("was ").appendValue(modification.getPath());
308             }
309         };
310     }
311
312     protected TestProbe createProbe() {
313         return new TestProbe(system);
314     }
315
316     @SuppressWarnings("checkstyle:hiddenField")
317     protected TransactionTester<LocalReadWriteProxyTransaction> createLocalProxy() {
318         final TestProbe backendProbe = new TestProbe(system, "backend2");
319         final TestProbe clientContextProbe = new TestProbe(system, "clientContext2");
320         final ClientActorContext context =
321                 AccessClientUtil.createClientActorContext(system, clientContextProbe.ref(), CLIENT_ID, PERSISTENCE_ID);
322         final ShardBackendInfo backend = new ShardBackendInfo(backendProbe.ref(), 0L, ABIVersion.current(),
323                 "default", UnsignedLong.ZERO, Optional.empty(), 3);
324         final AbstractClientConnection<ShardBackendInfo> connection =
325                 AccessClientUtil.createConnectedConnection(context, 0L, backend);
326         final AbstractClientHistory history = mock(AbstractClientHistory.class);
327         final ProxyHistory parent = ProxyHistory.createClient(history, connection, HISTORY_ID);
328         final DataTreeSnapshot snapshot = mock(DataTreeSnapshot.class);
329         when(snapshot.newModification()).thenReturn(mock(CursorAwareDataTreeModification.class));
330         final LocalReadWriteProxyTransaction tx =
331                 new LocalReadWriteProxyTransaction(parent, TestUtils.TRANSACTION_ID, snapshot);
332         return new TransactionTester<>(tx, connection, backendProbe);
333     }
334
335     @SuppressWarnings("checkstyle:hiddenField")
336     protected TransactionTester<RemoteProxyTransaction> createRemoteProxyTransactionTester() {
337         final TestProbe clientContextProbe = new TestProbe(system, "remoteClientContext");
338         final TestProbe backendProbe = new TestProbe(system, "remoteBackend");
339         final AbstractClientHistory history = mock(AbstractClientHistory.class);
340         doReturn(1000).when(datastoreContext).getShardBatchedModificationCount();
341         doReturn(datastoreContext).when(actorUtils).getDatastoreContext();
342         doReturn(actorUtils).when(history).actorUtils();
343
344         final ClientActorContext context =
345                 AccessClientUtil.createClientActorContext(system, clientContextProbe.ref(), CLIENT_ID, PERSISTENCE_ID);
346         final ShardBackendInfo backend = new ShardBackendInfo(backendProbe.ref(), 0L, ABIVersion.current(),
347                 "default", UnsignedLong.ZERO, Optional.empty(), 5);
348         final AbstractClientConnection<ShardBackendInfo> connection =
349                 AccessClientUtil.createConnectedConnection(context, 0L, backend);
350         final ProxyHistory proxyHistory = ProxyHistory.createClient(history, connection, HISTORY_ID);
351
352         final RemoteProxyTransaction transaction =
353                 new RemoteProxyTransaction(proxyHistory, TRANSACTION_ID, false, false, false);
354         return new TransactionTester<>(transaction, connection, backendProbe);
355     }
356 }