Barrier turn on/off-add switcher value to Config-Subsystem
[openflowjava.git] / openflow-protocol-impl / src / main / java / org / opendaylight / openflowjava / protocol / impl / core / connection / ConnectionAdapterImpl.java
index 20e4c2a83c62b2be22deb1f4d9f52d59f9ded1db..00f3d89b89f68980fbf3f6d9aee40cb4a54882fc 100644 (file)
@@ -27,6 +27,8 @@ import java.util.concurrent.TimeUnit;
 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionReadyListener;
 import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueHandler;
 import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueHandlerRegistration;
+import org.opendaylight.openflowjava.protocol.impl.core.OFVersionDetector;
+import org.opendaylight.openflowjava.protocol.impl.core.PipelineHandlers;
 import org.opendaylight.openflowjava.statistics.CounterEventTypes;
 import org.opendaylight.openflowjava.statistics.StatisticsCounters;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierInput;
@@ -117,24 +119,31 @@ public class ConnectionAdapterImpl implements ConnectionFacade {
     private OutboundQueueManager<?> outputManager;
     private boolean disconnectOccured = false;
     private final StatisticsCounters statisticsCounters;
+    private OFVersionDetector versionDetector;
     private final InetSocketAddress address;
 
+    private final boolean useBarrier;
+
     /**
      * default ctor
      * @param channel the channel to be set - used for communication
      * @param address client address (used only in case of UDP communication,
      *  as there is no need to store address over tcp (stable channel))
      */
-    public ConnectionAdapterImpl(final Channel channel, final InetSocketAddress address) {
+    public ConnectionAdapterImpl(final Channel channel, final InetSocketAddress address, final boolean useBarrier) {
+        this.channel = Preconditions.checkNotNull(channel);
+        this.output = new ChannelOutboundQueue(channel, DEFAULT_QUEUE_DEPTH, address);
+        this.address = address;
+
         responseCache = CacheBuilder.newBuilder()
                 .concurrencyLevel(1)
                 .expireAfterWrite(RPC_RESPONSE_EXPIRATION, TimeUnit.MINUTES)
                 .removalListener(REMOVAL_LISTENER).build();
-        this.channel = Preconditions.checkNotNull(channel);
-        this.output = new ChannelOutboundQueue(channel, DEFAULT_QUEUE_DEPTH, address);
-        this.address = address;
+
+        this.useBarrier = useBarrier;
         channel.pipeline().addLast(output);
         statisticsCounters = StatisticsCounters.getInstance();
+
         LOG.debug("ConnectionAdapter created");
     }
 
@@ -246,7 +255,7 @@ public class ConnectionAdapterImpl implements ConnectionFacade {
 
     @Override
     public Future<Boolean> disconnect() {
-        ChannelFuture disconnectResult = channel.disconnect();
+        final ChannelFuture disconnectResult = channel.disconnect();
         responseCache.invalidateAll();
         disconnectOccured = true;
 
@@ -265,7 +274,7 @@ public class ConnectionAdapterImpl implements ConnectionFacade {
 
     @Override
     public void consume(final DataObject message) {
-        LOG.debug("ConsumeIntern msg");
+        LOG.debug("ConsumeIntern msg on {}", channel);
         if (disconnectOccured ) {
             return;
         }
@@ -280,16 +289,23 @@ public class ConnectionAdapterImpl implements ConnectionFacade {
                 systemListener.onSwitchIdleEvent((SwitchIdleEvent) message);
                 // OpenFlow messages
             } else if (message instanceof EchoRequestMessage) {
-                messageListener.onEchoRequestMessage((EchoRequestMessage) message);
+                if (outputManager != null) {
+                    outputManager.onEchoRequest((EchoRequestMessage) message);
+                } else {
+                    messageListener.onEchoRequestMessage((EchoRequestMessage) message);
+                }
                 statisticsCounters.incrementCounter(CounterEventTypes.US_MESSAGE_PASS);
             } else if (message instanceof ErrorMessage) {
-                messageListener.onErrorMessage((ErrorMessage) message);
+                // Send only unmatched errors
+                if (outputManager == null || !outputManager.onMessage((OfHeader) message)) {
+                    messageListener.onErrorMessage((ErrorMessage) message);
+                }
                 statisticsCounters.incrementCounter(CounterEventTypes.US_MESSAGE_PASS);
             } else if (message instanceof ExperimenterMessage) {
-                messageListener.onExperimenterMessage((ExperimenterMessage) message);
                 if (outputManager != null) {
                     outputManager.onMessage((OfHeader) message);
                 }
+                messageListener.onExperimenterMessage((ExperimenterMessage) message);
                 statisticsCounters.incrementCounter(CounterEventTypes.US_MESSAGE_PASS);
             } else if (message instanceof FlowRemovedMessage) {
                 messageListener.onFlowRemovedMessage((FlowRemovedMessage) message);
@@ -299,10 +315,10 @@ public class ConnectionAdapterImpl implements ConnectionFacade {
                 messageListener.onHelloMessage((HelloMessage) message);
                 statisticsCounters.incrementCounter(CounterEventTypes.US_MESSAGE_PASS);
             } else if (message instanceof MultipartReplyMessage) {
-                messageListener.onMultipartReplyMessage((MultipartReplyMessage) message);
                 if (outputManager != null) {
                     outputManager.onMessage((OfHeader) message);
                 }
+                messageListener.onMultipartReplyMessage((MultipartReplyMessage) message);
                 statisticsCounters.incrementCounter(CounterEventTypes.US_MESSAGE_PASS);
             } else if (message instanceof PacketInMessage) {
                 messageListener.onPacketInMessage((PacketInMessage) message);
@@ -317,7 +333,7 @@ public class ConnectionAdapterImpl implements ConnectionFacade {
             LOG.debug("OFheader msg received");
 
             if (outputManager == null || !outputManager.onMessage((OfHeader) message)) {
-                RpcResponseKey key = createRpcResponseKey((OfHeader) message);
+                final RpcResponseKey key = createRpcResponseKey((OfHeader) message);
                 final ResponseExpectedRpcListener<?> listener = findRpcResponse(key);
                 if (listener != null) {
                     LOG.debug("corresponding rpcFuture found");
@@ -452,6 +468,9 @@ public class ConnectionAdapterImpl implements ConnectionFacade {
 
     @Override
     public void fireConnectionReadyNotification() {
+        versionDetector = (OFVersionDetector) channel.pipeline().get(PipelineHandlers.OF_VERSION_DETECTOR.name());
+        Preconditions.checkState(versionDetector != null);
+
         new Thread(new Runnable() {
             @Override
             public void run() {
@@ -494,6 +513,10 @@ public class ConnectionAdapterImpl implements ConnectionFacade {
             final T handler, final int maxQueueDepth, final long maxBarrierNanos) {
         Preconditions.checkState(outputManager == null, "Manager %s already registered", outputManager);
 
+        if (useBarrier) {
+
+        }
+
         final OutboundQueueManager<T> ret = new OutboundQueueManager<>(this, address, handler, maxQueueDepth, maxBarrierNanos);
         outputManager = ret;
         channel.pipeline().addLast(outputManager);
@@ -511,4 +534,10 @@ public class ConnectionAdapterImpl implements ConnectionFacade {
     Channel getChannel() {
         return channel;
     }
+
+    @Override
+    public void setPacketInFiltering(final boolean enabled) {
+        versionDetector.setFilterPacketIns(enabled);
+        LOG.debug("PacketIn filtering {}abled", enabled ? "en" : "dis");
+    }
 }