package org.opendaylight.openflowjava.protocol.impl.core.connection;
+import com.google.common.base.Preconditions;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalCause;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.util.concurrent.GenericFutureListener;
-
import java.net.InetSocketAddress;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
-
import org.opendaylight.openflowjava.protocol.api.connection.ConnectionReadyListener;
+import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueHandler;
+import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueHandlerRegistration;
+import org.opendaylight.openflowjava.protocol.impl.core.OFVersionDetector;
+import org.opendaylight.openflowjava.protocol.impl.core.PipelineHandlers;
import org.opendaylight.openflowjava.statistics.CounterEventTypes;
import org.opendaylight.openflowjava.statistics.StatisticsCounters;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierInput;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.base.Preconditions;
-import com.google.common.cache.Cache;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.RemovalCause;
-import com.google.common.cache.RemovalListener;
-import com.google.common.cache.RemovalNotification;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.SettableFuture;
-
/**
* Handles messages (notifications + rpcs) and connections
* @author mirehak
private static final Exception QUEUE_FULL_EXCEPTION =
new RejectedExecutionException("Output queue is full");
- private static final String APPLICATION_TAG = "OPENFLOW_LIBRARY";
- private static final String TAG = "OPENFLOW";
private static final RemovalListener<RpcResponseKey, ResponseExpectedRpcListener<?>> REMOVAL_LISTENER =
new RemovalListener<RpcResponseKey, ResponseExpectedRpcListener<?>>() {
@Override
private ConnectionReadyListener connectionReadyListener;
private OpenflowProtocolListener messageListener;
private SystemNotificationsListener systemListener;
+ private OutboundQueueManager<?> outputManager;
private boolean disconnectOccured = false;
- private StatisticsCounters statisticsCounters;
+ private final StatisticsCounters statisticsCounters;
+ private OFVersionDetector versionDetector;
+ private final InetSocketAddress address;
+
+ private final boolean useBarrier;
/**
* default ctor
* @param address client address (used only in case of UDP communication,
* as there is no need to store address over tcp (stable channel))
*/
- public ConnectionAdapterImpl(final Channel channel, final InetSocketAddress address) {
+ public ConnectionAdapterImpl(final Channel channel, final InetSocketAddress address, final boolean useBarrier) {
+ this.channel = Preconditions.checkNotNull(channel);
+ this.output = new ChannelOutboundQueue(channel, DEFAULT_QUEUE_DEPTH, address);
+ this.address = address;
+
responseCache = CacheBuilder.newBuilder()
.concurrencyLevel(1)
.expireAfterWrite(RPC_RESPONSE_EXPIRATION, TimeUnit.MINUTES)
.removalListener(REMOVAL_LISTENER).build();
- this.channel = Preconditions.checkNotNull(channel);
- this.output = new ChannelOutboundQueue(channel, DEFAULT_QUEUE_DEPTH, address);
+
+ this.useBarrier = useBarrier;
channel.pipeline().addLast(output);
statisticsCounters = StatisticsCounters.getInstance();
+
LOG.debug("ConnectionAdapter created");
}
@Override
public Future<RpcResult<Void>> flowMod(final FlowModInput input) {
+ statisticsCounters.incrementCounter(CounterEventTypes.DS_FLOW_MODS_ENTERED);
return sendToSwitchFuture(input, "flow-mod sending failed");
}
@Override
public Future<Boolean> disconnect() {
- ChannelFuture disconnectResult = channel.disconnect();
+ final ChannelFuture disconnectResult = channel.disconnect();
responseCache.invalidateAll();
disconnectOccured = true;
@Override
public void consume(final DataObject message) {
- LOG.debug("ConsumeIntern msg");
+ LOG.debug("ConsumeIntern msg on {}", channel);
if (disconnectOccured ) {
return;
}
systemListener.onSwitchIdleEvent((SwitchIdleEvent) message);
// OpenFlow messages
} else if (message instanceof EchoRequestMessage) {
- messageListener.onEchoRequestMessage((EchoRequestMessage) message);
+ if (outputManager != null) {
+ outputManager.onEchoRequest((EchoRequestMessage) message);
+ } else {
+ messageListener.onEchoRequestMessage((EchoRequestMessage) message);
+ }
statisticsCounters.incrementCounter(CounterEventTypes.US_MESSAGE_PASS);
} else if (message instanceof ErrorMessage) {
- messageListener.onErrorMessage((ErrorMessage) message);
+ // Send only unmatched errors
+ if (outputManager == null || !outputManager.onMessage((OfHeader) message)) {
+ messageListener.onErrorMessage((ErrorMessage) message);
+ }
statisticsCounters.incrementCounter(CounterEventTypes.US_MESSAGE_PASS);
} else if (message instanceof ExperimenterMessage) {
+ if (outputManager != null) {
+ outputManager.onMessage((OfHeader) message);
+ }
messageListener.onExperimenterMessage((ExperimenterMessage) message);
statisticsCounters.incrementCounter(CounterEventTypes.US_MESSAGE_PASS);
} else if (message instanceof FlowRemovedMessage) {
messageListener.onHelloMessage((HelloMessage) message);
statisticsCounters.incrementCounter(CounterEventTypes.US_MESSAGE_PASS);
} else if (message instanceof MultipartReplyMessage) {
+ if (outputManager != null) {
+ outputManager.onMessage((OfHeader) message);
+ }
messageListener.onMultipartReplyMessage((MultipartReplyMessage) message);
statisticsCounters.incrementCounter(CounterEventTypes.US_MESSAGE_PASS);
} else if (message instanceof PacketInMessage) {
} else {
LOG.warn("message listening not supported for type: {}", message.getClass());
}
- } else {
- if (message instanceof OfHeader) {
- LOG.debug("OFheader msg received");
- RpcResponseKey key = createRpcResponseKey((OfHeader) message);
+ } else if (message instanceof OfHeader) {
+ LOG.debug("OFheader msg received");
+
+ if (outputManager == null || !outputManager.onMessage((OfHeader) message)) {
+ final RpcResponseKey key = createRpcResponseKey((OfHeader) message);
final ResponseExpectedRpcListener<?> listener = findRpcResponse(key);
if (listener != null) {
LOG.debug("corresponding rpcFuture found");
} else {
LOG.warn("received unexpected rpc response: {}", key);
}
- } else {
- LOG.warn("message listening not supported for type: {}", message.getClass());
}
+ } else {
+ LOG.warn("message listening not supported for type: {}", message.getClass());
}
}
@Override
public void fireConnectionReadyNotification() {
+ versionDetector = (OFVersionDetector) channel.pipeline().get(PipelineHandlers.OF_VERSION_DETECTOR.name());
+ Preconditions.checkState(versionDetector != null);
+
new Thread(new Runnable() {
@Override
public void run() {
* Used only for testing purposes
* @param cache
*/
- public void setResponseCache(Cache<RpcResponseKey, ResponseExpectedRpcListener<?>> cache) {
+ public void setResponseCache(final Cache<RpcResponseKey, ResponseExpectedRpcListener<?>> cache) {
this.responseCache = cache;
}
+
+ @Override
+ public boolean isAutoRead() {
+ return channel.config().isAutoRead();
+ }
+
+ @Override
+ public void setAutoRead(final boolean autoRead) {
+ channel.config().setAutoRead(autoRead);
+ }
+
+ @Override
+ public <T extends OutboundQueueHandler> OutboundQueueHandlerRegistration<T> registerOutboundQueueHandler(
+ final T handler, final int maxQueueDepth, final long maxBarrierNanos) {
+ Preconditions.checkState(outputManager == null, "Manager %s already registered", outputManager);
+
+ if (useBarrier) {
+
+ }
+
+ final OutboundQueueManager<T> ret = new OutboundQueueManager<>(this, address, handler, maxQueueDepth, maxBarrierNanos);
+ outputManager = ret;
+ channel.pipeline().addLast(outputManager);
+
+ return new OutboundQueueHandlerRegistrationImpl<T>(handler) {
+ @Override
+ protected void removeRegistration() {
+ outputManager.close();
+ channel.pipeline().remove(outputManager);
+ outputManager = null;
+ }
+ };
+ }
+
+ Channel getChannel() {
+ return channel;
+ }
+
+ @Override
+ public void setPacketInFiltering(final boolean enabled) {
+ versionDetector.setFilterPacketIns(enabled);
+ LOG.debug("PacketIn filtering {}abled", enabled ? "en" : "dis");
+ }
}