+
+ @Override
+ public <T extends OutboundQueueHandler> OutboundQueueHandlerRegistration<T> registerOutboundQueueHandler(
+ final T handler, final int maxQueueDepth, final long maxBarrierNanos) {
+ Preconditions.checkState(outputManager == null, "Manager %s already registered", outputManager);
+
+ final OutboundQueueManager<T> ret = new OutboundQueueManager<>(this, address, handler, maxQueueDepth, maxBarrierNanos);
+ outputManager = ret;
+ channel.pipeline().addLast(outputManager);
+
+ return new OutboundQueueHandlerRegistrationImpl<T>(handler) {
+ @Override
+ protected void removeRegistration() {
+ outputManager.close();
+ channel.pipeline().remove(outputManager);
+ outputManager = null;
+ }
+ };
+ }
+
+ Channel getChannel() {
+ return channel;
+ }