* @return thread numbers for TcpHandler's eventloopGroups
*/
public ThreadConfiguration getThreadConfiguration();
+
+ /**
+ * @return size of ChannelOutbounfQueue
+ */
+ public int getOutboundQueueSize();
}
\ No newline at end of file
/**
* @param ch
+ * @param outboundQueueSize maximal size of {@link ChannelOutboundQueue}
* @return connection adapter tcp-implementation
*/
- public ConnectionFacade createConnectionFacade(Channel ch, InetSocketAddress address) ;
+ public ConnectionFacade createConnectionFacade(Channel ch, InetSocketAddress address, int outboundQueueSize);
}
* @return connection adapter tcp-implementation
*/
@Override
- public ConnectionFacade createConnectionFacade(Channel ch, InetSocketAddress address) {
- return new ConnectionAdapterImpl(ch, address);
+ public ConnectionFacade createConnectionFacade(Channel ch, InetSocketAddress address, int outboundQueueSize) {
+ return new ConnectionAdapterImpl(ch, address, outboundQueueSize);
}
}
/** after this time, RPC future response objects will be thrown away (in minutes) */
public static final int RPC_RESPONSE_EXPIRATION = 1;
- /**
- * Default depth of write queue, e.g. we allow these many messages
- * to be queued up before blocking producers.
- */
- public static final int DEFAULT_QUEUE_DEPTH = 1024;
-
private static final Logger LOG = LoggerFactory
.getLogger(ConnectionAdapterImpl.class);
private static final Exception QUEUE_FULL_EXCEPTION =
* @param channel the channel to be set - used for communication
* @param address client address (used only in case of UDP communication,
* as there is no need to store address over tcp (stable channel))
+ * @param outboundQueueSize maximal size of {@link ChannelOutboundQueue}
*/
- public ConnectionAdapterImpl(final Channel channel, final InetSocketAddress address) {
+ public ConnectionAdapterImpl(final Channel channel, final InetSocketAddress address, int outboundQueueSize) {
responseCache = CacheBuilder.newBuilder()
.concurrencyLevel(1)
.expireAfterWrite(RPC_RESPONSE_EXPIRATION, TimeUnit.MINUTES)
.removalListener(REMOVAL_LISTENER).build();
this.channel = Preconditions.checkNotNull(channel);
- this.output = new ChannelOutboundQueue(channel, DEFAULT_QUEUE_DEPTH);
+ this.output = new ChannelOutboundQueue(channel, outboundQueueSize);
output.setAddress(address);
channel.pipeline().addLast(output);
LOG.debug("ConnectionAdapter created");
factory.setTlsConfig(connConfig.getTlsConfiguration());
factory.setSerializationFactory(serializationFactory);
factory.setDeserializationFactory(deserializationFactory);
+ factory.setOutboundQueueSize(connConfig.getOutboundQueueSize());
TransportProtocol transportProtocol = (TransportProtocol) connConfig.getTransferProtocol();
if (transportProtocol.equals(TransportProtocol.TCP) || transportProtocol.equals(TransportProtocol.TLS)) {
server = new TcpHandler(connConfig.getAddress(), connConfig.getPort());
private SerializationFactory serializationFactory;\r
private TlsConfiguration tlsConfig;\r
private SwitchConnectionHandler switchConnectionHandler;\r
+ private int outboundQueueSize;\r
\r
/**\r
* @return PublishingChannelInitializer that initializes new channels\r
initializer.setSerializationFactory(serializationFactory);\r
initializer.setTlsConfiguration(tlsConfig);\r
initializer.setSwitchConnectionHandler(switchConnectionHandler);\r
+ initializer.setOutboungQueueSize(outboundQueueSize);\r
return initializer;\r
}\r
\r
initializer.setDeserializationFactory(deserializationFactory);\r
initializer.setSerializationFactory(serializationFactory);\r
initializer.setSwitchConnectionHandler(switchConnectionHandler);\r
+ initializer.setOutboungQueueSize(outboundQueueSize);\r
return initializer;\r
}\r
\r
public void setSwitchConnectionHandler(SwitchConnectionHandler switchConnectionHandler) {\r
this.switchConnectionHandler = switchConnectionHandler;\r
}\r
+\r
+ /**\r
+ * @param outboundQueueSize\r
+ */\r
+ public void setOutboundQueueSize(int outboundQueueSize) {\r
+ this.outboundQueueSize = outboundQueueSize;\r
+ }\r
}
\ No newline at end of file
private static final byte LENGTH_INDEX_IN_HEADER = 2;\r
private ConnectionAdapterFactory adapterFactory = new ConnectionAdapterFactoryImpl();\r
private SwitchConnectionHandler connectionHandler;\r
+ private int outboundQueueSize;\r
\r
/**\r
* Default constructor\r
* @param sch the switchConnectionHandler that decides\r
* what to do with incomming message / channel\r
+ * @param outboundQueueSize maximal size of {@link ChannelOutboundQueue}\r
*/\r
- public OFDatagramPacketHandler(SwitchConnectionHandler sch) {\r
+ public OFDatagramPacketHandler(SwitchConnectionHandler sch, int outboundQueueSize) {\r
this.connectionHandler = sch;\r
+ this.outboundQueueSize = outboundQueueSize;\r
}\r
\r
@Override\r
MessageConsumer consumer = UdpConnectionMap.getMessageConsumer(msg.sender());\r
if (consumer == null) {\r
ConnectionFacade connectionFacade =\r
- adapterFactory.createConnectionFacade(ctx.channel(), msg.sender());\r
+ adapterFactory.createConnectionFacade(ctx.channel(), msg.sender(), outboundQueueSize);\r
connectionHandler.onSwitchConnected(connectionFacade);\r
connectionFacade.checkListeners();\r
UdpConnectionMap.addConnection(msg.sender(), connectionFacade);\r
.getLogger(TcpChannelInitializer.class);
private final DefaultChannelGroup allChannels;
private ConnectionAdapterFactory connectionAdapterFactory;
+ private int outboundQueueSize;
/**
* default ctor
LOGGER.info("Incoming connection accepted - building pipeline");
allChannels.add(ch);
ConnectionFacade connectionFacade = null;
- connectionFacade = connectionAdapterFactory.createConnectionFacade(ch, null);
+ connectionFacade = connectionAdapterFactory.createConnectionFacade(ch, null, outboundQueueSize);
try {
LOGGER.debug("calling plugin: " + getSwitchConnectionHandler());
getSwitchConnectionHandler().onSwitchConnected(connectionFacade);
public int size() {
return allChannels.size();
}
+
+ /**
+ * @param outboundQueueSize
+ */
+ public void setOutboungQueueSize(int outboundQueueSize) {
+ this.outboundQueueSize = outboundQueueSize;
+ }
}
\ No newline at end of file
*/\r
public class UdpChannelInitializer extends ProtocolChannelInitializer<NioDatagramChannel> {\r
\r
+ private int outboundQueueSize;\r
+\r
@Override\r
protected void initChannel(NioDatagramChannel ch) throws Exception {\r
ch.pipeline().addLast(PipelineHandlers.OF_DATAGRAMPACKET_HANDLER.name(),\r
- new OFDatagramPacketHandler(getSwitchConnectionHandler()));\r
+ new OFDatagramPacketHandler(getSwitchConnectionHandler(), outboundQueueSize));\r
OFDatagramPacketDecoder ofDatagramPacketDecoder = new OFDatagramPacketDecoder();\r
ofDatagramPacketDecoder.setDeserializationFactory(getDeserializationFactory());\r
ch.pipeline().addLast(PipelineHandlers.OF_DATAGRAMPACKET_DECODER.name(),\r
ch.pipeline().addLast(PipelineHandlers.OF_ENCODER.name(), ofDatagramPacketEncoder);\r
// connectionFacade.fireConnectionReadyNotification();\r
}\r
+\r
+ /**\r
+ * @param outboundQueueSize\r
+ */\r
+ public void setOutboungQueueSize(int outboundQueueSize) {\r
+ this.outboundQueueSize = outboundQueueSize;\r
+ }\r
}
\ No newline at end of file
final Tls tlsConfig = getTls();
final Threads threads = getThreads();
final TransportProtocol transportProtocol = getTransportProtocol();
+ final int queueSize = getOutboundQueueSize();
return new ConnectionConfiguration() {
@Override
}
};
}
+ @Override
+ public int getOutboundQueueSize() {
+ return queueSize;
+ }
};
}
type uint16;
}
}
+ leaf outbound-queue-size {
+ description "Sets size of ChannelOutboundQueue";
+ type uint16;
+ default 1024;
+ }
}
}
}
\ No newline at end of file
*/
public class PublishingChannelInitializerTest {
+ private static final int OUTBOUND_QUEUE_SIZE = 1024;
@Mock SocketChannel mockSocketCh ;
@Mock ChannelPipeline mockChPipeline ;
@Mock SwitchConnectionHandler mockSwConnHandler ;
pubChInitializer.setSerializationFactory(mockSerializationFactory);
pubChInitializer.setDeserializationFactory(mockDeserializationFactory);
pubChInitializer.setSwitchIdleTimeout(1) ;
- pubChInitializer.getConnectionIterator() ;
+ pubChInitializer.getConnectionIterator();
+ pubChInitializer.setOutboungQueueSize(OUTBOUND_QUEUE_SIZE);
when( mockChGrp.size()).thenReturn(1) ;
pubChInitializer.setSwitchConnectionHandler( mockSwConnHandler ) ;
inetSockAddr = new InetSocketAddress(InetAddress.getLocalHost(), 8675 ) ;
- when(mockConnAdaptorFactory.createConnectionFacade(mockSocketCh, null))
+ when(mockConnAdaptorFactory.createConnectionFacade(mockSocketCh, null, OUTBOUND_QUEUE_SIZE))
.thenReturn(mockConnFacade);
when(mockSocketCh.remoteAddress()).thenReturn(inetSockAddr) ;
when(mockSocketCh.localAddress()).thenReturn(inetSockAddr) ;
*/
public class ConnectionConfigurationImpl implements ConnectionConfiguration {
+ private static final int OUTBOUND_QUEUE_SIZE = 1024;
private InetAddress address;
private int port;
private Object transferProtocol;
public void setThreadConfiguration(ThreadConfiguration threadConfig) {
this.threadConfig = threadConfig;
}
+
+ @Override
+ public int getOutboundQueueSize() {
+ return OUTBOUND_QUEUE_SIZE;
+ }
}
\ No newline at end of file