"Added more deserialization factories & their unit tests"
[openflowjava.git] / openflow-protocol-impl / src / main / java / org / opendaylight / openflowjava / protocol / impl / connection / ConnectionAdapterImpl.java
1 /**
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.openflowjava.protocol.impl.connection;
10
11 import io.netty.channel.Channel;
12 import io.netty.channel.ChannelFuture;
13 import io.netty.util.concurrent.GenericFutureListener;
14
15 import java.util.Collection;
16 import java.util.concurrent.Future;
17 import java.util.concurrent.TimeUnit;
18
19 import org.opendaylight.controller.sal.common.util.RpcErrors;
20 import org.opendaylight.controller.sal.common.util.Rpcs;
21 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierInput;
22 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierOutput;
23 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoInput;
24 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoOutput;
25 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoReplyInput;
26 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoRequestMessage;
27 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.ErrorMessage;
28 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.ExperimenterInput;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.ExperimenterMessage;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FlowModInput;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FlowRemovedMessage;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetAsyncInput;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetAsyncOutput;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetConfigInput;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetConfigOutput;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetFeaturesInput;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetFeaturesOutput;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetQueueConfigInput;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetQueueConfigOutput;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GroupModInput;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.HelloInput;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.HelloMessage;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MeterModInput;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartReplyMessage;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartRequestMessage;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OpenflowProtocolListener;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PacketInMessage;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PacketOutInput;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortModInput;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortStatusMessage;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.RoleRequestInput;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.RoleRequestOutput;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.SetAsyncInput;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.SetConfigInput;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.TableModInput;
57 import org.opendaylight.yangtools.yang.binding.DataObject;
58 import org.opendaylight.yangtools.yang.binding.Notification;
59 import org.opendaylight.yangtools.yang.common.RpcError;
60 import org.opendaylight.yangtools.yang.common.RpcError.ErrorSeverity;
61 import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
62 import org.opendaylight.yangtools.yang.common.RpcResult;
63 import org.slf4j.Logger;
64 import org.slf4j.LoggerFactory;
65
66 import com.google.common.cache.Cache;
67 import com.google.common.cache.CacheBuilder;
68 import com.google.common.cache.RemovalListener;
69 import com.google.common.cache.RemovalNotification;
70 import com.google.common.collect.Lists;
71 import com.google.common.util.concurrent.SettableFuture;
72
73 /**
74  * @author mirehak
75  * @author michal.polkorab
76  */
77 public class ConnectionAdapterImpl implements ConnectionFacade {
78     
79     /** after this time, rpc future response objects will be thrown away (in minutes) */
80     public static final int RPC_RESPONSE_EXPIRATION = 1;
81
82     protected static final Logger LOG = LoggerFactory
83             .getLogger(ConnectionAdapterImpl.class);
84     
85     private static final String APPLICATION_TAG = "OPENFLOW_LIBRARY";
86     private static final String TAG = "OPENFLOW";
87     private Channel channel;
88     private OpenflowProtocolListener messageListener;
89     /** expiring cache for future rpcResponses */
90     protected Cache<RpcResponseKey, SettableFuture<?>> responseCache;
91     
92     
93     /**
94      * default ctor 
95      */
96     public ConnectionAdapterImpl() {
97         responseCache = CacheBuilder.newBuilder()
98                 .concurrencyLevel(1)
99                 .expireAfterWrite(RPC_RESPONSE_EXPIRATION, TimeUnit.MINUTES)
100                 .removalListener(new RemovalListener<RpcResponseKey, SettableFuture<?>>() {
101
102                     @Override
103                     public void onRemoval(
104                             RemovalNotification<RpcResponseKey, SettableFuture<?>> notification) {
105                         LOG.warn("rpc response discarded: "+notification.getKey());
106                         notification.getValue().cancel(true);
107                     }
108                 }).build();
109         LOG.info("ConnectionAdapter created");
110     }
111     
112     /**
113      * @param channel the channel to be set - used for communication
114      */
115     public void setChannel(Channel channel) {
116         this.channel = channel;
117     }
118
119     @Override
120     public Future<RpcResult<BarrierOutput>> barrier(BarrierInput input) {
121         return sendToSwitchExpectRpcResultFuture(
122                 input, BarrierOutput.class, "barrier-input sending failed");
123     }
124
125     @Override
126     public Future<RpcResult<EchoOutput>> echo(EchoInput input) {
127         return sendToSwitchExpectRpcResultFuture(
128                 input, EchoOutput.class, "echo-input sending failed");
129     }
130
131     @Override
132     public Future<RpcResult<Void>> echoReply(EchoReplyInput input) {
133         return sendToSwitchFuture(input, "echo-reply sending failed");
134     }
135
136     @Override
137     public Future<RpcResult<Void>> experimenter(ExperimenterInput input) {
138         return sendToSwitchFuture(input, "experimenter sending failed");
139     }
140
141     @Override
142     public Future<RpcResult<Void>> flowMod(FlowModInput input) {
143         return sendToSwitchFuture(input, "flow-mod sending failed");
144     }
145
146     @Override
147     public Future<RpcResult<GetConfigOutput>> getConfig(GetConfigInput input) {
148         return sendToSwitchExpectRpcResultFuture(
149                 input, GetConfigOutput.class, "get-config-input sending failed");
150     }
151
152     @Override
153     public Future<RpcResult<GetFeaturesOutput>> getFeatures(
154             GetFeaturesInput input) {
155         return sendToSwitchExpectRpcResultFuture(
156                 input, GetFeaturesOutput.class, "get-features-input sending failed");
157     }
158
159     @Override
160     public Future<RpcResult<GetQueueConfigOutput>> getQueueConfig(
161             GetQueueConfigInput input) {
162         return sendToSwitchExpectRpcResultFuture(
163                 input, GetQueueConfigOutput.class, "get-queue-config-input sending failed");
164     }
165
166     @Override
167     public Future<RpcResult<Void>> groupMod(GroupModInput input) {
168         return sendToSwitchFuture(input, "group-mod-input sending failed");
169     }
170
171     @Override
172     public Future<RpcResult<Void>> hello(HelloInput input) {
173         return sendToSwitchFuture(input, "hello-input sending failed");
174     }
175
176     @Override
177     public Future<RpcResult<Void>> meterMod(MeterModInput input) {
178         return sendToSwitchFuture(input, "meter-mod-input sending failed");
179     }
180
181     @Override
182     public Future<RpcResult<Void>> packetOut(PacketOutInput input) {
183         return sendToSwitchFuture(input, "packet-out-input sending failed");
184     }
185
186     @Override
187     public Future<RpcResult<Void>> portMod(PortModInput input) {
188         return sendToSwitchFuture(input, "port-mod-input sending failed");
189     }
190
191     @Override
192     public Future<RpcResult<RoleRequestOutput>> roleRequest(
193             RoleRequestInput input) {
194         return sendToSwitchExpectRpcResultFuture(
195                 input, RoleRequestOutput.class, "role-request-config-input sending failed");
196     }
197
198     @Override
199     public Future<RpcResult<Void>> setConfig(SetConfigInput input) {
200         return sendToSwitchFuture(input, "set-config-input sending failed");
201     }
202
203     @Override
204     public Future<RpcResult<Void>> tableMod(TableModInput input) {
205         return sendToSwitchFuture(input, "table-mod-input sending failed");
206     }
207
208     @Override
209     public Future<RpcResult<GetAsyncOutput>> getAsync(GetAsyncInput input) {
210         return sendToSwitchExpectRpcResultFuture(
211                 input, GetAsyncOutput.class, "get-async-input sending failed");
212     }
213
214     @Override
215     public Future<RpcResult<Void>> setAsync(SetAsyncInput input) {
216         return sendToSwitchFuture(input, "set-async-input sending failed");
217     }
218
219     @Override
220     public Future<Boolean> disconnect() {
221         ChannelFuture disconnectResult = channel.disconnect();
222         
223         String failureInfo = "switch disconnecting failed";
224         ErrorSeverity errorSeverity = ErrorSeverity.ERROR;
225         String message = "Check the switch connection";
226         return handleTransportChannelFuture(disconnectResult, failureInfo, errorSeverity, message);
227     }
228
229     @Override
230     public boolean isAlive() {
231         return channel.isOpen();
232     }
233
234     @Override
235     public void setMessageListener(OpenflowProtocolListener messageListener) {
236         this.messageListener = messageListener;
237     }
238     
239     @Override
240     public void consume(DataObject message) {
241         if (message instanceof Notification) {
242             if (message instanceof EchoRequestMessage) {
243                 messageListener.onEchoRequestMessage((EchoRequestMessage) message);
244             } else if (message instanceof ErrorMessage) {
245                 messageListener.onErrorMessage((ErrorMessage) message);
246             } else if (message instanceof ExperimenterMessage) {
247                 messageListener.onExperimenterMessage((ExperimenterMessage) message);
248             } else if (message instanceof FlowRemovedMessage) {
249                 messageListener.onFlowRemovedMessage((FlowRemovedMessage) message);
250             } else if (message instanceof HelloMessage) {
251                 LOG.info("Hello received / branch");
252                 messageListener.onHelloMessage((HelloMessage) message);
253             } else if (message instanceof MultipartReplyMessage) {
254                 messageListener.onMultipartReplyMessage((MultipartReplyMessage) message);
255             } else if (message instanceof MultipartRequestMessage) {
256                 messageListener.onMultipartRequestMessage((MultipartRequestMessage) message);
257             } else if (message instanceof PacketInMessage) {
258                 messageListener.onPacketInMessage((PacketInMessage) message);
259             } else if (message instanceof PortStatusMessage) {
260                 messageListener.onPortStatusMessage((PortStatusMessage) message);
261             } else {
262                 LOG.warn("message listening not supported for type: "+message.getClass());
263             }
264         } else {
265             if (message instanceof OfHeader) {
266                 RpcResponseKey key = createRpcResponseKey((OfHeader) message);
267                 SettableFuture<RpcResult<?>> rpcFuture = findRpcResponse(key);
268                 if (rpcFuture != null) {
269                     rpcFuture.set(Rpcs.getRpcResult(true, message, null));
270                     responseCache.invalidate(key);
271                 } else {
272                     LOG.warn("received unexpected rpc response: "+key);
273                 }
274                 
275             } else {
276                 LOG.warn("message listening not supported for type: "+message.getClass());
277             }
278         }
279     }
280
281     /**
282      * sends given message to switch, sending result will be reported via return value
283      * @param input message to send
284      * @param failureInfo describes, what type of message caused failure by sending 
285      * @return future object, <ul>
286      *  <li>if send successful, {@link RpcResult} without errors and successful 
287      *  status will be returned, </li>
288      *  <li>else {@link RpcResult} will contain errors and failed status</li>
289      *  </ul>    
290      */
291     private SettableFuture<RpcResult<Void>> sendToSwitchFuture(
292             DataObject input, final String failureInfo) {
293         ChannelFuture resultFuture = channel.writeAndFlush(input);
294         
295         ErrorSeverity errorSeverity = ErrorSeverity.ERROR;
296         String errorMessage = "check switch connection";
297         return handleRpcChannelFuture(resultFuture, failureInfo, errorSeverity, errorMessage);
298     }
299     
300     /**
301      * sends given message to switch, sending result or switch response will be reported via return value
302      * @param input message to send
303      * @param responseClazz type of response
304      * @param failureInfo describes, what type of message caused failure by sending 
305      * @return future object, <ul>
306      *  <li>if send fails, {@link RpcResult} will contain errors and failed status </li>
307      *  <li>else {@link RpcResult} will be stored in responseCache and wait for particular timeout 
308      *  ({@link ConnectionAdapterImpl#RPC_RESPONSE_EXPIRATION}), 
309      *  <ul><li>either switch will manage to answer
310      *  and then corresponding response message will be set into returned future</li>
311      *  <li>or response in cache will expire and returned future will be cancelled</li></ul>
312      *  </li>
313      *  </ul>     
314      */
315     private <IN extends OfHeader, OUT extends OfHeader> SettableFuture<RpcResult<OUT>> sendToSwitchExpectRpcResultFuture(
316             IN input, Class<OUT> responseClazz, final String failureInfo) {
317         ChannelFuture resultFuture = channel.writeAndFlush(input);
318         
319         ErrorSeverity errorSeverity = ErrorSeverity.ERROR;
320         String errorMessage = "check switch connection";
321         return handleRpcChannelFutureWithResponse(resultFuture, failureInfo, errorSeverity, 
322                 errorMessage, input, responseClazz);
323     }
324
325     /**
326      * @param resultFuture
327      * @param failureInfo
328      * @return
329      */
330     private SettableFuture<RpcResult<Void>> handleRpcChannelFuture(
331             ChannelFuture resultFuture, final String failureInfo, 
332             final ErrorSeverity errorSeverity, final String errorMessage) {
333         
334         final SettableFuture<RpcResult<Void>> rpcResult = SettableFuture.create();
335         
336         resultFuture.addListener(new GenericFutureListener<io.netty.util.concurrent.Future<? super Void>>() {
337             
338             @Override
339             public void operationComplete(
340                     io.netty.util.concurrent.Future<? super Void> future)
341                     throws Exception {
342                 Collection<RpcError> errors = null;
343                 
344                 if (future.cause() != null) {
345                     RpcError rpcError = buildRpcError(failureInfo, 
346                             errorSeverity, errorMessage, future.cause());
347                     errors = Lists.newArrayList(rpcError);
348                 }
349                 
350                 rpcResult.set(Rpcs.getRpcResult(
351                         future.isSuccess(), 
352                         (Void) null, 
353                         errors)
354                 );
355             }
356         });
357         return rpcResult;
358     }
359     
360     /**
361      * @param input
362      * @param responseClazz
363      * @param resultFuture
364      * @param failureInfo
365      * @param errorSeverity
366      * @param errorMessage
367      * @return
368      */
369     private <IN extends OfHeader, OUT extends OfHeader> SettableFuture<RpcResult<OUT>> handleRpcChannelFutureWithResponse(
370             ChannelFuture resultFuture, final String failureInfo,
371             final ErrorSeverity errorSeverity, final String errorMessage,
372             final IN input, Class<OUT> responseClazz) {
373         final SettableFuture<RpcResult<OUT>> rpcResult = SettableFuture.create();
374         
375         resultFuture.addListener(new GenericFutureListener<io.netty.util.concurrent.Future<? super Void>>() {
376             
377             @Override
378             public void operationComplete(
379                     io.netty.util.concurrent.Future<? super Void> future)
380                     throws Exception {
381                 
382                 if (future.cause() != null) {
383                     Collection<RpcError> errors = null;
384                     RpcError rpcError = buildRpcError(failureInfo, 
385                             errorSeverity, errorMessage, future.cause());
386                     errors = Lists.newArrayList(rpcError);
387                     rpcResult.set(Rpcs.getRpcResult(
388                             future.isSuccess(), 
389                             (OUT) null, 
390                             errors)
391                             );
392                 } else {
393                     RpcResponseKey key = new RpcResponseKey(input.getXid(), input.getClass().toString());
394                     if (responseCache.getIfPresent(key) != null) {
395                         responseCache.invalidate(key);
396                     }
397                     responseCache.put(key, rpcResult);
398                 }
399             }
400         });
401         return rpcResult;
402     }
403
404     /**
405      * @param resultFuture
406      * @param failureInfo
407      * @param errorSeverity 
408      * @param message 
409      * @return
410      */
411     private static SettableFuture<Boolean> handleTransportChannelFuture(
412             ChannelFuture resultFuture, final String failureInfo, 
413             final ErrorSeverity errorSeverity, final String message) {
414         
415         final SettableFuture<Boolean> transportResult = SettableFuture.create();
416         
417         resultFuture.addListener(new GenericFutureListener<io.netty.util.concurrent.Future<? super Void>>() {
418             
419             @Override
420             public void operationComplete(
421                     io.netty.util.concurrent.Future<? super Void> future)
422                     throws Exception {
423                 transportResult.set(future.isSuccess());
424                 transportResult.setException(future.cause());
425             }
426         });
427         return transportResult;
428     }
429
430     /**
431      * @param cause
432      * @return
433      */
434     protected RpcError buildRpcError(String info, ErrorSeverity severity, String message, 
435             Throwable cause) {
436         RpcError error = RpcErrors.getRpcError(APPLICATION_TAG, TAG, info, severity, message, 
437                 ErrorType.RPC, cause);
438         return error;
439     }
440     
441     /**
442      * @param cause
443      * @return
444      */
445     protected RpcError buildTransportError(String info, ErrorSeverity severity, String message, 
446             Throwable cause) {
447         RpcError error = RpcErrors.getRpcError(APPLICATION_TAG, TAG, info, severity, message, 
448                 ErrorType.TRANSPORT, cause);
449         return error;
450     }
451
452     /**
453      * @param message
454      * @return
455      */
456     private static RpcResponseKey createRpcResponseKey(OfHeader message) {
457         return new RpcResponseKey(message.getXid(), message.getClass().toString());
458     }
459
460     /**
461      * @return
462      */
463     @SuppressWarnings("unchecked")
464     private SettableFuture<RpcResult<?>> findRpcResponse(RpcResponseKey key) {
465         return (SettableFuture<RpcResult<?>>) responseCache.getIfPresent(key);
466     }
467
468 }