7cacb2213cbe625e3bba9f893502a02d0fc0c376
[openflowjava.git] / openflow-protocol-impl / src / main / java / org / opendaylight / openflowjava / protocol / impl / connection / ConnectionAdapterImpl.java
1 /* Copyright (C)2013 Pantheon Technologies, s.r.o. All rights reserved. */
2
3 package org.opendaylight.openflowjava.protocol.impl.connection;
4
5 import io.netty.channel.Channel;
6 import io.netty.channel.ChannelFuture;
7 import io.netty.util.concurrent.GenericFutureListener;
8
9 import java.util.Collection;
10 import java.util.concurrent.Future;
11 import java.util.concurrent.TimeUnit;
12
13 import org.opendaylight.controller.sal.common.util.RpcErrors;
14 import org.opendaylight.controller.sal.common.util.Rpcs;
15 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierInput;
16 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierOutput;
17 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoInput;
18 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoOutput;
19 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoReplyInput;
20 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoRequestMessage;
21 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.ErrorMessage;
22 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.ExperimenterInput;
23 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.ExperimenterMessage;
24 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FlowModInput;
25 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FlowRemovedMessage;
26 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetAsyncInput;
27 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetAsyncOutput;
28 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetConfigInput;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetConfigOutput;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetFeaturesInput;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetFeaturesOutput;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetQueueConfigInput;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetQueueConfigOutput;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GroupModInput;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.HelloInput;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.HelloMessage;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MeterModInput;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartReplyMessage;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartRequestMessage;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OpenflowProtocolListener;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PacketInMessage;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PacketOutInput;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortModInput;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortStatusMessage;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.RoleRequestInput;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.RoleRequestOutput;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.SetAsyncInput;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.SetConfigInput;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.TableModInput;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.system.rev130927.DisconnectEvent;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.system.rev130927.SystemNotificationsListener;
53 import org.opendaylight.yangtools.yang.binding.DataObject;
54 import org.opendaylight.yangtools.yang.binding.Notification;
55 import org.opendaylight.yangtools.yang.common.RpcError;
56 import org.opendaylight.yangtools.yang.common.RpcError.ErrorSeverity;
57 import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
58 import org.opendaylight.yangtools.yang.common.RpcResult;
59 import org.slf4j.Logger;
60 import org.slf4j.LoggerFactory;
61
62 import com.google.common.cache.Cache;
63 import com.google.common.cache.CacheBuilder;
64 import com.google.common.cache.RemovalListener;
65 import com.google.common.cache.RemovalNotification;
66 import com.google.common.collect.Lists;
67 import com.google.common.util.concurrent.SettableFuture;
68
69 /**
70  * @author mirehak
71  * @author michal.polkorab
72  */
73 public class ConnectionAdapterImpl implements ConnectionFacade {
74     
75     /** after this time, rpc future response objects will be thrown away (in minutes) */
76     public static final int RPC_RESPONSE_EXPIRATION = 1;
77
78     protected static final Logger LOG = LoggerFactory
79             .getLogger(ConnectionAdapterImpl.class);
80     
81     private static final String APPLICATION_TAG = "OPENFLOW_LIBRARY";
82     private static final String TAG = "OPENFLOW";
83     private Channel channel;
84     private OpenflowProtocolListener messageListener;
85     /** expiring cache for future rpcResponses */
86     protected Cache<RpcResponseKey, SettableFuture<?>> responseCache;
87     private SystemNotificationsListener systemListener;
88     private boolean disconnectOccured = false;
89     
90     /**
91      * default ctor 
92      */
93     public ConnectionAdapterImpl() {
94         responseCache = CacheBuilder.newBuilder()
95                 .concurrencyLevel(1)
96                 .expireAfterWrite(RPC_RESPONSE_EXPIRATION, TimeUnit.MINUTES)
97                 .removalListener(new RemovalListener<RpcResponseKey, SettableFuture<?>>() {
98
99                     @Override
100                     public void onRemoval(
101                             RemovalNotification<RpcResponseKey, SettableFuture<?>> notification) {
102                         LOG.warn("rpc response discarded: "+notification.getKey());
103                         notification.getValue().cancel(true);
104                     }
105                 }).build();
106         LOG.info("ConnectionAdapter created");
107     }
108     
109     /**
110      * @param channel the channel to be set - used for communication
111      */
112     public void setChannel(Channel channel) {
113         this.channel = channel;
114     }
115
116     @Override
117     public Future<RpcResult<BarrierOutput>> barrier(BarrierInput input) {
118         return sendToSwitchExpectRpcResultFuture(
119                 input, BarrierOutput.class, "barrier-input sending failed");
120     }
121
122     @Override
123     public Future<RpcResult<EchoOutput>> echo(EchoInput input) {
124         return sendToSwitchExpectRpcResultFuture(
125                 input, EchoOutput.class, "echo-input sending failed");
126     }
127
128     @Override
129     public Future<RpcResult<Void>> echoReply(EchoReplyInput input) {
130         return sendToSwitchFuture(input, "echo-reply sending failed");
131     }
132
133     @Override
134     public Future<RpcResult<Void>> experimenter(ExperimenterInput input) {
135         return sendToSwitchFuture(input, "experimenter sending failed");
136     }
137
138     @Override
139     public Future<RpcResult<Void>> flowMod(FlowModInput input) {
140         return sendToSwitchFuture(input, "flow-mod sending failed");
141     }
142
143     @Override
144     public Future<RpcResult<GetConfigOutput>> getConfig(GetConfigInput input) {
145         return sendToSwitchExpectRpcResultFuture(
146                 input, GetConfigOutput.class, "get-config-input sending failed");
147     }
148
149     @Override
150     public Future<RpcResult<GetFeaturesOutput>> getFeatures(
151             GetFeaturesInput input) {
152         return sendToSwitchExpectRpcResultFuture(
153                 input, GetFeaturesOutput.class, "get-features-input sending failed");
154     }
155
156     @Override
157     public Future<RpcResult<GetQueueConfigOutput>> getQueueConfig(
158             GetQueueConfigInput input) {
159         return sendToSwitchExpectRpcResultFuture(
160                 input, GetQueueConfigOutput.class, "get-queue-config-input sending failed");
161     }
162
163     @Override
164     public Future<RpcResult<Void>> groupMod(GroupModInput input) {
165         return sendToSwitchFuture(input, "group-mod-input sending failed");
166     }
167
168     @Override
169     public Future<RpcResult<Void>> hello(HelloInput input) {
170         return sendToSwitchFuture(input, "hello-input sending failed");
171     }
172
173     @Override
174     public Future<RpcResult<Void>> meterMod(MeterModInput input) {
175         return sendToSwitchFuture(input, "meter-mod-input sending failed");
176     }
177
178     @Override
179     public Future<RpcResult<Void>> packetOut(PacketOutInput input) {
180         return sendToSwitchFuture(input, "packet-out-input sending failed");
181     }
182
183     @Override
184     public Future<RpcResult<Void>> portMod(PortModInput input) {
185         return sendToSwitchFuture(input, "port-mod-input sending failed");
186     }
187
188     @Override
189     public Future<RpcResult<RoleRequestOutput>> roleRequest(
190             RoleRequestInput input) {
191         return sendToSwitchExpectRpcResultFuture(
192                 input, RoleRequestOutput.class, "role-request-config-input sending failed");
193     }
194
195     @Override
196     public Future<RpcResult<Void>> setConfig(SetConfigInput input) {
197         return sendToSwitchFuture(input, "set-config-input sending failed");
198     }
199
200     @Override
201     public Future<RpcResult<Void>> tableMod(TableModInput input) {
202         return sendToSwitchFuture(input, "table-mod-input sending failed");
203     }
204
205     @Override
206     public Future<RpcResult<GetAsyncOutput>> getAsync(GetAsyncInput input) {
207         return sendToSwitchExpectRpcResultFuture(
208                 input, GetAsyncOutput.class, "get-async-input sending failed");
209     }
210
211     @Override
212     public Future<RpcResult<Void>> setAsync(SetAsyncInput input) {
213         return sendToSwitchFuture(input, "set-async-input sending failed");
214     }
215
216     @Override
217     public Future<Boolean> disconnect() {
218         ChannelFuture disconnectResult = channel.disconnect();
219         responseCache.invalidateAll();
220         disconnectOccured = true;
221
222         String failureInfo = "switch disconnecting failed";
223         ErrorSeverity errorSeverity = ErrorSeverity.ERROR;
224         String message = "Check the switch connection";
225         return handleTransportChannelFuture(disconnectResult, failureInfo, errorSeverity, message);
226     }
227
228     @Override
229     public boolean isAlive() {
230         return channel.isOpen();
231     }
232
233     @Override
234     public void setMessageListener(OpenflowProtocolListener messageListener) {
235         this.messageListener = messageListener;
236     }
237     
238     @Override
239     public void consume(DataObject message) {
240         if (disconnectOccured ) {
241             return;
242         }
243         if (message instanceof Notification) {
244             // System events
245             if (message instanceof DisconnectEvent) {
246                 systemListener.onDisconnectEvent((DisconnectEvent) message);
247                 responseCache.invalidateAll();
248                 disconnectOccured = true;
249             } 
250             // OpenFlow messages
251               else if (message instanceof EchoRequestMessage) {
252                 messageListener.onEchoRequestMessage((EchoRequestMessage) message);
253             } else if (message instanceof ErrorMessage) {
254                 messageListener.onErrorMessage((ErrorMessage) message);
255             } else if (message instanceof ExperimenterMessage) {
256                 messageListener.onExperimenterMessage((ExperimenterMessage) message);
257             } else if (message instanceof FlowRemovedMessage) {
258                 messageListener.onFlowRemovedMessage((FlowRemovedMessage) message);
259             } else if (message instanceof HelloMessage) {
260                 LOG.info("Hello received / branch");
261                 messageListener.onHelloMessage((HelloMessage) message);
262             } else if (message instanceof MultipartReplyMessage) {
263                 messageListener.onMultipartReplyMessage((MultipartReplyMessage) message);
264             } else if (message instanceof MultipartRequestMessage) {
265                 messageListener.onMultipartRequestMessage((MultipartRequestMessage) message);
266             } else if (message instanceof PacketInMessage) {
267                 messageListener.onPacketInMessage((PacketInMessage) message);
268             } else if (message instanceof PortStatusMessage) {
269                 messageListener.onPortStatusMessage((PortStatusMessage) message);
270             } else {
271                 LOG.warn("message listening not supported for type: "+message.getClass());
272             }
273         } else {
274             if (message instanceof OfHeader) {
275                 RpcResponseKey key = createRpcResponseKey((OfHeader) message);
276                 SettableFuture<RpcResult<?>> rpcFuture = findRpcResponse(key);
277                 if (rpcFuture != null) {
278                     rpcFuture.set(Rpcs.getRpcResult(true, message, null));
279                     responseCache.invalidate(key);
280                 } else {
281                     LOG.warn("received unexpected rpc response: "+key);
282                 }
283             } else {
284                 LOG.warn("message listening not supported for type: "+message.getClass());
285             }
286         }
287     }
288
289     /**
290      * sends given message to switch, sending result will be reported via return value
291      * @param input message to send
292      * @param failureInfo describes, what type of message caused failure by sending 
293      * @return future object, <ul>
294      *  <li>if send successful, {@link RpcResult} without errors and successful 
295      *  status will be returned, </li>
296      *  <li>else {@link RpcResult} will contain errors and failed status</li>
297      *  </ul>    
298      */
299     private SettableFuture<RpcResult<Void>> sendToSwitchFuture(
300             DataObject input, final String failureInfo) {
301         ChannelFuture resultFuture = channel.writeAndFlush(input);
302         
303         ErrorSeverity errorSeverity = ErrorSeverity.ERROR;
304         String errorMessage = "check switch connection";
305         return handleRpcChannelFuture(resultFuture, failureInfo, errorSeverity, errorMessage);
306     }
307     
308     /**
309      * sends given message to switch, sending result or switch response will be reported via return value
310      * @param input message to send
311      * @param responseClazz type of response
312      * @param failureInfo describes, what type of message caused failure by sending 
313      * @return future object, <ul>
314      *  <li>if send fails, {@link RpcResult} will contain errors and failed status </li>
315      *  <li>else {@link RpcResult} will be stored in responseCache and wait for particular timeout 
316      *  ({@link ConnectionAdapterImpl#RPC_RESPONSE_EXPIRATION}), 
317      *  <ul><li>either switch will manage to answer
318      *  and then corresponding response message will be set into returned future</li>
319      *  <li>or response in cache will expire and returned future will be cancelled</li></ul>
320      *  </li>
321      *  </ul>     
322      */
323     private <IN extends OfHeader, OUT extends OfHeader> SettableFuture<RpcResult<OUT>> sendToSwitchExpectRpcResultFuture(
324             IN input, Class<OUT> responseClazz, final String failureInfo) {
325         ChannelFuture resultFuture = channel.writeAndFlush(input);
326         
327         ErrorSeverity errorSeverity = ErrorSeverity.ERROR;
328         String errorMessage = "check switch connection";
329         return handleRpcChannelFutureWithResponse(resultFuture, failureInfo, errorSeverity, 
330                 errorMessage, input, responseClazz);
331     }
332
333     /**
334      * @param resultFuture
335      * @param failureInfo
336      * @return
337      */
338     private SettableFuture<RpcResult<Void>> handleRpcChannelFuture(
339             ChannelFuture resultFuture, final String failureInfo, 
340             final ErrorSeverity errorSeverity, final String errorMessage) {
341         
342         final SettableFuture<RpcResult<Void>> rpcResult = SettableFuture.create();
343         
344         resultFuture.addListener(new GenericFutureListener<io.netty.util.concurrent.Future<? super Void>>() {
345             
346             @Override
347             public void operationComplete(
348                     io.netty.util.concurrent.Future<? super Void> future)
349                     throws Exception {
350                 Collection<RpcError> errors = null;
351                 
352                 if (future.cause() != null) {
353                     RpcError rpcError = buildRpcError(failureInfo, 
354                             errorSeverity, errorMessage, future.cause());
355                     errors = Lists.newArrayList(rpcError);
356                 }
357                 
358                 rpcResult.set(Rpcs.getRpcResult(
359                         future.isSuccess(), 
360                         (Void) null, 
361                         errors)
362                 );
363             }
364         });
365         return rpcResult;
366     }
367     
368     /**
369      * @param input
370      * @param responseClazz
371      * @param resultFuture
372      * @param failureInfo
373      * @param errorSeverity
374      * @param errorMessage
375      * @return
376      */
377     private <IN extends OfHeader, OUT extends OfHeader> SettableFuture<RpcResult<OUT>> handleRpcChannelFutureWithResponse(
378             ChannelFuture resultFuture, final String failureInfo,
379             final ErrorSeverity errorSeverity, final String errorMessage,
380             final IN input, Class<OUT> responseClazz) {
381         final SettableFuture<RpcResult<OUT>> rpcResult = SettableFuture.create();
382         
383         resultFuture.addListener(new GenericFutureListener<io.netty.util.concurrent.Future<? super Void>>() {
384             
385             @Override
386             public void operationComplete(
387                     io.netty.util.concurrent.Future<? super Void> future)
388                     throws Exception {
389                 
390                 if (future.cause() != null) {
391                     Collection<RpcError> errors = null;
392                     RpcError rpcError = buildRpcError(failureInfo, 
393                             errorSeverity, errorMessage, future.cause());
394                     errors = Lists.newArrayList(rpcError);
395                     rpcResult.set(Rpcs.getRpcResult(
396                             future.isSuccess(), 
397                             (OUT) null, 
398                             errors)
399                             );
400                 } else {
401                     RpcResponseKey key = new RpcResponseKey(input.getXid(), input.getClass().toString());
402                     if (responseCache.getIfPresent(key) != null) {
403                         responseCache.invalidate(key);
404                     }
405                     responseCache.put(key, rpcResult);
406                 }
407             }
408         });
409         return rpcResult;
410     }
411
412     /**
413      * @param resultFuture
414      * @param failureInfo
415      * @param errorSeverity 
416      * @param message 
417      * @return
418      */
419     private static SettableFuture<Boolean> handleTransportChannelFuture(
420             ChannelFuture resultFuture, final String failureInfo, 
421             final ErrorSeverity errorSeverity, final String message) {
422         
423         final SettableFuture<Boolean> transportResult = SettableFuture.create();
424         
425         resultFuture.addListener(new GenericFutureListener<io.netty.util.concurrent.Future<? super Void>>() {
426             
427             @Override
428             public void operationComplete(
429                     io.netty.util.concurrent.Future<? super Void> future)
430                     throws Exception {
431                 transportResult.set(future.isSuccess());
432                 transportResult.setException(future.cause());
433             }
434         });
435         return transportResult;
436     }
437
438     /**
439      * @param cause
440      * @return
441      */
442     protected RpcError buildRpcError(String info, ErrorSeverity severity, String message, 
443             Throwable cause) {
444         RpcError error = RpcErrors.getRpcError(APPLICATION_TAG, TAG, info, severity, message, 
445                 ErrorType.RPC, cause);
446         return error;
447     }
448     
449     /**
450      * @param cause
451      * @return
452      */
453     protected RpcError buildTransportError(String info, ErrorSeverity severity, String message, 
454             Throwable cause) {
455         RpcError error = RpcErrors.getRpcError(APPLICATION_TAG, TAG, info, severity, message, 
456                 ErrorType.TRANSPORT, cause);
457         return error;
458     }
459
460     /**
461      * @param message
462      * @return
463      */
464     private static RpcResponseKey createRpcResponseKey(OfHeader message) {
465         return new RpcResponseKey(message.getXid(), message.getClass().toString());
466     }
467
468     /**
469      * @return
470      */
471     @SuppressWarnings("unchecked")
472     private SettableFuture<RpcResult<?>> findRpcResponse(RpcResponseKey key) {
473         return (SettableFuture<RpcResult<?>>) responseCache.getIfPresent(key);
474     }
475
476     @Override
477     public void setSystemListener(SystemNotificationsListener systemListener) {
478         this.systemListener = systemListener;
479     }
480     
481     @Override
482     public void checkListeners() {
483         StringBuffer buffer =  new StringBuffer();
484         if (systemListener == null) {
485             buffer.append("SystemListener ");
486         }
487         if (messageListener == null) {
488             buffer.append("MessageListener ");
489         }
490         
491         if (buffer.length() > 0) {
492             throw new IllegalStateException("Missing listeners: " + buffer.toString());
493         }
494     }
495
496 }