Add TransmitQueue unit tests
[controller.git] / opendaylight / md-sal / cds-access-client / src / test / java / org / opendaylight / controller / cluster / access / client / TransmittingTransmitQueueTest.java
1 /*
2  * Copyright (c) 2017 Pantheon Technologies s.r.o. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.access.client;
9
10 import static org.hamcrest.CoreMatchers.everyItem;
11 import static org.mockito.Matchers.eq;
12 import static org.mockito.Matchers.isA;
13 import static org.mockito.Mockito.mock;
14 import static org.mockito.Mockito.verify;
15 import static org.opendaylight.controller.cluster.access.client.ConnectionEntryMatcher.entryWithRequest;
16
17 import com.google.common.base.Ticker;
18 import com.google.common.collect.Iterables;
19 import com.google.common.testing.FakeTicker;
20 import java.util.Optional;
21 import java.util.concurrent.TimeUnit;
22 import java.util.function.Consumer;
23 import org.junit.Assert;
24 import org.junit.Test;
25 import org.opendaylight.controller.cluster.access.ABIVersion;
26 import org.opendaylight.controller.cluster.access.commands.TransactionPurgeRequest;
27 import org.opendaylight.controller.cluster.access.commands.TransactionPurgeResponse;
28 import org.opendaylight.controller.cluster.access.concepts.Request;
29 import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope;
30 import org.opendaylight.controller.cluster.access.concepts.RequestSuccess;
31 import org.opendaylight.controller.cluster.access.concepts.Response;
32 import org.opendaylight.controller.cluster.access.concepts.SuccessEnvelope;
33 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
34
35 public class TransmittingTransmitQueueTest extends AbstractTransmitQueueTest<TransmitQueue.Transmitting> {
36
37     private BackendInfo backendInfo;
38
39     @Override
40     protected int getMaxInFlightMessages() {
41         return backendInfo.getMaxMessages();
42     }
43
44     @Override
45     protected TransmitQueue.Transmitting createQueue() {
46         backendInfo = new BackendInfo(probe.ref(), 0L, ABIVersion.BORON, 3);
47         return new TransmitQueue.Transmitting(0, backendInfo);
48     }
49
50     @Test
51     public void testComplete() throws Exception {
52         final long sequence1 = 0L;
53         final long sequence2 = 1L;
54         final Request request1 = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, sequence1, probe.ref());
55         final TransactionIdentifier transactionIdentifier2 = new TransactionIdentifier(HISTORY, 1L);
56         final Request request2 = new TransactionPurgeRequest(transactionIdentifier2, sequence2, probe.ref());
57         final Consumer<Response<?, ?>> callback1 = createConsumerMock();
58         final Consumer<Response<?, ?>> callback2 = createConsumerMock();
59         final long now1 = Ticker.systemTicker().read();
60         final long now2 = Ticker.systemTicker().read();
61         //enqueue 2 entries
62         queue.enqueue(new ConnectionEntry(request1, callback1, now1), now1);
63         queue.enqueue(new ConnectionEntry(request2, callback2, now2), now2);
64         final RequestSuccess<?, ?> success1 = new TransactionPurgeResponse(TRANSACTION_IDENTIFIER, sequence1);
65         final RequestSuccess<?, ?> success2 = new TransactionPurgeResponse(transactionIdentifier2, sequence2);
66         //complete entries in different order
67         final Optional<TransmittedConnectionEntry> completed2 =
68                 queue.complete(new SuccessEnvelope(success2, 0L, sequence2, 1L), now2);
69         final Optional<TransmittedConnectionEntry> completed1 =
70                 queue.complete(new SuccessEnvelope(success1, 0L, sequence1, 1L), now1);
71         //check first entry
72         final TransmittedConnectionEntry transmittedEntry1 = completed1.orElseThrow(AssertionError::new);
73         Assert.assertEquals(transmittedEntry1.getRequest(), request1);
74         Assert.assertEquals(transmittedEntry1.getTxSequence(), sequence1);
75         Assert.assertEquals(transmittedEntry1.getCallback(), callback1);
76         //check second entry
77         final TransmittedConnectionEntry transmittedEntry2 = completed2.orElseThrow(AssertionError::new);
78         Assert.assertEquals(transmittedEntry2.getRequest(), request2);
79         Assert.assertEquals(transmittedEntry2.getTxSequence(), sequence2);
80         Assert.assertEquals(transmittedEntry2.getCallback(), callback2);
81     }
82
83     @Test
84     public void testEnqueueCanTransmit() throws Exception {
85         final Request request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref());
86         final Consumer<Response<?, ?>> callback = createConsumerMock();
87         final long now = Ticker.systemTicker().read();
88         queue.enqueue(new ConnectionEntry(request, callback, now), now);
89         final RequestEnvelope requestEnvelope = probe.expectMsgClass(RequestEnvelope.class);
90         Assert.assertEquals(request, requestEnvelope.getMessage());
91     }
92
93     @Test
94     public void testEnqueueBackendFull() throws Exception {
95         final Request request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref());
96         final Consumer<Response<?, ?>> callback = createConsumerMock();
97         final long now = Ticker.systemTicker().read();
98         final int sentMessages = getMaxInFlightMessages() + 1;
99         for (int i = 0; i < sentMessages; i++) {
100             queue.enqueue(new ConnectionEntry(request, callback, now), now);
101         }
102         for (int i = 0; i < getMaxInFlightMessages(); i++) {
103             probe.expectMsgClass(RequestEnvelope.class);
104         }
105         probe.expectNoMsg();
106         final Iterable<ConnectionEntry> entries = queue.asIterable();
107         Assert.assertEquals(sentMessages, Iterables.size(entries));
108         Assert.assertThat(entries, everyItem(entryWithRequest(request)));
109     }
110
111     @Test
112     @Override
113     public void testCanTransmitCount() throws Exception {
114         Assert.assertTrue(queue.canTransmitCount(getMaxInFlightMessages() - 1) > 0);
115         Assert.assertFalse(queue.canTransmitCount(getMaxInFlightMessages()) > 0);
116     }
117
118     @Test
119     @Override
120     public void testTransmit() throws Exception {
121         final Request request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref());
122         final Consumer<Response<?, ?>> callback = createConsumerMock();
123         final long now = Ticker.systemTicker().read();
124         final ConnectionEntry entry = new ConnectionEntry(request, callback, now);
125         queue.transmit(entry, now);
126         final RequestEnvelope requestEnvelope = probe.expectMsgClass(RequestEnvelope.class);
127         Assert.assertEquals(request, requestEnvelope.getMessage());
128     }
129
130     @Test
131     public void testSetForwarder() throws Exception {
132         final FakeTicker ticker = new FakeTicker();
133         ticker.setAutoIncrementStep(1, TimeUnit.MICROSECONDS);
134         final Request request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref());
135         final Consumer<Response<?, ?>> callback = createConsumerMock();
136         final ConnectionEntry entry = new ConnectionEntry(request, callback, ticker.read());
137         queue.enqueue(entry, ticker.read());
138         final ReconnectForwarder forwarder = mock(ReconnectForwarder.class);
139         final long setForwarderNow = ticker.read();
140         queue.setForwarder(forwarder, setForwarderNow);
141         verify(forwarder).forwardEntry(isA(TransmittedConnectionEntry.class), eq(setForwarderNow));
142         final long secondEnqueueNow = ticker.read();
143         queue.enqueue(entry, secondEnqueueNow);
144         verify(forwarder).forwardEntry(entry, secondEnqueueNow);
145     }
146
147 }