Added support for OF 1.0
[openflowjava.git] / openflow-protocol-impl / src / main / java / org / opendaylight / openflowjava / protocol / impl / connection / ConnectionAdapterImpl.java
index 5f670132de9812d8ae5d1c1beb34cb03a77085bb..499e76e570b2f07b2564ed4c65d89d78604c0600 100644 (file)
@@ -1,10 +1,4 @@
-/**
- * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
+/* Copyright (C)2013 Pantheon Technologies, s.r.o. All rights reserved. */
 
 package org.opendaylight.openflowjava.protocol.impl.connection;
 
@@ -13,12 +7,14 @@ import io.netty.channel.ChannelFuture;
 import io.netty.util.concurrent.GenericFutureListener;
 
 import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
 import org.opendaylight.controller.sal.common.util.RpcErrors;
 import org.opendaylight.controller.sal.common.util.Rpcs;
-import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
+import org.opendaylight.openflowjava.protocol.api.connection.ConnectionReadyListener;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierInput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierOutput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoInput;
@@ -55,6 +51,10 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.SetAsyncInput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.SetConfigInput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.TableModInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.system.rev130927.DisconnectEvent;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.system.rev130927.SwitchIdleEvent;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.system.rev130927.SystemNotificationsListener;
+import org.opendaylight.yangtools.yang.binding.DataContainer;
 import org.opendaylight.yangtools.yang.binding.DataObject;
 import org.opendaylight.yangtools.yang.binding.Notification;
 import org.opendaylight.yangtools.yang.common.RpcError;
@@ -73,9 +73,9 @@ import com.google.common.util.concurrent.SettableFuture;
 
 /**
  * @author mirehak
- *
+ * @author michal.polkorab
  */
-public class ConnectionAdapterImpl implements ConnectionAdapter, MessageConsumer {
+public class ConnectionAdapterImpl implements ConnectionFacade {
     
     /** after this time, rpc future response objects will be thrown away (in minutes) */
     public static final int RPC_RESPONSE_EXPIRATION = 1;
@@ -89,8 +89,11 @@ public class ConnectionAdapterImpl implements ConnectionAdapter, MessageConsumer
     private OpenflowProtocolListener messageListener;
     /** expiring cache for future rpcResponses */
     protected Cache<RpcResponseKey, SettableFuture<?>> responseCache;
-    
-    
+    private SystemNotificationsListener systemListener;
+    private boolean disconnectOccured = false;
+
+    protected ConnectionReadyListener connectionReadyListener;
+
     /**
      * default ctor 
      */
@@ -98,19 +101,12 @@ public class ConnectionAdapterImpl implements ConnectionAdapter, MessageConsumer
         responseCache = CacheBuilder.newBuilder()
                 .concurrencyLevel(1)
                 .expireAfterWrite(RPC_RESPONSE_EXPIRATION, TimeUnit.MINUTES)
-                .removalListener(new RemovalListener<RpcResponseKey, SettableFuture<?>>() {
-
-                    @Override
-                    public void onRemoval(
-                            RemovalNotification<RpcResponseKey, SettableFuture<?>> notification) {
-                        LOG.warn("rpc response discarded: "+notification.getKey());
-                        notification.getValue().cancel(true);
-                    }
-                }).build();
+                .removalListener(new ResponseRemovalListener()).build();
+        LOG.debug("ConnectionAdapter created");
     }
     
     /**
-     * @param channel the channel to set
+     * @param channel the channel to be set - used for communication
      */
     public void setChannel(Channel channel) {
         this.channel = channel;
@@ -219,7 +215,9 @@ public class ConnectionAdapterImpl implements ConnectionAdapter, MessageConsumer
     @Override
     public Future<Boolean> disconnect() {
         ChannelFuture disconnectResult = channel.disconnect();
-        
+        responseCache.invalidateAll();
+        disconnectOccured = true;
+
         String failureInfo = "switch disconnecting failed";
         ErrorSeverity errorSeverity = ErrorSeverity.ERROR;
         String message = "Check the switch connection";
@@ -238,8 +236,21 @@ public class ConnectionAdapterImpl implements ConnectionAdapter, MessageConsumer
     
     @Override
     public void consume(DataObject message) {
+        LOG.debug("ConsumeIntern msg");
+        if (disconnectOccured ) {
+            return;
+        }
         if (message instanceof Notification) {
-            if (message instanceof EchoRequestMessage) {
+            // System events
+            if (message instanceof DisconnectEvent) {
+                systemListener.onDisconnectEvent((DisconnectEvent) message);
+                responseCache.invalidateAll();
+                disconnectOccured = true;
+            } else if (message instanceof SwitchIdleEvent) {
+                systemListener.onSwitchIdleEvent((SwitchIdleEvent) message);
+            }
+            // OpenFlow messages
+              else if (message instanceof EchoRequestMessage) {
                 messageListener.onEchoRequestMessage((EchoRequestMessage) message);
             } else if (message instanceof ErrorMessage) {
                 messageListener.onErrorMessage((ErrorMessage) message);
@@ -248,6 +259,7 @@ public class ConnectionAdapterImpl implements ConnectionAdapter, MessageConsumer
             } else if (message instanceof FlowRemovedMessage) {
                 messageListener.onFlowRemovedMessage((FlowRemovedMessage) message);
             } else if (message instanceof HelloMessage) {
+                LOG.info("Hello received / branch");
                 messageListener.onHelloMessage((HelloMessage) message);
             } else if (message instanceof MultipartReplyMessage) {
                 messageListener.onMultipartReplyMessage((MultipartReplyMessage) message);
@@ -262,15 +274,19 @@ public class ConnectionAdapterImpl implements ConnectionAdapter, MessageConsumer
             }
         } else {
             if (message instanceof OfHeader) {
+                LOG.debug("OFheader msg received");
                 RpcResponseKey key = createRpcResponseKey((OfHeader) message);
-                SettableFuture<RpcResult<?>> rpcFuture = findRpcResponse(key);
+                final SettableFuture<RpcResult<?>> rpcFuture = findRpcResponse(key);
                 if (rpcFuture != null) {
-                    rpcFuture.set(Rpcs.getRpcResult(true, message, null));
+                    LOG.debug("corresponding rpcFuture found");
+                    List<RpcError> errors = Collections.emptyList();
+                    LOG.debug("before setting rpcFuture");
+                    rpcFuture.set(Rpcs.getRpcResult(true, message, errors));
+                    LOG.debug("after setting rpcFuture");
                     responseCache.invalidate(key);
                 } else {
                     LOG.warn("received unexpected rpc response: "+key);
                 }
-                
             } else {
                 LOG.warn("message listening not supported for type: "+message.getClass());
             }
@@ -289,7 +305,9 @@ public class ConnectionAdapterImpl implements ConnectionAdapter, MessageConsumer
      */
     private SettableFuture<RpcResult<Void>> sendToSwitchFuture(
             DataObject input, final String failureInfo) {
+        LOG.debug("going to flush");
         ChannelFuture resultFuture = channel.writeAndFlush(input);
+        LOG.debug("flushed");
         
         ErrorSeverity errorSeverity = ErrorSeverity.ERROR;
         String errorMessage = "check switch connection";
@@ -313,12 +331,18 @@ public class ConnectionAdapterImpl implements ConnectionAdapter, MessageConsumer
      */
     private <IN extends OfHeader, OUT extends OfHeader> SettableFuture<RpcResult<OUT>> sendToSwitchExpectRpcResultFuture(
             IN input, Class<OUT> responseClazz, final String failureInfo) {
+        LOG.debug("going to flush");
+        SettableFuture<RpcResult<OUT>> rpcResult = SettableFuture.create();
+        RpcResponseKey key = new RpcResponseKey(input.getXid(), responseClazz);
+        responseCache.put(key, rpcResult);
         ChannelFuture resultFuture = channel.writeAndFlush(input);
+        LOG.debug("flushed");
         
         ErrorSeverity errorSeverity = ErrorSeverity.ERROR;
         String errorMessage = "check switch connection";
+        
         return handleRpcChannelFutureWithResponse(resultFuture, failureInfo, errorSeverity, 
-                errorMessage, input, responseClazz);
+                errorMessage, input, responseClazz, rpcResult, key);
     }
 
     /**
@@ -331,16 +355,18 @@ public class ConnectionAdapterImpl implements ConnectionAdapter, MessageConsumer
             final ErrorSeverity errorSeverity, final String errorMessage) {
         
         final SettableFuture<RpcResult<Void>> rpcResult = SettableFuture.create();
-        
+        LOG.debug("handlerpcchannelfuture");
         resultFuture.addListener(new GenericFutureListener<io.netty.util.concurrent.Future<? super Void>>() {
             
             @Override
             public void operationComplete(
                     io.netty.util.concurrent.Future<? super Void> future)
                     throws Exception {
-                Collection<RpcError> errors = null;
+                LOG.debug("operation complete");
+                Collection<RpcError> errors = Collections.emptyList();
                 
                 if (future.cause() != null) {
+                    LOG.debug("future.cause != null");
                     RpcError rpcError = buildRpcError(failureInfo, 
                             errorSeverity, errorMessage, future.cause());
                     errors = Lists.newArrayList(rpcError);
@@ -357,19 +383,20 @@ public class ConnectionAdapterImpl implements ConnectionAdapter, MessageConsumer
     }
     
     /**
-     * @param input
-     * @param responseClazz
      * @param resultFuture
      * @param failureInfo
      * @param errorSeverity
      * @param errorMessage
+     * @param input
+     * @param responseClazz
+     * @param key of rpcResponse
      * @return
      */
     private <IN extends OfHeader, OUT extends OfHeader> SettableFuture<RpcResult<OUT>> handleRpcChannelFutureWithResponse(
             ChannelFuture resultFuture, final String failureInfo,
             final ErrorSeverity errorSeverity, final String errorMessage,
-            final IN input, Class<OUT> responseClazz) {
-        final SettableFuture<RpcResult<OUT>> rpcResult = SettableFuture.create();
+            final IN input, Class<OUT> responseClazz, final SettableFuture<RpcResult<OUT>> rpcResult, final RpcResponseKey key) {
+        LOG.debug("handleRpcchanfuture with response");
         
         resultFuture.addListener(new GenericFutureListener<io.netty.util.concurrent.Future<? super Void>>() {
             
@@ -378,8 +405,10 @@ public class ConnectionAdapterImpl implements ConnectionAdapter, MessageConsumer
                     io.netty.util.concurrent.Future<? super Void> future)
                     throws Exception {
                 
+                LOG.debug("operation complete");
+                Collection<RpcError> errors = Collections.emptyList();
                 if (future.cause() != null) {
-                    Collection<RpcError> errors = null;
+                    LOG.debug("ChannelFuture.cause != null");
                     RpcError rpcError = buildRpcError(failureInfo, 
                             errorSeverity, errorMessage, future.cause());
                     errors = Lists.newArrayList(rpcError);
@@ -388,12 +417,12 @@ public class ConnectionAdapterImpl implements ConnectionAdapter, MessageConsumer
                             (OUT) null, 
                             errors)
                             );
+                    responseCache.invalidate(key);
                 } else {
-                    RpcResponseKey key = new RpcResponseKey(input.getXid(), input.getClass().toString());
-                    if (responseCache.getIfPresent(key) != null) {
-                        responseCache.invalidate(key);
+                    LOG.debug("ChannelFuture.cause == null");
+                    if (responseCache.getIfPresent(key) == null) {
+                       LOG.debug("responcache: key wasn't present");
                     }
-                    responseCache.put(key, rpcResult);
                 }
             }
         });
@@ -420,7 +449,9 @@ public class ConnectionAdapterImpl implements ConnectionAdapter, MessageConsumer
                     io.netty.util.concurrent.Future<? super Void> future)
                     throws Exception {
                 transportResult.set(future.isSuccess());
-                transportResult.setException(future.cause());
+                if (!future.isSuccess()) {
+                    transportResult.setException(future.cause());
+                }
             }
         });
         return transportResult;
@@ -453,7 +484,7 @@ public class ConnectionAdapterImpl implements ConnectionAdapter, MessageConsumer
      * @return
      */
     private static RpcResponseKey createRpcResponseKey(OfHeader message) {
-        return new RpcResponseKey(message.getXid(), message.getClass().toString());
+        return new RpcResponseKey(message.getXid(), message.getClass());
     }
 
     /**
@@ -464,4 +495,67 @@ public class ConnectionAdapterImpl implements ConnectionAdapter, MessageConsumer
         return (SettableFuture<RpcResult<?>>) responseCache.getIfPresent(key);
     }
 
+    @Override
+    public void setSystemListener(SystemNotificationsListener systemListener) {
+        this.systemListener = systemListener;
+    }
+    
+    @Override
+    public void checkListeners() {
+        StringBuffer buffer =  new StringBuffer();
+        if (systemListener == null) {
+            buffer.append("SystemListener ");
+        }
+        if (messageListener == null) {
+            buffer.append("MessageListener ");
+        }
+        if (connectionReadyListener == null) {
+            buffer.append("ConnectionReadyListener ");
+        }
+        
+        if (buffer.length() > 0) {
+            throw new IllegalStateException("Missing listeners: " + buffer.toString());
+        }
+    }
+
+    static class ResponseRemovalListener implements RemovalListener<RpcResponseKey, SettableFuture<?>> {
+        @Override
+        public void onRemoval(
+                RemovalNotification<RpcResponseKey, SettableFuture<?>> notification) {
+            SettableFuture<?> future = notification.getValue();
+            if (!future.isDone()) {
+                LOG.warn("rpc response discarded: " + notification.getKey());
+                future.cancel(true);
+            }
+        }
+    }
+
+    /**
+     * Class is used ONLY for exiting msgQueue processing thread
+     * @author michal.polkorab
+     */
+    static class ExitingDataObject implements DataObject {
+        @Override
+        public Class<? extends DataContainer> getImplementedInterface() {
+            return null;
+        }
+    }
+    
+    @Override
+    public void fireConnectionReadyNotification() {
+        new Thread(new Runnable() {
+            @Override
+            public void run() {
+                connectionReadyListener.onConnectionReady();
+            }
+        }).start();
+    }
+    
+    
+    @Override
+    public void setConnectionReadyListener(
+            ConnectionReadyListener connectionReadyListener) {
+        this.connectionReadyListener = connectionReadyListener;
+    }
+    
 }