* terms of the Eclipse Public License v1.0 which accompanies this distribution,
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
-
-
package org.opendaylight.openflowjava.protocol.impl.core;
+import static com.google.common.base.Preconditions.checkState;
+
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.epoll.Epoll;
-import org.checkerframework.checker.nullness.qual.Nullable;
+import org.eclipse.jdt.annotation.Nullable;
import org.opendaylight.infrautils.diagstatus.DiagStatusService;
import org.opendaylight.infrautils.diagstatus.ServiceDescriptor;
+import org.opendaylight.infrautils.diagstatus.ServiceRegistration;
import org.opendaylight.infrautils.diagstatus.ServiceState;
import org.opendaylight.infrautils.utils.concurrent.Executors;
import org.opendaylight.openflowjava.protocol.api.connection.ConnectionConfiguration;
* @author michal.polkorab
*/
public class SwitchConnectionProviderImpl implements SwitchConnectionProvider, ConnectionInitializer {
-
private static final Logger LOG = LoggerFactory.getLogger(SwitchConnectionProviderImpl.class);
private static final String THREAD_NAME_PREFIX = "OFP-SwitchConnectionProvider-Udp/TcpHandler";
private static final String OPENFLOW_JAVA_SERVICE_NAME_PREFIX = "OPENFLOW_SERVER";
private final DeserializerRegistry deserializerRegistry;
private final DeserializationFactory deserializationFactory;
private final ListeningExecutorService listeningExecutorService;
- private final DiagStatusService diagStatusService;
private final String diagStatusIdentifier;
private final String threadName;
+
private TcpConnectionInitializer connectionInitializer;
+ // FIXME: clean this up when no longer needed
+ private final ServiceRegistration diagReg;
- public SwitchConnectionProviderImpl(
- @Nullable ConnectionConfiguration connConfig, DiagStatusService diagStatusService) {
+ public SwitchConnectionProviderImpl(final DiagStatusService diagStatus,
+ final @Nullable ConnectionConfiguration connConfig) {
this.connConfig = connConfig;
String connectionSuffix = createConnectionSuffix(connConfig);
+ diagStatusIdentifier = OPENFLOW_JAVA_SERVICE_NAME_PREFIX + connectionSuffix;
+ diagReg = diagStatus.register(diagStatusIdentifier);
- this.diagStatusService = diagStatusService;
- this.diagStatusIdentifier = OPENFLOW_JAVA_SERVICE_NAME_PREFIX + connectionSuffix;
- diagStatusService.register(diagStatusIdentifier);
-
- this.threadName = THREAD_NAME_PREFIX + connectionSuffix;
- this.listeningExecutorService = Executors.newListeningSingleThreadExecutor(threadName, LOG);
-
+ threadName = THREAD_NAME_PREFIX + connectionSuffix;
+ listeningExecutorService = Executors.newListeningSingleThreadExecutor(threadName, LOG);
serializerRegistry = new SerializerRegistryImpl();
if (connConfig != null) {
serializerRegistry.setGroupAddModConfig(connConfig.isGroupAddModEnabled());
}
// ID based, on configuration, used for diagstatus serviceIdentifier (ServiceDescriptor moduleServiceName)
- private static String createConnectionSuffix(@Nullable ConnectionConfiguration config) {
- if (config != null && config.getAddress() != null) {
- return "-" + config.getAddress().toString() + "_" + config.getPort();
- } else if (config != null) {
- return "_" + config.getPort();
- } else {
- return "-null-config";
- }
+ private static String createConnectionSuffix(final @Nullable ConnectionConfiguration config) {
+ return config == null ? "-null-config" : "_" + config.getPort();
}
@Override
throw new IllegalStateException("SwitchConnectionHandler is not set");
}
Futures.addCallback(listeningExecutorService.submit(serverFacade), new FutureCallback<Object>() {
-
@Override
- public void onFailure(Throwable throwable) {
- diagStatusService.report(new ServiceDescriptor(diagStatusIdentifier, throwable));
+ public void onFailure(final Throwable throwable) {
+ diagReg.report(new ServiceDescriptor(diagStatusIdentifier, throwable));
}
@Override
- public void onSuccess(@Nullable Object nullResult) {
- diagStatusService.report(new ServiceDescriptor(
- diagStatusIdentifier, ServiceState.ERROR, threadName + " terminated"));
+ public void onSuccess(final Object result) {
+ diagReg.report(new ServiceDescriptor(diagStatusIdentifier, ServiceState.ERROR,
+ threadName + " terminated"));
}
- } , MoreExecutors.directExecutor());
+ }, MoreExecutors.directExecutor());
return serverFacade.getIsOnlineFuture();
} catch (RuntimeException e) {
return Futures.immediateFailedFuture(e);
private ServerFacade createAndConfigureServer() {
LOG.debug("Configuring ..");
ServerFacade server;
+
+ checkState(connConfig != null, "Connection not configured");
+
final ChannelInitializerFactory factory = new ChannelInitializerFactory();
factory.setSwitchConnectionHandler(switchConnectionHandler);
factory.setSwitchIdleTimeout(connConfig.getSwitchIdleTimeout());
factory.setUseBarrier(connConfig.useBarrier());
factory.setChannelOutboundQueueSize(connConfig.getChannelOutboundQueueSize());
final TransportProtocol transportProtocol = (TransportProtocol) connConfig.getTransferProtocol();
-
// Check if Epoll native transport is available.
// TODO : Add option to disable Epoll.
boolean isEpollEnabled = Epoll.isAvailable();
if (TransportProtocol.TCP.equals(transportProtocol) || TransportProtocol.TLS.equals(transportProtocol)) {
- server = new TcpHandler(connConfig.getAddress(), connConfig.getPort(), () -> diagStatusService
- .report(new ServiceDescriptor(diagStatusIdentifier, ServiceState.OPERATIONAL)));
+ server = new TcpHandler(connConfig.getAddress(), connConfig.getPort(),
+ () -> diagReg.report(new ServiceDescriptor(diagStatusIdentifier, ServiceState.OPERATIONAL)));
final TcpChannelInitializer channelInitializer = factory.createPublishingChannelInitializer();
((TcpHandler) server).setChannelInitializer(channelInitializer);
((TcpHandler) server).initiateEventLoopGroups(connConfig.getThreadConfiguration(), isEpollEnabled);
-
final EventLoopGroup workerGroupFromTcpHandler = ((TcpHandler) server).getWorkerGroup();
connectionInitializer = new TcpConnectionInitializer(workerGroupFromTcpHandler, isEpollEnabled);
connectionInitializer.setChannelInitializer(channelInitializer);
connectionInitializer.run();
} else if (TransportProtocol.UDP.equals(transportProtocol)) {
- server = new UdpHandler(connConfig.getAddress(), connConfig.getPort(), () -> diagStatusService
- .report(new ServiceDescriptor(diagStatusIdentifier, ServiceState.OPERATIONAL)));
+ server = new UdpHandler(connConfig.getAddress(), connConfig.getPort(),
+ () -> diagReg.report(new ServiceDescriptor(diagStatusIdentifier, ServiceState.OPERATIONAL)));
((UdpHandler) server).initiateEventLoopGroups(connConfig.getThreadConfiguration(), isEpollEnabled);
((UdpHandler) server).setChannelInitializer(factory.createUdpChannelInitializer());
} else {
}
@Override
- public void registerExperimenterMessageDeserializer(ExperimenterIdDeserializerKey key,
- OFDeserializer<? extends ExperimenterDataOfChoice> deserializer) {
+ public void registerExperimenterMessageDeserializer(final ExperimenterIdDeserializerKey key,
+ final OFDeserializer<? extends ExperimenterDataOfChoice> deserializer) {
deserializerRegistry.registerDeserializer(key, deserializer);
}
@Override
- public void registerMultipartReplyMessageDeserializer(ExperimenterIdDeserializerKey key,
- OFDeserializer<? extends ExperimenterDataOfChoice> deserializer) {
+ public void registerMultipartReplyMessageDeserializer(final ExperimenterIdDeserializerKey key,
+ final OFDeserializer<? extends ExperimenterDataOfChoice> deserializer) {
deserializerRegistry.registerDeserializer(key, deserializer);
}
@Override
public void registerExperimenterMessageSerializer(
- ExperimenterIdSerializerKey<? extends ExperimenterDataOfChoice> key,
- OFSerializer<? extends ExperimenterDataOfChoice> serializer) {
+ final ExperimenterIdSerializerKey<? extends ExperimenterDataOfChoice> key,
+ final OFSerializer<? extends ExperimenterDataOfChoice> serializer) {
serializerRegistry.registerSerializer(key, serializer);
}
@Override
- public void registerMultipartRequestSerializer(ExperimenterIdSerializerKey<? extends ExperimenterDataOfChoice> key,
- OFSerializer<? extends ExperimenterDataOfChoice> serializer) {
+ public void registerMultipartRequestSerializer(
+ final ExperimenterIdSerializerKey<? extends ExperimenterDataOfChoice> key,
+ final OFSerializer<? extends ExperimenterDataOfChoice> serializer) {
serializerRegistry.registerSerializer(key, serializer);
}
@Override
public ConnectionConfiguration getConfiguration() {
- return this.connConfig;
+ return connConfig;
}
@Override
- public <K> void registerSerializer(MessageTypeKey<K> key, OFGeneralSerializer serializer) {
+ public <K> void registerSerializer(final MessageTypeKey<K> key, final OFGeneralSerializer serializer) {
serializerRegistry.registerSerializer(key, serializer);
}
@Override
- public void registerDeserializer(MessageCodeKey key, OFGeneralDeserializer deserializer) {
+ public void registerDeserializer(final MessageCodeKey key, final OFGeneralDeserializer deserializer) {
deserializerRegistry.registerDeserializer(key, deserializer);
}