Bump upstreams
[openflowplugin.git] / openflowjava / openflow-protocol-impl / src / main / java / org / opendaylight / openflowjava / protocol / impl / core / SwitchConnectionProviderImpl.java
index 7474a311222e3b532f42dad2df36ec30abe738ec..923a9e8842ad5d9285ae70b5b55cf791de380195 100755 (executable)
@@ -5,14 +5,23 @@
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
  * and is available at http://www.eclipse.org/legal/epl-v10.html
  */
-
-
 package org.opendaylight.openflowjava.protocol.impl.core;
 
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.SettableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
 import io.netty.channel.EventLoopGroup;
 import io.netty.channel.epoll.Epoll;
+import org.eclipse.jdt.annotation.Nullable;
+import org.opendaylight.infrautils.diagstatus.DiagStatusService;
+import org.opendaylight.infrautils.diagstatus.ServiceDescriptor;
+import org.opendaylight.infrautils.diagstatus.ServiceRegistration;
+import org.opendaylight.infrautils.diagstatus.ServiceState;
+import org.opendaylight.infrautils.utils.concurrent.Executors;
 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionConfiguration;
 import org.opendaylight.openflowjava.protocol.api.connection.SwitchConnectionHandler;
 import org.opendaylight.openflowjava.protocol.api.extensibility.DeserializerRegistry;
@@ -34,11 +43,11 @@ import org.opendaylight.openflowjava.protocol.api.keys.MatchEntryDeserializerKey
 import org.opendaylight.openflowjava.protocol.api.keys.MatchEntrySerializerKey;
 import org.opendaylight.openflowjava.protocol.api.keys.MessageCodeKey;
 import org.opendaylight.openflowjava.protocol.api.keys.MessageTypeKey;
+import org.opendaylight.openflowjava.protocol.api.keys.TypeToClassKey;
 import org.opendaylight.openflowjava.protocol.impl.deserialization.DeserializationFactory;
 import org.opendaylight.openflowjava.protocol.impl.deserialization.DeserializerRegistryImpl;
 import org.opendaylight.openflowjava.protocol.impl.serialization.SerializationFactory;
 import org.opendaylight.openflowjava.protocol.impl.serialization.SerializerRegistryImpl;
-import org.opendaylight.openflowjava.protocol.api.keys.TypeToClassKey;
 import org.opendaylight.openflowjava.protocol.spi.connection.SwitchConnectionProvider;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.config.rev140630.TransportProtocol;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.oxm.rev150225.MatchField;
@@ -52,16 +61,17 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * Exposed class for server handling<br>
+ * Exposed class for server handling<br>
  * C - {@link MatchEntrySerializerKey} parameter representing oxm_class (see specification)<br>
  * F - {@link MatchEntrySerializerKey} parameter representing oxm_field (see specification)
  * @author mirehak
  * @author michal.polkorab
  */
 public class SwitchConnectionProviderImpl implements SwitchConnectionProvider, ConnectionInitializer {
+    private static final Logger LOG = LoggerFactory.getLogger(SwitchConnectionProviderImpl.class);
+    private static final String THREAD_NAME_PREFIX = "OFP-SwitchConnectionProvider-Udp/TcpHandler";
+    private static final String OPENFLOW_JAVA_SERVICE_NAME_PREFIX = "OPENFLOW_SERVER";
 
-    private static final Logger LOG = LoggerFactory
-            .getLogger(SwitchConnectionProviderImpl.class);
     private SwitchConnectionHandler switchConnectionHandler;
     private ServerFacade serverFacade;
     private final ConnectionConfiguration connConfig;
@@ -69,21 +79,37 @@ public class SwitchConnectionProviderImpl implements SwitchConnectionProvider, C
     private final SerializerRegistry serializerRegistry;
     private final DeserializerRegistry deserializerRegistry;
     private final DeserializationFactory deserializationFactory;
+    private final ListeningExecutorService listeningExecutorService;
+    private final String diagStatusIdentifier;
+    private final String threadName;
+
     private TcpConnectionInitializer connectionInitializer;
+    // FIXME: clean this up when no longer needed
+    private final ServiceRegistration diagReg;
 
-    public SwitchConnectionProviderImpl(ConnectionConfiguration connConfig) {
+    public SwitchConnectionProviderImpl(final DiagStatusService diagStatus,
+            final @Nullable ConnectionConfiguration connConfig) {
         this.connConfig = connConfig;
+        String connectionSuffix = createConnectionSuffix(connConfig);
+        diagStatusIdentifier = OPENFLOW_JAVA_SERVICE_NAME_PREFIX + connectionSuffix;
+        diagReg = diagStatus.register(diagStatusIdentifier);
+
+        threadName = THREAD_NAME_PREFIX + connectionSuffix;
+        listeningExecutorService = Executors.newListeningSingleThreadExecutor(threadName, LOG);
         serializerRegistry = new SerializerRegistryImpl();
         if (connConfig != null) {
             serializerRegistry.setGroupAddModConfig(connConfig.isGroupAddModEnabled());
         }
         serializerRegistry.init();
-        serializationFactory = new SerializationFactory();
-        serializationFactory.setSerializerTable(serializerRegistry);
+        serializationFactory = new SerializationFactory(serializerRegistry);
         deserializerRegistry = new DeserializerRegistryImpl();
         deserializerRegistry.init();
-        deserializationFactory = new DeserializationFactory();
-        deserializationFactory.setRegistry(deserializerRegistry);
+        deserializationFactory = new DeserializationFactory(deserializerRegistry);
+    }
+
+    // ID based, on configuration, used for diagstatus serviceIdentifier (ServiceDescriptor moduleServiceName)
+    private static String createConnectionSuffix(final @Nullable ConnectionConfiguration config) {
+        return config == null ? "-null-config" : "_" + config.getPort();
     }
 
     @Override
@@ -95,38 +121,48 @@ public class SwitchConnectionProviderImpl implements SwitchConnectionProvider, C
     @Override
     public ListenableFuture<Boolean> shutdown() {
         LOG.debug("Shutdown summoned");
-        if(serverFacade == null){
+        if (serverFacade == null) {
             LOG.warn("Can not shutdown - not configured or started");
             throw new IllegalStateException("SwitchConnectionProvider is not started or not configured.");
         }
-        return serverFacade.shutdown();
+        ListenableFuture<Boolean> serverFacadeShutdownFuture = serverFacade.shutdown();
+        Executors.shutdownAndAwaitTermination(listeningExecutorService);
+        return serverFacadeShutdownFuture;
     }
 
     @Override
+    @SuppressWarnings("checkstyle:IllegalCatch")
     public ListenableFuture<Boolean> startup() {
         LOG.debug("Startup summoned");
-        ListenableFuture<Boolean> result = null;
         try {
             serverFacade = createAndConfigureServer();
             if (switchConnectionHandler == null) {
                 throw new IllegalStateException("SwitchConnectionHandler is not set");
             }
-            new Thread(serverFacade).start();
-            result = serverFacade.getIsOnlineFuture();
-        } catch (final Exception e) {
-            final SettableFuture<Boolean> exResult = SettableFuture.create();
-            exResult.setException(e);
-            result = exResult;
+            Futures.addCallback(listeningExecutorService.submit(serverFacade), new FutureCallback<Object>() {
+                @Override
+                public void onFailure(final Throwable throwable) {
+                    diagReg.report(new ServiceDescriptor(diagStatusIdentifier, throwable));
+                }
+
+                @Override
+                public void onSuccess(final Object result) {
+                    diagReg.report(new ServiceDescriptor(diagStatusIdentifier, ServiceState.ERROR,
+                        threadName + " terminated"));
+                }
+            }, MoreExecutors.directExecutor());
+            return serverFacade.getIsOnlineFuture();
+        } catch (RuntimeException e) {
+            return Futures.immediateFailedFuture(e);
         }
-        return result;
     }
 
-    /**
-     * @return
-     */
     private ServerFacade createAndConfigureServer() {
         LOG.debug("Configuring ..");
-        ServerFacade server = null;
+        ServerFacade server;
+
+        checkState(connConfig != null, "Connection not configured");
+
         final ChannelInitializerFactory factory = new ChannelInitializerFactory();
         factory.setSwitchConnectionHandler(switchConnectionHandler);
         factory.setSwitchIdleTimeout(connConfig.getSwitchIdleTimeout());
@@ -134,24 +170,25 @@ public class SwitchConnectionProviderImpl implements SwitchConnectionProvider, C
         factory.setSerializationFactory(serializationFactory);
         factory.setDeserializationFactory(deserializationFactory);
         factory.setUseBarrier(connConfig.useBarrier());
+        factory.setChannelOutboundQueueSize(connConfig.getChannelOutboundQueueSize());
         final TransportProtocol transportProtocol = (TransportProtocol) connConfig.getTransferProtocol();
-
         // Check if Epoll native transport is available.
         // TODO : Add option to disable Epoll.
         boolean isEpollEnabled = Epoll.isAvailable();
 
-        if ((TransportProtocol.TCP.equals(transportProtocol) || TransportProtocol.TLS.equals(transportProtocol))) {
-            server = new TcpHandler(connConfig.getAddress(), connConfig.getPort());
+        if (TransportProtocol.TCP.equals(transportProtocol) || TransportProtocol.TLS.equals(transportProtocol)) {
+            server = new TcpHandler(connConfig.getAddress(), connConfig.getPort(),
+                () -> diagReg.report(new ServiceDescriptor(diagStatusIdentifier, ServiceState.OPERATIONAL)));
             final TcpChannelInitializer channelInitializer = factory.createPublishingChannelInitializer();
             ((TcpHandler) server).setChannelInitializer(channelInitializer);
             ((TcpHandler) server).initiateEventLoopGroups(connConfig.getThreadConfiguration(), isEpollEnabled);
-
             final EventLoopGroup workerGroupFromTcpHandler = ((TcpHandler) server).getWorkerGroup();
             connectionInitializer = new TcpConnectionInitializer(workerGroupFromTcpHandler, isEpollEnabled);
             connectionInitializer.setChannelInitializer(channelInitializer);
             connectionInitializer.run();
-        } else if (TransportProtocol.UDP.equals(transportProtocol)){
-            server = new UdpHandler(connConfig.getAddress(), connConfig.getPort());
+        } else if (TransportProtocol.UDP.equals(transportProtocol)) {
+            server = new UdpHandler(connConfig.getAddress(), connConfig.getPort(),
+                () -> diagReg.report(new ServiceDescriptor(diagStatusIdentifier, ServiceState.OPERATIONAL)));
             ((UdpHandler) server).initiateEventLoopGroups(connConfig.getThreadConfiguration(), isEpollEnabled);
             ((UdpHandler) server).setChannelInitializer(factory.createUdpChannelInitializer());
         } else {
@@ -161,15 +198,12 @@ public class SwitchConnectionProviderImpl implements SwitchConnectionProvider, C
         return server;
     }
 
-    /**
-     * @return servers
-     */
     public ServerFacade getServerFacade() {
         return serverFacade;
     }
 
     @Override
-    public void close() throws Exception {
+    public void close() {
         shutdown();
     }
 
@@ -208,8 +242,8 @@ public class SwitchConnectionProviderImpl implements SwitchConnectionProvider, C
     }
 
     @Override
-    public <C extends OxmClassBase, F extends MatchField> void registerMatchEntrySerializer(final MatchEntrySerializerKey<C, F> key,
-            final OFGeneralSerializer serializer) {
+    public <C extends OxmClassBase, F extends MatchField> void registerMatchEntrySerializer(
+            final MatchEntrySerializerKey<C, F> key, final OFGeneralSerializer serializer) {
         serializerRegistry.registerSerializer(key, serializer);
     }
 
@@ -226,14 +260,14 @@ public class SwitchConnectionProviderImpl implements SwitchConnectionProvider, C
     }
 
     @Override
-    public void registerExperimenterMessageDeserializer(ExperimenterIdDeserializerKey key,
-                                                        OFDeserializer<? extends ExperimenterDataOfChoice> deserializer) {
+    public void registerExperimenterMessageDeserializer(final ExperimenterIdDeserializerKey key,
+            final OFDeserializer<? extends ExperimenterDataOfChoice> deserializer) {
         deserializerRegistry.registerDeserializer(key, deserializer);
     }
 
     @Override
-    public void registerMultipartReplyMessageDeserializer(ExperimenterIdDeserializerKey key,
-                                                          OFDeserializer<? extends ExperimenterDataOfChoice> deserializer) {
+    public void registerMultipartReplyMessageDeserializer(final ExperimenterIdDeserializerKey key,
+            final OFDeserializer<? extends ExperimenterDataOfChoice> deserializer) {
         deserializerRegistry.registerDeserializer(key, deserializer);
     }
 
@@ -256,14 +290,16 @@ public class SwitchConnectionProviderImpl implements SwitchConnectionProvider, C
     }
 
     @Override
-    public void registerExperimenterMessageSerializer(ExperimenterIdSerializerKey<? extends ExperimenterDataOfChoice> key,
-                                                      OFSerializer<? extends ExperimenterDataOfChoice> serializer) {
+    public void registerExperimenterMessageSerializer(
+            final ExperimenterIdSerializerKey<? extends ExperimenterDataOfChoice> key,
+            final OFSerializer<? extends ExperimenterDataOfChoice> serializer) {
         serializerRegistry.registerSerializer(key, serializer);
     }
 
     @Override
-    public void registerMultipartRequestSerializer(ExperimenterIdSerializerKey<? extends ExperimenterDataOfChoice> key,
-                                                   OFSerializer<? extends ExperimenterDataOfChoice> serializer) {
+    public void registerMultipartRequestSerializer(
+            final ExperimenterIdSerializerKey<? extends ExperimenterDataOfChoice> key,
+            final OFSerializer<? extends ExperimenterDataOfChoice> serializer) {
         serializerRegistry.registerSerializer(key, serializer);
     }
 
@@ -273,11 +309,13 @@ public class SwitchConnectionProviderImpl implements SwitchConnectionProvider, C
         serializerRegistry.registerSerializer(key, serializer);
     }
 
-    @Override
     /**
-     * @deprecated Since we have used ExperimenterIdMeterSubTypeSerializerKey as MeterBandSerializer's key, in order to avoid
-     * the occurrence of an error, we should discard this function
+     * Deprecated.
+     *
+     * @deprecated Since we have used ExperimenterIdMeterSubTypeSerializerKey as MeterBandSerializer's key, in order
+     *     to avoid the occurrence of an error, we should discard this function.
      */
+    @Override
     @Deprecated
     public void registerMeterBandSerializer(final ExperimenterIdSerializerKey<MeterBandExperimenterCase> key,
             final OFSerializer<MeterBandExperimenterCase> serializer) {
@@ -285,8 +323,9 @@ public class SwitchConnectionProviderImpl implements SwitchConnectionProvider, C
     }
 
     @Override
-    public void registerMeterBandSerializer(final ExperimenterIdMeterSubTypeSerializerKey<MeterBandExperimenterCase> key,
-                                            final OFSerializer<MeterBandExperimenterCase> serializer) {
+    public void registerMeterBandSerializer(
+            final ExperimenterIdMeterSubTypeSerializerKey<MeterBandExperimenterCase> key,
+            final OFSerializer<MeterBandExperimenterCase> serializer) {
         serializerRegistry.registerSerializer(key, serializer);
     }
 
@@ -297,17 +336,17 @@ public class SwitchConnectionProviderImpl implements SwitchConnectionProvider, C
 
     @Override
     public ConnectionConfiguration getConfiguration() {
-        return this.connConfig;
+        return connConfig;
     }
 
-     @Override
-    public <K> void registerSerializer(MessageTypeKey<K> key, OFGeneralSerializer serializer) {
+    @Override
+    public <K> void registerSerializer(final MessageTypeKey<K> key, final OFGeneralSerializer serializer) {
         serializerRegistry.registerSerializer(key, serializer);
     }
 
     @Override
-    public void registerDeserializer(MessageCodeKey key, OFGeneralDeserializer deserializer) {
-       deserializerRegistry.registerDeserializer(key, deserializer);
+    public void registerDeserializer(final MessageCodeKey key, final OFGeneralDeserializer deserializer) {
+        deserializerRegistry.registerDeserializer(key, deserializer);
     }
 
     @Override