Disconnection improvements.
[openflowplugin.git] / openflowplugin-impl / src / main / java / org / opendaylight / openflowplugin / impl / device / DeviceContextImpl.java
index 43db4f34ebbd0f86013051c59257626f9ef35894..c07b67aa91615bb8e319193d3f148a6f7ceb7d55 100644 (file)
@@ -9,6 +9,7 @@ package org.opendaylight.openflowplugin.impl.device;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Function;
+import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Verify;
 import com.google.common.util.concurrent.AsyncFunction;
@@ -23,7 +24,6 @@ import java.math.BigInteger;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Optional;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
@@ -35,10 +35,12 @@ import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
 import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
+import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueHandlerRegistration;
 import org.opendaylight.openflowjava.protocol.api.keys.MessageTypeKey;
 import org.opendaylight.openflowplugin.api.ConnectionException;
 import org.opendaylight.openflowplugin.api.OFConstants;
 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
+import org.opendaylight.openflowplugin.api.openflow.connection.OutboundQueueProvider;
 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
 import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
 import org.opendaylight.openflowplugin.api.openflow.device.DeviceManager;
@@ -50,6 +52,7 @@ import org.opendaylight.openflowplugin.api.openflow.device.Xid;
 import org.opendaylight.openflowplugin.api.openflow.device.handlers.ClusterInitializationPhaseHandler;
 import org.opendaylight.openflowplugin.api.openflow.device.handlers.MultiMsgCollector;
 import org.opendaylight.openflowplugin.api.openflow.lifecycle.LifecycleService;
+import org.opendaylight.openflowplugin.api.openflow.lifecycle.MastershipChangeListener;
 import org.opendaylight.openflowplugin.api.openflow.md.core.SwitchConnectionDistinguisher;
 import org.opendaylight.openflowplugin.api.openflow.md.core.TranslatorKey;
 import org.opendaylight.openflowplugin.api.openflow.md.util.OpenflowVersion;
@@ -68,16 +71,19 @@ import org.opendaylight.openflowplugin.extension.api.core.extension.ExtensionCon
 import org.opendaylight.openflowplugin.extension.api.exception.ConversionException;
 import org.opendaylight.openflowplugin.extension.api.path.MessagePath;
 import org.opendaylight.openflowplugin.impl.common.ItemLifeCycleSourceImpl;
+import org.opendaylight.openflowplugin.impl.connection.OutboundQueueProviderImpl;
 import org.opendaylight.openflowplugin.impl.datastore.MultipartWriterProvider;
 import org.opendaylight.openflowplugin.impl.datastore.MultipartWriterProviderFactory;
 import org.opendaylight.openflowplugin.impl.device.initialization.AbstractDeviceInitializer;
 import org.opendaylight.openflowplugin.impl.device.initialization.DeviceInitializerProvider;
 import org.opendaylight.openflowplugin.impl.device.listener.MultiMsgCollectorImpl;
+import org.opendaylight.openflowplugin.impl.device.listener.OpenflowProtocolListenerFullImpl;
 import org.opendaylight.openflowplugin.impl.registry.flow.DeviceFlowRegistryImpl;
 import org.opendaylight.openflowplugin.impl.registry.flow.FlowRegistryKeyFactory;
 import org.opendaylight.openflowplugin.impl.registry.group.DeviceGroupRegistryImpl;
 import org.opendaylight.openflowplugin.impl.registry.meter.DeviceMeterRegistryImpl;
 import org.opendaylight.openflowplugin.impl.rpc.AbstractRequestContext;
+import org.opendaylight.openflowplugin.impl.services.sal.SalRoleServiceImpl;
 import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorExecutor;
 import org.opendaylight.openflowplugin.openflow.md.core.session.SwitchConnectionCookieOFImpl;
 import org.opendaylight.openflowplugin.openflow.md.util.InventoryDataServiceUtil;
@@ -89,6 +95,8 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.ta
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowKey;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorId;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRemovedBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeUpdatedBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnector;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorKey;
@@ -133,11 +141,14 @@ public class DeviceContextImpl implements DeviceContext, ExtensionConverterProvi
     // Timeout in seconds after what we will give up on propagating role
     private static final int SET_ROLE_TIMEOUT = 10;
 
+    private static final int LOW_WATERMARK = 1000;
+    private static final int HIGH_WATERMARK = 2000;
+
     private boolean initialized;
 
     private SalRoleService salRoleService = null;
     private final HashedWheelTimer hashedWheelTimer;
-    private ConnectionContext primaryConnectionContext;
+    private volatile ConnectionContext primaryConnectionContext;
     private final DeviceState deviceState;
     private final DataBroker dataBroker;
     private final Map<SwitchConnectionDistinguisher, ConnectionContext> auxiliaryConnectionContexts;
@@ -145,7 +156,7 @@ public class DeviceContextImpl implements DeviceContext, ExtensionConverterProvi
     private DeviceFlowRegistry deviceFlowRegistry;
     private DeviceGroupRegistry deviceGroupRegistry;
     private DeviceMeterRegistry deviceMeterRegistry;
-    private final PacketInRateLimiter packetInLimiter;
+    private PacketInRateLimiter packetInLimiter;
     private final MessageSpy messageSpy;
     private final ItemLifeCycleKeeper flowLifeCycleKeeper;
     private NotificationPublishService notificationPublishService;
@@ -156,43 +167,44 @@ public class DeviceContextImpl implements DeviceContext, ExtensionConverterProvi
     private final TranslatorLibrary translatorLibrary;
     private final ItemLifeCycleRegistry itemLifeCycleSourceRegistry;
     private ExtensionConverterProvider extensionConverterProvider;
-    private final DeviceManager deviceManager;
     private boolean skipTableFeatures;
     private boolean switchFeaturesMandatory;
-    private final DeviceInfo deviceInfo;
+    private DeviceInfo deviceInfo;
     private final ConvertorExecutor convertorExecutor;
     private volatile CONTEXT_STATE state;
     private ClusterInitializationPhaseHandler clusterInitializationPhaseHandler;
     private final DeviceManager myManager;
     private final DeviceInitializerProvider deviceInitializerProvider;
     private final boolean useSingleLayerSerialization;
+    private Boolean isAddNotificationSent = false;
+    private OutboundQueueProvider outboundQueueProvider;
+    private boolean wasOnceMaster;
 
     DeviceContextImpl(
         @Nonnull final ConnectionContext primaryConnectionContext,
         @Nonnull final DataBroker dataBroker,
         @Nonnull final MessageSpy messageSpy,
         @Nonnull final TranslatorLibrary translatorLibrary,
-        @Nonnull final DeviceManager manager,
+        @Nonnull final DeviceManager contextManager,
         final ConvertorExecutor convertorExecutor,
         final boolean skipTableFeatures,
         final HashedWheelTimer hashedWheelTimer,
-        final DeviceManager myManager,
         final boolean useSingleLayerSerialization,
         final DeviceInitializerProvider deviceInitializerProvider) {
 
         this.primaryConnectionContext = primaryConnectionContext;
+        this.outboundQueueProvider = (OutboundQueueProvider) primaryConnectionContext.getOutboundQueueProvider();
         this.deviceInfo = primaryConnectionContext.getDeviceInfo();
         this.hashedWheelTimer = hashedWheelTimer;
-        this.myManager = myManager;
         this.deviceInitializerProvider = deviceInitializerProvider;
+        this.myManager = contextManager;
         this.deviceState = new DeviceStateImpl();
         this.dataBroker = dataBroker;
         this.auxiliaryConnectionContexts = new HashMap<>();
         this.messageSpy = Preconditions.checkNotNull(messageSpy);
-        this.deviceManager = manager;
 
         this.packetInLimiter = new PacketInRateLimiter(primaryConnectionContext.getConnectionAdapter(),
-                /*initial*/ 1000, /*initial*/2000, this.messageSpy, REJECTED_DRAIN_FACTOR);
+                /*initial*/ LOW_WATERMARK, /*initial*/HIGH_WATERMARK, this.messageSpy, REJECTED_DRAIN_FACTOR);
 
         this.translatorLibrary = translatorLibrary;
         this.portStatusTranslator = translatorLibrary.lookupTranslator(
@@ -210,6 +222,7 @@ public class DeviceContextImpl implements DeviceContext, ExtensionConverterProvi
         this.skipTableFeatures = skipTableFeatures;
         this.useSingleLayerSerialization = useSingleLayerSerialization;
         this.initialized = false;
+        this.wasOnceMaster = false;
     }
 
     @Override
@@ -326,11 +339,11 @@ public class DeviceContextImpl implements DeviceContext, ExtensionConverterProvi
         final org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowRemoved flowRemovedNotification =
                 flowRemovedTranslator.translate(flowRemoved, deviceInfo, null);
 
-        if(deviceManager.isFlowRemovedNotificationOn()) {
+        if(!myManager.isFlowRemovedNotificationOn()) {
             // Trigger off a notification
             notificationPublishService.offerNotification(flowRemovedNotification);
         } else if(LOG.isDebugEnabled()) {
-            LOG.debug("For nodeId={} isFlowRemovedNotificationOn={}", getDeviceInfo().getLOGValue(), deviceManager.isFlowRemovedNotificationOn());
+            LOG.debug("For nodeId={} isNotificationFlowRemovedOn={}", getDeviceInfo().getLOGValue(), myManager.isFlowRemovedNotificationOn());
         }
 
         final ItemLifecycleListener itemLifecycleListener = flowLifeCycleKeeper.getItemLifecycleListener();
@@ -338,7 +351,7 @@ public class DeviceContextImpl implements DeviceContext, ExtensionConverterProvi
             //2. create registry key
             final FlowRegistryKey flowRegKey = FlowRegistryKeyFactory.create(getDeviceInfo().getVersion(), flowRemovedNotification);
             //3. lookup flowId
-            final FlowDescriptor flowDescriptor = deviceFlowRegistry.retrieveIdForFlow(flowRegKey);
+            final FlowDescriptor flowDescriptor = deviceFlowRegistry.retrieveDescriptor(flowRegKey);
             //4. if flowId present:
             if (flowDescriptor != null) {
                 // a) construct flow path
@@ -355,6 +368,26 @@ public class DeviceContextImpl implements DeviceContext, ExtensionConverterProvi
         }
     }
 
+    @Override
+    public void sendNodeAddedNotification() {
+        if (!isAddNotificationSent) {
+            isAddNotificationSent = true;
+            NodeUpdatedBuilder builder = new NodeUpdatedBuilder();
+            builder.setId(getDeviceInfo().getNodeId());
+            builder.setNodeRef(new NodeRef(getDeviceInfo().getNodeInstanceIdentifier()));
+            LOG.debug("Publishing node added notification for {}", builder.build());
+            notificationPublishService.offerNotification(builder.build());
+        }
+    }
+
+    @Override
+    public void sendNodeRemovedNotification() {
+        NodeRemovedBuilder builder = new NodeRemovedBuilder();
+        builder.setNodeRef(new NodeRef(getDeviceInfo().getNodeInstanceIdentifier()));
+        LOG.debug("Publishing node removed notification for {}", builder.build());
+        notificationPublishService.offerNotification(builder.build());
+    }
+
     @Override
     public void processPortStatusMessage(final PortStatusMessage portStatus) {
         messageSpy.spyMessage(portStatus.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PUBLISHED_SUCCESS);
@@ -489,7 +522,9 @@ public class DeviceContextImpl implements DeviceContext, ExtensionConverterProvi
     public void onPublished() {
         Verify.verify(CONTEXT_STATE.INITIALIZATION.equals(getState()));
         this.state = CONTEXT_STATE.WORKING;
-        primaryConnectionContext.getConnectionAdapter().setPacketInFiltering(false);
+        synchronized (primaryConnectionContext) {
+            primaryConnectionContext.getConnectionAdapter().setPacketInFiltering(false);
+        }
         for (final ConnectionContext switchAuxConnectionContext : auxiliaryConnectionContexts.values()) {
             switchAuxConnectionContext.getConnectionAdapter().setPacketInFiltering(false);
         }
@@ -569,6 +604,40 @@ public class DeviceContextImpl implements DeviceContext, ExtensionConverterProvi
         this.switchFeaturesMandatory = switchFeaturesMandatory;
     }
 
+    @Override
+    public synchronized void replaceConnection(final ConnectionContext connectionContext) {
+
+        primaryConnectionContext = null;
+        deviceInfo = null;
+        packetInLimiter = null;
+
+        primaryConnectionContext = connectionContext;
+        deviceInfo = primaryConnectionContext.getDeviceInfo();
+
+        packetInLimiter = new PacketInRateLimiter(primaryConnectionContext.getConnectionAdapter(),
+                /*initial*/ LOW_WATERMARK, /*initial*/HIGH_WATERMARK, messageSpy, REJECTED_DRAIN_FACTOR);
+
+        primaryConnectionContext.setOutboundQueueProvider(outboundQueueProvider);
+
+        final OutboundQueueHandlerRegistration<OutboundQueueProvider> outboundQueueHandlerRegistration =
+                primaryConnectionContext.getConnectionAdapter().registerOutboundQueueHandler(
+                        outboundQueueProvider,
+                        myManager.getBarrierCountLimit(),
+                        myManager.getBarrierIntervalNanos());
+
+        primaryConnectionContext.setOutboundQueueHandleRegistration(outboundQueueHandlerRegistration);
+
+        final OpenflowProtocolListenerFullImpl messageListener = new OpenflowProtocolListenerFullImpl(
+                primaryConnectionContext.getConnectionAdapter(), this);
+
+        primaryConnectionContext.getConnectionAdapter().setMessageListener(messageListener);
+
+        LOG.info("ConnectionEvent: Connection on device:{}, NodeId:{} switched.",
+                primaryConnectionContext.getConnectionAdapter().getRemoteAddress(),
+                primaryConnectionContext.getDeviceInfo().getNodeId());
+
+    }
+
     @Override
     public CONTEXT_STATE getState() {
         return this.state;
@@ -581,7 +650,8 @@ public class DeviceContextImpl implements DeviceContext, ExtensionConverterProvi
                 : Futures.immediateFuture(null);
 
         if (!connectionInterrupted) {
-            final ListenableFuture<Void> makeSlaveFuture = Futures.transform(makeDeviceSlave(), new Function<RpcResult<SetRoleOutput>, Void>() {
+            final ListenableFuture<Void> makeSlaveFuture
+                    = Futures.transform(makeDeviceSlave(), new Function<RpcResult<SetRoleOutput>, Void>() {
                 @Nullable
                 @Override
                 public Void apply(@Nullable RpcResult<SetRoleOutput> setRoleOutputRpcResult) {
@@ -595,6 +665,7 @@ public class DeviceContextImpl implements DeviceContext, ExtensionConverterProvi
                     if (LOG.isDebugEnabled()) {
                         LOG.debug("Role SLAVE was successfully propagated on device, node {}", deviceInfo.getLOGValue());
                     }
+                    sendNodeAddedNotification();
                 }
 
                 @Override
@@ -604,22 +675,9 @@ public class DeviceContextImpl implements DeviceContext, ExtensionConverterProvi
                 }
             });
 
-            return Futures.transform(deactivateTxManagerFuture, new AsyncFunction<Void, Void>() {
-                @Override
-                public ListenableFuture<Void> apply(Void aVoid) throws Exception {
-                    // Add fallback to remove device from operational DS if setting slave fails
-                    return Futures.withFallback(makeSlaveFuture, t ->
-                            myManager.removeDeviceFromOperationalDS(deviceInfo));
-                }
-            });
-        } else {
-            return Futures.transform(deactivateTxManagerFuture, new AsyncFunction<Void, Void>() {
-                @Override
-                public ListenableFuture<Void> apply(Void aVoid) throws Exception {
-                    return myManager.removeDeviceFromOperationalDS(deviceInfo);
-                }
-            });
         }
+
+        return deactivateTxManagerFuture;
     }
 
     @Override
@@ -641,6 +699,7 @@ public class DeviceContextImpl implements DeviceContext, ExtensionConverterProvi
         } else {
             this.state = CONTEXT_STATE.TERMINATION;
         }
+        sendNodeRemovedNotification();
     }
 
     @Override
@@ -650,14 +709,6 @@ public class DeviceContextImpl implements DeviceContext, ExtensionConverterProvi
         }
     }
 
-    @Override
-    public void replaceConnectionContext(final ConnectionContext connectionContext){
-        // Act like we are initializing the context
-        this.state = CONTEXT_STATE.INITIALIZATION;
-        this.primaryConnectionContext = connectionContext;
-        this.onPublished();
-    }
-
     @Override
     public boolean canUseSingleLayerSerialization() {
         return useSingleLayerSerialization && getDeviceInfo().getVersion() >= OFConstants.OFP_VERSION_1_3;
@@ -679,12 +730,12 @@ public class DeviceContextImpl implements DeviceContext, ExtensionConverterProvi
     }
 
     @Override
-    public boolean onContextInstantiateService(final ConnectionContext connectionContext) {
+    public void masterSuccessful(){
+        this.wasOnceMaster = true;
+    }
 
-        if (getPrimaryConnectionContext().getConnectionState().equals(ConnectionContext.CONNECTION_STATE.RIP)) {
-            LOG.warn("Connection on device {} was interrupted, will stop starting master services.", deviceInfo.getLOGValue());
-            return false;
-        }
+    @Override
+    public boolean onContextInstantiateService(final MastershipChangeListener mastershipChangeListener) {
 
         LOG.info("Starting device context cluster services for node {}", deviceInfo.getLOGValue());
 
@@ -693,7 +744,7 @@ public class DeviceContextImpl implements DeviceContext, ExtensionConverterProvi
         this.transactionChainManager.activateTransactionManager();
 
         try {
-            final Optional<AbstractDeviceInitializer> initializer = deviceInitializerProvider
+            final java.util.Optional<AbstractDeviceInitializer> initializer = deviceInitializerProvider
                 .lookup(deviceInfo.getVersion());
 
             if (initializer.isPresent()) {
@@ -707,8 +758,13 @@ public class DeviceContextImpl implements DeviceContext, ExtensionConverterProvi
             return false;
         }
 
-        Futures.addCallback(sendRoleChangeToDevice(OfpRole.BECOMEMASTER), new RpcResultFutureCallback());
-        return this.clusterInitializationPhaseHandler.onContextInstantiateService(getPrimaryConnectionContext());
+        Futures.addCallback(sendRoleChangeToDevice(OfpRole.BECOMEMASTER),
+                new RpcResultFutureCallback(mastershipChangeListener));
+
+        final ListenableFuture<List<Optional<FlowCapableNode>>> deviceFlowRegistryFill = getDeviceFlowRegistry().fill();
+        Futures.addCallback(deviceFlowRegistryFill, new DeviceFlowRegistryCallback(deviceFlowRegistryFill));
+
+        return this.clusterInitializationPhaseHandler.onContextInstantiateService(mastershipChangeListener);
     }
 
     @VisibleForTesting
@@ -771,17 +827,52 @@ public class DeviceContextImpl implements DeviceContext, ExtensionConverterProvi
     }
 
     private class RpcResultFutureCallback implements FutureCallback<RpcResult<SetRoleOutput>> {
+
+        private final MastershipChangeListener mastershipChangeListener;
+
+        RpcResultFutureCallback(final MastershipChangeListener mastershipChangeListener) {
+            this.mastershipChangeListener = mastershipChangeListener;
+        }
+
         @Override
         public void onSuccess(@Nullable RpcResult<SetRoleOutput> setRoleOutputRpcResult) {
             if (LOG.isDebugEnabled()) {
                 LOG.debug("Role MASTER was successfully set on device, node {}", deviceInfo.getLOGValue());
             }
+            sendNodeAddedNotification();
         }
 
         @Override
         public void onFailure(final Throwable throwable) {
             LOG.warn("Was not able to set MASTER role on device, node {}", deviceInfo.getLOGValue());
-            shutdownConnection();
+            mastershipChangeListener.onNotAbleToStartMastership(deviceInfo);
+        }
+    }
+
+    private class DeviceFlowRegistryCallback implements FutureCallback<List<Optional<FlowCapableNode>>> {
+        private final ListenableFuture<List<Optional<FlowCapableNode>>> deviceFlowRegistryFill;
+
+        DeviceFlowRegistryCallback(ListenableFuture<List<Optional<FlowCapableNode>>> deviceFlowRegistryFill) {
+            this.deviceFlowRegistryFill = deviceFlowRegistryFill;
+        }
+
+        @Override
+        public void onSuccess(@Nullable List<Optional<FlowCapableNode>> result) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Finished filling flow registry with flows for node: {}", deviceInfo.getLOGValue());
+            }
+        }
+
+        @Override
+        public void onFailure(Throwable t) {
+            if (deviceFlowRegistryFill.isCancelled()) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Cancelled filling flow registry with flows for node: {}", deviceInfo.getLOGValue());
+                }
+            } else {
+                LOG.warn("Failed filling flow registry with flows for node: {} with exception: {}", deviceInfo.getLOGValue(), t);
+            }
         }
     }
+
 }