add lib-plugin interaction implementations on library side
[openflowjava.git] / openflow-protocol-impl / src / main / java / org / opendaylight / openflowjava / protocol / impl / connection / ConnectionAdapterImpl.java
1 /**
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.openflowjava.protocol.impl.connection;
10
11 import io.netty.channel.Channel;
12 import io.netty.channel.ChannelFuture;
13 import io.netty.util.concurrent.GenericFutureListener;
14
15 import java.util.Collection;
16 import java.util.concurrent.Future;
17 import java.util.concurrent.TimeUnit;
18
19 import org.opendaylight.controller.sal.common.util.RpcErrors;
20 import org.opendaylight.controller.sal.common.util.Rpcs;
21 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
22 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierInput;
23 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierOutput;
24 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoInput;
25 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoOutput;
26 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoReplyInput;
27 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoRequestMessage;
28 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.ErrorMessage;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.ExperimenterInput;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.ExperimenterMessage;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FlowModInput;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FlowRemovedMessage;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetAsyncInput;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetAsyncOutput;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetConfigInput;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetConfigOutput;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetFeaturesInput;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetFeaturesOutput;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetQueueConfigInput;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetQueueConfigOutput;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GroupModInput;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.HelloInput;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.HelloMessage;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MeterModInput;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartReplyMessage;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartRequestMessage;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OpenflowProtocolListener;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PacketInMessage;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PacketOutInput;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortModInput;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortStatusMessage;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.RoleRequestInput;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.RoleRequestOutput;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.SetAsyncInput;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.SetConfigInput;
57 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.TableModInput;
58 import org.opendaylight.yangtools.yang.binding.DataObject;
59 import org.opendaylight.yangtools.yang.binding.Notification;
60 import org.opendaylight.yangtools.yang.common.RpcError;
61 import org.opendaylight.yangtools.yang.common.RpcError.ErrorSeverity;
62 import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
63 import org.opendaylight.yangtools.yang.common.RpcResult;
64 import org.slf4j.Logger;
65 import org.slf4j.LoggerFactory;
66
67 import com.google.common.cache.Cache;
68 import com.google.common.cache.CacheBuilder;
69 import com.google.common.cache.RemovalListener;
70 import com.google.common.cache.RemovalNotification;
71 import com.google.common.collect.Lists;
72 import com.google.common.util.concurrent.SettableFuture;
73
74 /**
75  * @author mirehak
76  *
77  */
78 public class ConnectionAdapterImpl implements ConnectionAdapter, MessageConsumer {
79     
80     /** after this time, rpc future response objects will be thrown away (in minutes) */
81     public static final int RPC_RESPONSE_EXPIRATION = 1;
82
83     protected static final Logger LOG = LoggerFactory
84             .getLogger(ConnectionAdapterImpl.class);
85     
86     private static final String APPLICATION_TAG = "OPENFLOW_LIBRARY";
87     private static final String TAG = "OPENFLOW";
88     private Channel channel;
89     private OpenflowProtocolListener messageListener;
90     /** expiring cache for future rpcResponses */
91     protected Cache<RpcResponseKey, SettableFuture<?>> responseCache;
92     
93     
94     /**
95      * default ctor 
96      */
97     public ConnectionAdapterImpl() {
98         responseCache = CacheBuilder.newBuilder()
99                 .concurrencyLevel(1)
100                 .expireAfterWrite(RPC_RESPONSE_EXPIRATION, TimeUnit.MINUTES)
101                 .removalListener(new RemovalListener<RpcResponseKey, SettableFuture<?>>() {
102
103                     @Override
104                     public void onRemoval(
105                             RemovalNotification<RpcResponseKey, SettableFuture<?>> notification) {
106                         LOG.warn("rpc response discarded: "+notification.getKey());
107                         notification.getValue().cancel(true);
108                     }
109                 }).build();
110     }
111     
112     /**
113      * @param channel the channel to set
114      */
115     public void setChannel(Channel channel) {
116         this.channel = channel;
117     }
118
119     @Override
120     public Future<RpcResult<BarrierOutput>> barrier(BarrierInput input) {
121         return sendToSwitchExpectRpcResultFuture(
122                 input, BarrierOutput.class, "barrier-input sending failed");
123     }
124
125     @Override
126     public Future<RpcResult<EchoOutput>> echo(EchoInput input) {
127         return sendToSwitchExpectRpcResultFuture(
128                 input, EchoOutput.class, "echo-input sending failed");
129     }
130
131     @Override
132     public Future<RpcResult<Void>> echoReply(EchoReplyInput input) {
133         return sendToSwitchFuture(input, "echo-reply sending failed");
134     }
135
136     @Override
137     public Future<RpcResult<Void>> experimenter(ExperimenterInput input) {
138         return sendToSwitchFuture(input, "experimenter sending failed");
139     }
140
141     @Override
142     public Future<RpcResult<Void>> flowMod(FlowModInput input) {
143         return sendToSwitchFuture(input, "flow-mod sending failed");
144     }
145
146     @Override
147     public Future<RpcResult<GetConfigOutput>> getConfig(GetConfigInput input) {
148         return sendToSwitchExpectRpcResultFuture(
149                 input, GetConfigOutput.class, "get-config-input sending failed");
150     }
151
152     @Override
153     public Future<RpcResult<GetFeaturesOutput>> getFeatures(
154             GetFeaturesInput input) {
155         return sendToSwitchExpectRpcResultFuture(
156                 input, GetFeaturesOutput.class, "get-features-input sending failed");
157     }
158
159     @Override
160     public Future<RpcResult<GetQueueConfigOutput>> getQueueConfig(
161             GetQueueConfigInput input) {
162         return sendToSwitchExpectRpcResultFuture(
163                 input, GetQueueConfigOutput.class, "get-queue-config-input sending failed");
164     }
165
166     @Override
167     public Future<RpcResult<Void>> groupMod(GroupModInput input) {
168         return sendToSwitchFuture(input, "group-mod-input sending failed");
169     }
170
171     @Override
172     public Future<RpcResult<Void>> hello(HelloInput input) {
173         return sendToSwitchFuture(input, "hello-input sending failed");
174     }
175
176     @Override
177     public Future<RpcResult<Void>> meterMod(MeterModInput input) {
178         return sendToSwitchFuture(input, "meter-mod-input sending failed");
179     }
180
181     @Override
182     public Future<RpcResult<Void>> packetOut(PacketOutInput input) {
183         return sendToSwitchFuture(input, "packet-out-input sending failed");
184     }
185
186     @Override
187     public Future<RpcResult<Void>> portMod(PortModInput input) {
188         return sendToSwitchFuture(input, "port-mod-input sending failed");
189     }
190
191     @Override
192     public Future<RpcResult<RoleRequestOutput>> roleRequest(
193             RoleRequestInput input) {
194         return sendToSwitchExpectRpcResultFuture(
195                 input, RoleRequestOutput.class, "role-request-config-input sending failed");
196     }
197
198     @Override
199     public Future<RpcResult<Void>> setConfig(SetConfigInput input) {
200         return sendToSwitchFuture(input, "set-config-input sending failed");
201     }
202
203     @Override
204     public Future<RpcResult<Void>> tableMod(TableModInput input) {
205         return sendToSwitchFuture(input, "table-mod-input sending failed");
206     }
207
208     @Override
209     public Future<RpcResult<GetAsyncOutput>> getAsync(GetAsyncInput input) {
210         return sendToSwitchExpectRpcResultFuture(
211                 input, GetAsyncOutput.class, "get-async-input sending failed");
212     }
213
214     @Override
215     public Future<RpcResult<Void>> setAsync(SetAsyncInput input) {
216         return sendToSwitchFuture(input, "set-async-input sending failed");
217     }
218
219     @Override
220     public Future<Boolean> disconnect() {
221         ChannelFuture disconnectResult = channel.disconnect();
222         
223         String failureInfo = "switch disconnecting failed";
224         ErrorSeverity errorSeverity = ErrorSeverity.ERROR;
225         String message = "Check the switch connection";
226         return handleTransportChannelFuture(disconnectResult, failureInfo, errorSeverity, message);
227     }
228
229     @Override
230     public boolean isAlive() {
231         return channel.isOpen();
232     }
233
234     @Override
235     public void setMessageListener(OpenflowProtocolListener messageListener) {
236         this.messageListener = messageListener;
237     }
238     
239     @Override
240     public void consume(DataObject message) {
241         if (message instanceof Notification) {
242             if (message instanceof EchoRequestMessage) {
243                 messageListener.onEchoRequestMessage((EchoRequestMessage) message);
244             } else if (message instanceof ErrorMessage) {
245                 messageListener.onErrorMessage((ErrorMessage) message);
246             } else if (message instanceof ExperimenterMessage) {
247                 messageListener.onExperimenterMessage((ExperimenterMessage) message);
248             } else if (message instanceof FlowRemovedMessage) {
249                 messageListener.onFlowRemovedMessage((FlowRemovedMessage) message);
250             } else if (message instanceof HelloMessage) {
251                 messageListener.onHelloMessage((HelloMessage) message);
252             } else if (message instanceof MultipartReplyMessage) {
253                 messageListener.onMultipartReplyMessage((MultipartReplyMessage) message);
254             } else if (message instanceof MultipartRequestMessage) {
255                 messageListener.onMultipartRequestMessage((MultipartRequestMessage) message);
256             } else if (message instanceof PacketInMessage) {
257                 messageListener.onPacketInMessage((PacketInMessage) message);
258             } else if (message instanceof PortStatusMessage) {
259                 messageListener.onPortStatusMessage((PortStatusMessage) message);
260             } else {
261                 LOG.warn("message listening not supported for type: "+message.getClass());
262             }
263         } else {
264             if (message instanceof OfHeader) {
265                 RpcResponseKey key = createRpcResponseKey((OfHeader) message);
266                 SettableFuture<RpcResult<?>> rpcFuture = findRpcResponse(key);
267                 if (rpcFuture != null) {
268                     rpcFuture.set(Rpcs.getRpcResult(true, message, null));
269                     responseCache.invalidate(key);
270                 } else {
271                     LOG.warn("received unexpected rpc response: "+key);
272                 }
273                 
274             } else {
275                 LOG.warn("message listening not supported for type: "+message.getClass());
276             }
277         }
278     }
279
280     /**
281      * sends given message to switch, sending result will be reported via return value
282      * @param input message to send
283      * @param failureInfo describes, what type of message caused failure by sending 
284      * @return future object, <ul>
285      *  <li>if send successful, {@link RpcResult} without errors and successful 
286      *  status will be returned, </li>
287      *  <li>else {@link RpcResult} will contain errors and failed status</li>
288      *  </ul>    
289      */
290     private SettableFuture<RpcResult<Void>> sendToSwitchFuture(
291             DataObject input, final String failureInfo) {
292         ChannelFuture resultFuture = channel.writeAndFlush(input);
293         
294         ErrorSeverity errorSeverity = ErrorSeverity.ERROR;
295         String errorMessage = "check switch connection";
296         return handleRpcChannelFuture(resultFuture, failureInfo, errorSeverity, errorMessage);
297     }
298     
299     /**
300      * sends given message to switch, sending result or switch response will be reported via return value
301      * @param input message to send
302      * @param responseClazz type of response
303      * @param failureInfo describes, what type of message caused failure by sending 
304      * @return future object, <ul>
305      *  <li>if send fails, {@link RpcResult} will contain errors and failed status </li>
306      *  <li>else {@link RpcResult} will be stored in responseCache and wait for particular timeout 
307      *  ({@link ConnectionAdapterImpl#RPC_RESPONSE_EXPIRATION}), 
308      *  <ul><li>either switch will manage to answer
309      *  and then corresponding response message will be set into returned future</li>
310      *  <li>or response in cache will expire and returned future will be cancelled</li></ul>
311      *  </li>
312      *  </ul>     
313      */
314     private <IN extends OfHeader, OUT extends OfHeader> SettableFuture<RpcResult<OUT>> sendToSwitchExpectRpcResultFuture(
315             IN input, Class<OUT> responseClazz, final String failureInfo) {
316         ChannelFuture resultFuture = channel.writeAndFlush(input);
317         
318         ErrorSeverity errorSeverity = ErrorSeverity.ERROR;
319         String errorMessage = "check switch connection";
320         return handleRpcChannelFutureWithResponse(resultFuture, failureInfo, errorSeverity, 
321                 errorMessage, input, responseClazz);
322     }
323
324     /**
325      * @param resultFuture
326      * @param failureInfo
327      * @return
328      */
329     private SettableFuture<RpcResult<Void>> handleRpcChannelFuture(
330             ChannelFuture resultFuture, final String failureInfo, 
331             final ErrorSeverity errorSeverity, final String errorMessage) {
332         
333         final SettableFuture<RpcResult<Void>> rpcResult = SettableFuture.create();
334         
335         resultFuture.addListener(new GenericFutureListener<io.netty.util.concurrent.Future<? super Void>>() {
336             
337             @Override
338             public void operationComplete(
339                     io.netty.util.concurrent.Future<? super Void> future)
340                     throws Exception {
341                 Collection<RpcError> errors = null;
342                 
343                 if (future.cause() != null) {
344                     RpcError rpcError = buildRpcError(failureInfo, 
345                             errorSeverity, errorMessage, future.cause());
346                     errors = Lists.newArrayList(rpcError);
347                 }
348                 
349                 rpcResult.set(Rpcs.getRpcResult(
350                         future.isSuccess(), 
351                         (Void) null, 
352                         errors)
353                 );
354             }
355         });
356         return rpcResult;
357     }
358     
359     /**
360      * @param input
361      * @param responseClazz
362      * @param resultFuture
363      * @param failureInfo
364      * @param errorSeverity
365      * @param errorMessage
366      * @return
367      */
368     private <IN extends OfHeader, OUT extends OfHeader> SettableFuture<RpcResult<OUT>> handleRpcChannelFutureWithResponse(
369             ChannelFuture resultFuture, final String failureInfo,
370             final ErrorSeverity errorSeverity, final String errorMessage,
371             final IN input, Class<OUT> responseClazz) {
372         final SettableFuture<RpcResult<OUT>> rpcResult = SettableFuture.create();
373         
374         resultFuture.addListener(new GenericFutureListener<io.netty.util.concurrent.Future<? super Void>>() {
375             
376             @Override
377             public void operationComplete(
378                     io.netty.util.concurrent.Future<? super Void> future)
379                     throws Exception {
380                 
381                 if (future.cause() != null) {
382                     Collection<RpcError> errors = null;
383                     RpcError rpcError = buildRpcError(failureInfo, 
384                             errorSeverity, errorMessage, future.cause());
385                     errors = Lists.newArrayList(rpcError);
386                     rpcResult.set(Rpcs.getRpcResult(
387                             future.isSuccess(), 
388                             (OUT) null, 
389                             errors)
390                             );
391                 } else {
392                     RpcResponseKey key = new RpcResponseKey(input.getXid(), input.getClass().toString());
393                     if (responseCache.getIfPresent(key) != null) {
394                         responseCache.invalidate(key);
395                     }
396                     responseCache.put(key, rpcResult);
397                 }
398             }
399         });
400         return rpcResult;
401     }
402
403     /**
404      * @param resultFuture
405      * @param failureInfo
406      * @param errorSeverity 
407      * @param message 
408      * @return
409      */
410     private static SettableFuture<Boolean> handleTransportChannelFuture(
411             ChannelFuture resultFuture, final String failureInfo, 
412             final ErrorSeverity errorSeverity, final String message) {
413         
414         final SettableFuture<Boolean> transportResult = SettableFuture.create();
415         
416         resultFuture.addListener(new GenericFutureListener<io.netty.util.concurrent.Future<? super Void>>() {
417             
418             @Override
419             public void operationComplete(
420                     io.netty.util.concurrent.Future<? super Void> future)
421                     throws Exception {
422                 transportResult.set(future.isSuccess());
423                 transportResult.setException(future.cause());
424             }
425         });
426         return transportResult;
427     }
428
429     /**
430      * @param cause
431      * @return
432      */
433     protected RpcError buildRpcError(String info, ErrorSeverity severity, String message, 
434             Throwable cause) {
435         RpcError error = RpcErrors.getRpcError(APPLICATION_TAG, TAG, info, severity, message, 
436                 ErrorType.RPC, cause);
437         return error;
438     }
439     
440     /**
441      * @param cause
442      * @return
443      */
444     protected RpcError buildTransportError(String info, ErrorSeverity severity, String message, 
445             Throwable cause) {
446         RpcError error = RpcErrors.getRpcError(APPLICATION_TAG, TAG, info, severity, message, 
447                 ErrorType.TRANSPORT, cause);
448         return error;
449     }
450
451     /**
452      * @param message
453      * @return
454      */
455     private static RpcResponseKey createRpcResponseKey(OfHeader message) {
456         return new RpcResponseKey(message.getXid(), message.getClass().toString());
457     }
458
459     /**
460      * @return
461      */
462     @SuppressWarnings("unchecked")
463     private SettableFuture<RpcResult<?>> findRpcResponse(RpcResponseKey key) {
464         return (SettableFuture<RpcResult<?>>) responseCache.getIfPresent(key);
465     }
466
467 }