Bump to odlparent 2.0.0
[openflowjava.git] / openflow-protocol-impl / src / main / java / org / opendaylight / openflowjava / protocol / impl / core / connection / ConnectionAdapterImpl.java
index d22f2f0606033413a53e2930677b71478768739f..8d9c87466a337039f15719754a57ccbf392118c4 100644 (file)
@@ -36,7 +36,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * Handles messages (notifications + rpcs) and connections
+ * Handles messages (notifications + rpcs) and connections.
  * @author mirehak
  * @author michal.polkorab
  */
@@ -47,17 +47,16 @@ public class ConnectionAdapterImpl extends AbstractConnectionAdapterStatistics i
     private ConnectionReadyListener connectionReadyListener;
     private OpenflowProtocolListener messageListener;
     private SystemNotificationsListener systemListener;
-    private OutboundQueueManager<?> outputManager;
+    private AbstractOutboundQueueManager<?, ?> outputManager;
     private OFVersionDetector versionDetector;
 
     private final boolean useBarrier;
 
     /**
-     * default ctor
-     *
+     * Default constructor.
      * @param channel the channel to be set - used for communication
      * @param address client address (used only in case of UDP communication,
-     *            as there is no need to store address over tcp (stable channel))
+     *                as there is no need to store address over tcp (stable channel))
      * @param useBarrier value is configurable by configSubsytem
      */
     public ConnectionAdapterImpl(final Channel channel, final InetSocketAddress address, final boolean useBarrier) {
@@ -84,7 +83,7 @@ public class ConnectionAdapterImpl extends AbstractConnectionAdapterStatistics i
     @Override
     public void consumeDeviceMessage(final DataObject message) {
         LOG.debug("ConsumeIntern msg on {}", channel);
-        if (disconnectOccured ) {
+        if (disconnectOccured) {
             return;
         }
         if (message instanceof Notification) {
@@ -96,7 +95,7 @@ public class ConnectionAdapterImpl extends AbstractConnectionAdapterStatistics i
                 disconnectOccured = true;
             } else if (message instanceof SwitchIdleEvent) {
                 systemListener.onSwitchIdleEvent((SwitchIdleEvent) message);
-                // OpenFlow messages
+            // OpenFlow messages
             } else if (message instanceof EchoRequestMessage) {
                 if (outputManager != null) {
                     outputManager.onEchoRequest((EchoRequestMessage) message);
@@ -116,7 +115,7 @@ public class ConnectionAdapterImpl extends AbstractConnectionAdapterStatistics i
             } else if (message instanceof FlowRemovedMessage) {
                 messageListener.onFlowRemovedMessage((FlowRemovedMessage) message);
             } else if (message instanceof HelloMessage) {
-                LOG.info("Hello received / branch");
+                LOG.info("Hello received");
                 messageListener.onHelloMessage((HelloMessage) message);
             } else if (message instanceof MultipartReplyMessage) {
                 if (outputManager != null) {
@@ -131,15 +130,15 @@ public class ConnectionAdapterImpl extends AbstractConnectionAdapterStatistics i
                 LOG.warn("message listening not supported for type: {}", message.getClass());
             }
         } else if (message instanceof OfHeader) {
-            LOG.debug("OFheader msg received");
+            LOG.debug("OF header msg received");
 
             if (outputManager == null || !outputManager.onMessage((OfHeader) message)) {
                 final RpcResponseKey key = createRpcResponseKey((OfHeader) message);
                 final ResponseExpectedRpcListener<?> listener = findRpcResponse(key);
                 if (listener != null) {
-                    LOG.debug("corresponding rpcFuture found");
+                    LOG.debug("Corresponding rpcFuture found");
                     listener.completed((OfHeader)message);
-                    LOG.debug("after setting rpcFuture");
+                    LOG.debug("After setting rpcFuture");
                     responseCache.invalidate(key);
                 } else {
                     LOG.warn("received unexpected rpc response: {}", key);
@@ -150,10 +149,6 @@ public class ConnectionAdapterImpl extends AbstractConnectionAdapterStatistics i
         }
     }
 
-    /**
-     * @param message
-     * @return
-     */
     private static RpcResponseKey createRpcResponseKey(final OfHeader message) {
         return new RpcResponseKey(message.getXid(), message.getImplementedInterface().getName());
     }
@@ -192,15 +187,22 @@ public class ConnectionAdapterImpl extends AbstractConnectionAdapterStatistics i
             final T handler, final int maxQueueDepth, final long maxBarrierNanos) {
         Preconditions.checkState(outputManager == null, "Manager %s already registered", outputManager);
 
+        final AbstractOutboundQueueManager<T, ?> ret;
         if (useBarrier) {
-
+            ret = new OutboundQueueManager<>(this, address, handler, maxQueueDepth, maxBarrierNanos);
+        } else {
+            LOG.warn("OutboundQueueManager without barrier is started.");
+            ret = new OutboundQueueManagerNoBarrier<>(this, address, handler);
         }
 
-        final OutboundQueueManager<T> ret = new OutboundQueueManager<>(this, address, handler, maxQueueDepth, maxBarrierNanos);
         outputManager = ret;
         /* we don't need it anymore */
         channel.pipeline().remove(output);
-        channel.pipeline().addLast(outputManager);
+        // OutboundQueueManager is put before DelegatingInboundHandler because otherwise channelInactive event would
+        // be first processed in OutboundQueueManager and then in ConnectionAdapter (and Openflowplugin). This might
+        // cause problems because we are shutting down the queue before Openflowplugin knows about it.
+        channel.pipeline().addBefore(PipelineHandlers.DELEGATING_INBOUND_HANDLER.name(),
+                PipelineHandlers.CHANNEL_OUTBOUND_QUEUE_MANAGER.name(), outputManager);
 
         return new OutboundQueueHandlerRegistrationImpl<T>(handler) {
             @Override