import org.opendaylight.openflowjava.protocol.api.connection.ConnectionReadyListener;
import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueHandler;
import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueHandlerRegistration;
+import org.opendaylight.openflowjava.protocol.impl.core.OFVersionDetector;
+import org.opendaylight.openflowjava.protocol.impl.core.PipelineHandlers;
import org.opendaylight.openflowjava.statistics.CounterEventTypes;
import org.opendaylight.openflowjava.statistics.StatisticsCounters;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierInput;
private OutboundQueueManager<?> outputManager;
private boolean disconnectOccured = false;
private final StatisticsCounters statisticsCounters;
+ private OFVersionDetector versionDetector;
private final InetSocketAddress address;
+ private final boolean useBarrier;
+
/**
* default ctor
* @param channel the channel to be set - used for communication
* @param address client address (used only in case of UDP communication,
* as there is no need to store address over tcp (stable channel))
*/
- public ConnectionAdapterImpl(final Channel channel, final InetSocketAddress address) {
+ public ConnectionAdapterImpl(final Channel channel, final InetSocketAddress address, final boolean useBarrier) {
+ this.channel = Preconditions.checkNotNull(channel);
+ this.output = new ChannelOutboundQueue(channel, DEFAULT_QUEUE_DEPTH, address);
+ this.address = address;
+
responseCache = CacheBuilder.newBuilder()
.concurrencyLevel(1)
.expireAfterWrite(RPC_RESPONSE_EXPIRATION, TimeUnit.MINUTES)
.removalListener(REMOVAL_LISTENER).build();
- this.channel = Preconditions.checkNotNull(channel);
- this.output = new ChannelOutboundQueue(channel, DEFAULT_QUEUE_DEPTH, address);
- this.address = address;
+
+ this.useBarrier = useBarrier;
channel.pipeline().addLast(output);
statisticsCounters = StatisticsCounters.getInstance();
+
LOG.debug("ConnectionAdapter created");
}
@Override
public Future<Boolean> disconnect() {
- ChannelFuture disconnectResult = channel.disconnect();
+ final ChannelFuture disconnectResult = channel.disconnect();
responseCache.invalidateAll();
disconnectOccured = true;
LOG.debug("OFheader msg received");
if (outputManager == null || !outputManager.onMessage((OfHeader) message)) {
- RpcResponseKey key = createRpcResponseKey((OfHeader) message);
+ final RpcResponseKey key = createRpcResponseKey((OfHeader) message);
final ResponseExpectedRpcListener<?> listener = findRpcResponse(key);
if (listener != null) {
LOG.debug("corresponding rpcFuture found");
@Override
public void fireConnectionReadyNotification() {
+ versionDetector = (OFVersionDetector) channel.pipeline().get(PipelineHandlers.OF_VERSION_DETECTOR.name());
+ Preconditions.checkState(versionDetector != null);
+
new Thread(new Runnable() {
@Override
public void run() {
final T handler, final int maxQueueDepth, final long maxBarrierNanos) {
Preconditions.checkState(outputManager == null, "Manager %s already registered", outputManager);
+ if (useBarrier) {
+
+ }
+
final OutboundQueueManager<T> ret = new OutboundQueueManager<>(this, address, handler, maxQueueDepth, maxBarrierNanos);
outputManager = ret;
channel.pipeline().addLast(outputManager);
Channel getChannel() {
return channel;
}
+
+ @Override
+ public void setPacketInFiltering(final boolean enabled) {
+ versionDetector.setFilterPacketIns(enabled);
+ LOG.debug("PacketIn filtering {}abled", enabled ? "en" : "dis");
+ }
}