BUG-3219: Introduce OutboundQueueHandler and related interfaces 80/20080/25
authorRobert Varga <rovarga@cisco.com>
Mon, 11 May 2015 19:18:29 +0000 (21:18 +0200)
committerRobert Varga <rovarga@cisco.com>
Fri, 15 May 2015 11:50:56 +0000 (13:50 +0200)
This patch introduces a barrier-enabled outbound queue. The queue works
in reserve/commit manner and notifies users with the result of the
requests. XID allocation occurs as an offset in a particular queue.

Change-Id: Icd3ceda34746be0346ac59b8ed46352db3be9a2f
Signed-off-by: Robert Varga <rovarga@cisco.com>
openflow-protocol-api/src/main/java/org/opendaylight/openflowjava/protocol/api/connection/ConnectionAdapter.java
openflow-protocol-api/src/main/java/org/opendaylight/openflowjava/protocol/api/connection/OutboundQueue.java [new file with mode: 0644]
openflow-protocol-api/src/main/java/org/opendaylight/openflowjava/protocol/api/connection/OutboundQueueHandler.java [new file with mode: 0644]
openflow-protocol-api/src/main/java/org/opendaylight/openflowjava/protocol/api/connection/OutboundQueueHandlerRegistration.java [new file with mode: 0644]
openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/ConnectionAdapterImpl.java
openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/OutboundQueueEntry.java [new file with mode: 0644]
openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/OutboundQueueHandlerRegistrationImpl.java [new file with mode: 0644]
openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/OutboundQueueImpl.java [new file with mode: 0644]
openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/OutboundQueueManager.java [new file with mode: 0644]

index a205e5e66b58c71235874274ea2fcd6efaf491b5..02679f05c7db8a2d3fc4027b8d146e98cc004e37 100644 (file)
@@ -5,13 +5,11 @@
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
  * and is available at http://www.eclipse.org/legal/epl-v10.html
  */
-
-
 package org.opendaylight.openflowjava.protocol.api.connection;
 
+import com.google.common.annotations.Beta;
 import java.net.InetSocketAddress;
 import java.util.concurrent.Future;
-
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OpenflowProtocolListener;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OpenflowProtocolService;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.system.rev130927.SystemNotificationsListener;
@@ -73,4 +71,15 @@ public interface ConnectionAdapter extends OpenflowProtocolService {
      * @return true, if channel is configured to autoread
      */
     boolean isAutoRead();
+
+    /**
+     * Registers a new bypass outbound queue
+     * @param handler
+     * @param maxQueueDepth
+     * @param maxBarrierNanos
+     * @return An {@link OutboundQueueHandlerRegistration}
+     */
+    @Beta
+    <T extends OutboundQueueHandler> OutboundQueueHandlerRegistration<T> registerOutboundQueueHandler(T handler,
+        int maxQueueDepth, long maxBarrierNanos);
 }
diff --git a/openflow-protocol-api/src/main/java/org/opendaylight/openflowjava/protocol/api/connection/OutboundQueue.java b/openflow-protocol-api/src/main/java/org/opendaylight/openflowjava/protocol/api/connection/OutboundQueue.java
new file mode 100644 (file)
index 0000000..1291c22
--- /dev/null
@@ -0,0 +1,40 @@
+/*
+ * Copyright (c) 2015 Pantheon Technologies s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.openflowjava.protocol.api.connection;
+
+import com.google.common.annotations.Beta;
+import com.google.common.util.concurrent.FutureCallback;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
+
+@Beta
+public interface OutboundQueue {
+    /**
+     * Reserve an entry in the outbound queue.
+     * @return XID for the new message, or null if the queue is full
+     */
+    Long reserveEntry();
+
+    /**
+     * Commit the specified offset using a message. Specified callback will
+     * be invoked once we know how it has resolved, either with a normal response,
+     * implied completion via a barrier, or failure (such as connection drop). For
+     * multipart responses, {@link FutureCallback#onSuccess(Object)} will be invoked
+     * multiple times as the corresponding responses arrive. If the request is completed
+     * with a response, the object reported will be non-null. If the request's completion
+     * is implied by a barrier, the object reported will be null.
+     *
+     * @param xid Previously-reserved XID
+     * @param message Message which should be sent out, or null if the reservation
+     *                should be cancelled.
+     * @param callback Callback to be invoked, or null if no callback should be invoked.
+     * @throws IllegalArgumentException if the slot is already committed or was never reserved.
+     */
+    void commitEntry(@Nonnull Long xid, @Nullable OfHeader message, @Nullable FutureCallback<OfHeader> callback);
+}
diff --git a/openflow-protocol-api/src/main/java/org/opendaylight/openflowjava/protocol/api/connection/OutboundQueueHandler.java b/openflow-protocol-api/src/main/java/org/opendaylight/openflowjava/protocol/api/connection/OutboundQueueHandler.java
new file mode 100644 (file)
index 0000000..0800798
--- /dev/null
@@ -0,0 +1,37 @@
+/*
+ * Copyright (c) 2015 Pantheon Technologies s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.openflowjava.protocol.api.connection;
+
+import com.google.common.annotations.Beta;
+import javax.annotation.Nonnull;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierInput;
+
+/**
+ * Handler of the outbound queue. The queue has a maximum depth assigned when the
+ * handler is registered.
+ */
+@Beta
+public interface OutboundQueueHandler {
+    /**
+     * Create a new {@link BarrierInput barrier} message. This callback is invoked
+     * when the queue is being flushed to the switch. The barrier ensures that any
+     * outstanding requests are detected as either completed or failed.
+     *
+     * @param xid XID for the barrier message
+     * @return New barrier message.
+     */
+    @Nonnull BarrierInput createBarrierRequest(@Nonnull Long xid);
+
+    /**
+     * Invoked whenever the underlying queue is refreshed. Implementations should
+     * ensure they are talking to the latest queue
+     * @param queue New queue instance, null indicates a shutdown, e.g. the queue
+     *              is no longer available.
+     */
+    void onConnectionQueueChanged(OutboundQueue queue);
+}
diff --git a/openflow-protocol-api/src/main/java/org/opendaylight/openflowjava/protocol/api/connection/OutboundQueueHandlerRegistration.java b/openflow-protocol-api/src/main/java/org/opendaylight/openflowjava/protocol/api/connection/OutboundQueueHandlerRegistration.java
new file mode 100644 (file)
index 0000000..d31d1ab
--- /dev/null
@@ -0,0 +1,23 @@
+/*
+ * Copyright (c) 2015 Pantheon Technologies s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.openflowjava.protocol.api.connection;
+
+import com.google.common.annotations.Beta;
+import org.opendaylight.yangtools.concepts.ObjectRegistration;
+
+/**
+ * An {@link ObjectRegistration} of a {@link OutboundQueueHandler}. Registration can be cancelled
+ * by invoking {@link #close()}.
+ *
+ * @param <T> Handler type
+ */
+@Beta
+public interface OutboundQueueHandlerRegistration<T extends OutboundQueueHandler> extends ObjectRegistration<T> {
+    @Override
+    void close();
+}
index a38e1700de39e83c188ea179a5c32fa61c5931d4..20e4c2a83c62b2be22deb1f4d9f52d59f9ded1db 100644 (file)
@@ -9,16 +9,24 @@
 
 package org.opendaylight.openflowjava.protocol.impl.core.connection;
 
+import com.google.common.base.Preconditions;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalCause;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
 import io.netty.util.concurrent.GenericFutureListener;
-
 import java.net.InetSocketAddress;
 import java.util.concurrent.Future;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.TimeUnit;
-
 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionReadyListener;
+import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueHandler;
+import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueHandlerRegistration;
 import org.opendaylight.openflowjava.statistics.CounterEventTypes;
 import org.opendaylight.openflowjava.statistics.StatisticsCounters;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierInput;
@@ -66,15 +74,6 @@ import org.opendaylight.yangtools.yang.common.RpcResult;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.base.Preconditions;
-import com.google.common.cache.Cache;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.RemovalCause;
-import com.google.common.cache.RemovalListener;
-import com.google.common.cache.RemovalNotification;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.SettableFuture;
-
 /**
  * Handles messages (notifications + rpcs) and connections
  * @author mirehak
@@ -115,8 +114,10 @@ public class ConnectionAdapterImpl implements ConnectionFacade {
     private ConnectionReadyListener connectionReadyListener;
     private OpenflowProtocolListener messageListener;
     private SystemNotificationsListener systemListener;
+    private OutboundQueueManager<?> outputManager;
     private boolean disconnectOccured = false;
-    private StatisticsCounters statisticsCounters;
+    private final StatisticsCounters statisticsCounters;
+    private final InetSocketAddress address;
 
     /**
      * default ctor
@@ -131,6 +132,7 @@ public class ConnectionAdapterImpl implements ConnectionFacade {
                 .removalListener(REMOVAL_LISTENER).build();
         this.channel = Preconditions.checkNotNull(channel);
         this.output = new ChannelOutboundQueue(channel, DEFAULT_QUEUE_DEPTH, address);
+        this.address = address;
         channel.pipeline().addLast(output);
         statisticsCounters = StatisticsCounters.getInstance();
         LOG.debug("ConnectionAdapter created");
@@ -285,6 +287,9 @@ public class ConnectionAdapterImpl implements ConnectionFacade {
                 statisticsCounters.incrementCounter(CounterEventTypes.US_MESSAGE_PASS);
             } else if (message instanceof ExperimenterMessage) {
                 messageListener.onExperimenterMessage((ExperimenterMessage) message);
+                if (outputManager != null) {
+                    outputManager.onMessage((OfHeader) message);
+                }
                 statisticsCounters.incrementCounter(CounterEventTypes.US_MESSAGE_PASS);
             } else if (message instanceof FlowRemovedMessage) {
                 messageListener.onFlowRemovedMessage((FlowRemovedMessage) message);
@@ -295,6 +300,9 @@ public class ConnectionAdapterImpl implements ConnectionFacade {
                 statisticsCounters.incrementCounter(CounterEventTypes.US_MESSAGE_PASS);
             } else if (message instanceof MultipartReplyMessage) {
                 messageListener.onMultipartReplyMessage((MultipartReplyMessage) message);
+                if (outputManager != null) {
+                    outputManager.onMessage((OfHeader) message);
+                }
                 statisticsCounters.incrementCounter(CounterEventTypes.US_MESSAGE_PASS);
             } else if (message instanceof PacketInMessage) {
                 messageListener.onPacketInMessage((PacketInMessage) message);
@@ -305,9 +313,10 @@ public class ConnectionAdapterImpl implements ConnectionFacade {
             } else {
                 LOG.warn("message listening not supported for type: {}", message.getClass());
             }
-        } else {
-            if (message instanceof OfHeader) {
-                LOG.debug("OFheader msg received");
+        } else if (message instanceof OfHeader) {
+            LOG.debug("OFheader msg received");
+
+            if (outputManager == null || !outputManager.onMessage((OfHeader) message)) {
                 RpcResponseKey key = createRpcResponseKey((OfHeader) message);
                 final ResponseExpectedRpcListener<?> listener = findRpcResponse(key);
                 if (listener != null) {
@@ -319,9 +328,9 @@ public class ConnectionAdapterImpl implements ConnectionFacade {
                 } else {
                     LOG.warn("received unexpected rpc response: {}", key);
                 }
-            } else {
-                LOG.warn("message listening not supported for type: {}", message.getClass());
             }
+        } else {
+            LOG.warn("message listening not supported for type: {}", message.getClass());
         }
     }
 
@@ -466,7 +475,7 @@ public class ConnectionAdapterImpl implements ConnectionFacade {
      * Used only for testing purposes
      * @param cache
      */
-    public void setResponseCache(Cache<RpcResponseKey, ResponseExpectedRpcListener<?>> cache) {
+    public void setResponseCache(final Cache<RpcResponseKey, ResponseExpectedRpcListener<?>> cache) {
         this.responseCache = cache;
     }
 
@@ -476,7 +485,30 @@ public class ConnectionAdapterImpl implements ConnectionFacade {
     }
 
        @Override
-    public void setAutoRead(boolean autoRead) {
+    public void setAutoRead(final boolean autoRead) {
        channel.config().setAutoRead(autoRead);
     }
+
+    @Override
+    public <T extends OutboundQueueHandler> OutboundQueueHandlerRegistration<T> registerOutboundQueueHandler(
+            final T handler, final int maxQueueDepth, final long maxBarrierNanos) {
+        Preconditions.checkState(outputManager == null, "Manager %s already registered", outputManager);
+
+        final OutboundQueueManager<T> ret = new OutboundQueueManager<>(this, address, handler, maxQueueDepth, maxBarrierNanos);
+        outputManager = ret;
+        channel.pipeline().addLast(outputManager);
+
+        return new OutboundQueueHandlerRegistrationImpl<T>(handler) {
+            @Override
+            protected void removeRegistration() {
+                outputManager.close();
+                channel.pipeline().remove(outputManager);
+                outputManager = null;
+            }
+        };
+    }
+
+    Channel getChannel() {
+        return channel;
+    }
 }
diff --git a/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/OutboundQueueEntry.java b/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/OutboundQueueEntry.java
new file mode 100644 (file)
index 0000000..c0c2f76
--- /dev/null
@@ -0,0 +1,90 @@
+/*
+ * Copyright (c) 2015 Pantheon Technologies s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.openflowjava.protocol.impl.core.connection;
+
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.FutureCallback;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartReplyMessage;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+final class OutboundQueueEntry {
+    private static final Logger LOG = LoggerFactory.getLogger(OutboundQueueEntry.class);
+    private FutureCallback<OfHeader> callback;
+    private OfHeader message;
+    private boolean completed;
+    private volatile boolean committed;
+
+    void commit(final OfHeader message, final FutureCallback<OfHeader> callback) {
+        this.message = message;
+        this.callback = callback;
+
+        // Volatile write, needs to be last
+        committed = true;
+    }
+
+    void reset() {
+        callback = null;
+        message = null;
+        completed = false;
+
+        // Volatile write, needs to be last
+        committed = false;
+    }
+
+    boolean isBarrier() {
+        return message instanceof BarrierInput;
+    }
+
+    boolean isCommitted() {
+        return committed;
+    }
+
+    boolean isCompleted() {
+        return completed;
+    }
+
+    OfHeader getMessage() {
+        return message;
+    }
+
+    boolean complete(final OfHeader response) {
+        Preconditions.checkState(!completed, "Attempted to complete a completed message %s with response %s", message, response);
+
+        // Multipart requests are special, we have to look at them to see
+        // if there is something outstanding and adjust ourselves accordingly
+        final boolean reallyComplete;
+        if (response instanceof MultipartReplyMessage) {
+            reallyComplete = !((MultipartReplyMessage) response).getFlags().isOFPMPFREQMORE();
+            LOG.debug("Multipart reply {}", response);
+        } else {
+            reallyComplete = true;
+        }
+
+        completed = reallyComplete;
+        if (callback != null) {
+            callback.onSuccess(response);
+        }
+        LOG.debug("Entry {} completed {} with response {}", this, completed, response);
+        return reallyComplete;
+    }
+
+    void fail(final Throwable cause) {
+        if (!completed) {
+            completed = true;
+            if (callback != null) {
+                callback.onFailure(cause);
+            }
+        } else {
+            LOG.warn("Ignoring failure {} for completed message {}", cause, message);
+        }
+    }
+
+}
diff --git a/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/OutboundQueueHandlerRegistrationImpl.java b/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/OutboundQueueHandlerRegistrationImpl.java
new file mode 100644 (file)
index 0000000..697d2dd
--- /dev/null
@@ -0,0 +1,18 @@
+/*
+ * Copyright (c) 2015 Pantheon Technologies s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.openflowjava.protocol.impl.core.connection;
+
+import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueHandler;
+import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueHandlerRegistration;
+import org.opendaylight.yangtools.concepts.AbstractObjectRegistration;
+
+abstract class OutboundQueueHandlerRegistrationImpl<T extends OutboundQueueHandler> extends AbstractObjectRegistration<T> implements OutboundQueueHandlerRegistration<T> {
+    protected OutboundQueueHandlerRegistrationImpl(final T instance) {
+        super(instance);
+    }
+}
diff --git a/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/OutboundQueueImpl.java b/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/OutboundQueueImpl.java
new file mode 100644 (file)
index 0000000..1bb9ec4
--- /dev/null
@@ -0,0 +1,239 @@
+/*
+ * Copyright (c) 2015 Pantheon Technologies s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.openflowjava.protocol.impl.core.connection;
+
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.FutureCallback;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import java.util.concurrent.locks.LockSupport;
+import javax.annotation.Nonnull;
+import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueue;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+final class OutboundQueueImpl implements OutboundQueue {
+    private static final Logger LOG = LoggerFactory.getLogger(OutboundQueueImpl.class);
+    private static final AtomicIntegerFieldUpdater<OutboundQueueImpl> CURRENT_OFFSET_UPDATER =
+            AtomicIntegerFieldUpdater.newUpdater(OutboundQueueImpl.class, "reserveOffset");
+    private static final long FLUSH_RETRY_NANOS = 1L;
+    private final OutboundQueueManager<?> manager;
+    private final OutboundQueueEntry[] queue;
+    private final long baseXid;
+    private final long endXid;
+    private final int reserve;
+
+    // Updated concurrently
+    private volatile int reserveOffset;
+
+    // Updated from Netty only
+    private int flushOffset;
+    private int completeCount;
+
+    OutboundQueueImpl(final OutboundQueueManager<?> manager, final long baseXid, final int maxQueue) {
+        /*
+         * We use the last entry as an emergency should a timeout-triggered
+         * flush request race with normal users for the last entry in this
+         * queue. In that case the flush request will take the last entry and
+         * schedule a flush, which means that we will get around sending the
+         * message as soon as the user finishes the reservation.
+         */
+        Preconditions.checkArgument(maxQueue > 1);
+        this.baseXid = baseXid;
+        this.endXid = baseXid + maxQueue;
+        this.reserve = maxQueue - 1;
+        this.manager = Preconditions.checkNotNull(manager);
+        queue = new OutboundQueueEntry[maxQueue];
+        for (int i = 0; i < maxQueue; ++i) {
+            queue[i] = new OutboundQueueEntry();
+        }
+    }
+
+    private OutboundQueueImpl(final OutboundQueueManager<?> manager, final long baseXid, final OutboundQueueEntry[] queue) {
+        this.manager = Preconditions.checkNotNull(manager);
+        this.queue = Preconditions.checkNotNull(queue);
+        this.baseXid = baseXid;
+        this.endXid = baseXid + queue.length;
+        this.reserve = queue.length - 1;
+        for (OutboundQueueEntry element : queue) {
+            element.reset();
+        }
+    }
+
+    OutboundQueueImpl reuse(final long baseXid) {
+        return new OutboundQueueImpl(manager, baseXid, queue);
+    }
+
+    Long reserveEntry(final boolean forBarrier) {
+        final int offset = CURRENT_OFFSET_UPDATER.getAndIncrement(this);
+        if (offset >= reserve) {
+            if (forBarrier) {
+                LOG.debug("Queue {} offset {}/{}, using emergency slot", this, offset, queue.length);
+                return endXid;
+            } else {
+                LOG.debug("Queue {} offset {}/{}, not allowing reservation", this, offset, queue.length);
+                return null;
+            }
+        }
+
+        final Long xid = baseXid + offset;
+        LOG.debug("Queue {} allocated XID {} at offset {}", this, xid, offset);
+        return xid;
+    }
+
+    @Override
+    public Long reserveEntry() {
+        return reserveEntry(false);
+    }
+
+    @Override
+    public void commitEntry(final Long xid, final OfHeader message, final FutureCallback<OfHeader> callback) {
+        final int offset = (int)(xid - baseXid);
+        if (message != null) {
+            Preconditions.checkArgument(xid.equals(message.getXid()), "Message %s has wrong XID %s, expected %s", message, message.getXid(), xid);
+        }
+
+        queue[offset].commit(message, callback);
+        LOG.debug("Queue {} XID {} at offset {} (of {}) committed", this, xid, offset, reserveOffset);
+
+        manager.ensureFlushing(this);
+    }
+
+    /**
+     * An empty queue is a queue which has no further unflushed entries.
+     *
+     * @return True if this queue does not have unprocessed entries.
+     */
+    boolean isEmpty() {
+        int ro = reserveOffset;
+        if (ro >= reserve) {
+            if (queue[reserve].isCommitted()) {
+                ro = reserve + 1;
+            } else {
+                ro = reserve;
+            }
+        }
+
+        LOG.debug("Effective flush/reserve offset {}/{}", flushOffset, ro);
+        return ro <= flushOffset;
+    }
+
+    /**
+     * A queue is finished when all of its entries have been completed.
+     *
+     * @return False if there are any uncompleted requests.
+     */
+    boolean isFinished() {
+        if (completeCount < reserve) {
+            return false;
+        }
+
+        // We need to check if the last entry was used
+        final OutboundQueueEntry last = queue[reserve];
+        return !last.isCommitted() || last.isCompleted();
+    }
+
+    boolean isFlushed() {
+        LOG.debug("Check queue {} for completeness (offset {}, reserve {})", flushOffset, reserve);
+        if (flushOffset < reserve) {
+            return false;
+        }
+
+        // flushOffset implied == reserve
+        return flushOffset >= queue.length || !queue[reserve].isCommitted();
+    }
+
+    OfHeader flushEntry() {
+        for (;;) {
+            // No message ready
+            if (isEmpty()) {
+                LOG.debug("Flush offset {} is uptodate with reserved", flushOffset);
+                return null;
+            }
+
+            boolean retry = true;
+            while (!queue[flushOffset].isCommitted()) {
+                if (!retry) {
+                    LOG.debug("Offset {} not ready yet, giving up", flushOffset);
+                    return null;
+                }
+
+                LOG.debug("Offset {} not ready yet, retrying", flushOffset);
+                LockSupport.parkNanos(FLUSH_RETRY_NANOS);
+                retry = false;
+            }
+
+            final OfHeader msg = queue[flushOffset++].getMessage();
+            if (msg != null) {
+                return msg;
+            }
+        }
+    }
+
+    private boolean xidInRance(final long xid) {
+        return xid < endXid && (xid >= baseXid || baseXid > endXid);
+    }
+
+    /**
+     * Return the request entry corresponding to a response. Returns null
+     * if there is no request matching the response.
+     *
+     * @param response Response message
+     * @return Matching request entry, or null if no match is found.
+     */
+    OutboundQueueEntry pairRequest(@Nonnull final OfHeader response) {
+        final Long xid = response.getXid();
+        if (!xidInRance(xid)) {
+            LOG.debug("Queue {} {}/{} ignoring XID {}", this, baseXid, queue.length, xid);
+            return null;
+        }
+
+        final int offset = (int)(xid - baseXid);
+        final OutboundQueueEntry entry = queue[offset];
+        if (entry.isCompleted()) {
+            LOG.debug("Entry {} already is completed, not accepting response {}", entry, response);
+            return null;
+        }
+
+        if (entry.complete(response)) {
+            completeCount++;
+
+            // This has been a barrier -- make sure we complete all preceding requests
+            if (entry.isBarrier()) {
+                LOG.debug("Barrier XID {} completed, cascading completion to XIDs {} to {}", xid, baseXid, xid - 1);
+                for (int i = 0; i < offset; ++i) {
+                    final OutboundQueueEntry e = queue[i];
+                    if (!e.isCompleted() && e.complete(null)) {
+                        completeCount++;
+                    }
+                }
+            }
+        }
+        return entry;
+    }
+
+    void completeAll() {
+        for (OutboundQueueEntry entry : queue) {
+            if (!entry.isCompleted() && entry.complete(null)) {
+                completeCount++;
+            }
+        }
+    }
+
+    int failAll(final Throwable cause) {
+        int ret = 0;
+        for (OutboundQueueEntry entry : queue) {
+            if (!entry.isCompleted()) {
+                entry.fail(cause);
+                ret++;
+            }
+        }
+
+        return ret;
+    }
+}
diff --git a/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/OutboundQueueManager.java b/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/OutboundQueueManager.java
new file mode 100644 (file)
index 0000000..b7c2c8f
--- /dev/null
@@ -0,0 +1,424 @@
+/*
+ * Copyright (c) 2015 Pantheon Technologies s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.openflowjava.protocol.impl.core.connection;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Verify;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import java.net.InetSocketAddress;
+import java.util.ArrayDeque;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.Queue;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueHandler;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+final class OutboundQueueManager<T extends OutboundQueueHandler> extends ChannelInboundHandlerAdapter implements AutoCloseable {
+    private static final Logger LOG = LoggerFactory.getLogger(OutboundQueueManager.class);
+
+    /**
+     * This is the default upper bound we place on the flush task running
+     * a single iteration. We relinquish control after about this amount
+     * of time.
+     */
+    private static final long DEFAULT_WORKTIME_MICROS = TimeUnit.MILLISECONDS.toMicros(100);
+
+    /**
+     * We re-check the time spent flushing every this many messages. We do this because
+     * checking after each message may prove to be CPU-intensive. Set to Integer.MAX_VALUE
+     * or similar to disable the feature.
+     */
+    private static final int WORKTIME_RECHECK_MSGS = 64;
+
+    /**
+     * We maintain a cache of this many previous queues for later reuse.
+     */
+    private static final int QUEUE_CACHE_SIZE = 4;
+
+    private final Queue<OutboundQueueImpl> queueCache = new ArrayDeque<>(QUEUE_CACHE_SIZE);
+    private final Queue<OutboundQueueImpl> activeQueues = new LinkedList<>();
+    private final ConnectionAdapterImpl parent;
+    private final InetSocketAddress address;
+    private final long maxBarrierNanos;
+    private final long maxWorkTime;
+    private final int queueSize;
+    private final T handler;
+
+    /*
+     * Instead of using an AtomicBoolean object, we use these two. It saves us
+     * from allocating an extra object.
+     */
+    @SuppressWarnings("rawtypes")
+    private static final AtomicIntegerFieldUpdater<OutboundQueueManager> FLUSH_SCHEDULED_UPDATER =
+            AtomicIntegerFieldUpdater.newUpdater(OutboundQueueManager.class, "flushScheduled");
+    private volatile int flushScheduled = 0;
+
+    // Updated from netty only
+    private long lastBarrierNanos = System.nanoTime();
+    private OutboundQueueImpl currentQueue;
+    private int nonBarrierMessages;
+    private long lastXid = 0;
+
+    // Passed to executor to request triggering of flush
+    private final Runnable flushRunnable = new Runnable() {
+        @Override
+        public void run() {
+            flush();
+        }
+    };
+    private final Runnable barrierRunnable = new Runnable() {
+        @Override
+        public void run() {
+            barrier();
+        }
+    };
+
+    OutboundQueueManager(final ConnectionAdapterImpl parent, final InetSocketAddress address, final T handler,
+        final int queueSize, final long maxBarrierNanos) {
+        this.parent = Preconditions.checkNotNull(parent);
+        this.handler = Preconditions.checkNotNull(handler);
+        Preconditions.checkArgument(queueSize > 0);
+        this.queueSize = queueSize;
+        Preconditions.checkArgument(maxBarrierNanos > 0);
+        this.maxBarrierNanos = maxBarrierNanos;
+        this.address = address;
+        this.maxWorkTime = TimeUnit.MICROSECONDS.toNanos(DEFAULT_WORKTIME_MICROS);
+
+        LOG.debug("Queue manager instantiated with queue size {}", queueSize);
+        createQueue();
+        scheduleBarrierTimer(lastBarrierNanos);
+    }
+
+    T getHandler() {
+        return handler;
+    }
+
+    @Override
+    public void close() {
+        handler.onConnectionQueueChanged(null);
+    }
+
+    private void retireQueue(final OutboundQueueImpl queue) {
+        if (queueCache.offer(queue)) {
+            LOG.debug("Saving queue {} for later reuse", queue);
+        } else {
+            LOG.debug("Queue {} thrown away", queue);
+        }
+    }
+
+    private void createQueue() {
+        final long baseXid = lastXid;
+        lastXid += queueSize + 1;
+
+        final OutboundQueueImpl cached = queueCache.poll();
+        final OutboundQueueImpl queue;
+        if (cached != null) {
+            queue = cached.reuse(baseXid);
+            LOG.debug("Reusing queue {} as {} on channel {}", cached, queue, parent.getChannel());
+        } else {
+            queue = new OutboundQueueImpl(this, baseXid, queueSize + 1);
+            LOG.debug("Allocated new queue {} on channel {}", queue, parent.getChannel());
+        }
+
+        activeQueues.add(queue);
+        currentQueue = queue;
+        handler.onConnectionQueueChanged(queue);
+    }
+
+    private void scheduleBarrierTimer(final long now) {
+        long next = lastBarrierNanos + maxBarrierNanos;
+        if (next < now) {
+            LOG.debug("Attempted to schedule barrier in the past, reset maximum)");
+            next = now + maxBarrierNanos;
+        }
+
+        final long delay = next - now;
+        LOG.debug("Scheduling barrier timer {}us from now", TimeUnit.NANOSECONDS.toMicros(delay));
+        parent.getChannel().eventLoop().schedule(barrierRunnable, next - now, TimeUnit.NANOSECONDS);
+    }
+
+    private void scheduleBarrierMessage() {
+        final Long xid = currentQueue.reserveEntry(true);
+        Verify.verifyNotNull(xid);
+
+        currentQueue.commitEntry(xid, handler.createBarrierRequest(xid), null);
+        LOG.debug("Barrier XID {} scheduled", xid);
+
+        // We can see into the future when compared to flushEntry(), as that
+        // codepath may be lagging behind on messages. Resetting the counter
+        // here ensures that flushEntry() will not attempt to issue a flush
+        // request. Note that we do not reset current time, as that should
+        // reflect when we sent the message for real.
+        nonBarrierMessages = 0;
+    }
+
+    /**
+     * Flush an entry from the queue.
+     *
+     * @param now Time reference for 'now'. We take this as an argument, as
+     *            we need a timestamp to mark barrier messages we see swinging
+     *            by. That timestamp does not need to be completely accurate,
+     *            hence we use the flush start time. Alternative would be to
+     *            measure System.nanoTime() for each barrier -- needlessly
+     *            adding overhead.
+     *
+     * @return Entry which was flushed, null if no entry is ready.
+     */
+    OfHeader flushEntry(final long now) {
+        final OfHeader message = currentQueue.flushEntry();
+        if (currentQueue.isFlushed()) {
+            LOG.debug("Queue {} is fully flushed", currentQueue);
+            createQueue();
+        }
+
+        if (message == null) {
+            return null;
+        }
+
+        if (message instanceof BarrierInput) {
+            LOG.debug("Barrier message seen, resetting counters");
+            nonBarrierMessages = 0;
+            lastBarrierNanos = now;
+        } else {
+            nonBarrierMessages++;
+            if (nonBarrierMessages >= queueSize) {
+                LOG.debug("Scheduled barrier request after {} non-barrier messages", nonBarrierMessages);
+                scheduleBarrierMessage();
+            }
+        }
+
+        return message;
+    }
+
+    /**
+     * Invoked whenever a message comes in from the switch. Runs matching
+     * on all active queues in an attempt to complete a previous request.
+     *
+     * @param message Potential response message
+     * @return True if the message matched a previous request, false otherwise.
+     */
+    boolean onMessage(final OfHeader message) {
+        LOG.debug("Attempting to pair message {} to a request", message);
+
+        Iterator<OutboundQueueImpl> it = activeQueues.iterator();
+        while (it.hasNext()) {
+            final OutboundQueueImpl queue = it.next();
+            final OutboundQueueEntry entry = queue.pairRequest(message);
+
+            if (entry == null) {
+                continue;
+            }
+
+            LOG.debug("Queue {} accepted response {}", queue, message);
+
+            // This has been a barrier request, we need to flush all
+            // previous queues
+            if (entry.isBarrier() && activeQueues.size() > 1) {
+                LOG.debug("Queue {} indicated request was a barrier", queue);
+
+                it = activeQueues.iterator();
+                while (it.hasNext()) {
+                    final OutboundQueueImpl q = it.next();
+
+                    // We want to complete all queues before the current one, we will
+                    // complete the current queue below
+                    if (!queue.equals(q)) {
+                        LOG.debug("Queue {} is implied finished", q);
+                        q.completeAll();
+                        it.remove();
+                        retireQueue(q);
+                    } else {
+                        break;
+                    }
+                }
+            }
+
+            if (queue.isFinished()) {
+                LOG.debug("Queue {} is finished", queue);
+                it.remove();
+                retireQueue(queue);
+            }
+
+            return true;
+        }
+
+        LOG.debug("Failed to find completion for message {}", message);
+        return false;
+    }
+
+    private void scheduleFlush() {
+        if (parent.getChannel().isWritable()) {
+            if (FLUSH_SCHEDULED_UPDATER.compareAndSet(this, 0, 1)) {
+                LOG.trace("Scheduling flush task on channel {}", parent.getChannel());
+                parent.getChannel().eventLoop().execute(flushRunnable);
+            } else {
+                LOG.trace("Flush task is already present on channel {}", parent.getChannel());
+            }
+        } else {
+            LOG.trace("Channel {} is not writable, not issuing a flush", parent.getChannel());
+        }
+    }
+
+    void ensureFlushing(final OutboundQueueImpl queue) {
+        Preconditions.checkState(currentQueue.equals(queue));
+        scheduleFlush();
+    }
+
+    /**
+     * Periodic barrier check.
+     */
+    protected void barrier() {
+        LOG.debug("Channel {} barrier timer expired", parent.getChannel());
+        if (currentQueue == null) {
+            LOG.debug("Channel shut down, not processing barrier");
+            return;
+        }
+
+        final long now = System.nanoTime();
+        final long sinceLast = now - lastBarrierNanos;
+        if (sinceLast >= maxBarrierNanos) {
+            LOG.debug("Last barrier at {} now {}, elapsed {}", lastBarrierNanos, now, sinceLast);
+            // FIXME: we should be tracking requests/responses instead of this
+            if (nonBarrierMessages == 0) {
+                LOG.debug("No messages written since last barrier, not issuing one");
+            } else {
+                scheduleBarrierMessage();
+            }
+        }
+
+        scheduleBarrierTimer(now);
+    }
+
+    /**
+     * Perform a single flush operation.
+     */
+    protected void flush() {
+        final long start = System.nanoTime();
+        final long deadline = start + maxWorkTime;
+
+        LOG.debug("Dequeuing messages to channel {}", parent.getChannel());
+
+        long messages = 0;
+        for (;; ++messages) {
+            if (!parent.getChannel().isWritable()) {
+                LOG.trace("Channel is no longer writable");
+                break;
+            }
+
+            final OfHeader message = flushEntry(start);
+            if (message == null) {
+                LOG.trace("The queue is completely drained");
+                break;
+            }
+
+            final Object wrapper;
+            if (address == null) {
+                wrapper = new MessageListenerWrapper(message, null);
+            } else {
+                wrapper = new UdpMessageListenerWrapper(message, null, address);
+            }
+            parent.getChannel().write(wrapper);
+
+            /*
+             * Check every WORKTIME_RECHECK_MSGS for exceeded time.
+             *
+             * XXX: given we already measure our flushing throughput, we
+             *      should be able to perform dynamic adjustments here.
+             *      is that additional complexity needed, though?
+             */
+            if ((messages % WORKTIME_RECHECK_MSGS) == 0 && System.nanoTime() >= deadline) {
+                LOG.trace("Exceeded allotted work time {}us",
+                        TimeUnit.NANOSECONDS.toMicros(maxWorkTime));
+                break;
+            }
+        }
+
+        if (messages > 0) {
+            LOG.debug("Flushing {} message(s) to channel {}", messages, parent.getChannel());
+            parent.getChannel().flush();
+        }
+
+        final long stop = System.nanoTime();
+        LOG.debug("Flushed {} messages in {}us to channel {}",
+                messages, TimeUnit.NANOSECONDS.toMicros(stop - start), parent.getChannel());
+
+        /*
+         * We are almost ready to terminate. This is a bit tricky, because
+         * we do not want to have a race window where a message would be
+         * stuck on the queue without a flush being scheduled.
+         *
+         * So we mark ourselves as not running and then re-check if a
+         * flush out is needed. That will re-synchronized with other threads
+         * such that only one flush is scheduled at any given time.
+         */
+        if (!FLUSH_SCHEDULED_UPDATER.compareAndSet(this, 1, 0)) {
+            LOG.warn("Channel {} queue {} flusher found unscheduled", parent.getChannel(), this);
+        }
+
+        conditionalFlush();
+    }
+
+
+    /**
+     * Schedule a queue flush if it is not empty and the channel is found
+     * to be writable. May only be called from Netty context.
+     */
+    private void conditionalFlush() {
+        if (!currentQueue.isEmpty()) {
+            scheduleFlush();
+        } else {
+            LOG.trace("Queue is empty, no flush needed");
+        }
+    }
+
+    private void conditionalFlush(final ChannelHandlerContext ctx) {
+        Preconditions.checkState(ctx.channel().equals(parent.getChannel()), "Inconsistent channel %s with context %s", parent.getChannel(), ctx);
+        conditionalFlush();
+    }
+
+    @Override
+    public void channelActive(final ChannelHandlerContext ctx) throws Exception {
+        super.channelActive(ctx);
+        conditionalFlush(ctx);
+    }
+
+    @Override
+    public void channelWritabilityChanged(final ChannelHandlerContext ctx) throws Exception {
+        super.channelWritabilityChanged(ctx);
+        conditionalFlush(ctx);
+    }
+
+    @Override
+    public void channelInactive(final ChannelHandlerContext ctx) throws Exception {
+        super.channelInactive(ctx);
+
+        long entries = 0;
+        LOG.debug("Channel shutdown, flushing queue...");
+        handler.onConnectionQueueChanged(null);
+
+        final Throwable cause = new RejectedExecutionException("Channel disconnected");
+        for (OutboundQueueImpl queue : activeQueues) {
+            entries += queue.failAll(cause);
+        }
+        activeQueues.clear();
+
+        LOG.debug("Flushed {} queue entries", entries);
+    }
+
+    @Override
+    public String toString() {
+        return String.format("Channel %s queue [flushing=%s]", parent.getChannel(), flushScheduled);
+    }
+}