Add cds-access-client unit tests
[controller.git] / opendaylight / md-sal / cds-access-client / src / test / java / org / opendaylight / controller / cluster / access / client / AbstractClientConnectionTest.java
1 /*
2  * Copyright (c) 2017 Pantheon Technologies s.r.o. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.access.client;
9
10 import static org.hamcrest.CoreMatchers.hasItems;
11 import static org.mockito.Matchers.anyLong;
12 import static org.mockito.Matchers.argThat;
13 import static org.mockito.Matchers.isA;
14 import static org.mockito.Mockito.mock;
15 import static org.mockito.Mockito.timeout;
16 import static org.mockito.Mockito.verify;
17
18 import akka.actor.ActorRef;
19 import akka.actor.ActorSystem;
20 import akka.testkit.JavaTestKit;
21 import akka.testkit.TestProbe;
22 import java.util.Optional;
23 import java.util.function.Consumer;
24 import org.hamcrest.BaseMatcher;
25 import org.hamcrest.Description;
26 import org.junit.After;
27 import org.junit.Assert;
28 import org.junit.Before;
29 import org.junit.Test;
30 import org.mockito.MockitoAnnotations;
31 import org.opendaylight.controller.cluster.access.commands.AbortLocalTransactionRequest;
32 import org.opendaylight.controller.cluster.access.commands.TransactionAbortSuccess;
33 import org.opendaylight.controller.cluster.access.commands.TransactionFailure;
34 import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
35 import org.opendaylight.controller.cluster.access.concepts.FrontendIdentifier;
36 import org.opendaylight.controller.cluster.access.concepts.FrontendType;
37 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
38 import org.opendaylight.controller.cluster.access.concepts.MemberName;
39 import org.opendaylight.controller.cluster.access.concepts.Request;
40 import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope;
41 import org.opendaylight.controller.cluster.access.concepts.RequestSuccess;
42 import org.opendaylight.controller.cluster.access.concepts.Response;
43 import org.opendaylight.controller.cluster.access.concepts.ResponseEnvelope;
44 import org.opendaylight.controller.cluster.access.concepts.RuntimeRequestException;
45 import org.opendaylight.controller.cluster.access.concepts.SuccessEnvelope;
46 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
47 import scala.concurrent.duration.FiniteDuration;
48
49 public abstract class AbstractClientConnectionTest<T extends AbstractClientConnection<U>, U extends BackendInfo> {
50
51     protected static final MemberName MEMBER_NAME = MemberName.forName("member-1");
52     protected static final FrontendType FRONTEND_TYPE =
53             FrontendType.forName(ClientActorContextTest.class.getSimpleName());
54     protected static final FrontendIdentifier FRONTEND_ID = FrontendIdentifier.create(MEMBER_NAME, FRONTEND_TYPE);
55     protected static final ClientIdentifier CLIENT_ID = ClientIdentifier.create(FRONTEND_ID, 0);
56     protected static final String PERSISTENCE_ID = "per-1";
57
58     protected T connection;
59     protected ClientActorContext context;
60     protected ActorSystem system;
61     protected TestProbe backendProbe;
62     protected TestProbe contextProbe;
63     protected TestProbe replyToProbe;
64
65     @Before
66     public void setUp() throws Exception {
67         MockitoAnnotations.initMocks(this);
68         system = ActorSystem.apply();
69         backendProbe = new TestProbe(system);
70         contextProbe = new TestProbe(system);
71         context = new ClientActorContext(contextProbe.ref(), system.scheduler(), system.dispatcher(),
72                 PERSISTENCE_ID, CLIENT_ID);
73         replyToProbe = new TestProbe(system);
74         connection = createConnection();
75     }
76
77     protected abstract T createConnection();
78
79     @Test
80     public void testLocalActor() throws Exception {
81         Assert.assertEquals(contextProbe.ref(), connection.localActor());
82     }
83
84     @Test
85     public abstract void testReconnectConnection() throws Exception;
86
87     @Test
88     public void testPoison() throws Exception {
89         final Consumer<Response<?, ?>> callback = mock(Consumer.class);
90         final Request<?, ?> request = createRequest(replyToProbe.ref());
91         final ConnectionEntry entry = new ConnectionEntry(request, callback, 0L);
92         connection.enqueueEntry(entry, 0L);
93         connection.poison(new RuntimeRequestException("fail", new RuntimeException("fail")));
94         verify(callback, timeout(1000)).accept(isA(TransactionFailure.class));
95     }
96
97     @Test
98     public void testSendRequestReceiveResponse() throws Exception {
99         final Consumer<Response<?, ?>> callback = mock(Consumer.class);
100         final Request<?, ?> request = createRequest(replyToProbe.ref());
101         connection.sendRequest(request, callback);
102         final RequestEnvelope requestEnvelope = backendProbe.expectMsgClass(RequestEnvelope.class);
103         Assert.assertEquals(request, requestEnvelope.getMessage());
104         final LocalHistoryIdentifier historyId = new LocalHistoryIdentifier(CLIENT_ID, 0L);
105         final RequestSuccess<?, ?> message = new TransactionAbortSuccess(new TransactionIdentifier(historyId, 0L), 0L);
106         final ResponseEnvelope<?> envelope = new SuccessEnvelope(message, 0L, 0L, 0L);
107         connection.receiveResponse(envelope);
108         verify(callback, timeout(1000)).accept(isA(TransactionAbortSuccess.class));
109     }
110
111     @Test
112     public void testRun() throws Exception {
113         final ClientActorBehavior<U> behavior = mock(ClientActorBehavior.class);
114         Assert.assertSame(behavior, connection.runTimer(behavior));
115     }
116
117     @Test
118     public void testCheckTimeoutEmptyQueue() throws Exception {
119         final Optional<FiniteDuration> timeout = connection.checkTimeout(context.ticker().read());
120         Assert.assertFalse(timeout.isPresent());
121     }
122
123     @Test
124     public void testCheckTimeoutConnectionTimeouted() throws Exception {
125         final Consumer<Response<?, ?>> callback = mock(Consumer.class);
126         connection.sendRequest(createRequest(replyToProbe.ref()), callback);
127         final long now = context.ticker().read() + ConnectedClientConnection.REQUEST_TIMEOUT_NANOS;
128         final Optional<FiniteDuration> timeout = connection.checkTimeout(now);
129         Assert.assertNull(timeout);
130     }
131
132     @Test
133     public void testCheckTimeout() throws Exception {
134         final Consumer<Response<?, ?>> callback = mock(Consumer.class);
135         connection.sendRequest(createRequest(replyToProbe.ref()), callback);
136         final long now = context.ticker().read();
137         final Optional<FiniteDuration> timeout = connection.checkTimeout(now);
138         Assert.assertTrue(timeout.isPresent());
139     }
140
141     @Test
142     public void testReplay() throws Exception {
143         final Consumer<Response<?, ?>> callback = mock(Consumer.class);
144         final Request<?, ?> request1 = createRequest(replyToProbe.ref());
145         final Request<?, ?> request2 = createRequest(replyToProbe.ref());
146         connection.sendRequest(request1, callback);
147         connection.sendRequest(request2, callback);
148         final Iterable<ConnectionEntry> entries = connection.startReplay();
149         Assert.assertThat(entries, hasItems(entryWithRequest(request1), entryWithRequest(request2)));
150         final ReconnectForwarder forwarder = mock(ReconnectForwarder.class);
151         connection.finishReplay(forwarder);
152         verify(forwarder).forwardEntry(argThat(entryWithRequest(request1)), anyLong());
153         verify(forwarder).forwardEntry(argThat(entryWithRequest(request2)), anyLong());
154     }
155
156     @After
157     public void tearDown() throws Exception {
158         JavaTestKit.shutdownActorSystem(system);
159     }
160
161     protected Request<?, ?> createRequest(final ActorRef replyTo) {
162         final TransactionIdentifier identifier =
163                 new TransactionIdentifier(new LocalHistoryIdentifier(CLIENT_ID, 0L), 0L);
164         return new AbortLocalTransactionRequest(identifier, replyTo);
165     }
166
167     private static ConnectionEntryMatcher entryWithRequest(final Request<?, ?> request) {
168         return new ConnectionEntryMatcher(request);
169     }
170
171     private static class ConnectionEntryMatcher extends BaseMatcher<ConnectionEntry> {
172
173         private final Request request;
174
175         private ConnectionEntryMatcher(final Request request) {
176             this.request = request;
177         }
178
179         @Override
180         public boolean matches(final Object item) {
181             if (!(item instanceof ConnectionEntry)) {
182                 return false;
183             }
184             final ConnectionEntry entry = (ConnectionEntry) item;
185             return this.request.equals(entry.getRequest());
186         }
187
188         @Override
189         public void describeMismatch(final Object item, final Description description) {
190             final ConnectionEntry entry = (ConnectionEntry) item;
191             super.describeMismatch(entry.getRequest(), description);
192         }
193
194         @Override
195         public void describeTo(final Description description) {
196             description.appendValue(request);
197         }
198     }
199
200 }