Use String(byte[], Charset)
[openflowplugin.git] / openflowjava / openflow-protocol-impl / src / main / java / org / opendaylight / openflowjava / protocol / impl / core / connection / AbstractConnectionAdapter.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.openflowjava.protocol.impl.core.connection;
10
11 import com.google.common.annotations.VisibleForTesting;
12 import com.google.common.base.Preconditions;
13 import com.google.common.cache.Cache;
14 import com.google.common.cache.CacheBuilder;
15 import com.google.common.cache.RemovalCause;
16 import com.google.common.cache.RemovalListener;
17 import com.google.common.util.concurrent.ListenableFuture;
18 import com.google.common.util.concurrent.SettableFuture;
19 import io.netty.channel.Channel;
20 import io.netty.channel.ChannelFuture;
21 import java.net.InetSocketAddress;
22 import java.util.concurrent.Future;
23 import java.util.concurrent.RejectedExecutionException;
24 import java.util.concurrent.TimeUnit;
25 import org.eclipse.jdt.annotation.NonNull;
26 import org.eclipse.jdt.annotation.Nullable;
27 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
28 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierInput;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierOutput;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoInput;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoOutput;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoReplyInput;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoReplyOutput;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.ExperimenterInput;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.ExperimenterOutput;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FlowModInput;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FlowModOutput;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetAsyncInput;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetAsyncOutput;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetConfigInput;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetConfigOutput;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetFeaturesInput;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetFeaturesOutput;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetQueueConfigInput;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetQueueConfigOutput;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GroupModInput;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GroupModOutput;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.HelloInput;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.HelloOutput;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MeterModInput;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MeterModOutput;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartRequestInput;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartRequestOutput;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PacketOutInput;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PacketOutOutput;
57 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortModInput;
58 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortModOutput;
59 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.RoleRequestInput;
60 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.RoleRequestOutput;
61 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.SetAsyncInput;
62 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.SetAsyncOutput;
63 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.SetConfigInput;
64 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.SetConfigOutput;
65 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.TableModInput;
66 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.TableModOutput;
67 import org.opendaylight.yangtools.yang.binding.DataObject;
68 import org.opendaylight.yangtools.yang.common.RpcResult;
69 import org.slf4j.Logger;
70 import org.slf4j.LoggerFactory;
71
72 /**
73  * {@link ConnectionAdapter} interface contains couple of OF message handling approaches.
74  * {@link AbstractConnectionAdapter} class contains direct RPC processing from OpenflowProtocolService
75  * {@link org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OpenflowProtocolService}
76  */
77 abstract class AbstractConnectionAdapter implements ConnectionAdapter {
78
79     private static final Logger LOG = LoggerFactory.getLogger(AbstractConnectionAdapter.class);
80
81     /** after this time, RPC future response objects will be thrown away (in minutes). */
82     private static final int RPC_RESPONSE_EXPIRATION = 1;
83
84     private static final Exception QUEUE_FULL_EXCEPTION = new RejectedExecutionException("Output queue is full");
85
86     protected static final RemovalListener<RpcResponseKey, ResponseExpectedRpcListener<?>> REMOVAL_LISTENER =
87         notification -> {
88             if (!notification.getCause().equals(RemovalCause.EXPLICIT)) {
89                 notification.getValue().discard();
90             }
91         };
92
93     protected final Channel channel;
94     protected final InetSocketAddress address;
95     protected boolean disconnectOccured = false;
96     protected final ChannelOutboundQueue output;
97
98     /** expiring cache for future rpcResponses. */
99     protected Cache<RpcResponseKey, ResponseExpectedRpcListener<?>> responseCache;
100
101
102     AbstractConnectionAdapter(@NonNull final Channel channel, @Nullable final InetSocketAddress address,
103                               @Nullable final int channelOutboundQueueSize) {
104         this.channel = Preconditions.checkNotNull(channel);
105         this.address = address;
106
107         responseCache = CacheBuilder.newBuilder().concurrencyLevel(1)
108                 .expireAfterWrite(RPC_RESPONSE_EXPIRATION, TimeUnit.MINUTES).removalListener(REMOVAL_LISTENER).build();
109         LOG.debug("The channel outbound queue size:{}", channelOutboundQueueSize);
110         this.output = new ChannelOutboundQueue(channel, channelOutboundQueueSize, address);
111         channel.pipeline().addLast(output);
112     }
113
114     @Override
115     public Future<Boolean> disconnect() {
116         final ChannelFuture disconnectResult = channel.disconnect();
117         responseCache.invalidateAll();
118         disconnectOccured = true;
119
120         return handleTransportChannelFuture(disconnectResult);
121     }
122
123     @Override
124     public ListenableFuture<RpcResult<BarrierOutput>> barrier(final BarrierInput input) {
125         return sendToSwitchExpectRpcResultFuture(input, BarrierOutput.class, "barrier-input sending failed");
126     }
127
128     @Override
129     public ListenableFuture<RpcResult<EchoOutput>> echo(final EchoInput input) {
130         return sendToSwitchExpectRpcResultFuture(input, EchoOutput.class, "echo-input sending failed");
131     }
132
133     @Override
134     public ListenableFuture<RpcResult<EchoReplyOutput>> echoReply(final EchoReplyInput input) {
135         return sendToSwitchFuture(input, "echo-reply sending failed");
136     }
137
138     @Override
139     public ListenableFuture<RpcResult<ExperimenterOutput>> experimenter(final ExperimenterInput input) {
140         return sendToSwitchFuture(input, "experimenter sending failed");
141     }
142
143     @Override
144     public ListenableFuture<RpcResult<FlowModOutput>> flowMod(final FlowModInput input) {
145         return sendToSwitchFuture(input, "flow-mod sending failed");
146     }
147
148     @Override
149     public ListenableFuture<RpcResult<GetConfigOutput>> getConfig(final GetConfigInput input) {
150         return sendToSwitchExpectRpcResultFuture(input, GetConfigOutput.class, "get-config-input sending failed");
151     }
152
153     @Override
154     public ListenableFuture<RpcResult<GetFeaturesOutput>> getFeatures(final GetFeaturesInput input) {
155         return sendToSwitchExpectRpcResultFuture(input, GetFeaturesOutput.class, "get-features-input sending failed");
156     }
157
158     @Override
159     public ListenableFuture<RpcResult<GetQueueConfigOutput>> getQueueConfig(final GetQueueConfigInput input) {
160         return sendToSwitchExpectRpcResultFuture(input, GetQueueConfigOutput.class,
161                 "get-queue-config-input sending failed");
162     }
163
164     @Override
165     public ListenableFuture<RpcResult<GroupModOutput>> groupMod(final GroupModInput input) {
166         return sendToSwitchFuture(input, "group-mod-input sending failed");
167     }
168
169     @Override
170     public ListenableFuture<RpcResult<HelloOutput>> hello(final HelloInput input) {
171         return sendToSwitchFuture(input, "hello-input sending failed");
172     }
173
174     @Override
175     public ListenableFuture<RpcResult<MeterModOutput>> meterMod(final MeterModInput input) {
176         return sendToSwitchFuture(input, "meter-mod-input sending failed");
177     }
178
179     @Override
180     public ListenableFuture<RpcResult<PacketOutOutput>> packetOut(final PacketOutInput input) {
181         return sendToSwitchFuture(input, "packet-out-input sending failed");
182     }
183
184     @Override
185     public ListenableFuture<RpcResult<MultipartRequestOutput>> multipartRequest(final MultipartRequestInput input) {
186         return sendToSwitchFuture(input, "multi-part-request sending failed");
187     }
188
189     @Override
190     public ListenableFuture<RpcResult<PortModOutput>> portMod(final PortModInput input) {
191         return sendToSwitchFuture(input, "port-mod-input sending failed");
192     }
193
194     @Override
195     public ListenableFuture<RpcResult<RoleRequestOutput>> roleRequest(final RoleRequestInput input) {
196         return sendToSwitchExpectRpcResultFuture(input, RoleRequestOutput.class,
197                 "role-request-config-input sending failed");
198     }
199
200     @Override
201     public ListenableFuture<RpcResult<SetConfigOutput>> setConfig(final SetConfigInput input) {
202         return sendToSwitchFuture(input, "set-config-input sending failed");
203     }
204
205     @Override
206     public ListenableFuture<RpcResult<TableModOutput>> tableMod(final TableModInput input) {
207         return sendToSwitchFuture(input, "table-mod-input sending failed");
208     }
209
210     @Override
211     public ListenableFuture<RpcResult<GetAsyncOutput>> getAsync(final GetAsyncInput input) {
212         return sendToSwitchExpectRpcResultFuture(input, GetAsyncOutput.class, "get-async-input sending failed");
213     }
214
215     @Override
216     public ListenableFuture<RpcResult<SetAsyncOutput>> setAsync(final SetAsyncInput input) {
217         return sendToSwitchFuture(input, "set-async-input sending failed");
218     }
219
220     @Override
221     public boolean isAlive() {
222         return channel.isOpen();
223     }
224
225     @Override
226     public boolean isAutoRead() {
227         return channel.config().isAutoRead();
228     }
229
230     @Override
231     public void setAutoRead(final boolean autoRead) {
232         channel.config().setAutoRead(autoRead);
233     }
234
235     @Override
236     public InetSocketAddress getRemoteAddress() {
237         return (InetSocketAddress) channel.remoteAddress();
238     }
239
240     /**
241      * Used only for testing purposes.
242      *
243      * @param cache replacement
244      */
245     @VisibleForTesting
246     void setResponseCache(final Cache<RpcResponseKey, ResponseExpectedRpcListener<?>> cache) {
247         this.responseCache = cache;
248     }
249
250     /**
251      * Return cached RpcListener or {@code null} if not cached.
252      */
253     protected ResponseExpectedRpcListener<?> findRpcResponse(final RpcResponseKey key) {
254         return responseCache.getIfPresent(key);
255     }
256
257     /**
258      * Sends given message to switch, sending result or switch response will be reported via return value.
259      *
260      * @param input message to send
261      * @param responseClazz type of response
262      * @param failureInfo describes, what type of message caused failure by sending
263      * @return future object,
264      *         <ul>
265      *         <li>if send fails, {@link RpcResult} will contain errors and failed status</li>
266      *         <li>else {@link RpcResult} will be stored in responseCache and wait for particular timeout (
267      *         {@link ConnectionAdapterImpl#RPC_RESPONSE_EXPIRATION}),
268      *         <ul>
269      *         <li>either switch will manage to answer and then corresponding response message will be set into returned
270      *         future</li>
271      *         <li>or response in cache will expire and returned future will be cancelled</li>
272      *         </ul>
273      *         </li>
274      *         </ul>
275      */
276     protected <I extends OfHeader, O extends OfHeader> ListenableFuture<RpcResult<O>>
277             sendToSwitchExpectRpcResultFuture(final I input, final Class<O> responseClazz,
278                     final String failureInfo) {
279         final RpcResponseKey key = new RpcResponseKey(input.getXid().toJava(), responseClazz.getName());
280         final ResponseExpectedRpcListener<O> listener = new ResponseExpectedRpcListener<>(input, failureInfo,
281                 responseCache, key);
282         return enqueueMessage(listener);
283     }
284
285     /**
286      * Sends given message to switch, sending result will be reported via return value.
287      *
288      * @param input message to send
289      * @param failureInfo describes, what type of message caused failure by sending
290      * @return future object,
291      *         <ul>
292      *         <li>if send successful, {@link RpcResult} without errors and successful status will be returned,</li>
293      *         <li>else {@link RpcResult} will contain errors and failed status</li>
294      *         </ul>
295      */
296     protected <O extends DataObject> ListenableFuture<RpcResult<O>> sendToSwitchFuture(final Object input,
297                                                                                      final String failureInfo) {
298         SimpleRpcListener<O> listener = new SimpleRpcListener(input, failureInfo);
299         return enqueueMessage(listener);
300     }
301
302     private <T> ListenableFuture<RpcResult<T>> enqueueMessage(final AbstractRpcListener<T> promise) {
303         LOG.debug("Submitting promise {}", promise);
304
305         if (!output.enqueue(promise)) {
306             LOG.debug("Message queue is full, rejecting execution");
307             promise.failedRpc(QUEUE_FULL_EXCEPTION);
308         } else {
309             LOG.debug("Promise enqueued successfully");
310         }
311
312         return promise.getResult();
313     }
314
315     private static SettableFuture<Boolean> handleTransportChannelFuture(final ChannelFuture resultFuture) {
316
317         final SettableFuture<Boolean> transportResult = SettableFuture.create();
318
319         resultFuture.addListener(future -> {
320             transportResult.set(future.isSuccess());
321             if (!future.isSuccess()) {
322                 transportResult.setException(future.cause());
323             }
324         });
325         return transportResult;
326     }
327 }