Merge "Fix "Bug 5589 - Deprecate PortNumberCache" - New class that can convert Device...
[openflowplugin.git] / openflowplugin-impl / src / main / java / org / opendaylight / openflowplugin / impl / device / DeviceManagerImpl.java
index 49036c7ff304e1f22e830bc4393ba941868dd752..7fc078ccd4ec76b3feca7d9cd5cac18a0f970c85 100644 (file)
@@ -7,18 +7,24 @@
  */
 package org.opendaylight.openflowplugin.impl.device;
 
-import javax.annotation.CheckForNull;
-import javax.annotation.Nonnull;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Verify;
+import com.google.common.collect.Iterators;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import io.netty.util.HashedWheelTimer;
+import io.netty.util.Timeout;
+import io.netty.util.TimerTask;
 import java.util.Collections;
+import java.util.Iterator;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
-
-import com.google.common.base.Preconditions;
-import com.google.common.base.Verify;
-import io.netty.util.HashedWheelTimer;
+import javax.annotation.CheckForNull;
+import javax.annotation.Nonnull;
 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
 import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
 import org.opendaylight.controller.md.sal.binding.api.NotificationService;
@@ -33,6 +39,7 @@ import org.opendaylight.openflowplugin.api.openflow.device.DeviceManager;
 import org.opendaylight.openflowplugin.api.openflow.device.DeviceState;
 import org.opendaylight.openflowplugin.api.openflow.device.TranslatorLibrary;
 import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceInitializationPhaseHandler;
+import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceTerminationPhaseHandler;
 import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageIntelligenceAgency;
 import org.opendaylight.openflowplugin.extension.api.ExtensionConverterProviderKeeper;
 import org.opendaylight.openflowplugin.extension.api.core.extension.ExtensionConverterProvider;
@@ -49,12 +56,14 @@ import org.slf4j.LoggerFactory;
 /**
  *
  */
-public class DeviceManagerImpl implements DeviceManager, ExtensionConverterProviderKeeper, AutoCloseable {
+public class DeviceManagerImpl implements DeviceManager, ExtensionConverterProviderKeeper {
 
     private static final Logger LOG = LoggerFactory.getLogger(DeviceManagerImpl.class);
 
     private static final long TICK_DURATION = 10; // 0.5 sec.
     private final long globalNotificationQuota;
+    private final boolean switchFeaturesMandatory;
+
     private ScheduledThreadPoolExecutor spyPool;
     private final int spyRate = 10;
 
@@ -62,20 +71,22 @@ public class DeviceManagerImpl implements DeviceManager, ExtensionConverterProvi
     private final HashedWheelTimer hashedWheelTimer;
     private TranslatorLibrary translatorLibrary;
     private DeviceInitializationPhaseHandler deviceInitPhaseHandler;
+    private DeviceTerminationPhaseHandler deviceTerminPhaseHandler;
     private NotificationService notificationService;
     private NotificationPublishService notificationPublishService;
 
     private final ConcurrentMap<NodeId, DeviceContext> deviceContexts = new ConcurrentHashMap<>();
     private final MessageIntelligenceAgency messageIntelligenceAgency;
 
-    private final long barrierNanos = TimeUnit.MILLISECONDS.toNanos(500);
-    private final int maxQueueDepth = 25600;
-    private final DeviceTransactionChainManagerProvider deviceTransactionChainManagerProvider;
+    private final long barrierIntervalNanos;
+    private final int barrierCountLimit;
     private ExtensionConverterProvider extensionConverterProvider;
 
     public DeviceManagerImpl(@Nonnull final DataBroker dataBroker,
                              @Nonnull final MessageIntelligenceAgency messageIntelligenceAgency,
-                             final long globalNotificationQuota) {
+                             final long globalNotificationQuota, final boolean switchFeaturesMandatory,
+                             final long barrierInterval, final int barrierCountLimit) {
+        this.switchFeaturesMandatory = switchFeaturesMandatory;
         this.globalNotificationQuota = globalNotificationQuota;
         this.dataBroker = Preconditions.checkNotNull(dataBroker);
         hashedWheelTimer = new HashedWheelTimer(TICK_DURATION, TimeUnit.MILLISECONDS, 500);
@@ -93,33 +104,23 @@ public class DeviceManagerImpl implements DeviceManager, ExtensionConverterProvi
         }
 
         this.messageIntelligenceAgency = messageIntelligenceAgency;
-        deviceTransactionChainManagerProvider = new DeviceTransactionChainManagerProvider(dataBroker);
+        this.barrierIntervalNanos = TimeUnit.MILLISECONDS.toNanos(barrierInterval);
+        this.barrierCountLimit = barrierCountLimit;
     }
 
 
     @Override
     public void setDeviceInitializationPhaseHandler(final DeviceInitializationPhaseHandler handler) {
-        deviceInitPhaseHandler = handler;
+        this.deviceInitPhaseHandler = handler;
     }
 
     @Override
-    public void onDeviceContextLevelUp(final DeviceContext deviceContext) {
+    public void onDeviceContextLevelUp(final DeviceContext deviceContext) throws Exception {
         // final phase - we have to add new Device to MD-SAL DataStore
+        LOG.debug("Final phase of DeviceContextLevelUp for Node: {} ", deviceContext.getDeviceState().getNodeId());
         Preconditions.checkNotNull(deviceContext);
-        try {
-            ((DeviceContextImpl) deviceContext).initialSubmitTransaction();
-            deviceContext.onPublished();
-
-        } catch (final Exception e) {
-            LOG.warn("Node {} can not be add to OPERATIONAL DataStore yet because {} ", deviceContext.getDeviceState().getNodeId(), e.getMessage());
-            LOG.trace("Problem with add node {} to OPERATIONAL DataStore", deviceContext.getDeviceState().getNodeId(), e);
-            try {
-                deviceContext.close();
-            } catch (Exception e1) {
-                LOG.warn("Exception on device context close. ", e);
-            }
-        }
-
+        ((DeviceContextImpl) deviceContext).initialSubmitTransaction();
+        deviceContext.onPublished();
     }
 
     @Override
@@ -129,8 +130,11 @@ public class DeviceManagerImpl implements DeviceManager, ExtensionConverterProvi
                 "Rejecting connection from node which is already connected and there exist deviceContext for it: {}",
                 connectionContext.getNodeId()
         );
-        LOG.info("Initializing New Connection DeviceContext for node:{}", connectionContext.getNodeId());
+        LOG.info("ConnectionEvent: Device connected to controller, Device:{}, NodeId:{}",
+                connectionContext.getConnectionAdapter().getRemoteAddress(), connectionContext.getNodeId());
 
+        // Add Disconnect handler
+        connectionContext.setDeviceDisconnectedHandler(DeviceManagerImpl.this);
         // Cache this for clarity
         final ConnectionAdapter connectionAdapter = connectionContext.getConnectionAdapter();
 
@@ -142,29 +146,15 @@ public class DeviceManagerImpl implements DeviceManager, ExtensionConverterProvi
 
         connectionContext.setOutboundQueueProvider(outboundQueueProvider);
         final OutboundQueueHandlerRegistration<OutboundQueueProvider> outboundQueueHandlerRegistration =
-                connectionAdapter.registerOutboundQueueHandler(outboundQueueProvider, maxQueueDepth, barrierNanos);
+                connectionAdapter.registerOutboundQueueHandler(outboundQueueProvider, barrierCountLimit, barrierIntervalNanos);
         connectionContext.setOutboundQueueHandleRegistration(outboundQueueHandlerRegistration);
 
         final DeviceState deviceState = createDeviceState(connectionContext);
         final DeviceContext deviceContext = new DeviceContextImpl(connectionContext, deviceState, dataBroker,
-                hashedWheelTimer, messageIntelligenceAgency, outboundQueueProvider, translatorLibrary);
+                hashedWheelTimer, messageIntelligenceAgency, outboundQueueProvider, translatorLibrary, switchFeaturesMandatory);
 
+        Verify.verify(deviceContexts.putIfAbsent(connectionContext.getNodeId(), deviceContext) == null, "DeviceCtx still not closed.");
         deviceContext.addDeviceContextClosedHandler(this);
-        Verify.verify(deviceContexts.putIfAbsent(connectionContext.getNodeId(), deviceContext) == null);
-
-        // FIXME : txChainManager has to be propagate by Cluster MasterShip Election (remove this couple of lines)
-        // We would like to crete/register TxChainManager after
-        final DeviceTransactionChainManagerProvider.TransactionChainManagerRegistration txChainManagerReg = deviceTransactionChainManagerProvider
-                .provideTransactionChainManager(connectionContext);
-        if (txChainManagerReg.ownedByInvokingConnectionContext()) {
-            //this actually is new registration for currently processed connection context
-            ((DeviceContextImpl) deviceContext).setTransactionChainManager(txChainManagerReg.getTransactionChainManager());
-        } else {
-            LOG.info("In deviceConnected {}, ownedByInvokingConnectionContext is false", connectionContext.getNodeId());
-            deviceContext.close();
-            return;
-        }
-        // --- remove end ----
 
         ((ExtensionConverterProviderKeeper) deviceContext).setExtensionConverterProvider(extensionConverterProvider);
         deviceContext.setNotificationService(notificationService);
@@ -175,8 +165,9 @@ public class DeviceManagerImpl implements DeviceManager, ExtensionConverterProvi
         final OpenflowProtocolListenerFullImpl messageListener = new OpenflowProtocolListenerFullImpl(
                 connectionAdapter, deviceContext);
         connectionAdapter.setMessageListener(messageListener);
+        deviceState.setValid(true);
 
-        deviceCtxLevelUp(deviceContext);
+        deviceInitPhaseHandler.onDeviceContextLevelUp(deviceContext);
     }
 
     private static DeviceStateImpl createDeviceState(final @Nonnull ConnectionContext connectionContext) {
@@ -199,12 +190,6 @@ public class DeviceManagerImpl implements DeviceManager, ExtensionConverterProvi
         }
     }
 
-    void deviceCtxLevelUp(final DeviceContext deviceContext) {
-        deviceContext.getDeviceState().setValid(true);
-        LOG.trace("Device context level up called.");
-        deviceInitPhaseHandler.onDeviceContextLevelUp(deviceContext);
-    }
-
     @Override
     public TranslatorLibrary oook() {
         return translatorLibrary;
@@ -226,16 +211,24 @@ public class DeviceManagerImpl implements DeviceManager, ExtensionConverterProvi
     }
 
     @Override
-    public void close() throws Exception {
-        for (final DeviceContext deviceContext : deviceContexts.values()) {
-            deviceContext.close();
+    public void close() {
+        for (final Iterator<DeviceContext> iterator = Iterators.consumingIterator(deviceContexts.values().iterator());
+                iterator.hasNext();) {
+            final DeviceContext deviceCtx = iterator.next();
+            deviceCtx.shutdownConnection();
+            deviceCtx.shuttingDownDataStoreTransactions();
+        }
+
+        if (spyPool != null) {
+            spyPool.shutdownNow();
+            spyPool = null;
         }
     }
 
     @Override
-    public void onDeviceContextClosed(final DeviceContext deviceContext) {
-        LOG.trace("onDeviceContextClosed for Node {}", deviceContext.getDeviceState().getNodeId());
-        deviceContexts.remove(deviceContext.getPrimaryConnectionContext().getNodeId());
+    public void onDeviceContextLevelDown(final DeviceContext deviceContext) {
+        LOG.debug("onDeviceContextClosed for Node {}", deviceContext.getDeviceState().getNodeId());
+        deviceContexts.remove(deviceContext.getPrimaryConnectionContext().getNodeId(), deviceContext);
         updatePacketInRateLimiters();
     }
 
@@ -246,7 +239,7 @@ public class DeviceManagerImpl implements DeviceManager, ExtensionConverterProvi
     }
 
     @Override
-    public void setExtensionConverterProvider(ExtensionConverterProvider extensionConverterProvider) {
+    public void setExtensionConverterProvider(final ExtensionConverterProvider extensionConverterProvider) {
         this.extensionConverterProvider = extensionConverterProvider;
     }
 
@@ -254,4 +247,58 @@ public class DeviceManagerImpl implements DeviceManager, ExtensionConverterProvi
     public ExtensionConverterProvider getExtensionConverterProvider() {
         return extensionConverterProvider;
     }
+
+    @Override
+    public void setDeviceTerminationPhaseHandler(final DeviceTerminationPhaseHandler handler) {
+        this.deviceTerminPhaseHandler = handler;
+    }
+
+    @Override
+    public void onDeviceDisconnected(final ConnectionContext connectionContext) {
+        LOG.trace("onDeviceDisconnected method call for Node: {}", connectionContext.getNodeId());
+        Preconditions.checkArgument(connectionContext != null);
+        final NodeId nodeId = connectionContext.getNodeId();
+        final DeviceContext deviceCtx = this.deviceContexts.get(nodeId);
+
+        if (null == deviceCtx) {
+            LOG.info("DeviceContext for Node {} was not found. Connection is terminated without OFP context suite.",
+                    connectionContext.getNodeId());
+            return;
+        }
+
+        if (!connectionContext.equals(deviceCtx.getPrimaryConnectionContext())) {
+            /* Connection is not PrimaryConnection so try to remove from Auxiliary Connections */
+            deviceCtx.removeAuxiliaryConnectionContext(connectionContext);
+        } else {
+            /* Device is disconnected and so we need to close TxManager */
+            final ListenableFuture<Void> future = deviceCtx.shuttingDownDataStoreTransactions();
+            Futures.addCallback(future, new FutureCallback<Void>() {
+
+                @Override
+                public void onSuccess(final Void result) {
+                    LOG.debug("TxChainManager for device {} is closed successful.", deviceCtx.getDeviceState().getNodeId());
+                    deviceTerminPhaseHandler.onDeviceContextLevelDown(deviceCtx);
+                }
+
+                @Override
+                public void onFailure(final Throwable t) {
+                    LOG.warn("TxChainManager for device {} failed by closing.", deviceCtx.getDeviceState().getNodeId(), t);
+                    deviceTerminPhaseHandler.onDeviceContextLevelDown(deviceCtx);
+                }
+            });
+            /* Add timer for Close TxManager because it could fain ind cluster without notification */
+            final TimerTask timerTask = new TimerTask() {
+
+                @Override
+                public void run(final Timeout timeout) throws Exception {
+                    if (!future.isDone()) {
+                        LOG.info("Shutting down TxChain for node {} not completed during 10 sec. Continue anyway.",
+                                deviceCtx.getDeviceState().getNodeId());
+                        future.cancel(false);
+                    }
+                }
+            };
+            deviceCtx.getTimer().newTimeout(timerTask, 10, TimeUnit.SECONDS);
+        }
+    }
 }