Merge "Fixing OF Multipart messages 1) So we have a MultipartRequestDesc message...
[openflowjava.git] / openflow-protocol-impl / src / main / java / org / opendaylight / openflowjava / protocol / impl / connection / ConnectionAdapterImpl.java
index 7cacb2213cbe625e3bba9f893502a02d0fc0c376..3c83945b93798d2dec8d116345826dbe47d790c0 100644 (file)
@@ -7,11 +7,14 @@ import io.netty.channel.ChannelFuture;
 import io.netty.util.concurrent.GenericFutureListener;
 
 import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
 import org.opendaylight.controller.sal.common.util.RpcErrors;
 import org.opendaylight.controller.sal.common.util.Rpcs;
+import org.opendaylight.openflowjava.protocol.api.connection.ConnectionReadyListener;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierInput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierOutput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoInput;
@@ -45,11 +48,14 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortStatusMessage;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.RoleRequestInput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.RoleRequestOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.SendMultipartRequestMessageInput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.SetAsyncInput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.SetConfigInput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.TableModInput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.system.rev130927.DisconnectEvent;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.system.rev130927.SwitchIdleEvent;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.system.rev130927.SystemNotificationsListener;
+import org.opendaylight.yangtools.yang.binding.DataContainer;
 import org.opendaylight.yangtools.yang.binding.DataObject;
 import org.opendaylight.yangtools.yang.binding.Notification;
 import org.opendaylight.yangtools.yang.common.RpcError;
@@ -67,17 +73,18 @@ import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.SettableFuture;
 
 /**
+ * Handles messages (notifications + rpcs) and connections
  * @author mirehak
  * @author michal.polkorab
  */
 public class ConnectionAdapterImpl implements ConnectionFacade {
-    
+
     /** after this time, rpc future response objects will be thrown away (in minutes) */
     public static final int RPC_RESPONSE_EXPIRATION = 1;
 
     protected static final Logger LOG = LoggerFactory
             .getLogger(ConnectionAdapterImpl.class);
-    
+
     private static final String APPLICATION_TAG = "OPENFLOW_LIBRARY";
     private static final String TAG = "OPENFLOW";
     private Channel channel;
@@ -86,26 +93,20 @@ public class ConnectionAdapterImpl implements ConnectionFacade {
     protected Cache<RpcResponseKey, SettableFuture<?>> responseCache;
     private SystemNotificationsListener systemListener;
     private boolean disconnectOccured = false;
-    
+
+    protected ConnectionReadyListener connectionReadyListener;
+
     /**
-     * default ctor 
+     * default ctor
      */
     public ConnectionAdapterImpl() {
         responseCache = CacheBuilder.newBuilder()
                 .concurrencyLevel(1)
                 .expireAfterWrite(RPC_RESPONSE_EXPIRATION, TimeUnit.MINUTES)
-                .removalListener(new RemovalListener<RpcResponseKey, SettableFuture<?>>() {
-
-                    @Override
-                    public void onRemoval(
-                            RemovalNotification<RpcResponseKey, SettableFuture<?>> notification) {
-                        LOG.warn("rpc response discarded: "+notification.getKey());
-                        notification.getValue().cancel(true);
-                    }
-                }).build();
-        LOG.info("ConnectionAdapter created");
+                .removalListener(new ResponseRemovalListener()).build();
+        LOG.debug("ConnectionAdapter created");
     }
-    
+
     /**
      * @param channel the channel to be set - used for communication
      */
@@ -180,6 +181,11 @@ public class ConnectionAdapterImpl implements ConnectionFacade {
         return sendToSwitchFuture(input, "packet-out-input sending failed");
     }
 
+    @Override
+    public Future<RpcResult<Void>> sendMultipartRequestMessage(SendMultipartRequestMessageInput input) {
+        return sendToSwitchFuture(input, "multi-part-request sending failed");
+    }
+
     @Override
     public Future<RpcResult<Void>> portMod(PortModInput input) {
         return sendToSwitchFuture(input, "port-mod-input sending failed");
@@ -234,9 +240,10 @@ public class ConnectionAdapterImpl implements ConnectionFacade {
     public void setMessageListener(OpenflowProtocolListener messageListener) {
         this.messageListener = messageListener;
     }
-    
+
     @Override
     public void consume(DataObject message) {
+        LOG.debug("ConsumeIntern msg");
         if (disconnectOccured ) {
             return;
         }
@@ -246,7 +253,9 @@ public class ConnectionAdapterImpl implements ConnectionFacade {
                 systemListener.onDisconnectEvent((DisconnectEvent) message);
                 responseCache.invalidateAll();
                 disconnectOccured = true;
-            } 
+            } else if (message instanceof SwitchIdleEvent) {
+                systemListener.onSwitchIdleEvent((SwitchIdleEvent) message);
+            }
             // OpenFlow messages
               else if (message instanceof EchoRequestMessage) {
                 messageListener.onEchoRequestMessage((EchoRequestMessage) message);
@@ -272,10 +281,15 @@ public class ConnectionAdapterImpl implements ConnectionFacade {
             }
         } else {
             if (message instanceof OfHeader) {
+                LOG.debug("OFheader msg received");
                 RpcResponseKey key = createRpcResponseKey((OfHeader) message);
-                SettableFuture<RpcResult<?>> rpcFuture = findRpcResponse(key);
+                final SettableFuture<RpcResult<?>> rpcFuture = findRpcResponse(key);
                 if (rpcFuture != null) {
-                    rpcFuture.set(Rpcs.getRpcResult(true, message, null));
+                    LOG.debug("corresponding rpcFuture found");
+                    List<RpcError> errors = Collections.emptyList();
+                    LOG.debug("before setting rpcFuture");
+                    rpcFuture.set(Rpcs.getRpcResult(true, message, errors));
+                    LOG.debug("after setting rpcFuture");
                     responseCache.invalidate(key);
                 } else {
                     LOG.warn("received unexpected rpc response: "+key);
@@ -289,45 +303,53 @@ public class ConnectionAdapterImpl implements ConnectionFacade {
     /**
      * sends given message to switch, sending result will be reported via return value
      * @param input message to send
-     * @param failureInfo describes, what type of message caused failure by sending 
+     * @param failureInfo describes, what type of message caused failure by sending
      * @return future object, <ul>
-     *  <li>if send successful, {@link RpcResult} without errors and successful 
+     *  <li>if send successful, {@link RpcResult} without errors and successful
      *  status will be returned, </li>
      *  <li>else {@link RpcResult} will contain errors and failed status</li>
-     *  </ul>    
+     *  </ul>
      */
     private SettableFuture<RpcResult<Void>> sendToSwitchFuture(
             DataObject input, final String failureInfo) {
+        LOG.debug("going to flush");
         ChannelFuture resultFuture = channel.writeAndFlush(input);
-        
+        LOG.debug("flushed");
+
         ErrorSeverity errorSeverity = ErrorSeverity.ERROR;
         String errorMessage = "check switch connection";
         return handleRpcChannelFuture(resultFuture, failureInfo, errorSeverity, errorMessage);
     }
-    
+
     /**
      * sends given message to switch, sending result or switch response will be reported via return value
      * @param input message to send
      * @param responseClazz type of response
-     * @param failureInfo describes, what type of message caused failure by sending 
+     * @param failureInfo describes, what type of message caused failure by sending
      * @return future object, <ul>
      *  <li>if send fails, {@link RpcResult} will contain errors and failed status </li>
-     *  <li>else {@link RpcResult} will be stored in responseCache and wait for particular timeout 
-     *  ({@link ConnectionAdapterImpl#RPC_RESPONSE_EXPIRATION}), 
+     *  <li>else {@link RpcResult} will be stored in responseCache and wait for particular timeout
+     *  ({@link ConnectionAdapterImpl#RPC_RESPONSE_EXPIRATION}),
      *  <ul><li>either switch will manage to answer
      *  and then corresponding response message will be set into returned future</li>
      *  <li>or response in cache will expire and returned future will be cancelled</li></ul>
      *  </li>
-     *  </ul>     
+     *  </ul>
      */
     private <IN extends OfHeader, OUT extends OfHeader> SettableFuture<RpcResult<OUT>> sendToSwitchExpectRpcResultFuture(
             IN input, Class<OUT> responseClazz, final String failureInfo) {
+        LOG.debug("going to flush");
+        SettableFuture<RpcResult<OUT>> rpcResult = SettableFuture.create();
+        RpcResponseKey key = new RpcResponseKey(input.getXid(), responseClazz);
+        responseCache.put(key, rpcResult);
         ChannelFuture resultFuture = channel.writeAndFlush(input);
-        
+        LOG.debug("flushed");
+
         ErrorSeverity errorSeverity = ErrorSeverity.ERROR;
         String errorMessage = "check switch connection";
-        return handleRpcChannelFutureWithResponse(resultFuture, failureInfo, errorSeverity, 
-                errorMessage, input, responseClazz);
+
+        return handleRpcChannelFutureWithResponse(resultFuture, failureInfo, errorSeverity,
+                errorMessage, input, responseClazz, rpcResult, key);
     }
 
     /**
@@ -336,73 +358,78 @@ public class ConnectionAdapterImpl implements ConnectionFacade {
      * @return
      */
     private SettableFuture<RpcResult<Void>> handleRpcChannelFuture(
-            ChannelFuture resultFuture, final String failureInfo, 
+            ChannelFuture resultFuture, final String failureInfo,
             final ErrorSeverity errorSeverity, final String errorMessage) {
-        
+
         final SettableFuture<RpcResult<Void>> rpcResult = SettableFuture.create();
-        
+        LOG.debug("handlerpcchannelfuture");
         resultFuture.addListener(new GenericFutureListener<io.netty.util.concurrent.Future<? super Void>>() {
-            
+
             @Override
             public void operationComplete(
                     io.netty.util.concurrent.Future<? super Void> future)
                     throws Exception {
-                Collection<RpcError> errors = null;
-                
+                LOG.debug("operation complete");
+                Collection<RpcError> errors = Collections.emptyList();
+
                 if (future.cause() != null) {
-                    RpcError rpcError = buildRpcError(failureInfo, 
+                    LOG.debug("future.cause != null");
+                    RpcError rpcError = buildRpcError(failureInfo,
                             errorSeverity, errorMessage, future.cause());
                     errors = Lists.newArrayList(rpcError);
                 }
-                
+
                 rpcResult.set(Rpcs.getRpcResult(
-                        future.isSuccess(), 
-                        (Void) null, 
+                        future.isSuccess(),
+                        (Void) null,
                         errors)
                 );
             }
         });
         return rpcResult;
     }
-    
+
     /**
-     * @param input
-     * @param responseClazz
      * @param resultFuture
      * @param failureInfo
      * @param errorSeverity
      * @param errorMessage
+     * @param input
+     * @param responseClazz
+     * @param key of rpcResponse
      * @return
      */
     private <IN extends OfHeader, OUT extends OfHeader> SettableFuture<RpcResult<OUT>> handleRpcChannelFutureWithResponse(
             ChannelFuture resultFuture, final String failureInfo,
             final ErrorSeverity errorSeverity, final String errorMessage,
-            final IN input, Class<OUT> responseClazz) {
-        final SettableFuture<RpcResult<OUT>> rpcResult = SettableFuture.create();
-        
+            final IN input, Class<OUT> responseClazz, final SettableFuture<RpcResult<OUT>> rpcResult, final RpcResponseKey key) {
+        LOG.debug("handleRpcchanfuture with response");
+
         resultFuture.addListener(new GenericFutureListener<io.netty.util.concurrent.Future<? super Void>>() {
-            
+
             @Override
             public void operationComplete(
                     io.netty.util.concurrent.Future<? super Void> future)
                     throws Exception {
-                
+
+                LOG.debug("operation complete");
+                Collection<RpcError> errors = Collections.emptyList();
                 if (future.cause() != null) {
-                    Collection<RpcError> errors = null;
-                    RpcError rpcError = buildRpcError(failureInfo, 
+                    LOG.debug("ChannelFuture.cause != null");
+                    RpcError rpcError = buildRpcError(failureInfo,
                             errorSeverity, errorMessage, future.cause());
                     errors = Lists.newArrayList(rpcError);
                     rpcResult.set(Rpcs.getRpcResult(
-                            future.isSuccess(), 
-                            (OUT) null, 
+                            future.isSuccess(),
+                            (OUT) null,
                             errors)
                             );
+                    responseCache.invalidate(key);
                 } else {
-                    RpcResponseKey key = new RpcResponseKey(input.getXid(), input.getClass().toString());
-                    if (responseCache.getIfPresent(key) != null) {
-                        responseCache.invalidate(key);
+                    LOG.debug("ChannelFuture.cause == null");
+                    if (responseCache.getIfPresent(key) == null) {
+                       LOG.debug("responcache: key wasn't present");
                     }
-                    responseCache.put(key, rpcResult);
                 }
             }
         });
@@ -412,24 +439,26 @@ public class ConnectionAdapterImpl implements ConnectionFacade {
     /**
      * @param resultFuture
      * @param failureInfo
-     * @param errorSeverity 
-     * @param message 
+     * @param errorSeverity
+     * @param message
      * @return
      */
     private static SettableFuture<Boolean> handleTransportChannelFuture(
-            ChannelFuture resultFuture, final String failureInfo, 
+            ChannelFuture resultFuture, final String failureInfo,
             final ErrorSeverity errorSeverity, final String message) {
-        
+
         final SettableFuture<Boolean> transportResult = SettableFuture.create();
-        
+
         resultFuture.addListener(new GenericFutureListener<io.netty.util.concurrent.Future<? super Void>>() {
-            
+
             @Override
             public void operationComplete(
                     io.netty.util.concurrent.Future<? super Void> future)
                     throws Exception {
                 transportResult.set(future.isSuccess());
-                transportResult.setException(future.cause());
+                if (!future.isSuccess()) {
+                    transportResult.setException(future.cause());
+                }
             }
         });
         return transportResult;
@@ -439,20 +468,20 @@ public class ConnectionAdapterImpl implements ConnectionFacade {
      * @param cause
      * @return
      */
-    protected RpcError buildRpcError(String info, ErrorSeverity severity, String message, 
+    protected RpcError buildRpcError(String info, ErrorSeverity severity, String message,
             Throwable cause) {
-        RpcError error = RpcErrors.getRpcError(APPLICATION_TAG, TAG, info, severity, message, 
+        RpcError error = RpcErrors.getRpcError(APPLICATION_TAG, TAG, info, severity, message,
                 ErrorType.RPC, cause);
         return error;
     }
-    
+
     /**
      * @param cause
      * @return
      */
-    protected RpcError buildTransportError(String info, ErrorSeverity severity, String message, 
+    protected RpcError buildTransportError(String info, ErrorSeverity severity, String message,
             Throwable cause) {
-        RpcError error = RpcErrors.getRpcError(APPLICATION_TAG, TAG, info, severity, message, 
+        RpcError error = RpcErrors.getRpcError(APPLICATION_TAG, TAG, info, severity, message,
                 ErrorType.TRANSPORT, cause);
         return error;
     }
@@ -462,7 +491,7 @@ public class ConnectionAdapterImpl implements ConnectionFacade {
      * @return
      */
     private static RpcResponseKey createRpcResponseKey(OfHeader message) {
-        return new RpcResponseKey(message.getXid(), message.getClass().toString());
+        return new RpcResponseKey(message.getXid(), message.getClass());
     }
 
     /**
@@ -477,7 +506,7 @@ public class ConnectionAdapterImpl implements ConnectionFacade {
     public void setSystemListener(SystemNotificationsListener systemListener) {
         this.systemListener = systemListener;
     }
-    
+
     @Override
     public void checkListeners() {
         StringBuffer buffer =  new StringBuffer();
@@ -487,10 +516,53 @@ public class ConnectionAdapterImpl implements ConnectionFacade {
         if (messageListener == null) {
             buffer.append("MessageListener ");
         }
-        
+        if (connectionReadyListener == null) {
+            buffer.append("ConnectionReadyListener ");
+        }
+
         if (buffer.length() > 0) {
             throw new IllegalStateException("Missing listeners: " + buffer.toString());
         }
     }
 
+    static class ResponseRemovalListener implements RemovalListener<RpcResponseKey, SettableFuture<?>> {
+        @Override
+        public void onRemoval(
+                RemovalNotification<RpcResponseKey, SettableFuture<?>> notification) {
+            SettableFuture<?> future = notification.getValue();
+            if (!future.isDone()) {
+                LOG.warn("rpc response discarded: " + notification.getKey());
+                future.cancel(true);
+            }
+        }
+    }
+
+    /**
+     * Class is used ONLY for exiting msgQueue processing thread
+     * @author michal.polkorab
+     */
+    static class ExitingDataObject implements DataObject {
+        @Override
+        public Class<? extends DataContainer> getImplementedInterface() {
+            return null;
+        }
+    }
+
+    @Override
+    public void fireConnectionReadyNotification() {
+        new Thread(new Runnable() {
+            @Override
+            public void run() {
+                connectionReadyListener.onConnectionReady();
+            }
+        }).start();
+    }
+
+
+    @Override
+    public void setConnectionReadyListener(
+            ConnectionReadyListener connectionReadyListener) {
+        this.connectionReadyListener = connectionReadyListener;
+    }
+
 }