import org.slf4j.LoggerFactory;
/**
- * Handles messages (notifications + rpcs) and connections
+ * Handles messages (notifications + rpcs) and connections.
* @author mirehak
* @author michal.polkorab
*/
private ConnectionReadyListener connectionReadyListener;
private OpenflowProtocolListener messageListener;
private SystemNotificationsListener systemListener;
- private OutboundQueueManager<?> outputManager;
+ private AbstractOutboundQueueManager<?, ?> outputManager;
private OFVersionDetector versionDetector;
private final boolean useBarrier;
/**
- * default ctor
- *
+ * Default constructor.
* @param channel the channel to be set - used for communication
* @param address client address (used only in case of UDP communication,
- * as there is no need to store address over tcp (stable channel))
+ * as there is no need to store address over tcp (stable channel))
* @param useBarrier value is configurable by configSubsytem
*/
public ConnectionAdapterImpl(final Channel channel, final InetSocketAddress address, final boolean useBarrier) {
@Override
public void consumeDeviceMessage(final DataObject message) {
LOG.debug("ConsumeIntern msg on {}", channel);
- if (disconnectOccured ) {
+ if (disconnectOccured) {
return;
}
if (message instanceof Notification) {
disconnectOccured = true;
} else if (message instanceof SwitchIdleEvent) {
systemListener.onSwitchIdleEvent((SwitchIdleEvent) message);
- // OpenFlow messages
+ // OpenFlow messages
} else if (message instanceof EchoRequestMessage) {
if (outputManager != null) {
outputManager.onEchoRequest((EchoRequestMessage) message);
} else if (message instanceof FlowRemovedMessage) {
messageListener.onFlowRemovedMessage((FlowRemovedMessage) message);
} else if (message instanceof HelloMessage) {
- LOG.info("Hello received / branch");
+ LOG.info("Hello received");
messageListener.onHelloMessage((HelloMessage) message);
} else if (message instanceof MultipartReplyMessage) {
if (outputManager != null) {
LOG.warn("message listening not supported for type: {}", message.getClass());
}
} else if (message instanceof OfHeader) {
- LOG.debug("OFheader msg received");
+ LOG.debug("OF header msg received");
if (outputManager == null || !outputManager.onMessage((OfHeader) message)) {
final RpcResponseKey key = createRpcResponseKey((OfHeader) message);
final ResponseExpectedRpcListener<?> listener = findRpcResponse(key);
if (listener != null) {
- LOG.debug("corresponding rpcFuture found");
+ LOG.debug("Corresponding rpcFuture found");
listener.completed((OfHeader)message);
- LOG.debug("after setting rpcFuture");
+ LOG.debug("After setting rpcFuture");
responseCache.invalidate(key);
} else {
LOG.warn("received unexpected rpc response: {}", key);
}
}
- /**
- * @param message
- * @return
- */
private static RpcResponseKey createRpcResponseKey(final OfHeader message) {
return new RpcResponseKey(message.getXid(), message.getImplementedInterface().getName());
}
final T handler, final int maxQueueDepth, final long maxBarrierNanos) {
Preconditions.checkState(outputManager == null, "Manager %s already registered", outputManager);
+ final AbstractOutboundQueueManager<T, ?> ret;
if (useBarrier) {
-
+ ret = new OutboundQueueManager<>(this, address, handler, maxQueueDepth, maxBarrierNanos);
+ } else {
+ LOG.warn("OutboundQueueManager without barrier is started.");
+ ret = new OutboundQueueManagerNoBarrier<>(this, address, handler);
}
- final OutboundQueueManager<T> ret = new OutboundQueueManager<>(this, address, handler, maxQueueDepth, maxBarrierNanos);
outputManager = ret;
/* we don't need it anymore */
channel.pipeline().remove(output);
- channel.pipeline().addLast(outputManager);
+ // OutboundQueueManager is put before DelegatingInboundHandler because otherwise channelInactive event would
+ // be first processed in OutboundQueueManager and then in ConnectionAdapter (and Openflowplugin). This might
+ // cause problems because we are shutting down the queue before Openflowplugin knows about it.
+ channel.pipeline().addBefore(PipelineHandlers.DELEGATING_INBOUND_HANDLER.name(),
+ PipelineHandlers.CHANNEL_OUTBOUND_QUEUE_MANAGER.name(), outputManager);
return new OutboundQueueHandlerRegistrationImpl<T>(handler) {
@Override