BUG-8422: separate retry and request timeouts
[controller.git] / opendaylight / md-sal / cds-access-client / src / test / java / org / opendaylight / controller / cluster / access / client / AbstractClientConnectionTest.java
1 /*
2  * Copyright (c) 2017 Pantheon Technologies s.r.o. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.access.client;
9
10 import static org.hamcrest.CoreMatchers.hasItems;
11 import static org.mockito.Matchers.anyLong;
12 import static org.mockito.Matchers.argThat;
13 import static org.mockito.Matchers.isA;
14 import static org.mockito.Mockito.mock;
15 import static org.mockito.Mockito.timeout;
16 import static org.mockito.Mockito.verify;
17 import static org.opendaylight.controller.cluster.access.client.ConnectionEntryMatcher.entryWithRequest;
18
19 import akka.actor.ActorRef;
20 import akka.actor.ActorSystem;
21 import akka.testkit.JavaTestKit;
22 import akka.testkit.TestProbe;
23 import java.util.Optional;
24 import java.util.function.Consumer;
25 import org.junit.After;
26 import org.junit.Assert;
27 import org.junit.Before;
28 import org.junit.Test;
29 import org.mockito.MockitoAnnotations;
30 import org.opendaylight.controller.cluster.access.commands.AbortLocalTransactionRequest;
31 import org.opendaylight.controller.cluster.access.commands.TransactionAbortSuccess;
32 import org.opendaylight.controller.cluster.access.commands.TransactionFailure;
33 import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
34 import org.opendaylight.controller.cluster.access.concepts.FrontendIdentifier;
35 import org.opendaylight.controller.cluster.access.concepts.FrontendType;
36 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
37 import org.opendaylight.controller.cluster.access.concepts.MemberName;
38 import org.opendaylight.controller.cluster.access.concepts.Request;
39 import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope;
40 import org.opendaylight.controller.cluster.access.concepts.RequestSuccess;
41 import org.opendaylight.controller.cluster.access.concepts.Response;
42 import org.opendaylight.controller.cluster.access.concepts.ResponseEnvelope;
43 import org.opendaylight.controller.cluster.access.concepts.RuntimeRequestException;
44 import org.opendaylight.controller.cluster.access.concepts.SuccessEnvelope;
45 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
46
47 public abstract class AbstractClientConnectionTest<T extends AbstractClientConnection<U>, U extends BackendInfo> {
48
49     protected static final MemberName MEMBER_NAME = MemberName.forName("member-1");
50     protected static final FrontendType FRONTEND_TYPE =
51             FrontendType.forName(ClientActorContextTest.class.getSimpleName());
52     protected static final FrontendIdentifier FRONTEND_ID = FrontendIdentifier.create(MEMBER_NAME, FRONTEND_TYPE);
53     protected static final ClientIdentifier CLIENT_ID = ClientIdentifier.create(FRONTEND_ID, 0);
54     protected static final String PERSISTENCE_ID = "per-1";
55
56     protected T connection;
57     protected ClientActorContext context;
58     protected ActorSystem system;
59     protected TestProbe backendProbe;
60     protected TestProbe contextProbe;
61     protected TestProbe replyToProbe;
62
63     @Before
64     public void setUp() throws Exception {
65         MockitoAnnotations.initMocks(this);
66         system = ActorSystem.apply();
67         backendProbe = new TestProbe(system);
68         contextProbe = new TestProbe(system);
69         context = new ClientActorContext(contextProbe.ref(), system.scheduler(), system.dispatcher(),
70                 PERSISTENCE_ID, CLIENT_ID);
71         replyToProbe = new TestProbe(system);
72         connection = createConnection();
73     }
74
75     protected abstract T createConnection();
76
77     @Test
78     public void testLocalActor() throws Exception {
79         Assert.assertEquals(contextProbe.ref(), connection.localActor());
80     }
81
82     @Test
83     public abstract void testReconnectConnection() throws Exception;
84
85     @Test
86     public void testPoison() throws Exception {
87         final Consumer<Response<?, ?>> callback = mock(Consumer.class);
88         final Request<?, ?> request = createRequest(replyToProbe.ref());
89         final ConnectionEntry entry = new ConnectionEntry(request, callback, 0L);
90         connection.enqueueEntry(entry, 0L);
91         connection.poison(new RuntimeRequestException("fail", new RuntimeException("fail")));
92         verify(callback, timeout(1000)).accept(isA(TransactionFailure.class));
93     }
94
95     @Test
96     public void testSendRequestReceiveResponse() throws Exception {
97         final Consumer<Response<?, ?>> callback = mock(Consumer.class);
98         final Request<?, ?> request = createRequest(replyToProbe.ref());
99         connection.sendRequest(request, callback);
100         final RequestEnvelope requestEnvelope = backendProbe.expectMsgClass(RequestEnvelope.class);
101         Assert.assertEquals(request, requestEnvelope.getMessage());
102         final LocalHistoryIdentifier historyId = new LocalHistoryIdentifier(CLIENT_ID, 0L);
103         final RequestSuccess<?, ?> message = new TransactionAbortSuccess(new TransactionIdentifier(historyId, 0L), 0L);
104         final ResponseEnvelope<?> envelope = new SuccessEnvelope(message, 0L, 0L, 0L);
105         connection.receiveResponse(envelope);
106         verify(callback, timeout(1000)).accept(isA(TransactionAbortSuccess.class));
107     }
108
109     @Test
110     public void testRun() throws Exception {
111         final ClientActorBehavior<U> behavior = mock(ClientActorBehavior.class);
112         Assert.assertSame(behavior, connection.runTimer(behavior));
113     }
114
115     @Test
116     public void testCheckTimeoutEmptyQueue() throws Exception {
117         final Optional<Long> timeout = connection.checkTimeout(context.ticker().read());
118         Assert.assertFalse(timeout.isPresent());
119     }
120
121     @Test
122     public void testCheckTimeoutConnectionTimeouted() throws Exception {
123         final Consumer<Response<?, ?>> callback = mock(Consumer.class);
124         connection.sendRequest(createRequest(replyToProbe.ref()), callback);
125         final long now = context.ticker().read() + ConnectedClientConnection.BACKEND_ALIVE_TIMEOUT_NANOS;
126         final Optional<Long> timeout = connection.checkTimeout(now);
127         Assert.assertNull(timeout);
128     }
129
130     @Test
131     public void testCheckTimeout() throws Exception {
132         final Consumer<Response<?, ?>> callback = mock(Consumer.class);
133         connection.sendRequest(createRequest(replyToProbe.ref()), callback);
134         final long now = context.ticker().read();
135         final Optional<Long> timeout = connection.checkTimeout(now);
136         Assert.assertTrue(timeout.isPresent());
137     }
138
139     @Test
140     public void testReplay() throws Exception {
141         final Consumer<Response<?, ?>> callback = mock(Consumer.class);
142         final Request<?, ?> request1 = createRequest(replyToProbe.ref());
143         final Request<?, ?> request2 = createRequest(replyToProbe.ref());
144         connection.sendRequest(request1, callback);
145         connection.sendRequest(request2, callback);
146         final Iterable<ConnectionEntry> entries = connection.startReplay();
147         Assert.assertThat(entries, hasItems(entryWithRequest(request1), entryWithRequest(request2)));
148         final ReconnectForwarder forwarder = mock(ReconnectForwarder.class);
149         connection.finishReplay(forwarder);
150         verify(forwarder).forwardEntry(argThat(entryWithRequest(request1)), anyLong());
151         verify(forwarder).forwardEntry(argThat(entryWithRequest(request2)), anyLong());
152     }
153
154     @After
155     public void tearDown() throws Exception {
156         JavaTestKit.shutdownActorSystem(system);
157     }
158
159     protected Request<?, ?> createRequest(final ActorRef replyTo) {
160         final TransactionIdentifier identifier =
161                 new TransactionIdentifier(new LocalHistoryIdentifier(CLIENT_ID, 0L), 0L);
162         return new AbortLocalTransactionRequest(identifier, replyTo);
163     }
164
165 }