* terms of the Eclipse Public License v1.0 which accompanies this distribution,
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
-
-
package org.opendaylight.openflowjava.protocol.impl.core;
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.SettableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.epoll.Epoll;
+import org.eclipse.jdt.annotation.Nullable;
+import org.opendaylight.infrautils.diagstatus.DiagStatusService;
+import org.opendaylight.infrautils.diagstatus.ServiceDescriptor;
+import org.opendaylight.infrautils.diagstatus.ServiceRegistration;
+import org.opendaylight.infrautils.diagstatus.ServiceState;
+import org.opendaylight.infrautils.utils.concurrent.Executors;
import org.opendaylight.openflowjava.protocol.api.connection.ConnectionConfiguration;
import org.opendaylight.openflowjava.protocol.api.connection.SwitchConnectionHandler;
import org.opendaylight.openflowjava.protocol.api.extensibility.DeserializerRegistry;
import org.opendaylight.openflowjava.protocol.api.keys.MatchEntrySerializerKey;
import org.opendaylight.openflowjava.protocol.api.keys.MessageCodeKey;
import org.opendaylight.openflowjava.protocol.api.keys.MessageTypeKey;
+import org.opendaylight.openflowjava.protocol.api.keys.TypeToClassKey;
import org.opendaylight.openflowjava.protocol.impl.deserialization.DeserializationFactory;
import org.opendaylight.openflowjava.protocol.impl.deserialization.DeserializerRegistryImpl;
import org.opendaylight.openflowjava.protocol.impl.serialization.SerializationFactory;
import org.opendaylight.openflowjava.protocol.impl.serialization.SerializerRegistryImpl;
-import org.opendaylight.openflowjava.protocol.api.keys.TypeToClassKey;
import org.opendaylight.openflowjava.protocol.spi.connection.SwitchConnectionProvider;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.config.rev140630.TransportProtocol;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.oxm.rev150225.MatchField;
import org.slf4j.LoggerFactory;
/**
- * Exposed class for server handling<br>
+ * Exposed class for server handling. <br>
* C - {@link MatchEntrySerializerKey} parameter representing oxm_class (see specification)<br>
* F - {@link MatchEntrySerializerKey} parameter representing oxm_field (see specification)
* @author mirehak
* @author michal.polkorab
*/
public class SwitchConnectionProviderImpl implements SwitchConnectionProvider, ConnectionInitializer {
+ private static final Logger LOG = LoggerFactory.getLogger(SwitchConnectionProviderImpl.class);
+ private static final String THREAD_NAME_PREFIX = "OFP-SwitchConnectionProvider-Udp/TcpHandler";
+ private static final String OPENFLOW_JAVA_SERVICE_NAME_PREFIX = "OPENFLOW_SERVER";
- private static final Logger LOG = LoggerFactory
- .getLogger(SwitchConnectionProviderImpl.class);
private SwitchConnectionHandler switchConnectionHandler;
private ServerFacade serverFacade;
private final ConnectionConfiguration connConfig;
private final SerializerRegistry serializerRegistry;
private final DeserializerRegistry deserializerRegistry;
private final DeserializationFactory deserializationFactory;
+ private final ListeningExecutorService listeningExecutorService;
+ private final String diagStatusIdentifier;
+ private final String threadName;
+
private TcpConnectionInitializer connectionInitializer;
+ // FIXME: clean this up when no longer needed
+ private final ServiceRegistration diagReg;
- public SwitchConnectionProviderImpl(ConnectionConfiguration connConfig) {
+ public SwitchConnectionProviderImpl(final DiagStatusService diagStatus,
+ final @Nullable ConnectionConfiguration connConfig) {
this.connConfig = connConfig;
+ String connectionSuffix = createConnectionSuffix(connConfig);
+ diagStatusIdentifier = OPENFLOW_JAVA_SERVICE_NAME_PREFIX + connectionSuffix;
+ diagReg = diagStatus.register(diagStatusIdentifier);
+
+ threadName = THREAD_NAME_PREFIX + connectionSuffix;
+ listeningExecutorService = Executors.newListeningSingleThreadExecutor(threadName, LOG);
serializerRegistry = new SerializerRegistryImpl();
if (connConfig != null) {
serializerRegistry.setGroupAddModConfig(connConfig.isGroupAddModEnabled());
}
serializerRegistry.init();
- serializationFactory = new SerializationFactory();
- serializationFactory.setSerializerTable(serializerRegistry);
+ serializationFactory = new SerializationFactory(serializerRegistry);
deserializerRegistry = new DeserializerRegistryImpl();
deserializerRegistry.init();
- deserializationFactory = new DeserializationFactory();
- deserializationFactory.setRegistry(deserializerRegistry);
+ deserializationFactory = new DeserializationFactory(deserializerRegistry);
+ }
+
+ // ID based, on configuration, used for diagstatus serviceIdentifier (ServiceDescriptor moduleServiceName)
+ private static String createConnectionSuffix(final @Nullable ConnectionConfiguration config) {
+ return config == null ? "-null-config" : "_" + config.getPort();
}
@Override
@Override
public ListenableFuture<Boolean> shutdown() {
LOG.debug("Shutdown summoned");
- if(serverFacade == null){
+ if (serverFacade == null) {
LOG.warn("Can not shutdown - not configured or started");
throw new IllegalStateException("SwitchConnectionProvider is not started or not configured.");
}
- return serverFacade.shutdown();
+ ListenableFuture<Boolean> serverFacadeShutdownFuture = serverFacade.shutdown();
+ Executors.shutdownAndAwaitTermination(listeningExecutorService);
+ return serverFacadeShutdownFuture;
}
@Override
+ @SuppressWarnings("checkstyle:IllegalCatch")
public ListenableFuture<Boolean> startup() {
LOG.debug("Startup summoned");
- ListenableFuture<Boolean> result = null;
try {
serverFacade = createAndConfigureServer();
if (switchConnectionHandler == null) {
throw new IllegalStateException("SwitchConnectionHandler is not set");
}
- new Thread(serverFacade).start();
- result = serverFacade.getIsOnlineFuture();
- } catch (final Exception e) {
- final SettableFuture<Boolean> exResult = SettableFuture.create();
- exResult.setException(e);
- result = exResult;
+ Futures.addCallback(listeningExecutorService.submit(serverFacade), new FutureCallback<Object>() {
+ @Override
+ public void onFailure(final Throwable throwable) {
+ diagReg.report(new ServiceDescriptor(diagStatusIdentifier, throwable));
+ }
+
+ @Override
+ public void onSuccess(final Object result) {
+ diagReg.report(new ServiceDescriptor(diagStatusIdentifier, ServiceState.ERROR,
+ threadName + " terminated"));
+ }
+ }, MoreExecutors.directExecutor());
+ return serverFacade.getIsOnlineFuture();
+ } catch (RuntimeException e) {
+ return Futures.immediateFailedFuture(e);
}
- return result;
}
- /**
- * @return
- */
private ServerFacade createAndConfigureServer() {
LOG.debug("Configuring ..");
- ServerFacade server = null;
+ ServerFacade server;
+
+ checkState(connConfig != null, "Connection not configured");
+
final ChannelInitializerFactory factory = new ChannelInitializerFactory();
factory.setSwitchConnectionHandler(switchConnectionHandler);
factory.setSwitchIdleTimeout(connConfig.getSwitchIdleTimeout());
factory.setSerializationFactory(serializationFactory);
factory.setDeserializationFactory(deserializationFactory);
factory.setUseBarrier(connConfig.useBarrier());
+ factory.setChannelOutboundQueueSize(connConfig.getChannelOutboundQueueSize());
final TransportProtocol transportProtocol = (TransportProtocol) connConfig.getTransferProtocol();
-
// Check if Epoll native transport is available.
// TODO : Add option to disable Epoll.
boolean isEpollEnabled = Epoll.isAvailable();
- if ((TransportProtocol.TCP.equals(transportProtocol) || TransportProtocol.TLS.equals(transportProtocol))) {
- server = new TcpHandler(connConfig.getAddress(), connConfig.getPort());
+ if (TransportProtocol.TCP.equals(transportProtocol) || TransportProtocol.TLS.equals(transportProtocol)) {
+ server = new TcpHandler(connConfig.getAddress(), connConfig.getPort(),
+ () -> diagReg.report(new ServiceDescriptor(diagStatusIdentifier, ServiceState.OPERATIONAL)));
final TcpChannelInitializer channelInitializer = factory.createPublishingChannelInitializer();
((TcpHandler) server).setChannelInitializer(channelInitializer);
((TcpHandler) server).initiateEventLoopGroups(connConfig.getThreadConfiguration(), isEpollEnabled);
-
final EventLoopGroup workerGroupFromTcpHandler = ((TcpHandler) server).getWorkerGroup();
connectionInitializer = new TcpConnectionInitializer(workerGroupFromTcpHandler, isEpollEnabled);
connectionInitializer.setChannelInitializer(channelInitializer);
connectionInitializer.run();
- } else if (TransportProtocol.UDP.equals(transportProtocol)){
- server = new UdpHandler(connConfig.getAddress(), connConfig.getPort());
+ } else if (TransportProtocol.UDP.equals(transportProtocol)) {
+ server = new UdpHandler(connConfig.getAddress(), connConfig.getPort(),
+ () -> diagReg.report(new ServiceDescriptor(diagStatusIdentifier, ServiceState.OPERATIONAL)));
((UdpHandler) server).initiateEventLoopGroups(connConfig.getThreadConfiguration(), isEpollEnabled);
((UdpHandler) server).setChannelInitializer(factory.createUdpChannelInitializer());
} else {
return server;
}
- /**
- * @return servers
- */
public ServerFacade getServerFacade() {
return serverFacade;
}
@Override
- public void close() throws Exception {
+ public void close() {
shutdown();
}
}
@Override
- public <C extends OxmClassBase, F extends MatchField> void registerMatchEntrySerializer(final MatchEntrySerializerKey<C, F> key,
- final OFGeneralSerializer serializer) {
+ public <C extends OxmClassBase, F extends MatchField> void registerMatchEntrySerializer(
+ final MatchEntrySerializerKey<C, F> key, final OFGeneralSerializer serializer) {
serializerRegistry.registerSerializer(key, serializer);
}
}
@Override
- public void registerExperimenterMessageDeserializer(ExperimenterIdDeserializerKey key,
- OFDeserializer<? extends ExperimenterDataOfChoice> deserializer) {
+ public void registerExperimenterMessageDeserializer(final ExperimenterIdDeserializerKey key,
+ final OFDeserializer<? extends ExperimenterDataOfChoice> deserializer) {
deserializerRegistry.registerDeserializer(key, deserializer);
}
@Override
- public void registerMultipartReplyMessageDeserializer(ExperimenterIdDeserializerKey key,
- OFDeserializer<? extends ExperimenterDataOfChoice> deserializer) {
+ public void registerMultipartReplyMessageDeserializer(final ExperimenterIdDeserializerKey key,
+ final OFDeserializer<? extends ExperimenterDataOfChoice> deserializer) {
deserializerRegistry.registerDeserializer(key, deserializer);
}
}
@Override
- public void registerExperimenterMessageSerializer(ExperimenterIdSerializerKey<? extends ExperimenterDataOfChoice> key,
- OFSerializer<? extends ExperimenterDataOfChoice> serializer) {
+ public void registerExperimenterMessageSerializer(
+ final ExperimenterIdSerializerKey<? extends ExperimenterDataOfChoice> key,
+ final OFSerializer<? extends ExperimenterDataOfChoice> serializer) {
serializerRegistry.registerSerializer(key, serializer);
}
@Override
- public void registerMultipartRequestSerializer(ExperimenterIdSerializerKey<? extends ExperimenterDataOfChoice> key,
- OFSerializer<? extends ExperimenterDataOfChoice> serializer) {
+ public void registerMultipartRequestSerializer(
+ final ExperimenterIdSerializerKey<? extends ExperimenterDataOfChoice> key,
+ final OFSerializer<? extends ExperimenterDataOfChoice> serializer) {
serializerRegistry.registerSerializer(key, serializer);
}
serializerRegistry.registerSerializer(key, serializer);
}
- @Override
/**
- * @deprecated Since we have used ExperimenterIdMeterSubTypeSerializerKey as MeterBandSerializer's key, in order to avoid
- * the occurrence of an error, we should discard this function
+ * Deprecated.
+ *
+ * @deprecated Since we have used ExperimenterIdMeterSubTypeSerializerKey as MeterBandSerializer's key, in order
+ * to avoid the occurrence of an error, we should discard this function.
*/
+ @Override
@Deprecated
public void registerMeterBandSerializer(final ExperimenterIdSerializerKey<MeterBandExperimenterCase> key,
final OFSerializer<MeterBandExperimenterCase> serializer) {
}
@Override
- public void registerMeterBandSerializer(final ExperimenterIdMeterSubTypeSerializerKey<MeterBandExperimenterCase> key,
- final OFSerializer<MeterBandExperimenterCase> serializer) {
+ public void registerMeterBandSerializer(
+ final ExperimenterIdMeterSubTypeSerializerKey<MeterBandExperimenterCase> key,
+ final OFSerializer<MeterBandExperimenterCase> serializer) {
serializerRegistry.registerSerializer(key, serializer);
}
@Override
public ConnectionConfiguration getConfiguration() {
- return this.connConfig;
+ return connConfig;
}
- @Override
- public <K> void registerSerializer(MessageTypeKey<K> key, OFGeneralSerializer serializer) {
+ @Override
+ public <K> void registerSerializer(final MessageTypeKey<K> key, final OFGeneralSerializer serializer) {
serializerRegistry.registerSerializer(key, serializer);
}
@Override
- public void registerDeserializer(MessageCodeKey key, OFGeneralDeserializer deserializer) {
- deserializerRegistry.registerDeserializer(key, deserializer);
+ public void registerDeserializer(final MessageCodeKey key, final OFGeneralDeserializer deserializer) {
+ deserializerRegistry.registerDeserializer(key, deserializer);
}
@Override