Enable checkstyle enforcement in openflowplugin-parent
[openflowplugin.git] / openflowjava / openflow-protocol-impl / src / main / java / org / opendaylight / openflowjava / protocol / impl / core / connection / AbstractConnectionAdapter.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.openflowjava.protocol.impl.core.connection;
10
11 import com.google.common.annotations.VisibleForTesting;
12 import com.google.common.base.Preconditions;
13 import com.google.common.cache.Cache;
14 import com.google.common.cache.CacheBuilder;
15 import com.google.common.cache.RemovalCause;
16 import com.google.common.cache.RemovalListener;
17 import com.google.common.util.concurrent.ListenableFuture;
18 import com.google.common.util.concurrent.SettableFuture;
19 import io.netty.channel.Channel;
20 import io.netty.channel.ChannelFuture;
21 import java.net.InetSocketAddress;
22 import java.util.concurrent.Future;
23 import java.util.concurrent.RejectedExecutionException;
24 import java.util.concurrent.TimeUnit;
25 import javax.annotation.Nonnull;
26 import javax.annotation.Nullable;
27 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
28 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierInput;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierOutput;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoInput;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoOutput;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoReplyInput;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.ExperimenterInput;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FlowModInput;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetAsyncInput;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetAsyncOutput;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetConfigInput;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetConfigOutput;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetFeaturesInput;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetFeaturesOutput;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetQueueConfigInput;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetQueueConfigOutput;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GroupModInput;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.HelloInput;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MeterModInput;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartRequestInput;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PacketOutInput;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortModInput;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.RoleRequestInput;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.RoleRequestOutput;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.SetAsyncInput;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.SetConfigInput;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.TableModInput;
55 import org.opendaylight.yangtools.yang.binding.DataObject;
56 import org.opendaylight.yangtools.yang.common.RpcResult;
57 import org.slf4j.Logger;
58 import org.slf4j.LoggerFactory;
59
60 /**
61  * {@link ConnectionAdapter} interface contains couple of OF message handling approaches.
62  * {@link AbstractConnectionAdapter} class contains direct RPC processing from OpenflowProtocolService
63  * {@link org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OpenflowProtocolService}
64  */
65 abstract class AbstractConnectionAdapter implements ConnectionAdapter {
66
67     private static final Logger LOG = LoggerFactory.getLogger(AbstractConnectionAdapter.class);
68
69     /** after this time, RPC future response objects will be thrown away (in minutes). */
70     private static final int RPC_RESPONSE_EXPIRATION = 1;
71
72     private static final Exception QUEUE_FULL_EXCEPTION = new RejectedExecutionException("Output queue is full");
73
74     /**
75      * Default depth of write queue, e.g. we allow these many messages
76      * to be queued up before blocking producers.
77      */
78     private static final int DEFAULT_QUEUE_DEPTH = 1024;
79
80     protected static final RemovalListener<RpcResponseKey, ResponseExpectedRpcListener<?>> REMOVAL_LISTENER =
81         notification -> {
82             if (!notification.getCause().equals(RemovalCause.EXPLICIT)) {
83                 notification.getValue().discard();
84             }
85         };
86
87     protected final Channel channel;
88     protected final InetSocketAddress address;
89     protected boolean disconnectOccured = false;
90     protected final ChannelOutboundQueue output;
91
92     /** expiring cache for future rpcResponses. */
93     protected Cache<RpcResponseKey, ResponseExpectedRpcListener<?>> responseCache;
94
95
96     AbstractConnectionAdapter(@Nonnull final Channel channel, @Nullable final InetSocketAddress address) {
97         this.channel = Preconditions.checkNotNull(channel);
98         this.address = address;
99
100         responseCache = CacheBuilder.newBuilder().concurrencyLevel(1)
101                 .expireAfterWrite(RPC_RESPONSE_EXPIRATION, TimeUnit.MINUTES).removalListener(REMOVAL_LISTENER).build();
102         this.output = new ChannelOutboundQueue(channel, DEFAULT_QUEUE_DEPTH, address);
103         channel.pipeline().addLast(output);
104     }
105
106     @Override
107     public Future<Boolean> disconnect() {
108         final ChannelFuture disconnectResult = channel.disconnect();
109         responseCache.invalidateAll();
110         disconnectOccured = true;
111
112         return handleTransportChannelFuture(disconnectResult);
113     }
114
115     @Override
116     public Future<RpcResult<BarrierOutput>> barrier(final BarrierInput input) {
117         return sendToSwitchExpectRpcResultFuture(input, BarrierOutput.class, "barrier-input sending failed");
118     }
119
120     @Override
121     public Future<RpcResult<EchoOutput>> echo(final EchoInput input) {
122         return sendToSwitchExpectRpcResultFuture(input, EchoOutput.class, "echo-input sending failed");
123     }
124
125     @Override
126     public Future<RpcResult<Void>> echoReply(final EchoReplyInput input) {
127         return sendToSwitchFuture(input, "echo-reply sending failed");
128     }
129
130     @Override
131     public Future<RpcResult<Void>> experimenter(final ExperimenterInput input) {
132         return sendToSwitchFuture(input, "experimenter sending failed");
133     }
134
135     @Override
136     public Future<RpcResult<Void>> flowMod(final FlowModInput input) {
137         return sendToSwitchFuture(input, "flow-mod sending failed");
138     }
139
140     @Override
141     public Future<RpcResult<GetConfigOutput>> getConfig(final GetConfigInput input) {
142         return sendToSwitchExpectRpcResultFuture(input, GetConfigOutput.class, "get-config-input sending failed");
143     }
144
145     @Override
146     public Future<RpcResult<GetFeaturesOutput>> getFeatures(final GetFeaturesInput input) {
147         return sendToSwitchExpectRpcResultFuture(input, GetFeaturesOutput.class, "get-features-input sending failed");
148     }
149
150     @Override
151     public Future<RpcResult<GetQueueConfigOutput>> getQueueConfig(final GetQueueConfigInput input) {
152         return sendToSwitchExpectRpcResultFuture(input, GetQueueConfigOutput.class,
153                 "get-queue-config-input sending failed");
154     }
155
156     @Override
157     public Future<RpcResult<Void>> groupMod(final GroupModInput input) {
158         return sendToSwitchFuture(input, "group-mod-input sending failed");
159     }
160
161     @Override
162     public Future<RpcResult<Void>> hello(final HelloInput input) {
163         return sendToSwitchFuture(input, "hello-input sending failed");
164     }
165
166     @Override
167     public Future<RpcResult<Void>> meterMod(final MeterModInput input) {
168         return sendToSwitchFuture(input, "meter-mod-input sending failed");
169     }
170
171     @Override
172     public Future<RpcResult<Void>> packetOut(final PacketOutInput input) {
173         return sendToSwitchFuture(input, "packet-out-input sending failed");
174     }
175
176     @Override
177     public Future<RpcResult<Void>> multipartRequest(final MultipartRequestInput input) {
178         return sendToSwitchFuture(input, "multi-part-request sending failed");
179     }
180
181     @Override
182     public Future<RpcResult<Void>> portMod(final PortModInput input) {
183         return sendToSwitchFuture(input, "port-mod-input sending failed");
184     }
185
186     @Override
187     public Future<RpcResult<RoleRequestOutput>> roleRequest(final RoleRequestInput input) {
188         return sendToSwitchExpectRpcResultFuture(input, RoleRequestOutput.class,
189                 "role-request-config-input sending failed");
190     }
191
192     @Override
193     public Future<RpcResult<Void>> setConfig(final SetConfigInput input) {
194         return sendToSwitchFuture(input, "set-config-input sending failed");
195     }
196
197     @Override
198     public Future<RpcResult<Void>> tableMod(final TableModInput input) {
199         return sendToSwitchFuture(input, "table-mod-input sending failed");
200     }
201
202     @Override
203     public Future<RpcResult<GetAsyncOutput>> getAsync(final GetAsyncInput input) {
204         return sendToSwitchExpectRpcResultFuture(input, GetAsyncOutput.class, "get-async-input sending failed");
205     }
206
207     @Override
208     public Future<RpcResult<Void>> setAsync(final SetAsyncInput input) {
209         return sendToSwitchFuture(input, "set-async-input sending failed");
210     }
211
212     @Override
213     public boolean isAlive() {
214         return channel.isOpen();
215     }
216
217     @Override
218     public boolean isAutoRead() {
219         return channel.config().isAutoRead();
220     }
221
222     @Override
223     public void setAutoRead(final boolean autoRead) {
224         channel.config().setAutoRead(autoRead);
225     }
226
227     @Override
228     public InetSocketAddress getRemoteAddress() {
229         return (InetSocketAddress) channel.remoteAddress();
230     }
231
232     /**
233      * Used only for testing purposes.
234      *
235      * @param cache replacement
236      */
237     @VisibleForTesting
238     void setResponseCache(final Cache<RpcResponseKey, ResponseExpectedRpcListener<?>> cache) {
239         this.responseCache = cache;
240     }
241
242     /**
243      * Return cached RpcListener or {@code null} if not cached.
244      */
245     protected ResponseExpectedRpcListener<?> findRpcResponse(final RpcResponseKey key) {
246         return responseCache.getIfPresent(key);
247     }
248
249     /**
250      * Sends given message to switch, sending result or switch response will be reported via return value.
251      *
252      * @param input message to send
253      * @param responseClazz type of response
254      * @param failureInfo describes, what type of message caused failure by sending
255      * @return future object,
256      *         <ul>
257      *         <li>if send fails, {@link RpcResult} will contain errors and failed status</li>
258      *         <li>else {@link RpcResult} will be stored in responseCache and wait for particular timeout (
259      *         {@link ConnectionAdapterImpl#RPC_RESPONSE_EXPIRATION}),
260      *         <ul>
261      *         <li>either switch will manage to answer and then corresponding response message will be set into returned
262      *         future</li>
263      *         <li>or response in cache will expire and returned future will be cancelled</li>
264      *         </ul>
265      *         </li>
266      *         </ul>
267      */
268     protected <I extends OfHeader, O extends OfHeader> ListenableFuture<RpcResult<O>>
269             sendToSwitchExpectRpcResultFuture(final I input, final Class<O> responseClazz,
270                     final String failureInfo) {
271         final RpcResponseKey key = new RpcResponseKey(input.getXid(), responseClazz.getName());
272         final ResponseExpectedRpcListener<O> listener = new ResponseExpectedRpcListener<>(input, failureInfo,
273                 responseCache, key);
274         return enqueueMessage(listener);
275     }
276
277     /**
278      * Sends given message to switch, sending result will be reported via return value.
279      *
280      * @param input message to send
281      * @param failureInfo describes, what type of message caused failure by sending
282      * @return future object,
283      *         <ul>
284      *         <li>if send successful, {@link RpcResult} without errors and successful status will be returned,</li>
285      *         <li>else {@link RpcResult} will contain errors and failed status</li>
286      *         </ul>
287      */
288     protected ListenableFuture<RpcResult<Void>> sendToSwitchFuture(final DataObject input, final String failureInfo) {
289         return enqueueMessage(new SimpleRpcListener(input, failureInfo));
290     }
291
292     private <T> ListenableFuture<RpcResult<T>> enqueueMessage(final AbstractRpcListener<T> promise) {
293         LOG.debug("Submitting promise {}", promise);
294
295         if (!output.enqueue(promise)) {
296             LOG.debug("Message queue is full, rejecting execution");
297             promise.failedRpc(QUEUE_FULL_EXCEPTION);
298         } else {
299             LOG.debug("Promise enqueued successfully");
300         }
301
302         return promise.getResult();
303     }
304
305     private static SettableFuture<Boolean> handleTransportChannelFuture(final ChannelFuture resultFuture) {
306
307         final SettableFuture<Boolean> transportResult = SettableFuture.create();
308
309         resultFuture.addListener(future -> {
310             transportResult.set(future.isSuccess());
311             if (!future.isSuccess()) {
312                 transportResult.setException(future.cause());
313             }
314         });
315         return transportResult;
316     }
317 }