Merge "Fix "Bug 5589 - Deprecate PortNumberCache" - New class that can convert Device...
[openflowplugin.git] / openflowplugin-impl / src / main / java / org / opendaylight / openflowplugin / impl / device / DeviceManagerImpl.java
index a38920d5343645795140e630dca7b9a04a3c96da..7fc078ccd4ec76b3feca7d9cd5cac18a0f970c85 100644 (file)
@@ -7,17 +7,24 @@
  */
 package org.opendaylight.openflowplugin.impl.device;
 
-import javax.annotation.CheckForNull;
-import javax.annotation.Nonnull;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Verify;
+import com.google.common.collect.Iterators;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import io.netty.util.HashedWheelTimer;
+import io.netty.util.Timeout;
+import io.netty.util.TimerTask;
 import java.util.Collections;
-import java.util.Set;
+import java.util.Iterator;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Sets;
-import io.netty.util.HashedWheelTimer;
+import javax.annotation.CheckForNull;
+import javax.annotation.Nonnull;
 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
 import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
 import org.opendaylight.controller.md.sal.binding.api.NotificationService;
@@ -32,6 +39,7 @@ import org.opendaylight.openflowplugin.api.openflow.device.DeviceManager;
 import org.opendaylight.openflowplugin.api.openflow.device.DeviceState;
 import org.opendaylight.openflowplugin.api.openflow.device.TranslatorLibrary;
 import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceInitializationPhaseHandler;
+import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceTerminationPhaseHandler;
 import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageIntelligenceAgency;
 import org.opendaylight.openflowplugin.extension.api.ExtensionConverterProviderKeeper;
 import org.opendaylight.openflowplugin.extension.api.core.extension.ExtensionConverterProvider;
@@ -41,7 +49,6 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodesBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.OfpRole;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -49,12 +56,14 @@ import org.slf4j.LoggerFactory;
 /**
  *
  */
-public class DeviceManagerImpl implements DeviceManager, ExtensionConverterProviderKeeper, AutoCloseable {
+public class DeviceManagerImpl implements DeviceManager, ExtensionConverterProviderKeeper {
 
     private static final Logger LOG = LoggerFactory.getLogger(DeviceManagerImpl.class);
 
     private static final long TICK_DURATION = 10; // 0.5 sec.
     private final long globalNotificationQuota;
+    private final boolean switchFeaturesMandatory;
+
     private ScheduledThreadPoolExecutor spyPool;
     private final int spyRate = 10;
 
@@ -62,22 +71,22 @@ public class DeviceManagerImpl implements DeviceManager, ExtensionConverterProvi
     private final HashedWheelTimer hashedWheelTimer;
     private TranslatorLibrary translatorLibrary;
     private DeviceInitializationPhaseHandler deviceInitPhaseHandler;
+    private DeviceTerminationPhaseHandler deviceTerminPhaseHandler;
     private NotificationService notificationService;
     private NotificationPublishService notificationPublishService;
 
-    private final Set<DeviceContext> deviceContexts = Sets.newConcurrentHashSet();
+    private final ConcurrentMap<NodeId, DeviceContext> deviceContexts = new ConcurrentHashMap<>();
     private final MessageIntelligenceAgency messageIntelligenceAgency;
 
-    private final long barrierNanos = TimeUnit.MILLISECONDS.toNanos(500);
-    private final int maxQueueDepth = 25600;
-    private final boolean switchFeaturesMandatory;
-    private final DeviceTransactionChainManagerProvider deviceTransactionChainManagerProvider;
+    private final long barrierIntervalNanos;
+    private final int barrierCountLimit;
     private ExtensionConverterProvider extensionConverterProvider;
 
     public DeviceManagerImpl(@Nonnull final DataBroker dataBroker,
                              @Nonnull final MessageIntelligenceAgency messageIntelligenceAgency,
-                             final boolean switchFeaturesMandatory,
-                             final long globalNotificationQuota) {
+                             final long globalNotificationQuota, final boolean switchFeaturesMandatory,
+                             final long barrierInterval, final int barrierCountLimit) {
+        this.switchFeaturesMandatory = switchFeaturesMandatory;
         this.globalNotificationQuota = globalNotificationQuota;
         this.dataBroker = Preconditions.checkNotNull(dataBroker);
         hashedWheelTimer = new HashedWheelTimer(TICK_DURATION, TimeUnit.MILLISECONDS, 500);
@@ -95,77 +104,37 @@ public class DeviceManagerImpl implements DeviceManager, ExtensionConverterProvi
         }
 
         this.messageIntelligenceAgency = messageIntelligenceAgency;
-        this.switchFeaturesMandatory = switchFeaturesMandatory;
-        deviceTransactionChainManagerProvider = new DeviceTransactionChainManagerProvider(dataBroker);
+        this.barrierIntervalNanos = TimeUnit.MILLISECONDS.toNanos(barrierInterval);
+        this.barrierCountLimit = barrierCountLimit;
     }
 
 
     @Override
     public void setDeviceInitializationPhaseHandler(final DeviceInitializationPhaseHandler handler) {
-        deviceInitPhaseHandler = handler;
+        this.deviceInitPhaseHandler = handler;
     }
 
     @Override
-    public void onDeviceContextLevelUp(final DeviceContext deviceContext) {
+    public void onDeviceContextLevelUp(final DeviceContext deviceContext) throws Exception {
         // final phase - we have to add new Device to MD-SAL DataStore
+        LOG.debug("Final phase of DeviceContextLevelUp for Node: {} ", deviceContext.getDeviceState().getNodeId());
         Preconditions.checkNotNull(deviceContext);
-        try {
-
-            if (deviceContext.getDeviceState().getRole() != OfpRole.BECOMESLAVE) {
-                ((DeviceContextImpl) deviceContext).initialSubmitTransaction();
-                deviceContext.onPublished();
-
-            } else {
-                //if role = slave
-                try {
-                    ((DeviceContextImpl) deviceContext).cancelTransaction();
-                } catch (Exception e) {
-                    //TODO: how can we avoid it. pingpong does not have cancel
-                    LOG.debug("Expected Exception: Cancel Txn exception thrown for slaves", e);
-                }
-
-            }
-
-        } catch (final Exception e) {
-            LOG.warn("Node {} can not be add to OPERATIONAL DataStore yet because {} ", deviceContext.getDeviceState().getNodeId(), e.getMessage());
-            LOG.trace("Problem with add node {} to OPERATIONAL DataStore", deviceContext.getDeviceState().getNodeId(), e);
-            try {
-                deviceContext.close();
-            } catch (final Exception e1) {
-                LOG.warn("Device context close FAIL - " + deviceContext.getDeviceState().getNodeId());
-            }
-        }
+        ((DeviceContextImpl) deviceContext).initialSubmitTransaction();
+        deviceContext.onPublished();
     }
 
     @Override
-    public void deviceConnected(@CheckForNull final ConnectionContext connectionContext) {
+    public void deviceConnected(@CheckForNull final ConnectionContext connectionContext) throws Exception {
         Preconditions.checkArgument(connectionContext != null);
-
-        ReadyForNewTransactionChainHandler readyForNewTransactionChainHandler = new ReadyForNewTransactionChainHandlerImpl(this, connectionContext);
-        DeviceTransactionChainManagerProvider.TransactionChainManagerRegistration transactionChainManagerRegistration = deviceTransactionChainManagerProvider.provideTransactionChainManager(connectionContext);
-        TransactionChainManager transactionChainManager = transactionChainManagerRegistration.getTransactionChainManager();
-
-        if (transactionChainManagerRegistration.ownedByInvokingConnectionContext()) {
-            //this actually is new registration for currently processed connection context
-            initializeDeviceContext(connectionContext, transactionChainManager);
-        }
-        else if (TransactionChainManager.TransactionChainManagerStatus.WORKING.equals(transactionChainManager.getTransactionChainManagerStatus())) {
-            //this means there already exists connection described by same NodeId and it is not current connection contexts' registration
-            LOG.info("In deviceConnected, ownedByInvokingConnectionContext is false and  TransactionChainManagerStatus.WORKING. Closing connection to device to start again.");
-            connectionContext.closeConnection(false);
-        }
-        else if (!transactionChainManager.attemptToRegisterHandler(readyForNewTransactionChainHandler)) {
-            //previous connection is shutting down, we will try to register handler listening on new transaction chain ready
-            // new connection wil be closed if handler registration fails
-            LOG.info("In deviceConnected, ownedByInvokingConnectionContext is false, TransactionChainManagerStatus is not shutting down or readyForNewTransactionChainHandler is null. " +
-                    "Closing connection to device to start again.");
-            connectionContext.closeConnection(false);
-        }
-    }
-
-    private void initializeDeviceContext(final ConnectionContext connectionContext,
-            final TransactionChainManager transactionChainManager) {
-        LOG.info("Initializing New Connection DeviceContext for node:{}",  connectionContext.getNodeId());
+        Preconditions.checkState(!deviceContexts.containsKey(connectionContext.getNodeId()),
+                "Rejecting connection from node which is already connected and there exist deviceContext for it: {}",
+                connectionContext.getNodeId()
+        );
+        LOG.info("ConnectionEvent: Device connected to controller, Device:{}, NodeId:{}",
+                connectionContext.getConnectionAdapter().getRemoteAddress(), connectionContext.getNodeId());
+
+        // Add Disconnect handler
+        connectionContext.setDeviceDisconnectedHandler(DeviceManagerImpl.this);
         // Cache this for clarity
         final ConnectionAdapter connectionAdapter = connectionContext.getConnectionAdapter();
 
@@ -177,28 +146,32 @@ public class DeviceManagerImpl implements DeviceManager, ExtensionConverterProvi
 
         connectionContext.setOutboundQueueProvider(outboundQueueProvider);
         final OutboundQueueHandlerRegistration<OutboundQueueProvider> outboundQueueHandlerRegistration =
-                connectionAdapter.registerOutboundQueueHandler(outboundQueueProvider, maxQueueDepth, barrierNanos);
+                connectionAdapter.registerOutboundQueueHandler(outboundQueueProvider, barrierCountLimit, barrierIntervalNanos);
         connectionContext.setOutboundQueueHandleRegistration(outboundQueueHandlerRegistration);
 
-        final NodeId nodeId = connectionContext.getNodeId();
-        final DeviceState deviceState = new DeviceStateImpl(connectionContext.getFeatures(), nodeId);
-
+        final DeviceState deviceState = createDeviceState(connectionContext);
         final DeviceContext deviceContext = new DeviceContextImpl(connectionContext, deviceState, dataBroker,
-                hashedWheelTimer, messageIntelligenceAgency, outboundQueueProvider, translatorLibrary, transactionChainManager);
+                hashedWheelTimer, messageIntelligenceAgency, outboundQueueProvider, translatorLibrary, switchFeaturesMandatory);
+
+        Verify.verify(deviceContexts.putIfAbsent(connectionContext.getNodeId(), deviceContext) == null, "DeviceCtx still not closed.");
         deviceContext.addDeviceContextClosedHandler(this);
+
         ((ExtensionConverterProviderKeeper) deviceContext).setExtensionConverterProvider(extensionConverterProvider);
         deviceContext.setNotificationService(notificationService);
         deviceContext.setNotificationPublishService(notificationPublishService);
 
-        deviceContexts.add(deviceContext);
-
         updatePacketInRateLimiters();
 
         final OpenflowProtocolListenerFullImpl messageListener = new OpenflowProtocolListenerFullImpl(
                 connectionAdapter, deviceContext);
         connectionAdapter.setMessageListener(messageListener);
+        deviceState.setValid(true);
+
+        deviceInitPhaseHandler.onDeviceContextLevelUp(deviceContext);
+    }
 
-        deviceCtxLevelUp(deviceContext);
+    private static DeviceStateImpl createDeviceState(final @Nonnull ConnectionContext connectionContext) {
+        return new DeviceStateImpl(connectionContext.getFeatures(), connectionContext.getNodeId());
     }
 
     private void updatePacketInRateLimiters() {
@@ -210,19 +183,13 @@ public class DeviceManagerImpl implements DeviceManager, ExtensionConverterProvi
                     freshNotificationLimit = 100;
                 }
                 LOG.debug("fresh notification limit = {}", freshNotificationLimit);
-                for (DeviceContext deviceContext : deviceContexts) {
+                for (final DeviceContext deviceContext : deviceContexts.values()) {
                     deviceContext.updatePacketInRateLimit(freshNotificationLimit);
                 }
             }
         }
     }
 
-    void deviceCtxLevelUp(final DeviceContext deviceContext) {
-        deviceContext.getDeviceState().setValid(true);
-        LOG.trace("Device context level up called.");
-        deviceInitPhaseHandler.onDeviceContextLevelUp(deviceContext);
-    }
-
     @Override
     public TranslatorLibrary oook() {
         return translatorLibrary;
@@ -244,15 +211,24 @@ public class DeviceManagerImpl implements DeviceManager, ExtensionConverterProvi
     }
 
     @Override
-    public void close() throws Exception {
-        for (final DeviceContext deviceContext : deviceContexts) {
-            deviceContext.close();
+    public void close() {
+        for (final Iterator<DeviceContext> iterator = Iterators.consumingIterator(deviceContexts.values().iterator());
+                iterator.hasNext();) {
+            final DeviceContext deviceCtx = iterator.next();
+            deviceCtx.shutdownConnection();
+            deviceCtx.shuttingDownDataStoreTransactions();
+        }
+
+        if (spyPool != null) {
+            spyPool.shutdownNow();
+            spyPool = null;
         }
     }
 
     @Override
-    public void onDeviceContextClosed(final DeviceContext deviceContext) {
-        deviceContexts.remove(deviceContext);
+    public void onDeviceContextLevelDown(final DeviceContext deviceContext) {
+        LOG.debug("onDeviceContextClosed for Node {}", deviceContext.getDeviceState().getNodeId());
+        deviceContexts.remove(deviceContext.getPrimaryConnectionContext().getNodeId(), deviceContext);
         updatePacketInRateLimiters();
     }
 
@@ -263,7 +239,7 @@ public class DeviceManagerImpl implements DeviceManager, ExtensionConverterProvi
     }
 
     @Override
-    public void setExtensionConverterProvider(ExtensionConverterProvider extensionConverterProvider) {
+    public void setExtensionConverterProvider(final ExtensionConverterProvider extensionConverterProvider) {
         this.extensionConverterProvider = extensionConverterProvider;
     }
 
@@ -271,4 +247,58 @@ public class DeviceManagerImpl implements DeviceManager, ExtensionConverterProvi
     public ExtensionConverterProvider getExtensionConverterProvider() {
         return extensionConverterProvider;
     }
+
+    @Override
+    public void setDeviceTerminationPhaseHandler(final DeviceTerminationPhaseHandler handler) {
+        this.deviceTerminPhaseHandler = handler;
+    }
+
+    @Override
+    public void onDeviceDisconnected(final ConnectionContext connectionContext) {
+        LOG.trace("onDeviceDisconnected method call for Node: {}", connectionContext.getNodeId());
+        Preconditions.checkArgument(connectionContext != null);
+        final NodeId nodeId = connectionContext.getNodeId();
+        final DeviceContext deviceCtx = this.deviceContexts.get(nodeId);
+
+        if (null == deviceCtx) {
+            LOG.info("DeviceContext for Node {} was not found. Connection is terminated without OFP context suite.",
+                    connectionContext.getNodeId());
+            return;
+        }
+
+        if (!connectionContext.equals(deviceCtx.getPrimaryConnectionContext())) {
+            /* Connection is not PrimaryConnection so try to remove from Auxiliary Connections */
+            deviceCtx.removeAuxiliaryConnectionContext(connectionContext);
+        } else {
+            /* Device is disconnected and so we need to close TxManager */
+            final ListenableFuture<Void> future = deviceCtx.shuttingDownDataStoreTransactions();
+            Futures.addCallback(future, new FutureCallback<Void>() {
+
+                @Override
+                public void onSuccess(final Void result) {
+                    LOG.debug("TxChainManager for device {} is closed successful.", deviceCtx.getDeviceState().getNodeId());
+                    deviceTerminPhaseHandler.onDeviceContextLevelDown(deviceCtx);
+                }
+
+                @Override
+                public void onFailure(final Throwable t) {
+                    LOG.warn("TxChainManager for device {} failed by closing.", deviceCtx.getDeviceState().getNodeId(), t);
+                    deviceTerminPhaseHandler.onDeviceContextLevelDown(deviceCtx);
+                }
+            });
+            /* Add timer for Close TxManager because it could fain ind cluster without notification */
+            final TimerTask timerTask = new TimerTask() {
+
+                @Override
+                public void run(final Timeout timeout) throws Exception {
+                    if (!future.isDone()) {
+                        LOG.info("Shutting down TxChain for node {} not completed during 10 sec. Continue anyway.",
+                                deviceCtx.getDeviceState().getNodeId());
+                        future.cancel(false);
+                    }
+                }
+            };
+            deviceCtx.getTimer().newTimeout(timerTask, 10, TimeUnit.SECONDS);
+        }
+    }
 }