X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=openflow-protocol-impl%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fopenflowjava%2Fprotocol%2Fimpl%2Fcore%2Fconnection%2FConnectionAdapterImpl.java;h=91f4ceb1f9dd4854c95b4f2b34c359f16c019731;hb=ab55df73c5fc08aa365af5ed9c1b0b0395b9211e;hp=d81363c7464031cf9c0fb6f39f64860f82428264;hpb=f04d245dfbee2cc3e69db0ca17a5a819c5ef5b51;p=openflowjava.git diff --git a/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/ConnectionAdapterImpl.java b/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/ConnectionAdapterImpl.java index d81363c7..91f4ceb1 100644 --- a/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/ConnectionAdapterImpl.java +++ b/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/ConnectionAdapterImpl.java @@ -9,16 +9,24 @@ package org.opendaylight.openflowjava.protocol.impl.core.connection; +import com.google.common.base.Preconditions; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.RemovalCause; +import com.google.common.cache.RemovalListener; +import com.google.common.cache.RemovalNotification; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.util.concurrent.GenericFutureListener; - import java.net.InetSocketAddress; import java.util.concurrent.Future; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; - import org.opendaylight.openflowjava.protocol.api.connection.ConnectionReadyListener; +import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueHandler; +import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueHandlerRegistration; import org.opendaylight.openflowjava.statistics.CounterEventTypes; import org.opendaylight.openflowjava.statistics.StatisticsCounters; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierInput; @@ -66,15 +74,6 @@ import org.opendaylight.yangtools.yang.common.RpcResult; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.base.Preconditions; -import com.google.common.cache.Cache; -import com.google.common.cache.CacheBuilder; -import com.google.common.cache.RemovalCause; -import com.google.common.cache.RemovalListener; -import com.google.common.cache.RemovalNotification; -import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.SettableFuture; - /** * Handles messages (notifications + rpcs) and connections * @author mirehak @@ -115,8 +114,10 @@ public class ConnectionAdapterImpl implements ConnectionFacade { private ConnectionReadyListener connectionReadyListener; private OpenflowProtocolListener messageListener; private SystemNotificationsListener systemListener; + private OutboundQueueManager outputManager; private boolean disconnectOccured = false; - private StatisticsCounters statisticsCounters; + private final StatisticsCounters statisticsCounters; + private final InetSocketAddress address; /** * default ctor @@ -131,6 +132,7 @@ public class ConnectionAdapterImpl implements ConnectionFacade { .removalListener(REMOVAL_LISTENER).build(); this.channel = Preconditions.checkNotNull(channel); this.output = new ChannelOutboundQueue(channel, DEFAULT_QUEUE_DEPTH, address); + this.address = address; channel.pipeline().addLast(output); statisticsCounters = StatisticsCounters.getInstance(); LOG.debug("ConnectionAdapter created"); @@ -263,7 +265,7 @@ public class ConnectionAdapterImpl implements ConnectionFacade { @Override public void consume(final DataObject message) { - LOG.debug("ConsumeIntern msg"); + LOG.debug("ConsumeIntern msg on {}", channel); if (disconnectOccured ) { return; } @@ -281,9 +283,15 @@ public class ConnectionAdapterImpl implements ConnectionFacade { messageListener.onEchoRequestMessage((EchoRequestMessage) message); statisticsCounters.incrementCounter(CounterEventTypes.US_MESSAGE_PASS); } else if (message instanceof ErrorMessage) { - messageListener.onErrorMessage((ErrorMessage) message); + // Send only unmatched errors + if (outputManager == null || !outputManager.onMessage((OfHeader) message)) { + messageListener.onErrorMessage((ErrorMessage) message); + } statisticsCounters.incrementCounter(CounterEventTypes.US_MESSAGE_PASS); } else if (message instanceof ExperimenterMessage) { + if (outputManager != null) { + outputManager.onMessage((OfHeader) message); + } messageListener.onExperimenterMessage((ExperimenterMessage) message); statisticsCounters.incrementCounter(CounterEventTypes.US_MESSAGE_PASS); } else if (message instanceof FlowRemovedMessage) { @@ -294,6 +302,9 @@ public class ConnectionAdapterImpl implements ConnectionFacade { messageListener.onHelloMessage((HelloMessage) message); statisticsCounters.incrementCounter(CounterEventTypes.US_MESSAGE_PASS); } else if (message instanceof MultipartReplyMessage) { + if (outputManager != null) { + outputManager.onMessage((OfHeader) message); + } messageListener.onMultipartReplyMessage((MultipartReplyMessage) message); statisticsCounters.incrementCounter(CounterEventTypes.US_MESSAGE_PASS); } else if (message instanceof PacketInMessage) { @@ -305,9 +316,10 @@ public class ConnectionAdapterImpl implements ConnectionFacade { } else { LOG.warn("message listening not supported for type: {}", message.getClass()); } - } else { - if (message instanceof OfHeader) { - LOG.debug("OFheader msg received"); + } else if (message instanceof OfHeader) { + LOG.debug("OFheader msg received"); + + if (outputManager == null || !outputManager.onMessage((OfHeader) message)) { RpcResponseKey key = createRpcResponseKey((OfHeader) message); final ResponseExpectedRpcListener listener = findRpcResponse(key); if (listener != null) { @@ -319,9 +331,9 @@ public class ConnectionAdapterImpl implements ConnectionFacade { } else { LOG.warn("received unexpected rpc response: {}", key); } - } else { - LOG.warn("message listening not supported for type: {}", message.getClass()); } + } else { + LOG.warn("message listening not supported for type: {}", message.getClass()); } } @@ -466,7 +478,7 @@ public class ConnectionAdapterImpl implements ConnectionFacade { * Used only for testing purposes * @param cache */ - public void setResponseCache(Cache> cache) { + public void setResponseCache(final Cache> cache) { this.responseCache = cache; } @@ -474,9 +486,32 @@ public class ConnectionAdapterImpl implements ConnectionFacade { public boolean isAutoRead() { return channel.config().isAutoRead(); } - + @Override - public void setAutoRead(boolean autoRead) { + public void setAutoRead(final boolean autoRead) { channel.config().setAutoRead(autoRead); } + + @Override + public OutboundQueueHandlerRegistration registerOutboundQueueHandler( + final T handler, final int maxQueueDepth, final long maxBarrierNanos) { + Preconditions.checkState(outputManager == null, "Manager %s already registered", outputManager); + + final OutboundQueueManager ret = new OutboundQueueManager<>(this, address, handler, maxQueueDepth, maxBarrierNanos); + outputManager = ret; + channel.pipeline().addLast(outputManager); + + return new OutboundQueueHandlerRegistrationImpl(handler) { + @Override + protected void removeRegistration() { + outputManager.close(); + channel.pipeline().remove(outputManager); + outputManager = null; + } + }; + } + + Channel getChannel() { + return channel; + } }