X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=openflow-protocol-impl%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fopenflowjava%2Fprotocol%2Fimpl%2Fcore%2Fconnection%2FConnectionAdapterImpl.java;h=91f4ceb1f9dd4854c95b4f2b34c359f16c019731;hb=ccb7e5bda0598185f98d52ddd16e49ae4d48e5aa;hp=e35c6064a9536defefd2507e247121e571943701;hpb=18eeba4cf7aa2cb2a3156ad0560ba44d03cd381e;p=openflowjava.git diff --git a/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/ConnectionAdapterImpl.java b/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/ConnectionAdapterImpl.java index e35c6064..91f4ceb1 100644 --- a/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/ConnectionAdapterImpl.java +++ b/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/ConnectionAdapterImpl.java @@ -9,16 +9,24 @@ package org.opendaylight.openflowjava.protocol.impl.core.connection; +import com.google.common.base.Preconditions; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.RemovalCause; +import com.google.common.cache.RemovalListener; +import com.google.common.cache.RemovalNotification; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.util.concurrent.GenericFutureListener; - import java.net.InetSocketAddress; import java.util.concurrent.Future; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; - import org.opendaylight.openflowjava.protocol.api.connection.ConnectionReadyListener; +import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueHandler; +import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueHandlerRegistration; import org.opendaylight.openflowjava.statistics.CounterEventTypes; import org.opendaylight.openflowjava.statistics.StatisticsCounters; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierInput; @@ -66,15 +74,6 @@ import org.opendaylight.yangtools.yang.common.RpcResult; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.base.Preconditions; -import com.google.common.cache.Cache; -import com.google.common.cache.CacheBuilder; -import com.google.common.cache.RemovalCause; -import com.google.common.cache.RemovalListener; -import com.google.common.cache.RemovalNotification; -import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.SettableFuture; - /** * Handles messages (notifications + rpcs) and connections * @author mirehak @@ -95,8 +94,6 @@ public class ConnectionAdapterImpl implements ConnectionFacade { private static final Exception QUEUE_FULL_EXCEPTION = new RejectedExecutionException("Output queue is full"); - private static final String APPLICATION_TAG = "OPENFLOW_LIBRARY"; - private static final String TAG = "OPENFLOW"; private static final RemovalListener> REMOVAL_LISTENER = new RemovalListener>() { @Override @@ -117,8 +114,10 @@ public class ConnectionAdapterImpl implements ConnectionFacade { private ConnectionReadyListener connectionReadyListener; private OpenflowProtocolListener messageListener; private SystemNotificationsListener systemListener; + private OutboundQueueManager outputManager; private boolean disconnectOccured = false; - private StatisticsCounters statisticsCounters; + private final StatisticsCounters statisticsCounters; + private final InetSocketAddress address; /** * default ctor @@ -133,6 +132,7 @@ public class ConnectionAdapterImpl implements ConnectionFacade { .removalListener(REMOVAL_LISTENER).build(); this.channel = Preconditions.checkNotNull(channel); this.output = new ChannelOutboundQueue(channel, DEFAULT_QUEUE_DEPTH, address); + this.address = address; channel.pipeline().addLast(output); statisticsCounters = StatisticsCounters.getInstance(); LOG.debug("ConnectionAdapter created"); @@ -162,6 +162,7 @@ public class ConnectionAdapterImpl implements ConnectionFacade { @Override public Future> flowMod(final FlowModInput input) { + statisticsCounters.incrementCounter(CounterEventTypes.DS_FLOW_MODS_ENTERED); return sendToSwitchFuture(input, "flow-mod sending failed"); } @@ -264,7 +265,7 @@ public class ConnectionAdapterImpl implements ConnectionFacade { @Override public void consume(final DataObject message) { - LOG.debug("ConsumeIntern msg"); + LOG.debug("ConsumeIntern msg on {}", channel); if (disconnectOccured ) { return; } @@ -282,9 +283,15 @@ public class ConnectionAdapterImpl implements ConnectionFacade { messageListener.onEchoRequestMessage((EchoRequestMessage) message); statisticsCounters.incrementCounter(CounterEventTypes.US_MESSAGE_PASS); } else if (message instanceof ErrorMessage) { - messageListener.onErrorMessage((ErrorMessage) message); + // Send only unmatched errors + if (outputManager == null || !outputManager.onMessage((OfHeader) message)) { + messageListener.onErrorMessage((ErrorMessage) message); + } statisticsCounters.incrementCounter(CounterEventTypes.US_MESSAGE_PASS); } else if (message instanceof ExperimenterMessage) { + if (outputManager != null) { + outputManager.onMessage((OfHeader) message); + } messageListener.onExperimenterMessage((ExperimenterMessage) message); statisticsCounters.incrementCounter(CounterEventTypes.US_MESSAGE_PASS); } else if (message instanceof FlowRemovedMessage) { @@ -295,6 +302,9 @@ public class ConnectionAdapterImpl implements ConnectionFacade { messageListener.onHelloMessage((HelloMessage) message); statisticsCounters.incrementCounter(CounterEventTypes.US_MESSAGE_PASS); } else if (message instanceof MultipartReplyMessage) { + if (outputManager != null) { + outputManager.onMessage((OfHeader) message); + } messageListener.onMultipartReplyMessage((MultipartReplyMessage) message); statisticsCounters.incrementCounter(CounterEventTypes.US_MESSAGE_PASS); } else if (message instanceof PacketInMessage) { @@ -306,9 +316,10 @@ public class ConnectionAdapterImpl implements ConnectionFacade { } else { LOG.warn("message listening not supported for type: {}", message.getClass()); } - } else { - if (message instanceof OfHeader) { - LOG.debug("OFheader msg received"); + } else if (message instanceof OfHeader) { + LOG.debug("OFheader msg received"); + + if (outputManager == null || !outputManager.onMessage((OfHeader) message)) { RpcResponseKey key = createRpcResponseKey((OfHeader) message); final ResponseExpectedRpcListener listener = findRpcResponse(key); if (listener != null) { @@ -320,9 +331,9 @@ public class ConnectionAdapterImpl implements ConnectionFacade { } else { LOG.warn("received unexpected rpc response: {}", key); } - } else { - LOG.warn("message listening not supported for type: {}", message.getClass()); } + } else { + LOG.warn("message listening not supported for type: {}", message.getClass()); } } @@ -467,7 +478,40 @@ public class ConnectionAdapterImpl implements ConnectionFacade { * Used only for testing purposes * @param cache */ - public void setResponseCache(Cache> cache) { + public void setResponseCache(final Cache> cache) { this.responseCache = cache; } + + @Override + public boolean isAutoRead() { + return channel.config().isAutoRead(); + } + + @Override + public void setAutoRead(final boolean autoRead) { + channel.config().setAutoRead(autoRead); + } + + @Override + public OutboundQueueHandlerRegistration registerOutboundQueueHandler( + final T handler, final int maxQueueDepth, final long maxBarrierNanos) { + Preconditions.checkState(outputManager == null, "Manager %s already registered", outputManager); + + final OutboundQueueManager ret = new OutboundQueueManager<>(this, address, handler, maxQueueDepth, maxBarrierNanos); + outputManager = ret; + channel.pipeline().addLast(outputManager); + + return new OutboundQueueHandlerRegistrationImpl(handler) { + @Override + protected void removeRegistration() { + outputManager.close(); + channel.pipeline().remove(outputManager); + outputManager = null; + } + }; + } + + Channel getChannel() { + return channel; + } }