Change dropping mastership.
[openflowplugin.git] / openflowplugin-impl / src / main / java / org / opendaylight / openflowplugin / impl / device / DeviceManagerImpl.java
index f3b14296e7495a33d5d977e6d42ffc2f4295deb2..c8207372d27a9acdb58274894448a81f92e7d352 100644 (file)
@@ -10,6 +10,7 @@ package org.opendaylight.openflowplugin.impl.device;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Iterators;
+import com.google.common.util.concurrent.CheckedFuture;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
@@ -30,12 +31,11 @@ import org.opendaylight.controller.md.sal.binding.api.DataBroker;
 import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
-import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
 import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueHandlerRegistration;
 import org.opendaylight.openflowplugin.api.openflow.OFPContext;
 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
-import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionStatus;
 import org.opendaylight.openflowplugin.api.openflow.connection.OutboundQueueProvider;
 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
 import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
@@ -48,8 +48,9 @@ import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.Messa
 import org.opendaylight.openflowplugin.extension.api.ExtensionConverterProviderKeeper;
 import org.opendaylight.openflowplugin.extension.api.core.extension.ExtensionConverterProvider;
 import org.opendaylight.openflowplugin.impl.connection.OutboundQueueProviderImpl;
+import org.opendaylight.openflowplugin.impl.device.initialization.DeviceInitializerProvider;
 import org.opendaylight.openflowplugin.impl.device.listener.OpenflowProtocolListenerFullImpl;
-import org.opendaylight.openflowplugin.impl.lifecycle.LifecycleServiceImpl;
+import org.opendaylight.openflowplugin.impl.services.sal.SalRoleServiceImpl;
 import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorExecutor;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodesBuilder;
@@ -67,28 +68,28 @@ public class DeviceManagerImpl implements DeviceManager, ExtensionConverterProvi
 
     private final long globalNotificationQuota;
     private final boolean switchFeaturesMandatory;
-    private boolean isNotificationFlowRemovedOff;
+    private boolean isFlowRemovedNotificationOn;
     private boolean skipTableFeatures;
     private static final int SPY_RATE = 10;
 
     private final DataBroker dataBroker;
+    private final DeviceInitializerProvider deviceInitializerProvider;
     private final ConvertorExecutor convertorExecutor;
     private TranslatorLibrary translatorLibrary;
-    private DeviceInitializationPhaseHandler deviceInitPhaseHandler;
     private DeviceTerminationPhaseHandler deviceTerminPhaseHandler;
 
     private final ConcurrentMap<DeviceInfo, DeviceContext> deviceContexts = new ConcurrentHashMap<>();
     private final ConcurrentMap<DeviceInfo, LifecycleService> lifecycleServices = new ConcurrentHashMap<>();
 
-    private final long barrierIntervalNanos;
-    private final int barrierCountLimit;
+    private long barrierIntervalNanos;
+    private int barrierCountLimit;
 
     private ExtensionConverterProvider extensionConverterProvider;
     private ScheduledThreadPoolExecutor spyPool;
-    private final ClusterSingletonServiceProvider singletonServiceProvider;
     private final NotificationPublishService notificationPublishService;
     private final MessageSpy messageSpy;
     private final HashedWheelTimer hashedWheelTimer;
+    private boolean useSingleLayerSerialization;
 
     public DeviceManagerImpl(@Nonnull final DataBroker dataBroker,
                              final long globalNotificationQuota,
@@ -96,14 +97,17 @@ public class DeviceManagerImpl implements DeviceManager, ExtensionConverterProvi
                              final long barrierInterval,
                              final int barrierCountLimit,
                              final MessageSpy messageSpy,
-                             final boolean isNotificationFlowRemovedOff,
+                             final boolean isFlowRemovedNotificationOn,
                              final ClusterSingletonServiceProvider singletonServiceProvider,
                              final NotificationPublishService notificationPublishService,
                              final HashedWheelTimer hashedWheelTimer,
                              final ConvertorExecutor convertorExecutor,
-                             final boolean skipTableFeatures) {
+                             final boolean skipTableFeatures,
+                             final boolean useSingleLayerSerialization,
+                             final DeviceInitializerProvider deviceInitializerProvider) {
 
         this.dataBroker = dataBroker;
+        this.deviceInitializerProvider = deviceInitializerProvider;
 
         /* merge empty nodes to oper DS to predict any problems with missing parent for Node */
         final WriteTransaction tx = dataBroker.newWriteOnlyTransaction();
@@ -119,22 +123,21 @@ public class DeviceManagerImpl implements DeviceManager, ExtensionConverterProvi
 
         this.switchFeaturesMandatory = switchFeaturesMandatory;
         this.globalNotificationQuota = globalNotificationQuota;
-        this.isNotificationFlowRemovedOff = isNotificationFlowRemovedOff;
+        this.isFlowRemovedNotificationOn = isFlowRemovedNotificationOn;
         this.skipTableFeatures = skipTableFeatures;
         this.convertorExecutor = convertorExecutor;
         this.hashedWheelTimer = hashedWheelTimer;
         this.barrierIntervalNanos = TimeUnit.MILLISECONDS.toNanos(barrierInterval);
         this.barrierCountLimit = barrierCountLimit;
         this.spyPool = new ScheduledThreadPoolExecutor(1);
-        this.singletonServiceProvider = singletonServiceProvider;
         this.notificationPublishService = notificationPublishService;
         this.messageSpy = messageSpy;
+        this.useSingleLayerSerialization = useSingleLayerSerialization;
     }
 
 
     @Override
     public void setDeviceInitializationPhaseHandler(final DeviceInitializationPhaseHandler handler) {
-        this.deviceInitPhaseHandler = handler;
     }
 
     @Override
@@ -143,97 +146,7 @@ public class DeviceManagerImpl implements DeviceManager, ExtensionConverterProvi
         LOG.debug("Final phase of DeviceContextLevelUp for Node: {} ", deviceInfo.getNodeId());
         DeviceContext deviceContext = Preconditions.checkNotNull(deviceContexts.get(deviceInfo));
         deviceContext.onPublished();
-        lifecycleService.registerService(this.singletonServiceProvider);
-    }
-
-    @Override
-    public ConnectionStatus deviceConnected(@CheckForNull final ConnectionContext connectionContext) throws Exception {
-        Preconditions.checkArgument(connectionContext != null);
-
-        DeviceInfo deviceInfo = connectionContext.getDeviceInfo();
-        /*
-         * This part prevent destroy another device context. Throwing here an exception result to propagate close connection
-         * in {@link org.opendaylight.openflowplugin.impl.connection.org.opendaylight.openflowplugin.impl.connection.HandshakeContextImpl}
-         * If context already exist we are in state closing process (connection flapping) and we should not propagate connection close
-         */
-         if (deviceContexts.containsKey(deviceInfo)) {
-             DeviceContext deviceContext = deviceContexts.get(deviceInfo);
-             LOG.warn("Node {} already connected disconnecting device. Rejecting connection", deviceInfo.getLOGValue());
-             if (!deviceContext.getState().equals(OFPContext.CONTEXT_STATE.TERMINATION)) {
-                 LOG.warn("Node {} context state not in TERMINATION state.",
-                         connectionContext.getDeviceInfo().getLOGValue());
-                 return ConnectionStatus.ALREADY_CONNECTED;
-             } else {
-                 return ConnectionStatus.CLOSING;
-             }
-         }
-
-        LOG.info("ConnectionEvent: Device connected to controller, Device:{}, NodeId:{}",
-                connectionContext.getConnectionAdapter().getRemoteAddress(), deviceInfo.getNodeId());
-
-        // Add Disconnect handler
-        connectionContext.setDeviceDisconnectedHandler(DeviceManagerImpl.this);
-        // Cache this for clarity
-        final ConnectionAdapter connectionAdapter = connectionContext.getConnectionAdapter();
-
-        //FIXME: as soon as auxiliary connection are fully supported then this is needed only before device context published
-        connectionAdapter.setPacketInFiltering(true);
-
-        final OutboundQueueProvider outboundQueueProvider = new OutboundQueueProviderImpl(deviceInfo.getVersion());
-
-        connectionContext.setOutboundQueueProvider(outboundQueueProvider);
-        final OutboundQueueHandlerRegistration<OutboundQueueProvider> outboundQueueHandlerRegistration =
-                connectionAdapter.registerOutboundQueueHandler(outboundQueueProvider, barrierCountLimit, barrierIntervalNanos);
-        connectionContext.setOutboundQueueHandleRegistration(outboundQueueHandlerRegistration);
-
-        final DeviceContext deviceContext = new DeviceContextImpl(
-                connectionContext,
-                dataBroker,
-                messageSpy,
-                translatorLibrary,
-                this,
-                convertorExecutor,
-                skipTableFeatures);
-
-        deviceContexts.put(deviceInfo, deviceContext);
-
-        final LifecycleService lifecycleService = new LifecycleServiceImpl();
-        lifecycleService.setDeviceContext(deviceContext);
-        deviceContext.putLifecycleServiceIntoTxChainManager(lifecycleService);
-
-        lifecycleServices.put(deviceInfo, lifecycleService);
-
-        deviceContext.setSwitchFeaturesMandatory(switchFeaturesMandatory);
-
-        ((ExtensionConverterProviderKeeper) deviceContext).setExtensionConverterProvider(extensionConverterProvider);
-        deviceContext.setNotificationPublishService(notificationPublishService);
-
-        updatePacketInRateLimiters();
-
-        final OpenflowProtocolListenerFullImpl messageListener = new OpenflowProtocolListenerFullImpl(
-                connectionAdapter, deviceContext);
-
-        connectionAdapter.setMessageListener(messageListener);
-        deviceInitPhaseHandler.onDeviceContextLevelUp(connectionContext.getDeviceInfo(), lifecycleService);
-        return ConnectionStatus.MAY_CONTINUE;
-    }
-
-    private void updatePacketInRateLimiters() {
-        synchronized (deviceContexts) {
-            final int deviceContextsSize = deviceContexts.size();
-            if (deviceContextsSize > 0) {
-                long freshNotificationLimit = globalNotificationQuota / deviceContextsSize;
-                if (freshNotificationLimit < 100) {
-                    freshNotificationLimit = 100;
-                }
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("fresh notification limit = {}", freshNotificationLimit);
-                }
-                for (final DeviceContext deviceContext : deviceContexts.values()) {
-                    deviceContext.updatePacketInRateLimit(freshNotificationLimit);
-                }
-            }
-        }
+        lifecycleService.registerDeviceRemovedHandler(this);
     }
 
     @Override
@@ -262,27 +175,8 @@ public class DeviceManagerImpl implements DeviceManager, ExtensionConverterProvi
 
     @Override
     public void onDeviceContextLevelDown(final DeviceInfo deviceInfo) {
-
-        LifecycleService lifecycleService = lifecycleServices.remove(deviceInfo);
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("Lifecycle service removed for node {}", deviceInfo.getLOGValue());
-        }
-
         updatePacketInRateLimiters();
-        if (Objects.nonNull(lifecycleService)) {
-            try {
-                lifecycleService.close();
-                LOG.debug("Lifecycle service successfully closed for node {}", deviceInfo.getLOGValue());
-            } catch (Exception e) {
-                LOG.warn("Closing lifecycle service for node {} was unsuccessful ", deviceInfo.getLOGValue(), e);
-            }
-        }
-
-        deviceContexts.remove(deviceInfo);
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("Device context removed for node {}", deviceInfo.getLOGValue());
-        }
-
+        Optional.ofNullable(lifecycleServices.get(deviceInfo)).ifPresent(LifecycleService::close);
     }
 
     @Override
@@ -311,28 +205,30 @@ public class DeviceManagerImpl implements DeviceManager, ExtensionConverterProvi
         final DeviceInfo deviceInfo = connectionContext.getDeviceInfo();
         final DeviceContext deviceCtx = this.deviceContexts.get(deviceInfo);
 
-        if (null == deviceCtx) {
+        if (Objects.isNull(deviceCtx)) {
             LOG.info("DeviceContext for Node {} was not found. Connection is terminated without OFP context suite.", deviceInfo.getLOGValue());
             return;
         }
 
+        if (!connectionContext.equals(deviceCtx.getPrimaryConnectionContext())) {
+            LOG.debug("Node {} disconnected, but not primary connection.", connectionContext.getDeviceInfo().getLOGValue());
+            // Connection is not PrimaryConnection so try to remove from Auxiliary Connections
+            deviceCtx.removeAuxiliaryConnectionContext(connectionContext);
+            // If this is not primary connection, we should not continue disabling everything
+            return;
+        }
+
         if (deviceCtx.getState().equals(OFPContext.CONTEXT_STATE.TERMINATION)) {
             LOG.info("Device context for node {} is already is termination state, waiting for close all context", deviceInfo.getLOGValue());
             return;
         }
 
-        deviceCtx.setState(OFPContext.CONTEXT_STATE.TERMINATION);
+        deviceCtx.close();
 
-        if (!connectionContext.equals(deviceCtx.getPrimaryConnectionContext())) {
-            LOG.debug("Node {} disconnected, but not primary connection.", connectionContext.getDeviceInfo().getLOGValue());
-            /* Connection is not PrimaryConnection so try to remove from Auxiliary Connections */
-            deviceCtx.removeAuxiliaryConnectionContext(connectionContext);
-        }
-        //TODO: Auxiliary connections supported ?
-            /* Device is disconnected and so we need to close TxManager */
+        // TODO: Auxiliary connections supported ?
+        // Device is disconnected and so we need to close TxManager
         final ListenableFuture<Void> future = deviceCtx.shuttingDownDataStoreTransactions();
         Futures.addCallback(future, new FutureCallback<Void>() {
-
             @Override
             public void onSuccess(final Void result) {
                 LOG.debug("TxChainManager for device {} is closed successful.", deviceInfo.getLOGValue());
@@ -346,13 +242,15 @@ public class DeviceManagerImpl implements DeviceManager, ExtensionConverterProvi
                 deviceTerminPhaseHandler.onDeviceContextLevelDown(deviceInfo);
             }
         });
-        /* Add timer for Close TxManager because it could fain ind cluster without notification */
+
+        // Add timer for Close TxManager because it could fail in cluster without notification
         final TimerTask timerTask = timeout -> {
             if (!future.isDone()) {
                 LOG.warn("Shutting down TxChain for node {} not completed during 10 sec. Continue anyway.", deviceInfo.getLOGValue());
                 future.cancel(false);
             }
         };
+
         hashedWheelTimer.newTimeout(timerTask, 10, TimeUnit.SECONDS);
     }
 
@@ -362,13 +260,13 @@ public class DeviceManagerImpl implements DeviceManager, ExtensionConverterProvi
     }
 
     @Override
-    public void setIsNotificationFlowRemovedOff(boolean isNotificationFlowRemovedOff) {
-        this.isNotificationFlowRemovedOff = isNotificationFlowRemovedOff;
+    public void setFlowRemovedNotificationOn(boolean isNotificationFlowRemovedOff) {
+        this.isFlowRemovedNotificationOn = isNotificationFlowRemovedOff;
     }
 
     @Override
-    public boolean getIsNotificationFlowRemovedOff() {
-        return this.isNotificationFlowRemovedOff;
+    public boolean isFlowRemovedNotificationOn() {
+        return this.isFlowRemovedNotificationOn;
     }
 
 
@@ -377,4 +275,126 @@ public class DeviceManagerImpl implements DeviceManager, ExtensionConverterProvi
         skipTableFeatures = skipTableFeaturesValue;
     }
 
+    @Override
+    public void setBarrierCountLimit(final int barrierCountLimit) {
+        this.barrierCountLimit = barrierCountLimit;
+    }
+
+    @Override
+    public void setBarrierInterval(final long barrierTimeoutLimit) {
+        this.barrierIntervalNanos = TimeUnit.MILLISECONDS.toNanos(barrierTimeoutLimit);
+    }
+
+    @Override
+    public CheckedFuture<Void, TransactionCommitFailedException> removeDeviceFromOperationalDS(final DeviceInfo deviceInfo) {
+        final WriteTransaction delWtx = dataBroker.newWriteOnlyTransaction();
+        delWtx.delete(LogicalDatastoreType.OPERATIONAL, deviceInfo.getNodeInstanceIdentifier());
+        final CheckedFuture<Void, TransactionCommitFailedException> delFuture = delWtx.submit();
+
+        Futures.addCallback(delFuture, new FutureCallback<Void>() {
+            @Override
+            public void onSuccess(final Void result) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Delete Node {} was successful", deviceInfo.getLOGValue());
+                }
+            }
+
+            @Override
+            public void onFailure(@Nonnull final Throwable t) {
+                LOG.warn("Delete node {} failed with exception {}", deviceInfo.getLOGValue(), t);
+            }
+        });
+
+        return delFuture;
+    }
+
+    @Override
+    public void setUseSingleLayerSerialization(final Boolean useSingleLayerSerialization) {
+        this.useSingleLayerSerialization = useSingleLayerSerialization;
+    }
+
+    public DeviceContext createContext(@CheckForNull final ConnectionContext connectionContext) {
+
+        LOG.info("ConnectionEvent: Device connected to controller, Device:{}, NodeId:{}",
+                connectionContext.getConnectionAdapter().getRemoteAddress(),
+                connectionContext.getDeviceInfo().getNodeId());
+
+        connectionContext.getConnectionAdapter().setPacketInFiltering(true);
+
+        final OutboundQueueProvider outboundQueueProvider
+                = new OutboundQueueProviderImpl(connectionContext.getDeviceInfo().getVersion());
+
+        connectionContext.setOutboundQueueProvider(outboundQueueProvider);
+        final OutboundQueueHandlerRegistration<OutboundQueueProvider> outboundQueueHandlerRegistration =
+                connectionContext.getConnectionAdapter().registerOutboundQueueHandler(
+                        outboundQueueProvider,
+                        barrierCountLimit,
+                        barrierIntervalNanos);
+        connectionContext.setOutboundQueueHandleRegistration(outboundQueueHandlerRegistration);
+
+
+        final DeviceContext deviceContext = new DeviceContextImpl(
+                connectionContext,
+                dataBroker,
+                messageSpy,
+                translatorLibrary,
+                this,
+                convertorExecutor,
+                skipTableFeatures,
+                hashedWheelTimer,
+                useSingleLayerSerialization,
+                deviceInitializerProvider);
+
+        deviceContext.setSalRoleService(new SalRoleServiceImpl(deviceContext, deviceContext));
+        deviceContext.setSwitchFeaturesMandatory(switchFeaturesMandatory);
+        ((ExtensionConverterProviderKeeper) deviceContext).setExtensionConverterProvider(extensionConverterProvider);
+        deviceContext.setNotificationPublishService(notificationPublishService);
+
+        deviceContexts.put(connectionContext.getDeviceInfo(), deviceContext);
+        updatePacketInRateLimiters();
+
+        final OpenflowProtocolListenerFullImpl messageListener = new OpenflowProtocolListenerFullImpl(
+                connectionContext.getConnectionAdapter(), deviceContext);
+
+        connectionContext.getConnectionAdapter().setMessageListener(messageListener);
+
+        return deviceContext;
+    }
+
+    private void updatePacketInRateLimiters() {
+        synchronized (deviceContexts) {
+            final int deviceContextsSize = deviceContexts.size();
+            if (deviceContextsSize > 0) {
+                long freshNotificationLimit = globalNotificationQuota / deviceContextsSize;
+                if (freshNotificationLimit < 100) {
+                    freshNotificationLimit = 100;
+                }
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("fresh notification limit = {}", freshNotificationLimit);
+                }
+                for (final DeviceContext deviceContext : deviceContexts.values()) {
+                    deviceContext.updatePacketInRateLimit(freshNotificationLimit);
+                }
+            }
+        }
+    }
+
+    @Override
+    public void onDeviceRemoved(final DeviceInfo deviceInfo) {
+        deviceContexts.remove(deviceInfo);
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Device context removed for node {}", deviceInfo.getLOGValue());
+        }
+        this.updatePacketInRateLimiters();
+    }
+
+    @Override
+    public long getBarrierIntervalNanos() {
+        return barrierIntervalNanos;
+    }
+
+    @Override
+    public int getBarrierCountLimit() {
+        return barrierCountLimit;
+    }
 }