2 * Copyright (c) 2017 Pantheon Technologies s.r.o. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.databroker.actors.dds;
10 import static org.hamcrest.CoreMatchers.allOf;
11 import static org.hamcrest.CoreMatchers.hasItem;
12 import static org.hamcrest.MatcherAssert.assertThat;
13 import static org.hamcrest.core.Is.isA;
14 import static org.junit.Assert.assertEquals;
15 import static org.junit.Assert.assertNotNull;
16 import static org.junit.Assert.assertTrue;
17 import static org.mockito.Mockito.mock;
18 import static org.mockito.Mockito.verify;
19 import static org.mockito.Mockito.when;
21 import akka.actor.ActorSystem;
22 import akka.testkit.TestProbe;
23 import akka.testkit.javadsl.TestKit;
24 import com.google.common.base.Ticker;
25 import com.google.common.primitives.UnsignedLong;
26 import java.util.ArrayList;
27 import java.util.List;
28 import java.util.Optional;
29 import java.util.function.BiFunction;
30 import java.util.function.Consumer;
31 import org.hamcrest.BaseMatcher;
32 import org.hamcrest.Description;
33 import org.junit.After;
34 import org.junit.Before;
35 import org.junit.Test;
36 import org.junit.runner.RunWith;
37 import org.mockito.Mock;
38 import org.mockito.junit.MockitoJUnitRunner;
39 import org.opendaylight.controller.cluster.access.ABIVersion;
40 import org.opendaylight.controller.cluster.access.client.AbstractClientConnection;
41 import org.opendaylight.controller.cluster.access.client.AccessClientUtil;
42 import org.opendaylight.controller.cluster.access.client.ClientActorContext;
43 import org.opendaylight.controller.cluster.access.client.ConnectionEntry;
44 import org.opendaylight.controller.cluster.access.commands.AbortLocalTransactionRequest;
45 import org.opendaylight.controller.cluster.access.commands.ExistsTransactionRequest;
46 import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequest;
47 import org.opendaylight.controller.cluster.access.commands.PersistenceProtocol;
48 import org.opendaylight.controller.cluster.access.commands.ReadTransactionRequest;
49 import org.opendaylight.controller.cluster.access.commands.TransactionAbortRequest;
50 import org.opendaylight.controller.cluster.access.commands.TransactionAbortSuccess;
51 import org.opendaylight.controller.cluster.access.commands.TransactionDelete;
52 import org.opendaylight.controller.cluster.access.commands.TransactionMerge;
53 import org.opendaylight.controller.cluster.access.commands.TransactionModification;
54 import org.opendaylight.controller.cluster.access.commands.TransactionPurgeRequest;
55 import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
56 import org.opendaylight.controller.cluster.access.commands.TransactionSuccess;
57 import org.opendaylight.controller.cluster.access.commands.TransactionWrite;
58 import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
59 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
60 import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope;
61 import org.opendaylight.controller.cluster.access.concepts.Response;
62 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
63 import org.opendaylight.yangtools.yang.common.QName;
64 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
65 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
66 import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
67 import org.opendaylight.yangtools.yang.data.tree.api.CursorAwareDataTreeModification;
68 import org.opendaylight.yangtools.yang.data.tree.api.DataTreeSnapshot;
70 @RunWith(MockitoJUnitRunner.StrictStubs.class)
71 public abstract class AbstractProxyTransactionTest<T extends AbstractProxyTransaction> {
72 protected static final TransactionIdentifier TRANSACTION_ID = TestUtils.TRANSACTION_ID;
73 private static final ClientIdentifier CLIENT_ID = TestUtils.CLIENT_ID;
74 private static final LocalHistoryIdentifier HISTORY_ID = TestUtils.HISTORY_ID;
76 protected static final YangInstanceIdentifier PATH_1 = YangInstanceIdentifier.builder()
77 .node(QName.create("ns-1", "node-1"))
79 protected static final YangInstanceIdentifier PATH_2 = YangInstanceIdentifier.builder()
80 .node(QName.create("ns-1", "node-2"))
82 protected static final YangInstanceIdentifier PATH_3 = YangInstanceIdentifier.builder()
83 .node(QName.create("ns-1", "node-3"))
85 protected static final ContainerNode DATA_1 = Builders.containerBuilder()
86 .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(PATH_1.getLastPathArgument().getNodeType()))
88 protected static final ContainerNode DATA_2 = Builders.containerBuilder()
89 .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(PATH_2.getLastPathArgument().getNodeType()))
91 protected static final String PERSISTENCE_ID = "per-1";
94 private DataTreeSnapshot snapshot;
96 private AbstractClientHistory history;
97 private ActorSystem system;
98 private TestProbe backendProbe;
99 private TestProbe clientContextProbe;
100 private TransactionTester<T> tester;
101 protected ClientActorContext context;
102 protected T transaction;
105 public void setUp() {
106 system = ActorSystem.apply();
107 clientContextProbe = new TestProbe(system, "clientContext");
108 backendProbe = new TestProbe(system, "backend");
109 context = AccessClientUtil.createClientActorContext(system, clientContextProbe.ref(), CLIENT_ID,
111 final ShardBackendInfo backend = new ShardBackendInfo(backendProbe.ref(), 0L, ABIVersion.BORON,
112 "default", UnsignedLong.ZERO, Optional.empty(), 3);
113 final AbstractClientConnection<ShardBackendInfo> connection =
114 AccessClientUtil.createConnectedConnection(context, 0L, backend);
115 final ProxyHistory parent = ProxyHistory.createClient(history, connection, HISTORY_ID);
116 transaction = createTransaction(parent, TestUtils.TRANSACTION_ID, snapshot);
117 tester = new TransactionTester<>(transaction, connection, backendProbe);
120 @SuppressWarnings("checkstyle:hiddenField")
121 protected abstract T createTransaction(ProxyHistory parent, TransactionIdentifier id, DataTreeSnapshot snapshot);
124 public void tearDown() {
125 TestKit.shutdownActorSystem(system);
129 public abstract void testExists() throws Exception;
132 public abstract void testRead() throws Exception;
135 public abstract void testWrite();
138 public abstract void testMerge();
141 public abstract void testDelete();
144 public abstract void testDirectCommit() throws Exception;
147 public abstract void testCanCommit();
150 public abstract void testPreCommit();
153 public abstract void testDoCommit();
156 public abstract void testForwardToRemoteAbort();
159 public abstract void testForwardToRemoteCommit();
162 public void testAbortVotingFuture() {
163 testRequestResponse(f -> transaction.abort(f), TransactionAbortRequest.class, TransactionAbortSuccess::new);
167 public void testForwardToRemotePurge() {
168 final TestProbe probe = new TestProbe(system);
169 final TransactionPurgeRequest request = new TransactionPurgeRequest(TRANSACTION_ID, 0L, probe.ref());
170 testForwardToRemote(request, TransactionPurgeRequest.class);
174 public void testReplayMessages() {
175 final TestProbe probe = new TestProbe(system);
176 final List<ConnectionEntry> entries = new ArrayList<>();
177 final Consumer<Response<?, ?>> callback = createCallbackMock();
178 final ReadTransactionRequest request1 =
179 new ReadTransactionRequest(TRANSACTION_ID, 2L, probe.ref(), PATH_2, true);
180 final ExistsTransactionRequest request2 =
181 new ExistsTransactionRequest(TRANSACTION_ID, 3L, probe.ref(), PATH_3, true);
182 entries.add(AccessClientUtil.createConnectionEntry(request1, callback, 0L));
183 entries.add(AccessClientUtil.createConnectionEntry(request2, callback, 0L));
184 final TransactionTester<RemoteProxyTransaction> successor = createRemoteProxyTransactionTester();
185 final AbortLocalTransactionRequest successful1 = new AbortLocalTransactionRequest(TRANSACTION_ID, probe.ref());
186 transaction.recordSuccessfulRequest(successful1);
187 final ReadTransactionRequest successful2 =
188 new ReadTransactionRequest(TRANSACTION_ID, 1L, probe.ref(), PATH_1, true);
189 transaction.recordSuccessfulRequest(successful2);
190 transaction.startReconnect();
192 final ProxyHistory mockSuccessor = mock(ProxyHistory.class);
193 when(mockSuccessor.createTransactionProxy(TRANSACTION_ID, transaction.isSnapshotOnly(), false))
194 .thenReturn(successor.getTransaction());
196 transaction.replayMessages(mockSuccessor, entries);
198 final ModifyTransactionRequest transformed = successor.expectTransactionRequest(ModifyTransactionRequest.class);
199 assertNotNull(transformed);
200 assertEquals(successful1.getSequence(), transformed.getSequence());
201 assertTrue(transformed.getPersistenceProtocol().isPresent());
202 assertEquals(PersistenceProtocol.ABORT, transformed.getPersistenceProtocol().get());
204 ReadTransactionRequest tmpRead = successor.expectTransactionRequest(ReadTransactionRequest.class);
205 assertNotNull(tmpRead);
206 assertEquals(successful2.getTarget(), tmpRead.getTarget());
207 assertEquals(successful2.getSequence(), tmpRead.getSequence());
208 assertEquals(successful2.getPath(), tmpRead.getPath());
209 assertEquals(successor.localActor(), tmpRead.getReplyTo());
211 tmpRead = successor.expectTransactionRequest(ReadTransactionRequest.class);
212 assertNotNull(tmpRead);
213 assertEquals(request1.getTarget(), tmpRead.getTarget());
214 assertEquals(request1.getSequence(), tmpRead.getSequence());
215 assertEquals(request1.getPath(), tmpRead.getPath());
216 assertEquals(successor.localActor(), tmpRead.getReplyTo());
218 final ExistsTransactionRequest tmpExist = successor.expectTransactionRequest(ExistsTransactionRequest.class);
219 assertNotNull(tmpExist);
220 assertEquals(request2.getTarget(), tmpExist.getTarget());
221 assertEquals(request2.getSequence(), tmpExist.getSequence());
222 assertEquals(request2.getPath(), tmpExist.getPath());
223 assertEquals(successor.localActor(), tmpExist.getReplyTo());
226 protected void checkModifications(final ModifyTransactionRequest modifyRequest) {
227 final List<TransactionModification> modifications = modifyRequest.getModifications();
228 assertEquals(3, modifications.size());
229 assertThat(modifications, hasItem(allOf(isA(TransactionWrite.class), hasPath(PATH_1))));
230 assertThat(modifications, hasItem(allOf(isA(TransactionMerge.class), hasPath(PATH_2))));
231 assertThat(modifications, hasItem(allOf(isA(TransactionDelete.class), hasPath(PATH_3))));
234 @SuppressWarnings("checkstyle:hiddenField")
235 protected <R extends TransactionRequest<R>> void testRequestResponse(final Consumer<VotingFuture<Void>> consumer,
236 final Class<R> expectedRequest,
237 final BiFunction<TransactionIdentifier, Long, TransactionSuccess<?>> replySupplier) {
238 final TransactionTester<T> tester = getTester();
239 final VotingFuture<Void> future = mock(VotingFuture.class);
241 consumer.accept(future);
242 final TransactionRequest<?> req = tester.expectTransactionRequest(expectedRequest);
243 tester.replySuccess(replySupplier.apply(TRANSACTION_ID, req.getSequence()));
244 verify(future).voteYes();
247 protected <R extends TransactionRequest<R>> R testHandleForwardedRemoteRequest(final R request) {
248 transaction.handleReplayedRemoteRequest(request, createCallbackMock(), Ticker.systemTicker().read());
249 final RequestEnvelope envelope = backendProbe.expectMsgClass(RequestEnvelope.class);
250 final R received = (R) envelope.getMessage();
251 assertTrue(received.getClass().equals(request.getClass()));
252 assertEquals(TRANSACTION_ID, received.getTarget());
253 assertEquals(clientContextProbe.ref(), received.getReplyTo());
257 protected <R extends TransactionRequest<R>> R testForwardToRemote(final TransactionRequest<?> toForward,
258 final Class<R> expectedMessageClass) {
259 final Consumer<Response<?, ?>> callback = createCallbackMock();
260 final TransactionTester<RemoteProxyTransaction> transactionTester = createRemoteProxyTransactionTester();
261 final RemoteProxyTransaction successor = transactionTester.getTransaction();
262 transaction.forwardToRemote(successor, toForward, callback);
263 return transactionTester.expectTransactionRequest(expectedMessageClass);
266 protected TransactionTester<T> getTester() {
270 @SuppressWarnings("unchecked")
271 protected static <T> Consumer<T> createCallbackMock() {
272 return mock(Consumer.class);
275 protected static BaseMatcher<TransactionModification> hasPath(final YangInstanceIdentifier path) {
276 return new BaseMatcher<>() {
279 public boolean matches(final Object item) {
280 return path.equals(((TransactionModification) item).getPath());
284 public void describeTo(final Description description) {
285 description.appendValue(path);
289 public void describeMismatch(final Object item, final Description description) {
290 final TransactionModification modification = (TransactionModification) item;
291 description.appendText("was ").appendValue(modification.getPath());
296 protected TestProbe createProbe() {
297 return new TestProbe(system);
300 @SuppressWarnings("checkstyle:hiddenField")
301 protected TransactionTester<LocalReadWriteProxyTransaction> createLocalProxy() {
302 final TestProbe backendProbe = new TestProbe(system, "backend2");
303 final TestProbe clientContextProbe = new TestProbe(system, "clientContext2");
304 final ClientActorContext context =
305 AccessClientUtil.createClientActorContext(system, clientContextProbe.ref(), CLIENT_ID, PERSISTENCE_ID);
306 final ShardBackendInfo backend = new ShardBackendInfo(backendProbe.ref(), 0L, ABIVersion.BORON,
307 "default", UnsignedLong.ZERO, Optional.empty(), 3);
308 final AbstractClientConnection<ShardBackendInfo> connection =
309 AccessClientUtil.createConnectedConnection(context, 0L, backend);
310 final AbstractClientHistory history = mock(AbstractClientHistory.class);
311 final ProxyHistory parent = ProxyHistory.createClient(history, connection, HISTORY_ID);
312 final DataTreeSnapshot snapshot = mock(DataTreeSnapshot.class);
313 when(snapshot.newModification()).thenReturn(mock(CursorAwareDataTreeModification.class));
314 final LocalReadWriteProxyTransaction tx =
315 new LocalReadWriteProxyTransaction(parent, TestUtils.TRANSACTION_ID, snapshot);
316 return new TransactionTester<>(tx, connection, backendProbe);
319 @SuppressWarnings("checkstyle:hiddenField")
320 protected TransactionTester<RemoteProxyTransaction> createRemoteProxyTransactionTester() {
321 final TestProbe clientContextProbe = new TestProbe(system, "remoteClientContext");
322 final TestProbe backendProbe = new TestProbe(system, "remoteBackend");
323 final AbstractClientHistory history = mock(AbstractClientHistory.class);
324 final ClientActorContext context =
325 AccessClientUtil.createClientActorContext(system, clientContextProbe.ref(), CLIENT_ID, PERSISTENCE_ID);
326 final ShardBackendInfo backend = new ShardBackendInfo(backendProbe.ref(), 0L, ABIVersion.BORON,
327 "default", UnsignedLong.ZERO, Optional.empty(), 5);
328 final AbstractClientConnection<ShardBackendInfo> connection =
329 AccessClientUtil.createConnectedConnection(context, 0L, backend);
330 final ProxyHistory proxyHistory = ProxyHistory.createClient(history, connection, HISTORY_ID);
331 final RemoteProxyTransaction transaction =
332 new RemoteProxyTransaction(proxyHistory, TRANSACTION_ID, false, false, false);
333 return new TransactionTester<>(transaction, connection, backendProbe);