X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=openflow-protocol-impl%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fopenflowjava%2Fprotocol%2Fimpl%2Fcore%2Fconnection%2FConnectionAdapterImpl.java;h=00f3d89b89f68980fbf3f6d9aee40cb4a54882fc;hb=07ac625cae363f78ea5dd48356f617544c73913d;hp=d81363c7464031cf9c0fb6f39f64860f82428264;hpb=f04d245dfbee2cc3e69db0ca17a5a819c5ef5b51;p=openflowjava.git diff --git a/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/ConnectionAdapterImpl.java b/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/ConnectionAdapterImpl.java index d81363c7..00f3d89b 100644 --- a/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/ConnectionAdapterImpl.java +++ b/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/ConnectionAdapterImpl.java @@ -9,16 +9,26 @@ package org.opendaylight.openflowjava.protocol.impl.core.connection; +import com.google.common.base.Preconditions; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.RemovalCause; +import com.google.common.cache.RemovalListener; +import com.google.common.cache.RemovalNotification; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.util.concurrent.GenericFutureListener; - import java.net.InetSocketAddress; import java.util.concurrent.Future; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; - import org.opendaylight.openflowjava.protocol.api.connection.ConnectionReadyListener; +import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueHandler; +import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueHandlerRegistration; +import org.opendaylight.openflowjava.protocol.impl.core.OFVersionDetector; +import org.opendaylight.openflowjava.protocol.impl.core.PipelineHandlers; import org.opendaylight.openflowjava.statistics.CounterEventTypes; import org.opendaylight.openflowjava.statistics.StatisticsCounters; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierInput; @@ -66,15 +76,6 @@ import org.opendaylight.yangtools.yang.common.RpcResult; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.base.Preconditions; -import com.google.common.cache.Cache; -import com.google.common.cache.CacheBuilder; -import com.google.common.cache.RemovalCause; -import com.google.common.cache.RemovalListener; -import com.google.common.cache.RemovalNotification; -import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.SettableFuture; - /** * Handles messages (notifications + rpcs) and connections * @author mirehak @@ -115,8 +116,13 @@ public class ConnectionAdapterImpl implements ConnectionFacade { private ConnectionReadyListener connectionReadyListener; private OpenflowProtocolListener messageListener; private SystemNotificationsListener systemListener; + private OutboundQueueManager outputManager; private boolean disconnectOccured = false; - private StatisticsCounters statisticsCounters; + private final StatisticsCounters statisticsCounters; + private OFVersionDetector versionDetector; + private final InetSocketAddress address; + + private final boolean useBarrier; /** * default ctor @@ -124,15 +130,20 @@ public class ConnectionAdapterImpl implements ConnectionFacade { * @param address client address (used only in case of UDP communication, * as there is no need to store address over tcp (stable channel)) */ - public ConnectionAdapterImpl(final Channel channel, final InetSocketAddress address) { + public ConnectionAdapterImpl(final Channel channel, final InetSocketAddress address, final boolean useBarrier) { + this.channel = Preconditions.checkNotNull(channel); + this.output = new ChannelOutboundQueue(channel, DEFAULT_QUEUE_DEPTH, address); + this.address = address; + responseCache = CacheBuilder.newBuilder() .concurrencyLevel(1) .expireAfterWrite(RPC_RESPONSE_EXPIRATION, TimeUnit.MINUTES) .removalListener(REMOVAL_LISTENER).build(); - this.channel = Preconditions.checkNotNull(channel); - this.output = new ChannelOutboundQueue(channel, DEFAULT_QUEUE_DEPTH, address); + + this.useBarrier = useBarrier; channel.pipeline().addLast(output); statisticsCounters = StatisticsCounters.getInstance(); + LOG.debug("ConnectionAdapter created"); } @@ -244,7 +255,7 @@ public class ConnectionAdapterImpl implements ConnectionFacade { @Override public Future disconnect() { - ChannelFuture disconnectResult = channel.disconnect(); + final ChannelFuture disconnectResult = channel.disconnect(); responseCache.invalidateAll(); disconnectOccured = true; @@ -263,7 +274,7 @@ public class ConnectionAdapterImpl implements ConnectionFacade { @Override public void consume(final DataObject message) { - LOG.debug("ConsumeIntern msg"); + LOG.debug("ConsumeIntern msg on {}", channel); if (disconnectOccured ) { return; } @@ -278,12 +289,22 @@ public class ConnectionAdapterImpl implements ConnectionFacade { systemListener.onSwitchIdleEvent((SwitchIdleEvent) message); // OpenFlow messages } else if (message instanceof EchoRequestMessage) { - messageListener.onEchoRequestMessage((EchoRequestMessage) message); + if (outputManager != null) { + outputManager.onEchoRequest((EchoRequestMessage) message); + } else { + messageListener.onEchoRequestMessage((EchoRequestMessage) message); + } statisticsCounters.incrementCounter(CounterEventTypes.US_MESSAGE_PASS); } else if (message instanceof ErrorMessage) { - messageListener.onErrorMessage((ErrorMessage) message); + // Send only unmatched errors + if (outputManager == null || !outputManager.onMessage((OfHeader) message)) { + messageListener.onErrorMessage((ErrorMessage) message); + } statisticsCounters.incrementCounter(CounterEventTypes.US_MESSAGE_PASS); } else if (message instanceof ExperimenterMessage) { + if (outputManager != null) { + outputManager.onMessage((OfHeader) message); + } messageListener.onExperimenterMessage((ExperimenterMessage) message); statisticsCounters.incrementCounter(CounterEventTypes.US_MESSAGE_PASS); } else if (message instanceof FlowRemovedMessage) { @@ -294,6 +315,9 @@ public class ConnectionAdapterImpl implements ConnectionFacade { messageListener.onHelloMessage((HelloMessage) message); statisticsCounters.incrementCounter(CounterEventTypes.US_MESSAGE_PASS); } else if (message instanceof MultipartReplyMessage) { + if (outputManager != null) { + outputManager.onMessage((OfHeader) message); + } messageListener.onMultipartReplyMessage((MultipartReplyMessage) message); statisticsCounters.incrementCounter(CounterEventTypes.US_MESSAGE_PASS); } else if (message instanceof PacketInMessage) { @@ -305,10 +329,11 @@ public class ConnectionAdapterImpl implements ConnectionFacade { } else { LOG.warn("message listening not supported for type: {}", message.getClass()); } - } else { - if (message instanceof OfHeader) { - LOG.debug("OFheader msg received"); - RpcResponseKey key = createRpcResponseKey((OfHeader) message); + } else if (message instanceof OfHeader) { + LOG.debug("OFheader msg received"); + + if (outputManager == null || !outputManager.onMessage((OfHeader) message)) { + final RpcResponseKey key = createRpcResponseKey((OfHeader) message); final ResponseExpectedRpcListener listener = findRpcResponse(key); if (listener != null) { LOG.debug("corresponding rpcFuture found"); @@ -319,9 +344,9 @@ public class ConnectionAdapterImpl implements ConnectionFacade { } else { LOG.warn("received unexpected rpc response: {}", key); } - } else { - LOG.warn("message listening not supported for type: {}", message.getClass()); } + } else { + LOG.warn("message listening not supported for type: {}", message.getClass()); } } @@ -443,6 +468,9 @@ public class ConnectionAdapterImpl implements ConnectionFacade { @Override public void fireConnectionReadyNotification() { + versionDetector = (OFVersionDetector) channel.pipeline().get(PipelineHandlers.OF_VERSION_DETECTOR.name()); + Preconditions.checkState(versionDetector != null); + new Thread(new Runnable() { @Override public void run() { @@ -466,7 +494,7 @@ public class ConnectionAdapterImpl implements ConnectionFacade { * Used only for testing purposes * @param cache */ - public void setResponseCache(Cache> cache) { + public void setResponseCache(final Cache> cache) { this.responseCache = cache; } @@ -474,9 +502,42 @@ public class ConnectionAdapterImpl implements ConnectionFacade { public boolean isAutoRead() { return channel.config().isAutoRead(); } - + @Override - public void setAutoRead(boolean autoRead) { + public void setAutoRead(final boolean autoRead) { channel.config().setAutoRead(autoRead); } + + @Override + public OutboundQueueHandlerRegistration registerOutboundQueueHandler( + final T handler, final int maxQueueDepth, final long maxBarrierNanos) { + Preconditions.checkState(outputManager == null, "Manager %s already registered", outputManager); + + if (useBarrier) { + + } + + final OutboundQueueManager ret = new OutboundQueueManager<>(this, address, handler, maxQueueDepth, maxBarrierNanos); + outputManager = ret; + channel.pipeline().addLast(outputManager); + + return new OutboundQueueHandlerRegistrationImpl(handler) { + @Override + protected void removeRegistration() { + outputManager.close(); + channel.pipeline().remove(outputManager); + outputManager = null; + } + }; + } + + Channel getChannel() { + return channel; + } + + @Override + public void setPacketInFiltering(final boolean enabled) { + versionDetector.setFilterPacketIns(enabled); + LOG.debug("PacketIn filtering {}abled", enabled ? "en" : "dis"); + } }