package org.opendaylight.openflowjava.protocol.impl.core;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.epoll.Epoll;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.opendaylight.infrautils.diagstatus.DiagStatusService;
+import org.opendaylight.infrautils.diagstatus.ServiceDescriptor;
+import org.opendaylight.infrautils.diagstatus.ServiceState;
+import org.opendaylight.infrautils.utils.concurrent.Executors;
import org.opendaylight.openflowjava.protocol.api.connection.ConnectionConfiguration;
import org.opendaylight.openflowjava.protocol.api.connection.SwitchConnectionHandler;
import org.opendaylight.openflowjava.protocol.api.extensibility.DeserializerRegistry;
*/
public class SwitchConnectionProviderImpl implements SwitchConnectionProvider, ConnectionInitializer {
- private static final Logger LOG = LoggerFactory
- .getLogger(SwitchConnectionProviderImpl.class);
+ private static final Logger LOG = LoggerFactory.getLogger(SwitchConnectionProviderImpl.class);
+ private static final String THREAD_NAME_PREFIX = "OFP-SwitchConnectionProvider-Udp/TcpHandler";
+ private static final String OPENFLOW_JAVA_SERVICE_NAME_PREFIX = "OPENFLOW_SERVER";
+
private SwitchConnectionHandler switchConnectionHandler;
private ServerFacade serverFacade;
private final ConnectionConfiguration connConfig;
private final SerializerRegistry serializerRegistry;
private final DeserializerRegistry deserializerRegistry;
private final DeserializationFactory deserializationFactory;
+ private final ListeningExecutorService listeningExecutorService;
+ private final DiagStatusService diagStatusService;
+ private final String diagStatusIdentifier;
+ private final String threadName;
private TcpConnectionInitializer connectionInitializer;
- public SwitchConnectionProviderImpl(ConnectionConfiguration connConfig) {
+ public SwitchConnectionProviderImpl(
+ @Nullable ConnectionConfiguration connConfig, DiagStatusService diagStatusService) {
this.connConfig = connConfig;
+ String connectionSuffix = createConnectionSuffix(connConfig);
+
+ this.diagStatusService = diagStatusService;
+ this.diagStatusIdentifier = OPENFLOW_JAVA_SERVICE_NAME_PREFIX + connectionSuffix;
+ diagStatusService.register(diagStatusIdentifier);
+
+ this.threadName = THREAD_NAME_PREFIX + connectionSuffix;
+ this.listeningExecutorService = Executors.newListeningSingleThreadExecutor(threadName, LOG);
+
serializerRegistry = new SerializerRegistryImpl();
if (connConfig != null) {
serializerRegistry.setGroupAddModConfig(connConfig.isGroupAddModEnabled());
deserializationFactory = new DeserializationFactory(deserializerRegistry);
}
+ // ID based, on configuration, used for diagstatus serviceIdentifier (ServiceDescriptor moduleServiceName)
+ private static String createConnectionSuffix(@Nullable ConnectionConfiguration config) {
+ if (config != null && config.getAddress() != null) {
+ return "-" + config.getAddress().toString() + "_" + config.getPort();
+ } else if (config != null) {
+ return "_" + config.getPort();
+ } else {
+ return "-null-config";
+ }
+ }
+
@Override
public void setSwitchConnectionHandler(final SwitchConnectionHandler switchConnectionHandler) {
LOG.debug("setSwitchConnectionHandler");
LOG.warn("Can not shutdown - not configured or started");
throw new IllegalStateException("SwitchConnectionProvider is not started or not configured.");
}
- return serverFacade.shutdown();
+ ListenableFuture<Boolean> serverFacadeShutdownFuture = serverFacade.shutdown();
+ Executors.shutdownAndAwaitTermination(listeningExecutorService);
+ return serverFacadeShutdownFuture;
}
@Override
if (switchConnectionHandler == null) {
throw new IllegalStateException("SwitchConnectionHandler is not set");
}
- new Thread(serverFacade).start();
+ Futures.addCallback(listeningExecutorService.submit(serverFacade), new FutureCallback<Object>() {
+
+ @Override
+ public void onFailure(Throwable throwable) {
+ diagStatusService.report(new ServiceDescriptor(diagStatusIdentifier, throwable));
+ }
+
+ @Override
+ public void onSuccess(@Nullable Object nullResult) {
+ diagStatusService.report(new ServiceDescriptor(
+ diagStatusIdentifier, ServiceState.ERROR, threadName + " terminated"));
+ }
+ } , MoreExecutors.directExecutor());
result = serverFacade.getIsOnlineFuture();
} catch (RuntimeException e) {
final SettableFuture<Boolean> exResult = SettableFuture.create();
private ServerFacade createAndConfigureServer() {
LOG.debug("Configuring ..");
- ServerFacade server = null;
+ ServerFacade server;
final ChannelInitializerFactory factory = new ChannelInitializerFactory();
factory.setSwitchConnectionHandler(switchConnectionHandler);
factory.setSwitchIdleTimeout(connConfig.getSwitchIdleTimeout());
factory.setSerializationFactory(serializationFactory);
factory.setDeserializationFactory(deserializationFactory);
factory.setUseBarrier(connConfig.useBarrier());
+ factory.setChannelOutboundQueueSize(connConfig.getChannelOutboundQueueSize());
final TransportProtocol transportProtocol = (TransportProtocol) connConfig.getTransferProtocol();
// Check if Epoll native transport is available.
boolean isEpollEnabled = Epoll.isAvailable();
if (TransportProtocol.TCP.equals(transportProtocol) || TransportProtocol.TLS.equals(transportProtocol)) {
- server = new TcpHandler(connConfig.getAddress(), connConfig.getPort());
+ server = new TcpHandler(connConfig.getAddress(), connConfig.getPort(), () -> diagStatusService
+ .report(new ServiceDescriptor(diagStatusIdentifier, ServiceState.OPERATIONAL)));
final TcpChannelInitializer channelInitializer = factory.createPublishingChannelInitializer();
((TcpHandler) server).setChannelInitializer(channelInitializer);
((TcpHandler) server).initiateEventLoopGroups(connConfig.getThreadConfiguration(), isEpollEnabled);
connectionInitializer.setChannelInitializer(channelInitializer);
connectionInitializer.run();
} else if (TransportProtocol.UDP.equals(transportProtocol)) {
- server = new UdpHandler(connConfig.getAddress(), connConfig.getPort());
+ server = new UdpHandler(connConfig.getAddress(), connConfig.getPort(), () -> diagStatusService
+ .report(new ServiceDescriptor(diagStatusIdentifier, ServiceState.OPERATIONAL)));
((UdpHandler) server).initiateEventLoopGroups(connConfig.getThreadConfiguration(), isEpollEnabled);
((UdpHandler) server).setChannelInitializer(factory.createUdpChannelInitializer());
} else {