* Class capsulate basic processing for stacking requests for netty channel
* and provide functionality for pairing request/response device message communication.
*/
-abstract class AbstractOutboundQueueManager<T extends OutboundQueueHandler> extends ChannelInboundHandlerAdapter
+abstract class AbstractOutboundQueueManager<T extends OutboundQueueHandler, O extends AbstractStackedOutboundQueue>
+ extends ChannelInboundHandlerAdapter
implements AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(AbstractOutboundQueueManager.class);
private final AtomicBoolean flushScheduled = new AtomicBoolean();
protected final ConnectionAdapterImpl parent;
protected final InetSocketAddress address;
- protected final StackedOutboundQueue currentQueue;
+ protected final O currentQueue;
private final T handler;
// Accessed concurrently
this.parent = Preconditions.checkNotNull(parent);
this.handler = Preconditions.checkNotNull(handler);
this.address = address;
- currentQueue = new StackedOutboundQueue(this);
+ /* Note: don't wish to use reflection here */
+ currentQueue = initializeStackedOutboudnqueue();
LOG.debug("Queue manager instantiated with queue {}", currentQueue);
handler.onConnectionQueueChanged(currentQueue);
}
+ /**
+ * Method has to initialize some child of {@link AbstractStackedOutboundQueue}
+ *
+ * @return correct implementation of StacketOutboundqueue
+ */
+ protected abstract O initializeStackedOutboudnqueue();
+
@Override
public void close() {
handler.onConnectionQueueChanged(null);
// we'll steal its work. Note that more work may accumulate in the time window
// between now and when the task will run, so it may not be a no-op after all.
//
- // The reason for this is to will the output buffer before we go into selection
+ // The reason for this is to fill the output buffer before we go into selection
// phase. This will make sure the pipe is full (in which case our next wake up
// will be the queue becoming writable).
writeAndFlush();
+ alreadyReading = false;
}
@Override
*
* @return
*/
- private Object makeMessageListenerWrapper(@Nonnull final OfHeader msg) {
+ protected Object makeMessageListenerWrapper(@Nonnull final OfHeader msg) {
Preconditions.checkArgument(msg != null);
if (address == null) {