Disconnection improvements. 67/49367/19
authorJozef Bacigal <jozef.bacigal@pantheon.tech>
Wed, 14 Dec 2016 16:15:24 +0000 (17:15 +0100)
committerJozef Bacigal <jozef.bacigal@pantheon.tech>
Tue, 4 Apr 2017 07:56:14 +0000 (09:56 +0200)
Change-Id: I547bf7fab3604e8b7dad8ce8964528b3cbc05998
Signed-off-by: Jozef Bacigal <jozef.bacigal@pantheon.tech>
17 files changed:
openflowplugin-api/src/main/java/org/opendaylight/openflowplugin/api/openflow/OFPContext.java
openflowplugin-api/src/main/java/org/opendaylight/openflowplugin/api/openflow/connection/ConnectionManager.java
openflowplugin-api/src/main/java/org/opendaylight/openflowplugin/api/openflow/device/DeviceContext.java
openflowplugin-api/src/main/java/org/opendaylight/openflowplugin/api/openflow/device/DeviceManager.java
openflowplugin-api/src/main/java/org/opendaylight/openflowplugin/api/openflow/lifecycle/ContextChain.java
openflowplugin-api/src/main/java/org/opendaylight/openflowplugin/api/openflow/lifecycle/ContextChainHolder.java
openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/OpenFlowPluginProviderImpl.java
openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/connection/ConnectionManagerImpl.java
openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/connection/OutboundQueueProviderImpl.java
openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/DeviceContextImpl.java
openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/DeviceManagerImpl.java
openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/lifecycle/ContextChainHolderImpl.java
openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/lifecycle/ContextChainImpl.java
openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/rpc/RpcContextImpl.java
openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/statistics/StatisticsContextImpl.java
openflowplugin-impl/src/test/java/org/opendaylight/openflowplugin/impl/device/DeviceContextImplTest.java
openflowplugin-impl/src/test/java/org/opendaylight/openflowplugin/impl/device/DeviceManagerImplTest.java

index 7d1ad0555e4af7768c96b109603317d57db03b66..4752cf38bbfdb864c411b44b64a3df5461955f4c 100644 (file)
@@ -60,6 +60,14 @@ public interface OFPContext extends AutoCloseable, ClusterLifecycleSupervisor, C
                 new RejectedExecutionException(MESSAGE));
     }
 
+    /**
+     * About to stop services in cluster not master anymore or going down.
+     * @return Future most of services need time to be closed.
+     */
+    default ListenableFuture<Void> stopClusterServices() {
+        return stopClusterServices(false);
+    }
+
     /**
      * Get cluster singleton service identifier.
      * @return cluster singleton service identifier.
index e0660cbfc3a83327412c4f825a343e2a8fc6c0e3..7356f5b034904e1cb0156c58e83a160531feb3b9 100644 (file)
@@ -10,6 +10,7 @@ package org.opendaylight.openflowplugin.api.openflow.connection;
 
 import org.opendaylight.openflowjava.protocol.api.connection.SwitchConnectionHandler;
 import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceConnectedHandler;
+import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceDisconnectedHandler;
 
 /**
  * Connection manager manages connections with devices.
@@ -23,7 +24,13 @@ public interface ConnectionManager extends SwitchConnectionHandler {
      * device is connected.
      * @param deviceConnectedHandler device connected handler
      */
-    void setDeviceConnectedHandler(DeviceConnectedHandler deviceConnectedHandler);
+    void setDeviceConnectedHandler(final DeviceConnectedHandler deviceConnectedHandler);
+
+    /**
+     * Method registers handler responsible handling device disconnected event
+     * @param deviceDisconnectedHandler device disconnected handler
+     */
+    void setDeviceDisconnectedHandler(final DeviceDisconnectedHandler deviceDisconnectedHandler);
 
     /**
      * Setter for echo reply timeout.
index 4f0cf4ef68b59e2ebf389bc48f7ac36d72eace64..3206a89ad72fd844278f476ff8f4ff80d6710997 100644 (file)
@@ -142,6 +142,8 @@ public interface DeviceContext extends
      */
     void setSalRoleService(@Nonnull final SalRoleService salRoleService);
 
+    void masterSuccessful();
+
     /**
      * Make device slave.
      * @return listenable future from sal role service
index 49999edbacbf62413e22c7e6c5cba93cfc60554e..b429744e29388bb3dec3cd55cb00d10a2ce7664e 100644 (file)
@@ -24,7 +24,6 @@ import org.opendaylight.openflowplugin.api.openflow.translator.TranslatorLibrari
  */
 public interface DeviceManager extends
         OFPManager,
-        DeviceConnectedHandler,
         DeviceDisconnectedHandler,
         TranslatorLibrarian {
 
index 64c1c2821753131b87da59a92e720989ad883fdf..1894468e79fd09c2f7e98c50d80d2f64054eb760 100644 (file)
@@ -8,12 +8,11 @@
 package org.opendaylight.openflowplugin.api.openflow.lifecycle;
 
 import com.google.common.util.concurrent.ListenableFuture;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
 import org.eclipse.jdt.annotation.NonNull;
 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
 import org.opendaylight.openflowplugin.api.openflow.OFPContext;
 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
+import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflow.provider.config.rev160510.ContextChainState;
 
 /**
@@ -38,14 +37,15 @@ public interface ContextChain extends AutoCloseable {
     /**
      * Stop the working contexts, but not release them
      * @return Future
+     * @param connectionDropped
      */
-    Future<Void> stopChain();
+    ListenableFuture<Void> stopChain(boolean connectionDropped);
 
     /**
      * Start the contexts, if some context is missing or cant be started returns failed future
      * @return Future
      */
-    Future<Void> startChain();
+    ListenableFuture<Void> startChain();
 
     @Override
     void close();
@@ -56,7 +56,7 @@ public interface ContextChain extends AutoCloseable {
      */
     void changePrimaryConnection(final ConnectionContext connectionContext);
 
-    ListenableFuture<Void> connectionDropped() throws ExecutionException, InterruptedException;
+    ListenableFuture<Void> connectionDropped();
 
     ContextChainState getContextChainState();
 
@@ -67,4 +67,8 @@ public interface ContextChain extends AutoCloseable {
     void registerServices(@NonNull final ClusterSingletonServiceProvider clusterSingletonServiceProvider);
 
     void makeDeviceSlave();
+
+    void closePrimaryConnection();
+
+    DeviceContext provideDeviceContext();
 }
index b7de4ccaab05fb9d5ffc8f0fb99eb2e68dcafa0e..196f7ff0d66a6b8bddaa1f6aedc8b42364752203 100644 (file)
@@ -13,11 +13,15 @@ import org.opendaylight.openflowplugin.api.openflow.OFPManager;
 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
 import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
 import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceConnectedHandler;
+import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceDisconnectedHandler;
 
 /**
  * Generic interface for context chain holder, hold all created context chains.
  */
-public interface ContextChainHolder extends DeviceConnectedHandler, MastershipChangeListener {
+public interface ContextChainHolder extends
+        DeviceConnectedHandler,
+        MastershipChangeListener,
+        DeviceDisconnectedHandler {
 
     <T extends OFPManager> void addManager(final T manager);
     ContextChain createContextChain(final ConnectionContext connectionContext);
index fc7ddf81131a22b18fdfe90a00bdd895a21929a7..aa65b4b841c765e2c5f0cc3b08859906832d4cfb 100644 (file)
@@ -274,18 +274,11 @@ public class OpenFlowPluginProviderImpl implements OpenFlowPluginProvider, OpenF
         statisticsManager = new StatisticsManagerImpl(rpcProviderRegistry, isStatisticsPollingOn, hashedWheelTimer,
                 convertorManager,basicTimerDelay,maximumTimerDelay);
 
-        /* Initialization Phase ordering - OFP Device Context suite */
-        // CM -> DM -> SM -> RPC -> Role -> DM
         // Device connection handler moved from device manager to context holder
         connectionManager.setDeviceConnectedHandler(contextChainHolder);
-        deviceManager.setDeviceInitializationPhaseHandler(statisticsManager);
-        statisticsManager.setDeviceInitializationPhaseHandler(rpcManager);
-        rpcManager.setDeviceInitializationPhaseHandler(deviceManager);
 
         /* Termination Phase ordering - OFP Device Context suite */
-        deviceManager.setDeviceTerminationPhaseHandler(rpcManager);
-        rpcManager.setDeviceTerminationPhaseHandler(statisticsManager);
-        statisticsManager.setDeviceTerminationPhaseHandler(deviceManager);
+        connectionManager.setDeviceDisconnectedHandler(contextChainHolder);
 
         rpcManager.setStatisticsRpcEnabled(isStatisticsRpcEnabled);
 
index d954f0b24e6f4a6bf502eae420090473460731c8..8e6f3c8e264a7aba1bb2cc0a3cf8bb156da335d9 100644 (file)
@@ -16,6 +16,7 @@ import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext
 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionManager;
 import org.opendaylight.openflowplugin.api.openflow.connection.HandshakeContext;
 import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceConnectedHandler;
+import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceDisconnectedHandler;
 import org.opendaylight.openflowplugin.api.openflow.md.core.ConnectionConductor;
 import org.opendaylight.openflowplugin.api.openflow.md.core.HandshakeListener;
 import org.opendaylight.openflowplugin.api.openflow.md.core.HandshakeManager;
@@ -40,6 +41,7 @@ public class ConnectionManagerImpl implements ConnectionManager {
     private DeviceConnectedHandler deviceConnectedHandler;
     private long echoReplyTimeout;
     private final ThreadPoolExecutor threadPool;
+    private DeviceDisconnectedHandler deviceDisconnectedHandler;
 
     public ConnectionManagerImpl(long echoReplyTimeout, final ThreadPoolExecutor threadPool) {
         this.echoReplyTimeout = echoReplyTimeout;
@@ -50,6 +52,7 @@ public class ConnectionManagerImpl implements ConnectionManager {
     public void onSwitchConnected(final ConnectionAdapter connectionAdapter) {
         LOG.trace("prepare connection context");
         final ConnectionContext connectionContext = new ConnectionContextImpl(connectionAdapter);
+        connectionContext.setDeviceDisconnectedHandler(this.deviceDisconnectedHandler);
 
         HandshakeListener handshakeListener = new HandshakeListenerImpl(connectionContext, deviceConnectedHandler);
         final HandshakeManager handshakeManager = createHandshakeManager(connectionAdapter, handshakeListener);
@@ -102,6 +105,11 @@ public class ConnectionManagerImpl implements ConnectionManager {
         this.deviceConnectedHandler = deviceConnectedHandler;
     }
 
+    @Override
+    public void setDeviceDisconnectedHandler(final DeviceDisconnectedHandler deviceDisconnectedHandler) {
+        this.deviceDisconnectedHandler = deviceDisconnectedHandler;
+    }
+
     public void setEchoReplyTimeout(long echoReplyTimeout){
         this.echoReplyTimeout = echoReplyTimeout;
     }
index ac1a084fda7a73af72ecd4839c1dbde3a2a12d15..e86a9e9897c9f6b80719105ea8577038c0008005 100644 (file)
@@ -40,13 +40,18 @@ public class OutboundQueueProviderImpl implements OutboundQueueProvider {
 
     @Override
     public synchronized void onConnectionQueueChanged(final OutboundQueue queue) {
-        LOG.debug("Replacing queue {} with {}", outboundQueue, queue);
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Replacing queue {} with {}", outboundQueue, queue);
+        }
         outboundQueue = queue;
         notifyAll();
     }
 
     @Override
     public Long reserveEntry() {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Reserve entry with queue: {} in this {} implementation", outboundQueue, this);
+        }
         for (;;) {
             OutboundQueue queue = outboundQueue;
             if (queue == null) {
index 44d67f18d6b061a6689ea43cfaf325dffee39229..c07b67aa91615bb8e319193d3f148a6f7ceb7d55 100644 (file)
@@ -141,11 +141,14 @@ public class DeviceContextImpl implements DeviceContext, ExtensionConverterProvi
     // Timeout in seconds after what we will give up on propagating role
     private static final int SET_ROLE_TIMEOUT = 10;
 
+    private static final int LOW_WATERMARK = 1000;
+    private static final int HIGH_WATERMARK = 2000;
+
     private boolean initialized;
 
     private SalRoleService salRoleService = null;
     private final HashedWheelTimer hashedWheelTimer;
-    private ConnectionContext primaryConnectionContext;
+    private volatile ConnectionContext primaryConnectionContext;
     private final DeviceState deviceState;
     private final DataBroker dataBroker;
     private final Map<SwitchConnectionDistinguisher, ConnectionContext> auxiliaryConnectionContexts;
@@ -153,7 +156,7 @@ public class DeviceContextImpl implements DeviceContext, ExtensionConverterProvi
     private DeviceFlowRegistry deviceFlowRegistry;
     private DeviceGroupRegistry deviceGroupRegistry;
     private DeviceMeterRegistry deviceMeterRegistry;
-    private final PacketInRateLimiter packetInLimiter;
+    private PacketInRateLimiter packetInLimiter;
     private final MessageSpy messageSpy;
     private final ItemLifeCycleKeeper flowLifeCycleKeeper;
     private NotificationPublishService notificationPublishService;
@@ -166,7 +169,7 @@ public class DeviceContextImpl implements DeviceContext, ExtensionConverterProvi
     private ExtensionConverterProvider extensionConverterProvider;
     private boolean skipTableFeatures;
     private boolean switchFeaturesMandatory;
-    private final DeviceInfo deviceInfo;
+    private DeviceInfo deviceInfo;
     private final ConvertorExecutor convertorExecutor;
     private volatile CONTEXT_STATE state;
     private ClusterInitializationPhaseHandler clusterInitializationPhaseHandler;
@@ -174,6 +177,8 @@ public class DeviceContextImpl implements DeviceContext, ExtensionConverterProvi
     private final DeviceInitializerProvider deviceInitializerProvider;
     private final boolean useSingleLayerSerialization;
     private Boolean isAddNotificationSent = false;
+    private OutboundQueueProvider outboundQueueProvider;
+    private boolean wasOnceMaster;
 
     DeviceContextImpl(
         @Nonnull final ConnectionContext primaryConnectionContext,
@@ -188,6 +193,7 @@ public class DeviceContextImpl implements DeviceContext, ExtensionConverterProvi
         final DeviceInitializerProvider deviceInitializerProvider) {
 
         this.primaryConnectionContext = primaryConnectionContext;
+        this.outboundQueueProvider = (OutboundQueueProvider) primaryConnectionContext.getOutboundQueueProvider();
         this.deviceInfo = primaryConnectionContext.getDeviceInfo();
         this.hashedWheelTimer = hashedWheelTimer;
         this.deviceInitializerProvider = deviceInitializerProvider;
@@ -198,7 +204,7 @@ public class DeviceContextImpl implements DeviceContext, ExtensionConverterProvi
         this.messageSpy = Preconditions.checkNotNull(messageSpy);
 
         this.packetInLimiter = new PacketInRateLimiter(primaryConnectionContext.getConnectionAdapter(),
-                /*initial*/ 1000, /*initial*/2000, this.messageSpy, REJECTED_DRAIN_FACTOR);
+                /*initial*/ LOW_WATERMARK, /*initial*/HIGH_WATERMARK, this.messageSpy, REJECTED_DRAIN_FACTOR);
 
         this.translatorLibrary = translatorLibrary;
         this.portStatusTranslator = translatorLibrary.lookupTranslator(
@@ -216,6 +222,7 @@ public class DeviceContextImpl implements DeviceContext, ExtensionConverterProvi
         this.skipTableFeatures = skipTableFeatures;
         this.useSingleLayerSerialization = useSingleLayerSerialization;
         this.initialized = false;
+        this.wasOnceMaster = false;
     }
 
     @Override
@@ -515,7 +522,9 @@ public class DeviceContextImpl implements DeviceContext, ExtensionConverterProvi
     public void onPublished() {
         Verify.verify(CONTEXT_STATE.INITIALIZATION.equals(getState()));
         this.state = CONTEXT_STATE.WORKING;
-        primaryConnectionContext.getConnectionAdapter().setPacketInFiltering(false);
+        synchronized (primaryConnectionContext) {
+            primaryConnectionContext.getConnectionAdapter().setPacketInFiltering(false);
+        }
         for (final ConnectionContext switchAuxConnectionContext : auxiliaryConnectionContexts.values()) {
             switchAuxConnectionContext.getConnectionAdapter().setPacketInFiltering(false);
         }
@@ -596,31 +605,36 @@ public class DeviceContextImpl implements DeviceContext, ExtensionConverterProvi
     }
 
     @Override
-    public void replaceConnection(final ConnectionContext connectionContext) {
+    public synchronized void replaceConnection(final ConnectionContext connectionContext) {
+
+        primaryConnectionContext = null;
+        deviceInfo = null;
+        packetInLimiter = null;
+
+        primaryConnectionContext = connectionContext;
+        deviceInfo = primaryConnectionContext.getDeviceInfo();
 
-        connectionContext.getConnectionAdapter().setPacketInFiltering(true);
+        packetInLimiter = new PacketInRateLimiter(primaryConnectionContext.getConnectionAdapter(),
+                /*initial*/ LOW_WATERMARK, /*initial*/HIGH_WATERMARK, messageSpy, REJECTED_DRAIN_FACTOR);
 
-        final OutboundQueueProvider outboundQueueProvider
-                = new OutboundQueueProviderImpl(connectionContext.getDeviceInfo().getVersion());
+        primaryConnectionContext.setOutboundQueueProvider(outboundQueueProvider);
 
-        connectionContext.setOutboundQueueProvider(outboundQueueProvider);
         final OutboundQueueHandlerRegistration<OutboundQueueProvider> outboundQueueHandlerRegistration =
-                connectionContext.getConnectionAdapter().registerOutboundQueueHandler(
+                primaryConnectionContext.getConnectionAdapter().registerOutboundQueueHandler(
                         outboundQueueProvider,
-                        this.myManager.getBarrierCountLimit(),
-                        this.myManager.getBarrierIntervalNanos());
-        connectionContext.setOutboundQueueHandleRegistration(outboundQueueHandlerRegistration);
+                        myManager.getBarrierCountLimit(),
+                        myManager.getBarrierIntervalNanos());
 
-        this.salRoleService = new SalRoleServiceImpl(this, this);
+        primaryConnectionContext.setOutboundQueueHandleRegistration(outboundQueueHandlerRegistration);
 
         final OpenflowProtocolListenerFullImpl messageListener = new OpenflowProtocolListenerFullImpl(
-                connectionContext.getConnectionAdapter(), this);
+                primaryConnectionContext.getConnectionAdapter(), this);
 
-        connectionContext.getConnectionAdapter().setMessageListener(messageListener);
+        primaryConnectionContext.getConnectionAdapter().setMessageListener(messageListener);
 
         LOG.info("ConnectionEvent: Connection on device:{}, NodeId:{} switched.",
-                connectionContext.getConnectionAdapter().getRemoteAddress(),
-                connectionContext.getDeviceInfo().getNodeId());
+                primaryConnectionContext.getConnectionAdapter().getRemoteAddress(),
+                primaryConnectionContext.getDeviceInfo().getNodeId());
 
     }
 
@@ -661,15 +675,9 @@ public class DeviceContextImpl implements DeviceContext, ExtensionConverterProvi
                 }
             });
 
-            return Futures.transform(deactivateTxManagerFuture, (AsyncFunction<Void, Void>) aVoid -> {
-                // Add fallback to remove device from operational DS if setting slave fails
-                return Futures.withFallback(makeSlaveFuture, t ->
-                        myManager.removeDeviceFromOperationalDS(deviceInfo));
-            });
-        } else {
-            return Futures.transform(deactivateTxManagerFuture, (AsyncFunction<Void, Void>) aVoid ->
-                    myManager.removeDeviceFromOperationalDS(deviceInfo));
         }
+
+        return deactivateTxManagerFuture;
     }
 
     @Override
@@ -722,13 +730,12 @@ public class DeviceContextImpl implements DeviceContext, ExtensionConverterProvi
     }
 
     @Override
-    public boolean onContextInstantiateService(final MastershipChangeListener mastershipChangeListener) {
+    public void masterSuccessful(){
+        this.wasOnceMaster = true;
+    }
 
-        if (getPrimaryConnectionContext().getConnectionState().equals(ConnectionContext.CONNECTION_STATE.RIP)) {
-            LOG.warn("Connection on device {} was interrupted, will stop starting master services.",
-                    deviceInfo.getLOGValue());
-            return false;
-        }
+    @Override
+    public boolean onContextInstantiateService(final MastershipChangeListener mastershipChangeListener) {
 
         LOG.info("Starting device context cluster services for node {}", deviceInfo.getLOGValue());
 
index 1c4428aca6dc51dfced1faaf71d703cad7323d12..b267d115f5a980b0047c33a24ecd135885a765e4 100644 (file)
@@ -77,7 +77,6 @@ public class DeviceManagerImpl implements DeviceManager, ExtensionConverterProvi
     private final DeviceInitializerProvider deviceInitializerProvider;
     private final ConvertorExecutor convertorExecutor;
     private TranslatorLibrary translatorLibrary;
-    private DeviceInitializationPhaseHandler deviceInitPhaseHandler;
     private DeviceTerminationPhaseHandler deviceTerminPhaseHandler;
 
     private final ConcurrentMap<DeviceInfo, DeviceContext> deviceContexts = new ConcurrentHashMap<>();
@@ -140,7 +139,6 @@ public class DeviceManagerImpl implements DeviceManager, ExtensionConverterProvi
 
     @Override
     public void setDeviceInitializationPhaseHandler(final DeviceInitializationPhaseHandler handler) {
-        this.deviceInitPhaseHandler = handler;
     }
 
     @Override
@@ -152,11 +150,6 @@ public class DeviceManagerImpl implements DeviceManager, ExtensionConverterProvi
         lifecycleService.registerDeviceRemovedHandler(this);
     }
 
-    @Override
-    public ConnectionStatus deviceConnected(@CheckForNull final ConnectionContext connectionContext) throws Exception {
-        return ConnectionStatus.MAY_CONTINUE;
-    }
-
     @Override
     public TranslatorLibrary oook() {
         return translatorLibrary;
index 8fe1a89dd0809930011f6bd29a9040c8a9db7ea2..5cde6fc507519ccf7cd9e2446bca10bd0a0af78d 100644 (file)
@@ -13,8 +13,10 @@ import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import java.util.Objects;
 import java.util.concurrent.ConcurrentHashMap;
-import javax.annotation.Nonnull;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import javax.annotation.Nullable;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
 import org.opendaylight.openflowplugin.api.openflow.OFPManager;
 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
@@ -31,8 +33,6 @@ import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsContext
 import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsManager;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflow.provider.config.rev160510.ContextChainState;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflow.provider.config.rev160510.openflow.provider.config.ContextChainConfig;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.SetRoleOutput;
-import org.opendaylight.yangtools.yang.common.RpcResult;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -134,15 +134,21 @@ public class ContextChainHolderImpl implements ContextChainHolder {
         ContextChain chain = contextChainMap.get(deviceInfo);
         if (Objects.nonNull(chain)) {
             chain.close();
+            try {
+                deviceManager.removeDeviceFromOperationalDS(deviceInfo).checkedGet(5L, TimeUnit.SECONDS);
+            } catch (TimeoutException | TransactionCommitFailedException e) {
+                LOG.warn("Not able to remove device {} from DS", deviceInfo.getLOGValue());
+            }
         }
     }
 
     @Override
     public void pairConnection(final ConnectionContext connectionContext) {
         DeviceInfo deviceInfo = connectionContext.getDeviceInfo();
-        latestConnections.put(deviceInfo, connectionContext);
-        if (checkChainContext(deviceInfo)) {
-            contextChainMap.get(deviceInfo).changePrimaryConnection(connectionContext);
+        ContextChain contextChain = contextChainMap.get(deviceInfo);
+        if (Objects.nonNull(contextChain)) {
+            contextChain.changePrimaryConnection(connectionContext);
+            contextChain.makeDeviceSlave();
         }
     }
 
@@ -175,7 +181,7 @@ public class ContextChainHolderImpl implements ContextChainHolder {
         if (Objects.nonNull(contextChain)) {
             LOG.warn("Not able to set MASTER role on device {}", deviceInfo.getLOGValue());
             if (contextChain.getContextChainState().equals(ContextChainState.INITIALIZED)) {
-                contextChain.getPrimaryConnectionContext().closeConnection(false);
+                contextChain.closePrimaryConnection();
             } else {
                 contextChain.sleepTheChainAndDropConnection();
             }
@@ -196,7 +202,8 @@ public class ContextChainHolderImpl implements ContextChainHolder {
                     LOG.info("Device {} has not finish initial gathering yet.",
                             deviceInfo.getLOGValue());
                 }
-                contextChain.startChain();
+                Futures.addCallback(contextChain.startChain(),
+                        new StartStopChainCallback(contextChain.provideDeviceContext(), false));
             }
         }
     }
@@ -205,11 +212,13 @@ public class ContextChainHolderImpl implements ContextChainHolder {
     public void onSlaveRoleAcquired(final DeviceInfo deviceInfo) {
         ContextChain contextChain = contextChainMap.get(deviceInfo);
         if (Objects.nonNull(contextChain)) {
-            if (contextChain.getContextChainState().equals(ContextChainState.INITIALIZED)) {
-                contextChain.registerServices(this.singletonServicesProvider);
-            } else {
-                contextChain.stopChain();
-            }
+//            if (contextChain.getContextChainState().equals(ContextChainState.INITIALIZED)) {
+//                contextChain.registerServices(this.singletonServicesProvider);
+//            } else {
+//                Futures.addCallback(contextChain.stopChain(false),
+//                        new StartStopChainCallback(contextChain.provideDeviceContext(), true));
+//            }
+            contextChain.registerServices(this.singletonServicesProvider);
         }
     }
 
@@ -221,6 +230,26 @@ public class ContextChainHolderImpl implements ContextChainHolder {
         }
     }
 
+    @Override
+    public void onDeviceDisconnected(final ConnectionContext connectionContext) {
+
+        if (Objects.isNull(connectionContext.getDeviceInfo())) {
+            LOG.info("Non existing device info. Cannot close context chain.");
+        } else {
+            LOG.info("Device {} disconnected.", connectionContext.getDeviceInfo().getLOGValue());
+            ContextChain chain = contextChainMap.get(connectionContext.getDeviceInfo());
+            if (Objects.isNull(chain)) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("There was no context chain created yet for the disconnected device {}",
+                            connectionContext.getDeviceInfo().getLOGValue());
+                }
+            } else {
+                Futures.addCallback(chain.connectionDropped(),
+                        new StartStopChainCallback(null, true));
+            }
+        }
+    }
+
     private boolean checkAllManagers() {
         return Objects.nonNull(deviceManager) && Objects.nonNull(rpcManager) && Objects.nonNull(statisticsManager);
     }
@@ -229,4 +258,35 @@ public class ContextChainHolderImpl implements ContextChainHolder {
         return contextChainMap.containsKey(deviceInfo);
     }
 
+    private class StartStopChainCallback implements FutureCallback<Void> {
+
+        private final String deviceInfo;
+        private final String stop;
+        private final String stopped;
+        private final boolean start;
+        private final DeviceContext deviceContext;
+
+        StartStopChainCallback(final DeviceContext deviceContext, final boolean stop) {
+
+            this.deviceInfo = Objects.nonNull(deviceContext) ? deviceContext.getDeviceInfo().getLOGValue() : "null";
+            this.stop = stop ? "stop" : "start";
+            this.stopped = stop ? "stopped" : "started";
+            this.start = !stop;
+            this.deviceContext = deviceContext;
+        }
+
+        @Override
+        public void onSuccess(@Nullable Void aVoid) {
+            LOG.info("Context chain for device {} successfully {}", deviceInfo, stopped);
+//            if (start && Objects.nonNull(deviceContext)) {
+//                deviceContext.masterSuccessful();
+//            }
+        }
+
+        @Override
+        public void onFailure(Throwable throwable) {
+            LOG.warn("Not able to {} the context chain for device {}", stop, deviceInfo);
+        }
+    }
+
 }
index 886e3b03f5d3e7f6f0a998a26987054990cfc86c..c43e759e77b19c064cd04cdf6fa1afaad01f591b 100644 (file)
@@ -71,12 +71,12 @@ public class ContextChainImpl implements ContextChain {
     }
 
     @Override
-    public ListenableFuture<Void> stopChain() {
+    public ListenableFuture<Void> stopChain(boolean connectionDropped) {
         //TODO: stopClusterServices change parameter
         final List<ListenableFuture<Void>> futureList = new ArrayList<>();
-        futureList.add(statisticsContext.stopClusterServices(true));
-        futureList.add(rpcContext.stopClusterServices(false));
-        futureList.add(deviceContext.stopClusterServices(false));
+        futureList.add(statisticsContext.stopClusterServices());
+        futureList.add(rpcContext.stopClusterServices());
+        futureList.add(deviceContext.stopClusterServices(connectionDropped));
 
         return Futures.transform(Futures.successfulAsList(futureList), new Function<List<Void>, Void>() {
             @Nullable
@@ -113,10 +113,11 @@ public class ContextChainImpl implements ContextChain {
 
     @Override
     public void changePrimaryConnection(final ConnectionContext connectionContext) {
+        this.primaryConnectionContext = connectionContext;
+        this.contextChainState = ContextChainState.INITIALIZED;
         for (OFPContext context : contexts) {
             context.replaceConnection(connectionContext);
         }
-        this.primaryConnectionContext = connectionContext;
     }
 
     @Override
@@ -129,7 +130,7 @@ public class ContextChainImpl implements ContextChain {
         ContextChainState oldState = this.contextChainState;
         this.contextChainState = ContextChainState.SLEEPING;
         if (oldState.equals(ContextChainState.WORKINGMASTER)) {
-            return this.stopChain();
+            return this.stopChain(true);
         }
         return Futures.immediateFuture(null);
     }
@@ -142,7 +143,7 @@ public class ContextChainImpl implements ContextChain {
     @Override
     public void sleepTheChainAndDropConnection() {
         this.contextChainState = ContextChainState.SLEEPING;
-        this.primaryConnectionContext.closeConnection(false);
+        this.primaryConnectionContext.closeConnection(true);
     }
 
     @Override
@@ -160,4 +161,13 @@ public class ContextChainImpl implements ContextChain {
         this.lifecycleService.makeDeviceSlave(this.deviceContext);
     }
 
+    @Override
+    public void closePrimaryConnection() {
+        this.primaryConnectionContext.closeConnection(true);
+    }
+
+    @Override
+    public DeviceContext provideDeviceContext() {
+        return this.deviceContext;
+    }
 }
index c188c65a8e2762ac5f49f839206b66f00d77c756..2820bd4012fa956d6b5891e2665c705ce191e5d0 100644 (file)
@@ -87,12 +87,15 @@ class RpcContextImpl implements RpcContext {
     @Override
     public <S extends RpcService> void registerRpcServiceImplementation(final Class<S> serviceClass,
                                                                         final S serviceInstance) {
-        LOG.trace("Try to register service {} for device {}.", serviceClass, nodeInstanceIdentifier);
         if (! rpcRegistrations.containsKey(serviceClass)) {
             final RoutedRpcRegistration<S> routedRpcReg = rpcProviderRegistry.addRoutedRpcImplementation(serviceClass, serviceInstance);
             routedRpcReg.registerPath(NodeContext.class, nodeInstanceIdentifier);
             rpcRegistrations.put(serviceClass, routedRpcReg);
-            LOG.debug("Registration of service {} for device {}.", serviceClass.getSimpleName(), nodeInstanceIdentifier.getKey().getId().getValue());
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Registration of service {} for device {}.",
+                        serviceClass.getSimpleName(),
+                        nodeInstanceIdentifier.getKey().getId().getValue());
+            }
         }
     }
 
@@ -116,7 +119,7 @@ class RpcContextImpl implements RpcContext {
             }
         } else {
             try {
-                stopClusterServices(true).get();
+                stopClusterServices().get();
             } catch (Exception e) {
                 LOG.debug("Failed to close RpcContext for node {} with exception: ", getDeviceInfo().getLOGValue(), e);
             }
@@ -189,8 +192,8 @@ class RpcContextImpl implements RpcContext {
     }
 
     @Override
-    public ListenableFuture<Void> stopClusterServices(boolean connectionInterrupted) {
-        if (CONTEXT_STATE.TERMINATION.equals(getState())) {
+    public ListenableFuture<Void> stopClusterServices() {
+        if (CONTEXT_STATE.TERMINATION.equals(this.state)) {
             return Futures.immediateCancelledFuture();
         }
 
index 5a608d1aa150bc992c519365dda7025456d9464e..d95527476942ea2106d7cd13a22604a4f60a398a 100644 (file)
@@ -458,8 +458,8 @@ class StatisticsContextImpl<T extends OfHeader> implements StatisticsContext {
     }
 
     @Override
-    public ListenableFuture<Void> stopClusterServices(boolean connectionInterrupted) {
-        if (CONTEXT_STATE.TERMINATION.equals(getState())) {
+    public ListenableFuture<Void> stopClusterServices() {
+        if (CONTEXT_STATE.TERMINATION.equals(this.state)) {
             return Futures.immediateCancelledFuture();
         }
 
index 08243d9f5ab18a6793b7f8d88355bcbf9ade658d..b5364e4559a51aa92fea9d96f1c9686aab1f0941 100644 (file)
@@ -29,7 +29,6 @@ import io.netty.util.Timeout;
 import java.math.BigInteger;
 import java.net.InetSocketAddress;
 import java.util.concurrent.atomic.AtomicLong;
-import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
index e4bd7780667d37437f2ac3998ffd520e9f4cec56..c46d13946e6c3e9ade76d8e4e6c625b222970292 100644 (file)
@@ -10,7 +10,6 @@ package org.opendaylight.openflowplugin.impl.device;
 
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.doThrow;
-import static org.mockito.Mockito.inOrder;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
@@ -21,14 +20,11 @@ import com.google.common.util.concurrent.Futures;
 import io.netty.util.HashedWheelTimer;
 import java.lang.reflect.Field;
 import java.math.BigInteger;
-import java.util.Collections;
 import java.util.concurrent.ConcurrentHashMap;
 import org.junit.Assert;
 import org.junit.Before;
-import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.runner.RunWith;
-import org.mockito.InOrder;
 import org.mockito.Matchers;
 import org.mockito.Mock;
 import org.mockito.Mockito;
@@ -48,16 +44,13 @@ import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueHandle
 import org.opendaylight.openflowplugin.api.OFConstants;
 import org.opendaylight.openflowplugin.api.openflow.OFPContext;
 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
-import org.opendaylight.openflowplugin.api.openflow.connection.OutboundQueueProvider;
 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
 import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
 import org.opendaylight.openflowplugin.api.openflow.device.DeviceState;
-import org.opendaylight.openflowplugin.api.openflow.device.MessageTranslator;
 import org.opendaylight.openflowplugin.api.openflow.device.TranslatorLibrary;
 import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceInitializationPhaseHandler;
 import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceTerminationPhaseHandler;
 import org.opendaylight.openflowplugin.api.openflow.lifecycle.LifecycleService;
-import org.opendaylight.openflowplugin.api.openflow.md.core.TranslatorKey;
 import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageIntelligenceAgency;
 import org.opendaylight.openflowplugin.impl.device.initialization.DeviceInitializerProviderFactory;
 import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorExecutor;
@@ -69,7 +62,6 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev13
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FeaturesReply;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartRequestInput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.features.reply.PhyPortBuilder;
 import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
 
 @RunWith(MockitoJUnitRunner.class)