Implement scatter/gather on module shards
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / databroker / actors / dds / AbstractClientHandleTest.java
1 /*
2  * Copyright (c) 2017 Pantheon Technologies s.r.o. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.databroker.actors.dds;
9
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertNotNull;
12 import static org.mockito.ArgumentMatchers.any;
13 import static org.mockito.Mockito.doReturn;
14 import static org.mockito.Mockito.lenient;
15 import static org.mockito.Mockito.mock;
16 import static org.opendaylight.controller.cluster.databroker.actors.dds.TestUtils.CLIENT_ID;
17 import static org.opendaylight.controller.cluster.databroker.actors.dds.TestUtils.HISTORY_ID;
18 import static org.opendaylight.controller.cluster.databroker.actors.dds.TestUtils.TRANSACTION_ID;
19
20 import akka.actor.ActorRef;
21 import akka.actor.ActorSelection;
22 import akka.actor.ActorSystem;
23 import akka.testkit.TestProbe;
24 import akka.testkit.javadsl.TestKit;
25 import java.util.List;
26 import java.util.Map;
27 import org.junit.After;
28 import org.junit.Before;
29 import org.junit.Test;
30 import org.junit.runner.RunWith;
31 import org.mockito.Mock;
32 import org.mockito.junit.MockitoJUnitRunner;
33 import org.opendaylight.controller.cluster.access.client.AbstractClientConnection;
34 import org.opendaylight.controller.cluster.access.client.AccessClientUtil;
35 import org.opendaylight.controller.cluster.access.client.ClientActorContext;
36 import org.opendaylight.controller.cluster.access.client.InternalCommand;
37 import org.opendaylight.controller.cluster.access.commands.AbortLocalTransactionRequest;
38 import org.opendaylight.controller.cluster.access.commands.ConnectClientRequest;
39 import org.opendaylight.controller.cluster.access.commands.ConnectClientSuccess;
40 import org.opendaylight.controller.cluster.access.concepts.Envelope;
41 import org.opendaylight.controller.cluster.access.concepts.FailureEnvelope;
42 import org.opendaylight.controller.cluster.access.concepts.Request;
43 import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope;
44 import org.opendaylight.controller.cluster.access.concepts.RequestFailure;
45 import org.opendaylight.controller.cluster.access.concepts.RequestSuccess;
46 import org.opendaylight.controller.cluster.access.concepts.Response;
47 import org.opendaylight.controller.cluster.access.concepts.SuccessEnvelope;
48 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
49 import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
50 import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
51 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
52 import org.opendaylight.yangtools.yang.data.tree.api.DataTree;
53 import org.opendaylight.yangtools.yang.data.tree.api.DataTreeSnapshot;
54 import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
55 import scala.concurrent.Promise;
56
57 @RunWith(MockitoJUnitRunner.StrictStubs.class)
58 public abstract class AbstractClientHandleTest<T extends AbstractClientHandle<AbstractProxyTransaction>> {
59     private static final String PERSISTENCE_ID = "per-1";
60     private static final YangInstanceIdentifier PATH = YangInstanceIdentifier.empty();
61
62     @Mock
63     private DataTree dataTree;
64     @Mock
65     private DataTreeSnapshot dataTreeSnapshot;
66     private ActorSystem system;
67     private TestProbe backendProbe;
68     private AbstractClientHistory parent;
69     private AbstractDataStoreClientBehavior client;
70     private T handle;
71
72     @Before
73     public void setUp() throws Exception {
74         system = ActorSystem.apply();
75         final TestProbe contextProbe = new TestProbe(system, "context");
76         final TestProbe clientContextProbe = new TestProbe(system, "client-context");
77         backendProbe = new TestProbe(system, "backend");
78         //create handle dependencies
79         final ActorUtils actorUtils = createActorContextMock(system, contextProbe.ref());
80         final ClientActorContext clientContext =
81                 AccessClientUtil.createClientActorContext(system, clientContextProbe.ref(), CLIENT_ID, PERSISTENCE_ID);
82         client = new SimpleDataStoreClientBehavior(clientContext, actorUtils, "shard");
83         client.createLocalHistory();
84         parent = new SingleClientHistory(client, HISTORY_ID);
85         //connect client
86         client.getConnection(0L);
87         contextProbe.expectMsgClass(ConnectClientRequest.class);
88         final long sequence = 0L;
89         contextProbe.reply(new ConnectClientSuccess(CLIENT_ID, sequence, backendProbe.ref(), List.of(), dataTree, 3));
90         final InternalCommand<ShardBackendInfo> command = clientContextProbe.expectMsgClass(InternalCommand.class);
91         command.execute(client);
92         //data tree mock
93         doReturn(dataTreeSnapshot).when(dataTree).takeSnapshot();
94
95         handle = createHandle(parent);
96     }
97
98     @SuppressWarnings("checkstyle:hiddenField")
99     protected abstract T createHandle(AbstractClientHistory parent);
100
101     /**
102      * Do a operation with handle.
103      * Used for testing, whether closed handle throws exception when the operation is performed.
104      *
105      * @param handle handle
106      */
107     @SuppressWarnings("checkstyle:hiddenField")
108     protected abstract void doHandleOperation(T handle);
109
110     @After
111     public void tearDown() {
112         TestKit.shutdownActorSystem(system);
113     }
114
115     @Test
116     public void testGetIdentifier() {
117         assertEquals(TRANSACTION_ID, handle.getIdentifier());
118     }
119
120     @Test
121     public void testAbort() throws Exception {
122         doHandleOperation(handle);
123         handle.abort();
124         final Envelope<?> envelope = backendProbe.expectMsgClass(Envelope.class);
125         final AbortLocalTransactionRequest request = (AbortLocalTransactionRequest) envelope.getMessage();
126         assertEquals(TRANSACTION_ID, request.getTarget());
127         checkClosed();
128     }
129
130     @Test
131     public void testLocalAbort() throws Exception {
132         doHandleOperation(handle);
133         handle.localAbort(new RuntimeException("fail"));
134         final Envelope<?> envelope = backendProbe.expectMsgClass(Envelope.class);
135         final AbortLocalTransactionRequest request = (AbortLocalTransactionRequest) envelope.getMessage();
136         assertEquals(TRANSACTION_ID, request.getTarget());
137         checkClosed();
138     }
139
140     @Test
141     public void testEnsureClosed() {
142         doHandleOperation(handle);
143         final Map<Long, AbstractProxyTransaction> transactions = handle.ensureClosed();
144         assertNotNull(transactions);
145         assertEquals(1, transactions.size());
146     }
147
148     @Test
149     public void testEnsureProxy() {
150         final AbstractProxyTransaction expected = mock(AbstractProxyTransaction.class);
151         final AbstractProxyTransaction proxy = handle.ensureProxy(PATH);
152         assertEquals(0, proxy.getIdentifier().getTransactionId());
153     }
154
155     @Test
156     public void testParent() {
157         assertEquals(parent, handle.parent());
158     }
159
160     protected void checkClosed() throws Exception {
161         TestUtils.assertOperationThrowsException(() -> doHandleOperation(handle), IllegalStateException.class);
162     }
163
164     /**
165      * Checks, whether backend actor has received request of expected class wrapped in RequestEnvelope.
166      * Then given response wrapped in ResponseEnvelope is sent.
167      *
168      * @param expectedRequestClass expected request class
169      * @param response             response
170      * @param <R>                  expected request type
171      * @return request message
172      */
173     protected <R extends Request<?, R>> R backendRespondToRequest(final Class<R> expectedRequestClass,
174                                                             final Response<?, ?> response) {
175         final RequestEnvelope envelope = backendProbe.expectMsgClass(RequestEnvelope.class);
176         assertEquals(expectedRequestClass, envelope.getMessage().getClass());
177         final AbstractClientConnection<ShardBackendInfo> connection = client.getConnection(0L);
178         final long sessionId = envelope.getSessionId();
179         final long txSequence = envelope.getTxSequence();
180         final long executionTime = 0L;
181         if (response instanceof RequestSuccess) {
182             final RequestSuccess<?, ?> success = (RequestSuccess<?, ?>) response;
183             final SuccessEnvelope responseEnvelope = new SuccessEnvelope(success, sessionId, txSequence, executionTime);
184             AccessClientUtil.completeRequest(connection, responseEnvelope);
185         } else if (response instanceof RequestFailure) {
186             final RequestFailure<?, ?> fail = (RequestFailure<?, ?>) response;
187             final FailureEnvelope responseEnvelope = new FailureEnvelope(fail, sessionId, txSequence, executionTime);
188             AccessClientUtil.completeRequest(connection, responseEnvelope);
189         }
190         return expectedRequestClass.cast(envelope.getMessage());
191     }
192
193     protected T getHandle() {
194         return handle;
195     }
196
197     protected DataTreeSnapshot getDataTreeSnapshot() {
198         return dataTreeSnapshot;
199     }
200
201     private static ActorUtils createActorContextMock(final ActorSystem system, final ActorRef actor) {
202         final ActorUtils mock = mock(ActorUtils.class);
203         final Promise<PrimaryShardInfo> promise = new scala.concurrent.impl.Promise.DefaultPromise<>();
204         final ActorSelection selection = system.actorSelection(actor.path());
205         final PrimaryShardInfo shardInfo = new PrimaryShardInfo(selection, (short) 0);
206         promise.success(shardInfo);
207         doReturn(promise.future()).when(mock).findPrimaryShardAsync(any());
208
209         final EffectiveModelContext context = mock(EffectiveModelContext.class);
210         lenient().doCallRealMethod().when(context).getQName();
211         lenient().doReturn(context).when(mock).getSchemaContext();
212         lenient().doReturn(DatastoreContext.newBuilder().build()).when(mock).getDatastoreContext();
213
214         return mock;
215     }
216 }