AlreadyReading flag not used correctly
[openflowjava.git] / openflow-protocol-impl / src / main / java / org / opendaylight / openflowjava / protocol / impl / core / connection / AbstractOutboundQueueManager.java
index 49c6ddfa9fb47c155796cbdb3d453cd36704c22d..8febb15865f5ffbe024ff84e4cee44b04e91ae80 100644 (file)
@@ -29,7 +29,8 @@ import org.slf4j.LoggerFactory;
  * Class capsulate basic processing for stacking requests for netty channel
  * and provide functionality for pairing request/response device message communication.
  */
-abstract class AbstractOutboundQueueManager<T extends OutboundQueueHandler> extends ChannelInboundHandlerAdapter
+abstract class AbstractOutboundQueueManager<T extends OutboundQueueHandler, O extends AbstractStackedOutboundQueue>
+        extends ChannelInboundHandlerAdapter
         implements AutoCloseable {
 
     private static final Logger LOG = LoggerFactory.getLogger(AbstractOutboundQueueManager.class);
@@ -67,7 +68,7 @@ abstract class AbstractOutboundQueueManager<T extends OutboundQueueHandler> exte
     private final AtomicBoolean flushScheduled = new AtomicBoolean();
     protected final ConnectionAdapterImpl parent;
     protected final InetSocketAddress address;
-    protected final StackedOutboundQueue currentQueue;
+    protected final O currentQueue;
     private final T handler;
 
     // Accessed concurrently
@@ -89,12 +90,20 @@ abstract class AbstractOutboundQueueManager<T extends OutboundQueueHandler> exte
         this.parent = Preconditions.checkNotNull(parent);
         this.handler = Preconditions.checkNotNull(handler);
         this.address = address;
-        currentQueue = new StackedOutboundQueue(this);
+        /* Note: don't wish to use reflection here */
+        currentQueue = initializeStackedOutboudnqueue();
         LOG.debug("Queue manager instantiated with queue {}", currentQueue);
 
         handler.onConnectionQueueChanged(currentQueue);
     }
 
+    /**
+     * Method has to initialize some child of {@link AbstractStackedOutboundQueue}
+     *
+     * @return correct implementation of StacketOutboundqueue
+     */
+    protected abstract O initializeStackedOutboudnqueue();
+
     @Override
     public void close() {
         handler.onConnectionQueueChanged(null);
@@ -134,10 +143,11 @@ abstract class AbstractOutboundQueueManager<T extends OutboundQueueHandler> exte
         // we'll steal its work. Note that more work may accumulate in the time window
         // between now and when the task will run, so it may not be a no-op after all.
         //
-        // The reason for this is to will the output buffer before we go into selection
+        // The reason for this is to fill the output buffer before we go into selection
         // phase. This will make sure the pipe is full (in which case our next wake up
         // will be the queue becoming writable).
         writeAndFlush();
+        alreadyReading = false;
     }
 
     @Override
@@ -249,7 +259,7 @@ abstract class AbstractOutboundQueueManager<T extends OutboundQueueHandler> exte
      *
      * @return
      */
-    private Object makeMessageListenerWrapper(@Nonnull final OfHeader msg) {
+    protected Object makeMessageListenerWrapper(@Nonnull final OfHeader msg) {
         Preconditions.checkArgument(msg != null);
 
         if (address == null) {