BUG-2242: LLDP speaker as separate application.
[openflowplugin.git] / openflowplugin / src / main / java / org / opendaylight / openflowplugin / openflow / md / core / MDController.java
index 0dc084e258fd047c3cb4c9b29a8bb249ef6e773c..73bec508642cfc856910326d4d35d22340f1d97f 100644 (file)
@@ -13,18 +13,24 @@ import java.util.Collection;
 import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
-import org.opendaylight.openflowjava.protocol.api.connection.ConnectionConfiguration;
 import org.opendaylight.openflowjava.protocol.spi.connection.SwitchConnectionProvider;
-import org.opendaylight.openflowplugin.openflow.md.OFConstants;
+import org.opendaylight.openflowplugin.api.OFConstants;
+import org.opendaylight.openflowplugin.api.openflow.md.core.IMDMessageTranslator;
+import org.opendaylight.openflowplugin.api.openflow.md.core.NotificationQueueWrapper;
+import org.opendaylight.openflowplugin.api.openflow.md.core.TranslatorKey;
+import org.opendaylight.openflowplugin.extension.api.core.extension.ExtensionConverterProvider;
 import org.opendaylight.openflowplugin.openflow.md.core.session.OFSessionUtil;
 import org.opendaylight.openflowplugin.openflow.md.core.translator.ErrorTranslator;
 import org.opendaylight.openflowplugin.openflow.md.core.translator.ErrorV10Translator;
@@ -35,12 +41,13 @@ import org.opendaylight.openflowplugin.openflow.md.core.translator.MultiPartMess
 import org.opendaylight.openflowplugin.openflow.md.core.translator.MultiPartReplyPortToNodeConnectorUpdatedTranslator;
 import org.opendaylight.openflowplugin.openflow.md.core.translator.MultipartReplyTableFeaturesToTableUpdatedTranslator;
 import org.opendaylight.openflowplugin.openflow.md.core.translator.MultipartReplyTranslator;
+import org.opendaylight.openflowplugin.openflow.md.core.translator.NotificationPlainTranslator;
 import org.opendaylight.openflowplugin.openflow.md.core.translator.PacketInTranslator;
 import org.opendaylight.openflowplugin.openflow.md.core.translator.PacketInV10Translator;
 import org.opendaylight.openflowplugin.openflow.md.core.translator.PortStatusMessageToNodeConnectorUpdatedTranslator;
-import org.opendaylight.openflowplugin.openflow.md.lldp.LLDPSpeakerPopListener;
-import org.opendaylight.openflowplugin.openflow.md.queue.MessageSpy;
-import org.opendaylight.openflowplugin.openflow.md.queue.PopListener;
+import org.opendaylight.openflowplugin.api.openflow.md.queue.PopListener;
+import org.opendaylight.openflowplugin.openflow.md.util.OpenflowPortsUtil;
+import org.opendaylight.openflowplugin.api.statistics.MessageSpy;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.NodeErrorNotification;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.SwitchFlowRemoved;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.AggregateFlowStatisticsUpdate;
@@ -50,6 +57,7 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.
 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.GroupFeaturesUpdated;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.GroupStatisticsUpdated;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorUpdated;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRemoved;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeUpdated;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.MeterConfigStatsUpdated;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.MeterFeaturesUpdated;
@@ -87,7 +95,7 @@ import org.opendaylight.yangtools.yang.binding.DataObject;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.ForwardingBlockingQueue;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ListeningExecutorService;
@@ -104,13 +112,15 @@ public class MDController implements IMDController, AutoCloseable {
 
     private ConcurrentMap<TranslatorKey, Collection<IMDMessageTranslator<OfHeader, List<DataObject>>>> messageTranslators;
     private Map<Class<? extends DataObject>, Collection<PopListener<DataObject>>> popListeners;
-    private MessageSpy<DataContainer> messageSpyCounter; 
+    private MessageSpy<DataContainer> messageSpyCounter;
 
     final private int OF10 = OFConstants.OFP_VERSION_1_0;
     final private int OF13 = OFConstants.OFP_VERSION_1_3;
 
     private ErrorHandlerSimpleImpl errorHandler;
 
+    private ExtensionConverterProvider extensionConverterProvider;
+
     /**
      * @return translator mapping
      */
@@ -123,6 +133,9 @@ public class MDController implements IMDController, AutoCloseable {
      */
     public void init() {
         LOG.debug("init");
+
+        OpenflowPortsUtil.init();
+
         messageTranslators = new ConcurrentHashMap<>();
         popListeners = new ConcurrentHashMap<>();
         //TODO: move registration to factory
@@ -142,12 +155,14 @@ public class MDController implements IMDController, AutoCloseable {
         addMessageTranslator(MultipartReplyMessage.class,OF13, new MultipartReplyTranslator());
         addMessageTranslator(MultipartReplyMessage.class,OF13,new MultipartReplyTableFeaturesToTableUpdatedTranslator());
         addMessageTranslator(GetFeaturesOutput.class,OF10, new FeaturesV10ToNodeConnectorUpdatedTranslator());
+        addMessageTranslator(NotificationQueueWrapper.class, OF10, new NotificationPlainTranslator());
+        addMessageTranslator(NotificationQueueWrapper.class, OF13, new NotificationPlainTranslator());
 
         NotificationPopListener<DataObject> notificationPopListener = new NotificationPopListener<DataObject>();
         notificationPopListener.setNotificationProviderService(
                 OFSessionUtil.getSessionManager().getNotificationProviderService());
         notificationPopListener.setMessageSpy(messageSpyCounter);
-        
+
         //TODO: move registration to factory
         addMessagePopListener(NodeErrorNotification.class, notificationPopListener);
         addMessagePopListener(BadActionErrorNotification.class, notificationPopListener);
@@ -169,14 +184,15 @@ public class MDController implements IMDController, AutoCloseable {
         addMessagePopListener(PacketReceived.class,notificationPopListener);
         addMessagePopListener(TransmitPacketInput.class, notificationPopListener);
         addMessagePopListener(NodeUpdated.class, notificationPopListener);
+        addMessagePopListener(NodeRemoved.class, notificationPopListener);
 
         addMessagePopListener(SwitchFlowRemoved.class, notificationPopListener);
         addMessagePopListener(TableUpdated.class, notificationPopListener);
-        
+
         //Notification registration for flow statistics
         addMessagePopListener(FlowsStatisticsUpdate.class, notificationPopListener);
         addMessagePopListener(AggregateFlowStatisticsUpdate.class, notificationPopListener);
-        
+
         //Notification registrations for group-statistics
         addMessagePopListener(GroupStatisticsUpdated.class, notificationPopListener);
         addMessagePopListener(GroupFeaturesUpdated.class, notificationPopListener);
@@ -189,38 +205,61 @@ public class MDController implements IMDController, AutoCloseable {
 
         //Notification registration for port-statistics
         addMessagePopListener(NodeConnectorStatisticsUpdate.class, notificationPopListener);
-        
+
         //Notification registration for flow-table statistics
         addMessagePopListener(FlowTableStatisticsUpdate.class, notificationPopListener);
-        
+
         //Notification registration for queue-statistics
         addMessagePopListener(QueueStatisticsUpdate.class, notificationPopListener);
 
-        //Notification for LLDPSpeaker
-        LLDPSpeakerPopListener<DataObject> lldpPopListener  = new LLDPSpeakerPopListener<DataObject>();
-        addMessagePopListener(NodeConnectorUpdated.class,lldpPopListener);
-        
         // Push the updated Listeners to Session Manager which will be then picked up by ConnectionConductor eventually
         OFSessionUtil.getSessionManager().setTranslatorMapping(messageTranslators);
         OFSessionUtil.getSessionManager().setPopListenerMapping(popListeners);
         OFSessionUtil.getSessionManager().setMessageSpy(messageSpyCounter);
-        
+
         // prepare worker pool for rpc
         // TODO: get size from configSubsystem
         int rpcThreadLimit = 10;
         ListeningExecutorService rpcPoolDelegator = createRpcPoolSpyDecorated(rpcThreadLimit, messageSpyCounter);
         OFSessionUtil.getSessionManager().setRpcPool(rpcPoolDelegator);
-        
+        OFSessionUtil.getSessionManager().setExtensionConverterProvider(extensionConverterProvider);
+
     }
 
     /**
      * @param rpcThreadLimit
-     * @param messageSpy 
+     * @param messageSpy
      * @return
      */
-    private static ListeningExecutorService createRpcPoolSpyDecorated(int rpcThreadLimit, MessageSpy<DataContainer> messageSpy) {
-        ThreadPoolLoggingExecutor rpcPool = new ThreadPoolLoggingExecutor(rpcThreadLimit, rpcThreadLimit, 0L, 
-                TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
+    private static ListeningExecutorService createRpcPoolSpyDecorated(final int rpcThreadLimit, final MessageSpy<DataContainer> messageSpy) {
+        final BlockingQueue<Runnable> delegate = new LinkedBlockingQueue<>(100000);
+        final BlockingQueue<Runnable> queue = new ForwardingBlockingQueue<Runnable>() {
+            @Override
+            protected BlockingQueue<Runnable> delegate() {
+                return delegate;
+            }
+
+            @Override
+            public boolean offer(final Runnable r) {
+                // ThreadPoolExecutor will spawn a new thread after core size is reached only
+                // if the queue.offer returns false.
+                return false;
+            }
+        };
+
+        ThreadPoolLoggingExecutor rpcPool = new ThreadPoolLoggingExecutor(rpcThreadLimit, rpcThreadLimit, 0L,
+                TimeUnit.MILLISECONDS, queue, "OFRpc");
+        rpcPool.setRejectedExecutionHandler(new RejectedExecutionHandler() {
+                       @Override
+                       public void rejectedExecution(final Runnable r, final ThreadPoolExecutor executor) {
+                               try {
+                                       executor.getQueue().put(r);
+                               } catch (InterruptedException e) {
+                                       throw new RejectedExecutionException("Interrupted while waiting on queue", e);
+                               }
+
+                       }
+               });
         ListeningExecutorService listeningRpcPool = MoreExecutors.listeningDecorator(rpcPool);
         RpcListeningExecutorService rpcPoolDecorated = new RpcListeningExecutorService(listeningRpcPool);
         rpcPoolDecorated.setMessageSpy(messageSpy);
@@ -231,7 +270,7 @@ public class MDController implements IMDController, AutoCloseable {
      * @param switchConnectionProviders
      *            the switchConnectionProviders to set
      */
-    public void setSwitchConnectionProviders(Collection<SwitchConnectionProvider> switchConnectionProviders) {
+    public void setSwitchConnectionProviders(final Collection<SwitchConnectionProvider> switchConnectionProviders) {
         this.switchConnectionProviders = switchConnectionProviders;
     }
 
@@ -248,30 +287,18 @@ public class MDController implements IMDController, AutoCloseable {
         switchConnectionHandler.setMessageSpy(messageSpyCounter);
 
         errorHandler = new ErrorHandlerSimpleImpl();
-        
+
         switchConnectionHandler.setErrorHandler(errorHandler);
         switchConnectionHandler.init();
-        
-        List<ListenableFuture<Boolean>> starterChain = new ArrayList<>();
+
+        List<ListenableFuture<Boolean>> starterChain = new ArrayList<>(switchConnectionProviders.size());
         for (SwitchConnectionProvider switchConnectionPrv : switchConnectionProviders) {
             switchConnectionPrv.setSwitchConnectionHandler(switchConnectionHandler);
             ListenableFuture<Boolean> isOnlineFuture = switchConnectionPrv.startup();
             starterChain.add(isOnlineFuture);
         }
-        
-        Future<List<Boolean>> srvStarted = Futures.allAsList(starterChain);
-    }
 
-    /**
-     * @return wished connections configurations
-     * @deprecated use configSubsystem
-     */
-    @Deprecated
-    private static Collection<ConnectionConfiguration> getConnectionConfiguration() {
-        // TODO:: get config from state manager
-        ConnectionConfiguration configuration = ConnectionConfigurationFactory.getDefault();
-        ConnectionConfiguration configurationLegacy = ConnectionConfigurationFactory.getLegacy();
-        return Lists.newArrayList(configuration, configurationLegacy);
+        Future<List<Boolean>> srvStarted = Futures.allAsList(starterChain);
     }
 
     /**
@@ -282,7 +309,7 @@ public class MDController implements IMDController, AutoCloseable {
      */
     public void stop() {
         LOG.debug("stopping");
-        List<ListenableFuture<Boolean>> stopChain = new ArrayList<>();
+        List<ListenableFuture<Boolean>> stopChain = new ArrayList<>(switchConnectionProviders.size());
         try {
             for (SwitchConnectionProvider switchConnectionPrv : switchConnectionProviders) {
                 ListenableFuture<Boolean> shutdown =  switchConnectionPrv.shutdown();
@@ -306,7 +333,7 @@ public class MDController implements IMDController, AutoCloseable {
     }
 
     @Override
-    public void addMessageTranslator(Class<? extends DataObject> messageType, int version, IMDMessageTranslator<OfHeader, List<DataObject>> translator) {
+    public void addMessageTranslator(final Class<? extends DataObject> messageType, final int version, final IMDMessageTranslator<OfHeader, List<DataObject>> translator) {
         TranslatorKey tKey = new TranslatorKey(version, messageType.getName());
 
         Collection<IMDMessageTranslator<OfHeader, List<DataObject>>> existingValues = messageTranslators.get(tKey);
@@ -319,7 +346,7 @@ public class MDController implements IMDController, AutoCloseable {
     }
 
     @Override
-    public void removeMessageTranslator(Class<? extends DataObject> messageType, int version, IMDMessageTranslator<OfHeader, List<DataObject>> translator) {
+    public void removeMessageTranslator(final Class<? extends DataObject> messageType, final int version, final IMDMessageTranslator<OfHeader, List<DataObject>> translator) {
         TranslatorKey tKey = new TranslatorKey(version, messageType.getName());
         Collection<IMDMessageTranslator<OfHeader, List<DataObject>>> values = messageTranslators.get(tKey);
         if (values != null) {
@@ -332,7 +359,7 @@ public class MDController implements IMDController, AutoCloseable {
     }
 
     @Override
-    public void addMessagePopListener(Class<? extends DataObject> messageType, PopListener<DataObject> popListener) {
+    public void addMessagePopListener(final Class<? extends DataObject> messageType, final PopListener<DataObject> popListener) {
         Collection<PopListener<DataObject>> existingValues = popListeners.get(messageType);
         if (existingValues == null) {
             existingValues = new LinkedHashSet<>();
@@ -343,7 +370,7 @@ public class MDController implements IMDController, AutoCloseable {
     }
 
     @Override
-    public void removeMessagePopListener(Class<? extends DataObject> messageType, PopListener<DataObject> popListener) {
+    public void removeMessagePopListener(final Class<? extends DataObject> messageType, final PopListener<DataObject> popListener) {
         Collection<PopListener<DataObject>> values = popListeners.get(messageType);
         if (values != null) {
             values.remove(popListener);
@@ -358,10 +385,10 @@ public class MDController implements IMDController, AutoCloseable {
      * @param messageSpyCounter the messageSpyCounter to set
      */
     public void setMessageSpyCounter(
-            MessageSpy<DataContainer> messageSpyCounter) {
+            final MessageSpy<DataContainer> messageSpyCounter) {
         this.messageSpyCounter = messageSpyCounter;
     }
-    
+
     @Override
     public void close() {
         LOG.debug("close");
@@ -372,7 +399,15 @@ public class MDController implements IMDController, AutoCloseable {
             switchConnectionPrv.setSwitchConnectionHandler(null);
         }
         switchConnectionProviders = null;
+        OpenflowPortsUtil.close();
         OFSessionUtil.releaseSessionManager();
         errorHandler = null;
     }
+
+    /**
+     * @param extensionConverterProvider
+     */
+    public void setExtensionConverterProvider(ExtensionConverterProvider extensionConverterProvider) {
+        this.extensionConverterProvider = extensionConverterProvider;
+    }
 }