Merge "Bug 5596 Changes when closing device"
authorJozef Bacigal <jozef.bacigal@pantheon.tech>
Mon, 15 Aug 2016 12:29:22 +0000 (12:29 +0000)
committerGerrit Code Review <gerrit@opendaylight.org>
Mon, 15 Aug 2016 12:29:22 +0000 (12:29 +0000)
20 files changed:
openflowplugin-api/src/main/java/org/opendaylight/openflowplugin/api/openflow/OFPContext.java
openflowplugin-api/src/main/java/org/opendaylight/openflowplugin/api/openflow/device/DeviceContext.java
openflowplugin-api/src/main/java/org/opendaylight/openflowplugin/api/openflow/lifecycle/LifecycleService.java
openflowplugin-api/src/main/java/org/opendaylight/openflowplugin/api/openflow/role/RoleContext.java
openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/connection/ConnectionContextImpl.java
openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/DeviceContextImpl.java
openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/DeviceManagerImpl.java
openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/TransactionChainManager.java
openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/lifecycle/LifecycleServiceImpl.java
openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/registry/flow/DeviceFlowRegistryImpl.java
openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/role/RoleContextImpl.java
openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/role/RoleManagerImpl.java
openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/rpc/RpcContextImpl.java
openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/services/RoleService.java
openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/statistics/StatisticsContextImpl.java
openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/statistics/StatisticsGatheringUtils.java
openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/util/DeviceInitializationUtils.java
openflowplugin-impl/src/test/java/org/opendaylight/openflowplugin/impl/device/DeviceManagerImplTest.java
openflowplugin-impl/src/test/java/org/opendaylight/openflowplugin/impl/registry/flow/DeviceFlowRegistryImplTest.java
openflowplugin-impl/src/test/java/org/opendaylight/openflowplugin/impl/role/RoleContextImplTest.java

index 7bd11c9be76fe8463c413cee753c61c70b67396f..0840568efb23f2d88d3b6aa015e42d9b2783adea 100644 (file)
@@ -19,9 +19,7 @@ import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
  */
 public interface OFPContext {
 
-    default void setState(CONTEXT_STATE contextState) {
-        //NOOP
-    }
+    void setState(CONTEXT_STATE contextState);
 
     /**
      * Context state
index a081ab7cabb4fb2541213ac0862e310a28443fd5..d69dc9f5b4c566fb358b05ba9f102851c0dd69b4 100644 (file)
@@ -49,6 +49,9 @@ public interface DeviceContext extends
      */
     void shutdownConnection();
 
+    /**
+     * Initial submit transaction
+     */
     void initialSubmitTransaction();
 
     /**
index 53755179d78d79169533fa7d1a7dd2f67161baa0..0eca3b7d15260ef40ba678767982005c4e8ca5db 100644 (file)
@@ -20,15 +20,35 @@ import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsContext
  */
 public interface LifecycleService extends ClusterSingletonService, AutoCloseable {
 
-    void registerService(ClusterSingletonServiceProvider singletonServiceProvider);
+    /**
+     * This method registers lifecycle service to the given provider
+     * @param singletonServiceProvider from md-sal binding
+     */
+    void registerService(final ClusterSingletonServiceProvider singletonServiceProvider);
 
-    void setDeviceContext(DeviceContext deviceContext);
+    /**
+     * Setter for device context
+     * @param deviceContext actual device context created per device
+     */
+    void setDeviceContext(final DeviceContext deviceContext);
 
-    void setRpcContext(RpcContext rpcContext);
+    /**
+     * Setter for rpc context
+     * @param rpcContext actual rpc context created per device
+     */
+    void setRpcContext(final RpcContext rpcContext);
 
-    void setRoleContext(RoleContext roleContext);
+    /**
+     * Setter for role context
+     * @param roleContext actual role context created per device
+     */
+    void setRoleContext(final RoleContext roleContext);
 
-    void setStatContext(StatisticsContext statContext);
+    /**
+     * Setter for statistics context
+     * @param statContext actual statistics context created per device
+     */
+    void setStatContext(final StatisticsContext statContext);
 
     /**
      * Some services, contexts etc. still need to have access to device context,
index b76b43e86f7e899973fd58d9912b244973bba699..d75d7ff8e5bf4d21095fbb1b8f5353bc42ea75df 100644 (file)
@@ -7,6 +7,7 @@
  */
 package org.opendaylight.openflowplugin.api.openflow.role;
 
+import com.google.common.util.concurrent.ListenableFuture;
 import javax.annotation.Nonnull;
 import org.opendaylight.controller.md.sal.common.api.clustering.Entity;
 import org.opendaylight.openflowplugin.api.openflow.OFPContext;
@@ -15,6 +16,8 @@ import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
 import org.opendaylight.openflowplugin.api.openflow.device.RequestContextStack;
 import org.opendaylight.openflowplugin.api.openflow.rpc.RpcManager;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.SalRoleService;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.SetRoleOutput;
+import org.opendaylight.yangtools.yang.common.RpcResult;
 
 /**
  * Role context for change role on cluster
@@ -33,4 +36,5 @@ public interface RoleContext extends  RequestContextStack, OFPContext {
      */
     SalRoleService getSalRoleService();
 
+    ListenableFuture<RpcResult<SetRoleOutput>> makeDeviceSlave();
 }
index 5a5de4fe9ce65849fbb73f6d9586aa652478a185..dea0a5e6818b424021bc293bdb6635e1c706301b 100644 (file)
@@ -73,6 +73,7 @@ public class ConnectionContextImpl implements ConnectionContext {
     @Override
     public void setOutboundQueueProvider(final OutboundQueueProvider outboundQueueProvider) {
         this.outboundQueueProvider = outboundQueueProvider;
+        ((DeviceInfoImpl)this.deviceInfo).setOutboundQueueProvider(this.outboundQueueProvider);
     }
 
     @Override
@@ -245,6 +246,37 @@ public class ConnectionContextImpl implements ConnectionContext {
         this.handshakeContext = handshakeContext;
     }
 
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+
+        ConnectionContextImpl that = (ConnectionContextImpl) o;
+
+        if (!connectionAdapter.equals(that.connectionAdapter)) {
+            return false;
+        }
+
+        if (featuresReply != null ? !featuresReply.equals(that.featuresReply) : that.featuresReply != null) {
+            return false;
+        }
+
+        return nodeId != null ? nodeId.equals(that.nodeId) : that.nodeId == null;
+
+    }
+
+    @Override
+    public int hashCode() {
+        int result = connectionAdapter.hashCode();
+        result = 31 * result + (featuresReply != null ? featuresReply.hashCode() : 0);
+        result = 31 * result + (nodeId != null ? nodeId.hashCode() : 0);
+        return result;
+    }
 
     private class DeviceInfoImpl implements DeviceInfo {
 
@@ -322,6 +354,10 @@ public class ConnectionContextImpl implements ConnectionContext {
             return result;
         }
 
+        public void setOutboundQueueProvider(final OutboundQueue outboundQueueProvider) {
+            this.outboundQueueProvider = outboundQueueProvider;
+        }
+
         @Override
         public Long reserveXidForDeviceMessage() {
             return outboundQueueProvider.reserveEntry();
index 97c791766331ee2cd058aceb9e15a378b39c7170..1bfe9e96a3fe77dc7f11b5b62c9f4a57ae5dd413 100644 (file)
@@ -540,7 +540,7 @@ public class DeviceContextImpl implements DeviceContext, ExtensionConverterProvi
         LOG.debug("Initializing transaction chain manager for node {}", getDeviceInfo().getNodeId());
         this.transactionChainManager.activateTransactionManager();
         LOG.debug("Waiting to get node {} information", getDeviceInfo().getNodeId());
-        DeviceInitializationUtils.initializeNodeInformation(this, switchFeaturesMandatory, this.convertorExecutor).get();
+        DeviceInitializationUtils.initializeNodeInformation(this, switchFeaturesMandatory, this.convertorExecutor);
     }
 
     @Override
index ef850d972f48018a4b172dc3666b430264a758cd..0614df6bd5c2447102208e9cf7c9a29a0fb4fc21 100644 (file)
@@ -18,6 +18,7 @@ import io.netty.util.HashedWheelTimer;
 import io.netty.util.TimerTask;
 import java.util.Collections;
 import java.util.Iterator;
+import java.util.Objects;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutionException;
@@ -154,7 +155,12 @@ public class DeviceManagerImpl implements DeviceManager, ExtensionConverterProvi
          * If context already exist we are in state closing process (connection flapping) and we should not propagate connection close
          */
          if (deviceContexts.containsKey(deviceInfo)) {
-            LOG.warn("Rejecting connection from node which is already connected and there exist deviceContext for it: {}", connectionContext.getNodeId());
+             DeviceContext deviceContext = deviceContexts.get(deviceInfo);
+             if (!deviceContext.getState().equals(OFPContext.CONTEXT_STATE.TERMINATION)) {
+                 LOG.warn("Context state for node {} is not in TERMINATION state, trying to reconnect", connectionContext.getNodeId().getValue());
+             } else {
+                 LOG.warn("Rejecting connection from node which is already connected and there exist deviceContext for it: {}", connectionContext.getNodeId().getValue());
+             }
              return false;
          }
 
@@ -184,7 +190,7 @@ public class DeviceManagerImpl implements DeviceManager, ExtensionConverterProvi
                 this,
                 convertorExecutor);
 
-        Verify.verify(deviceContexts.putIfAbsent(deviceInfo, deviceContext) == null, "DeviceCtx still not closed.");
+        deviceContexts.putIfAbsent(deviceInfo, deviceContext);
 
         final LifecycleService lifecycleService = new LifecycleServiceImpl();
         lifecycleService.setDeviceContext(deviceContext);
@@ -249,14 +255,15 @@ public class DeviceManagerImpl implements DeviceManager, ExtensionConverterProvi
 
     @Override
     public void onDeviceContextLevelDown(final DeviceInfo deviceInfo) {
-        LOG.debug("onDeviceContextClosed for Node {}", deviceInfo.getNodeId());
         deviceContexts.remove(deviceInfo);
-        updatePacketInRateLimiters();
         LifecycleService lifecycleService = lifecycleServices.remove(deviceInfo);
-        try {
-            lifecycleService.close();
-        } catch (Exception e) {
-            LOG.warn("Closing service for node {} was unsuccessful ", deviceInfo.getNodeId().getValue(), e);
+        updatePacketInRateLimiters();
+        if (Objects.nonNull(lifecycleService)) {
+            try {
+                lifecycleService.close();
+            } catch (Exception e) {
+                LOG.warn("Closing service for node {} was unsuccessful ", deviceInfo.getNodeId().getValue(), e);
+            }
         }
     }
 
@@ -291,30 +298,41 @@ public class DeviceManagerImpl implements DeviceManager, ExtensionConverterProvi
             return;
         }
 
+        if (deviceCtx.getState().equals(OFPContext.CONTEXT_STATE.TERMINATION)) {
+            LOG.debug("Device context for node {} is already is termination state, waiting for close all context");
+            return;
+        }
+
+        deviceCtx.setState(OFPContext.CONTEXT_STATE.TERMINATION);
+
         if (!connectionContext.equals(deviceCtx.getPrimaryConnectionContext())) {
+            LOG.debug("Node {} disconnected, but not primary connection.", connectionContext.getDeviceInfo().getNodeId().getValue());
             /* Connection is not PrimaryConnection so try to remove from Auxiliary Connections */
             deviceCtx.removeAuxiliaryConnectionContext(connectionContext);
-        } else {
+        }
+        //TODO: Auxiliary connections supported ?
+        {
             /* Device is disconnected and so we need to close TxManager */
             final ListenableFuture<Void> future = deviceCtx.shuttingDownDataStoreTransactions();
             Futures.addCallback(future, new FutureCallback<Void>() {
 
                 @Override
                 public void onSuccess(final Void result) {
-                    LOG.debug("TxChainManager for device {} is closed successful.", deviceInfo.getNodeId());
+                    LOG.debug("TxChainManager for device {} is closed successful.", deviceInfo.getNodeId().getValue());
                     deviceTerminPhaseHandler.onDeviceContextLevelDown(deviceInfo);
                 }
 
                 @Override
                 public void onFailure(final Throwable t) {
-                    LOG.warn("TxChainManager for device {} failed by closing.", deviceInfo.getNodeId(), t);
+                    LOG.warn("TxChainManager for device {} failed by closing.", deviceInfo.getNodeId().getValue());
+                    LOG.trace("TxChainManager failed by closing. ", t);
                     deviceTerminPhaseHandler.onDeviceContextLevelDown(deviceInfo);
                 }
             });
             /* Add timer for Close TxManager because it could fain ind cluster without notification */
             final TimerTask timerTask = timeout -> {
                 if (!future.isDone()) {
-                    LOG.info("Shutting down TxChain for node {} not completed during 10 sec. Continue anyway.", deviceInfo.getNodeId());
+                    LOG.warn("Shutting down TxChain for node {} not completed during 10 sec. Continue anyway.", deviceInfo.getNodeId().getValue());
                     future.cancel(false);
                 }
             };
@@ -327,11 +345,6 @@ public class DeviceManagerImpl implements DeviceManager, ExtensionConverterProvi
         deviceContexts.put(deviceInfo, deviceContext);
     }
 
-    @VisibleForTesting
-    void removeDeviceContextFromMap(final DeviceInfo deviceInfo){
-        deviceContexts.remove(deviceInfo);
-    }
-
     @Override
     public <T extends OFPContext> T gainContext(final DeviceInfo deviceInfo) {
         return (T) deviceContexts.get(deviceInfo);
index 897d87637887a09d6424eed7450ab05a0a9478df..4f28731606934e1778a5d148e71aa869c4a71d32 100644 (file)
@@ -14,6 +14,7 @@ import com.google.common.util.concurrent.CheckedFuture;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
+import java.util.concurrent.CancellationException;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 import javax.annotation.concurrent.GuardedBy;
@@ -27,6 +28,7 @@ import org.opendaylight.controller.md.sal.common.api.data.TransactionChainClosed
 import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
 import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
+import org.opendaylight.openflowplugin.api.openflow.lifecycle.LifecycleService;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
@@ -175,12 +177,18 @@ class TransactionChainManager implements TransactionChainListener, AutoCloseable
                 @Override
                 public void onFailure(final Throwable t) {
                     if (t instanceof TransactionCommitFailedException) {
-                        LOG.error("Transaction commit failed. {}", t);
+                        LOG.error("Transaction commit failed. ", t);
                     } else {
-                        LOG.error("Exception during transaction submitting. {}", t);
+                        if (t instanceof CancellationException) {
+                            LOG.warn("Submit task was canceled");
+                            LOG.trace("Submit exception: ", t);
+                        } else {
+                            LOG.error("Exception during transaction submitting. ", t);
+                        }
                     }
                     if (initCommit) {
-                        LOG.error("Initial commit failed. {}", t);
+                        LOG.warn("Initial commit failed. ", t);
+                        wTx = null;
                     }
                 }
             });
index 53cc263eb3e16103762f99a9e3327d1604edc18e..dbac44a2705b2c62020e6d812c847d5561feffe4 100644 (file)
@@ -18,6 +18,7 @@ import javax.annotation.Nullable;
 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceRegistration;
 import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
+import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
 import org.opendaylight.openflowplugin.api.openflow.lifecycle.LifecycleService;
 import org.opendaylight.openflowplugin.api.openflow.role.RoleContext;
@@ -42,31 +43,70 @@ public class LifecycleServiceImpl implements LifecycleService {
     public void instantiateServiceInstance() {
         try {
 
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Starting clustering MASTER services for node {}", this.deviceContext.getDeviceInfo().getNodeId().getValue());
+                LOG.debug("===============================================");
+            }
+
+            if (connectionInterrupted()) {
+                return;
+            }
+
             LOG.info("Starting device context cluster services for node {}", getIdentifier());
             this.deviceContext.startupClusterServices();
 
+            if (connectionInterrupted()) {
+                return;
+            }
+
             LOG.info("Starting statistics context cluster services for node {}", getIdentifier());
             this.statContext.startupClusterServices();
 
+            if (connectionInterrupted()) {
+                return;
+            }
+
             LOG.info("Statistics initial gathering OK, submitting data for node {}", getIdentifier());
             this.deviceContext.initialSubmitTransaction();
 
+            if (connectionInterrupted()) {
+                return;
+            }
+
             LOG.info("Starting rpc context cluster services for node {}", getIdentifier());
             this.rpcContext.startupClusterServices();
 
+            if (connectionInterrupted()) {
+                return;
+            }
+
             LOG.info("Starting role context cluster services for node {}", getIdentifier());
             this.roleContext.startupClusterServices();
 
+            if (connectionInterrupted()) {
+                return;
+            }
+
             LOG.info("Caching flows IDs ...");
             fillDeviceFlowRegistry();
 
         } catch (ExecutionException | InterruptedException e) {
             LOG.warn("Cluster service {} was unable to start.", this.getIdentifier());
+            this.deviceContext.shutdownConnection();
+        }
+    }
+
+    private boolean connectionInterrupted() {
+        if (this.deviceContext.getPrimaryConnectionContext().getConnectionState().equals(ConnectionContext.CONNECTION_STATE.RIP)) {
+            LOG.warn("Node {} was disconnected, will stop starting MASTER services.", this.deviceContext.getDeviceInfo().getNodeId().getValue());
+            return true;
         }
+        return false;
     }
 
     @Override
     public ListenableFuture<Void> closeServiceInstance() {
+        roleContext.stopClusterServices();
         statContext.stopClusterServices();
         rpcContext.stopClusterServices();
         return deviceContext.stopClusterServices();
index ae072563ae28807d392f2015b2def15d0f0af403..c12a8ac80dd3c8b253ad2cf0472f6d4595d4196b 100644 (file)
@@ -25,6 +25,7 @@ import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Consumer;
@@ -129,8 +130,12 @@ public class DeviceFlowRegistryImpl implements DeviceFlowRegistry {
             @Override
             public void onSuccess(Optional<FlowCapableNode> result) {
                 result.asSet().stream()
+                        .filter(flowCapableNode -> Objects.nonNull(flowCapableNode.getTable()))
                         .flatMap(flowCapableNode -> flowCapableNode.getTable().stream())
+                        .filter(table -> Objects.nonNull(table.getFlow()))
                         .flatMap(table -> table.getFlow().stream())
+                        .filter(Objects::nonNull)
+                        .filter(flow -> Objects.nonNull(flow.getId()))
                         .forEach(flowConsumer);
 
                 // After we are done with reading from datastore, close the transaction
index 3e33a80845768c0a59b8f9fe9d67e162dfe7cf7f..483b8c7fafbbc219395c875c46f4c09fcfbb8851 100644 (file)
@@ -7,7 +7,9 @@
  */
 package org.opendaylight.openflowplugin.impl.role;
 
+import com.google.common.base.Function;
 import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.JdkFutureAdapters;
 import com.google.common.util.concurrent.ListenableFuture;
@@ -22,6 +24,7 @@ import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
 import org.opendaylight.openflowplugin.api.OFConstants;
 import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
 import org.opendaylight.openflowplugin.api.openflow.device.RequestContext;
+import org.opendaylight.openflowplugin.api.openflow.lifecycle.LifecycleService;
 import org.opendaylight.openflowplugin.api.openflow.role.RoleContext;
 import org.opendaylight.openflowplugin.api.openflow.role.RoleManager;
 import org.opendaylight.openflowplugin.impl.rpc.AbstractRequestContext;
@@ -51,11 +54,14 @@ class RoleContextImpl implements RoleContext {
     private final DeviceInfo deviceInfo;
     private CONTEXT_STATE state;
     private final RoleManager myManager;
+    private final LifecycleService lifecycleService;
 
     RoleContextImpl(final DeviceInfo deviceInfo,
                     final HashedWheelTimer hashedWheelTimer,
-                    final RoleManager myManager) {
+                    final RoleManager myManager,
+                    final LifecycleService lifecycleService) {
         this.deviceInfo = deviceInfo;
+        this.lifecycleService = lifecycleService;
         state = CONTEXT_STATE.WORKING;
         this.myManager = myManager;
         this.hashedWheelTimer = hashedWheelTimer;
@@ -103,25 +109,57 @@ class RoleContextImpl implements RoleContext {
     }
 
     public void startupClusterServices() throws ExecutionException, InterruptedException {
-        //TODO: Add callback ?
-        sendRoleChangeToDevice(OfpRole.BECOMEMASTER).get();
+        Futures.addCallback(sendRoleChangeToDevice(OfpRole.BECOMEMASTER), new FutureCallback<RpcResult<SetRoleOutput>>() {
+            @Override
+            public void onSuccess(@Nullable RpcResult<SetRoleOutput> setRoleOutputRpcResult) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Role MASTER was successfully set on device, node {}", deviceInfo.getNodeId().getValue());
+                }
+            }
+
+            @Override
+            public void onFailure(final Throwable throwable) {
+                LOG.warn("Was not able to set MASTER role on device, node {}", deviceInfo.getNodeId().getValue());
+                lifecycleService.closeConnection();
+            }
+        });
     }
 
     @Override
     public ListenableFuture<Void> stopClusterServices() {
-        ListenableFuture<Void> future;
-        try {
-            //TODO: Add callback
-            sendRoleChangeToDevice(OfpRole.BECOMESLAVE).get();
-        } catch (InterruptedException | ExecutionException e) {
-            LOG.warn("Send role to device failed ", e);
-        } finally {
-            myManager.removeDeviceFromOperationalDS(deviceInfo, MAX_CLEAN_DS_RETRIES);
-            future = Futures.immediateFuture(null);
-        }
+        ListenableFuture<Void> future = Futures.transform(makeDeviceSlave(), new Function<RpcResult<SetRoleOutput>, Void>() {
+                    @Nullable
+                    @Override
+                    public Void apply(@Nullable RpcResult<SetRoleOutput> setRoleOutputRpcResult) {
+                        return null;
+                    }
+                });
+
+                Futures.addCallback(future, new FutureCallback<Void>() {
+                    @Override
+                    public void onSuccess(@Nullable Void aVoid) {
+                        if (LOG.isDebugEnabled()) {
+                            LOG.debug("Role SLAVE was successfully propagated on device, node {}", deviceInfo.getNodeId().getValue());
+                        }
+                        myManager.removeDeviceFromOperationalDS(deviceInfo, MAX_CLEAN_DS_RETRIES);
+                    }
+
+                    @Override
+                    public void onFailure(final Throwable throwable) {
+                        LOG.warn("Was not able to set role SLAVE to device on node {} ", deviceInfo.getNodeId().getValue());
+                        LOG.trace("Error occurred on device role setting, probably connection loss: ", throwable);
+                        myManager.removeDeviceFromOperationalDS(deviceInfo, MAX_CLEAN_DS_RETRIES);
+
+                    }
+                });
         return future;
     }
 
+    @Override
+    public ListenableFuture<RpcResult<SetRoleOutput>> makeDeviceSlave(){
+        return sendRoleChangeToDevice(OfpRole.BECOMESLAVE);
+    }
+
     private ListenableFuture<RpcResult<SetRoleOutput>> sendRoleChangeToDevice(final OfpRole newRole) {
         LOG.debug("Sending new role {} to device {}", newRole, deviceInfo.getNodeId());
         final Future<RpcResult<SetRoleOutput>> setRoleOutputFuture;
index fbebd1d41f9f04ebb87a4f6d64b2ebb142671ce0..263d8e7098319f877fb7716845ce7dce5a50e2b4 100644 (file)
@@ -20,6 +20,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import javax.annotation.CheckForNull;
 import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
@@ -34,6 +35,8 @@ import org.opendaylight.openflowplugin.api.openflow.role.RoleContext;
 import org.opendaylight.openflowplugin.api.openflow.role.RoleManager;
 import org.opendaylight.openflowplugin.impl.services.SalRoleServiceImpl;
 import org.opendaylight.openflowplugin.impl.util.DeviceStateUtil;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.SetRoleOutput;
+import org.opendaylight.yangtools.yang.common.RpcResult;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -68,9 +71,23 @@ public class RoleManagerImpl implements RoleManager {
     @Override
     public void onDeviceContextLevelUp(@CheckForNull final DeviceInfo deviceInfo, final LifecycleService lifecycleService) throws Exception {
         final DeviceContext deviceContext = Preconditions.checkNotNull(lifecycleService.getDeviceContext());
-        final RoleContext roleContext = new RoleContextImpl(deviceInfo, hashedWheelTimer, this);
+        final RoleContext roleContext = new RoleContextImpl(deviceInfo, hashedWheelTimer, this, lifecycleService);
         roleContext.setSalRoleService(new SalRoleServiceImpl(roleContext, deviceContext));
         Verify.verify(contexts.putIfAbsent(deviceInfo, roleContext) == null, "Role context for master Node %s is still not closed.", deviceInfo.getNodeId());
+        Futures.addCallback(roleContext.makeDeviceSlave(), new FutureCallback<RpcResult<SetRoleOutput>>() {
+                    @Override
+                    public void onSuccess(@Nullable RpcResult<SetRoleOutput> setRoleOutputRpcResult) {
+                        if (LOG.isDebugEnabled()) {
+                            LOG.debug("Role SLAVE was successfully propagated on device, node {}", deviceInfo.getNodeId().getValue());
+                        }
+                    }
+
+                    @Override
+                    public void onFailure(Throwable throwable) {
+                        LOG.warn("Was not able to set role SLAVE to device on node {} ",deviceInfo.getNodeId().getValue());
+                        lifecycleService.closeConnection();
+                    }
+                });
         lifecycleService.setRoleContext(roleContext);
         deviceInitializationPhaseHandler.onDeviceContextLevelUp(deviceInfo, lifecycleService);
     }
@@ -88,12 +105,7 @@ public class RoleManagerImpl implements RoleManager {
 
     @Override
     public void onDeviceContextLevelDown(final DeviceInfo deviceInfo) {
-        LOG.trace("onDeviceContextLevelDown for node {}", deviceInfo.getNodeId());
-        final RoleContext roleContext = contexts.remove(deviceInfo);
-        if (roleContext != null) {
-            LOG.debug("Found roleContext associated to deviceContext: {}, now trying close the roleContext", deviceInfo.getNodeId());
-            contexts.remove(deviceInfo.getNodeId(), roleContext);
-        }
+        contexts.remove(deviceInfo);
         deviceTerminationPhaseHandler.onDeviceContextLevelDown(deviceInfo);
     }
 
index fa3c643074a00fb42ee8041e6705ad5fe758dda4..b5563d6e86b1f95d160b8473e3ab134502ba1f1f 100644 (file)
@@ -119,8 +119,10 @@ class RpcContextImpl implements RpcContext {
                 final RoutedRpcRegistration<?> rpcRegistration = iterator.next().getValue();
                 rpcRegistration.unregisterPath(NodeContext.class, nodeInstanceIdentifier);
                 rpcRegistration.close();
-                LOG.debug("Closing RPC Registration of service {} for device {}.", rpcRegistration.getServiceType(),
-                        nodeInstanceIdentifier);
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Closing RPC Registration of service {} for device {}.", rpcRegistration.getServiceType(),
+                            nodeInstanceIdentifier);
+                }
             }
         }
     }
index c0c0a6544f190c69cae2bb3fc0dd157a95b5b6b6..de1ad3efea2b1e887d0f349537aa839348573fdf 100644 (file)
@@ -53,7 +53,7 @@ public class RoleService extends AbstractSimpleService<RoleRequestInputBuilder,
     }
 
     public Future<BigInteger> getGenerationIdFromDevice(final Short version) {
-        LOG.info("getGenerationIdFromDevice called for device:{}", getDeviceInfo().getNodeId().getValue());
+        LOG.info("getGenerationIdFromDevice called for device: {}", getDeviceInfo().getNodeId().getValue());
 
         // send a dummy no-change role request to get the generation-id of the switch
         final RoleRequestInputBuilder roleRequestInputBuilder = new RoleRequestInputBuilder();
index bf31c1a65491463e81f053bb75730cc2575ec69f..144d664c2b631c5203b4b207a66e87de749576ec 100644 (file)
@@ -27,6 +27,7 @@ import java.util.concurrent.ExecutionException;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 import javax.annotation.concurrent.GuardedBy;
+import org.opendaylight.mdsal.common.api.TransactionChainClosedException;
 import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
@@ -158,7 +159,9 @@ class StatisticsContextImpl implements StatisticsContext {
                 }
                 @Override
                 public void onFailure(final Throwable t) {
-                    StatisticsGatheringUtils.markDeviceStateSnapshotEnd(deviceContext, false);
+                    if (!(t instanceof TransactionChainClosedException)) {
+                        StatisticsGatheringUtils.markDeviceStateSnapshotEnd(deviceContext, false);
+                    }
                 }
             });
             return settableStatResultFuture;
index b47dfee91359d53c4d2708f326e923585be30420..22ea5b560fa0f80f0d93ba2dd7b09a677c80c6ba 100644 (file)
@@ -25,6 +25,7 @@ import javax.annotation.Nullable;
 import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionChainClosedException;
 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
 import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
 import org.opendaylight.openflowplugin.api.openflow.device.DeviceRegistry;
@@ -501,8 +502,9 @@ public final class StatisticsGatheringUtils {
                 .build();
         try {
             deviceContext.writeToTransaction(LogicalDatastoreType.OPERATIONAL, statusPath, gatheringStatus);
-        } catch (final Exception e) {
-            LOG.warn("Can't write to transaction: {}", e);
+        } catch (final TransactionChainClosedException e) {
+            LOG.warn("Can't write to transaction, transaction chain probably closed.");
+            LOG.trace("Write to transaction exception: ", e);
         }
 
         deviceContext.submitTransaction();
@@ -526,8 +528,9 @@ public final class StatisticsGatheringUtils {
                 .build();
         try {
             deviceContext.writeToTransaction(LogicalDatastoreType.OPERATIONAL, statusEndPath, gatheringStatus);
-        } catch (Exception e) {
-            LOG.warn("Can't write to transaction: {}", e);
+        } catch (TransactionChainClosedException e) {
+            LOG.warn("Can't write to transaction, transaction chain probably closed.");
+            LOG.trace("Write to transaction exception: ", e);
         }
 
         deviceContext.submitTransaction();
index 680e0e731a7973b5b119c1be880511aea43c9fa0..bd40f4c9f6e9a436549be66c89cbec0be738f3a4 100644 (file)
@@ -96,9 +96,8 @@ public class DeviceInitializationUtils {
      * @param deviceContext
      * @param switchFeaturesMandatory
      * @param convertorExecutor
-     * @return future - recommended to have blocking call for this future
      */
-    public static ListenableFuture<Void> initializeNodeInformation(final DeviceContext deviceContext, final boolean switchFeaturesMandatory, final ConvertorExecutor convertorExecutor) {
+    public static void initializeNodeInformation(final DeviceContext deviceContext, final boolean switchFeaturesMandatory, final ConvertorExecutor convertorExecutor) throws ExecutionException, InterruptedException {
         Preconditions.checkArgument(deviceContext != null);
         final DeviceState deviceState = Preconditions.checkNotNull(deviceContext.getDeviceState());
         final DeviceInfo deviceInfo = deviceContext.getDeviceInfo();
@@ -147,29 +146,11 @@ public class DeviceInitializationUtils {
             final Capabilities capabilities = connectionContext.getFeatures().getCapabilities();
             LOG.debug("Setting capabilities for device {}", deviceInfo.getNodeId());
             DeviceStateUtil.setDeviceStateBasedOnV13Capabilities(deviceState, capabilities);
-            deviceFeaturesFuture = createDeviceFeaturesForOF13(deviceContext, switchFeaturesMandatory, convertorExecutor);
+            createDeviceFeaturesForOF13(deviceContext, switchFeaturesMandatory, convertorExecutor).get();
         } else {
-            deviceFeaturesFuture = Futures.immediateFailedFuture(new ConnectionException("Unsupported version "
-                    + version));
+            throw new ExecutionException(new ConnectionException("Unsupported version " + version));
         }
 
-        Futures.addCallback(deviceFeaturesFuture, new FutureCallback<List<RpcResult<List<MultipartReply>>>>() {
-            @Override
-            public void onSuccess(final List<RpcResult<List<MultipartReply>>> result) {
-                LOG.debug("All init data for node {} is in submitted.", deviceInfo.getNodeId());
-                returnFuture.set(null);
-            }
-
-            @Override
-            public void onFailure(final Throwable t) {
-                // FIXME : remove session
-                LOG.trace("Device capabilities gathering future failed.");
-                LOG.trace("more info in exploration failure..", t);
-                LOG.debug("All init data for node {} was not submited correctly - connection has to go down.", deviceInfo.getNodeId());
-                returnFuture.setException(t);
-            }
-        });
-        return returnFuture;
     }
 
     private static void addNodeToOperDS(final DeviceContext deviceContext, final SettableFuture<Void> future) {
index cef89f4e45091c8e4b60bc3185626ffb422b946a..97417436e2cf81591ab6137aa9a6622f0b1b12fa 100644 (file)
@@ -45,6 +45,7 @@ import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueue;
 import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueHandler;
 import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueHandlerRegistration;
 import org.opendaylight.openflowplugin.api.OFConstants;
+import org.opendaylight.openflowplugin.api.openflow.OFPContext;
 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
 import org.opendaylight.openflowplugin.api.openflow.connection.OutboundQueueProvider;
 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
@@ -233,6 +234,7 @@ public class DeviceManagerImplTest {
         when(deviceContext.shuttingDownDataStoreTransactions()).thenReturn(Futures.immediateCheckedFuture(null));
         when(deviceContext.getPrimaryConnectionContext()).thenReturn(connectionContext);
         when(deviceContext.getDeviceState()).thenReturn(deviceState);
+        when(deviceContext.getState()).thenReturn(OFPContext.CONTEXT_STATE.WORKING);
 
         final ConcurrentHashMap<DeviceInfo, DeviceContext> deviceContexts = getContextsCollection(deviceManager);
         deviceContexts.put(deviceInfo, deviceContext);
index 43544aff61ddbab1d37510be01b095eb378e0e41..79554b8f8f868f8d3a4812b2b14551a049fe9205 100644 (file)
@@ -17,6 +17,9 @@ import static org.mockito.Mockito.when;
 
 import com.google.common.base.Optional;
 import com.google.common.util.concurrent.Futures;
+import java.math.BigInteger;
+import java.util.Collections;
+import java.util.Map;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 import org.junit.Assert;
@@ -31,8 +34,14 @@ import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
 import org.opendaylight.openflowplugin.api.openflow.registry.flow.FlowDescriptor;
 import org.opendaylight.openflowplugin.api.openflow.registry.flow.FlowRegistryKey;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNodeBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowId;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.flow.and.statistics.map.list.FlowAndStatisticsMapList;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.FlowCookie;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
@@ -62,7 +71,6 @@ public class DeviceFlowRegistryImplTest {
     public void setUp() throws Exception {
         nodeInstanceIdentifier = InstanceIdentifier.create(Nodes.class).child(Node.class, new NodeKey(new NodeId(NODE_ID)));
         when(dataBroker.newReadOnlyTransaction()).thenReturn(readOnlyTransaction);
-        when(readOnlyTransaction.read(any(), any())).thenReturn(Futures.immediateCheckedFuture(Optional.absent()));
         deviceFlowRegistry = new DeviceFlowRegistryImpl(dataBroker, nodeInstanceIdentifier);
         final FlowAndStatisticsMapList flowStats = TestFlowHelper.createFlowAndStatisticsMapListBuilder(1).build();
         key = FlowRegistryKeyFactory.create(flowStats);
@@ -77,11 +85,59 @@ public class DeviceFlowRegistryImplTest {
     public void testFill() throws Exception {
         final InstanceIdentifier<FlowCapableNode> path = nodeInstanceIdentifier.augmentation(FlowCapableNode.class);
 
+        final Flow flow = new FlowBuilder()
+                .setTableId((short)1)
+                .setPriority(10)
+                .setCookie(new FlowCookie(BigInteger.TEN))
+                .setId(new FlowId("HELLO"))
+                .build();
+
+        final Table table = new TableBuilder()
+                .setFlow(Collections.singletonList(flow))
+                .build();
+
+        final FlowCapableNode flowCapableNode = new FlowCapableNodeBuilder()
+                .setTable(Collections.singletonList(table))
+                .build();
+
+        when(readOnlyTransaction.read(any(), any())).thenReturn(Futures.immediateCheckedFuture(Optional.of(flowCapableNode)));
+
         deviceFlowRegistry.fill().get();
+        verify(dataBroker, times(2)).newReadOnlyTransaction();
+        verify(readOnlyTransaction).read(LogicalDatastoreType.CONFIGURATION, path);
+        verify(readOnlyTransaction).read(LogicalDatastoreType.OPERATIONAL, path);
+
+        final Map<FlowRegistryKey, FlowDescriptor> allFlowDescriptors = deviceFlowRegistry.getAllFlowDescriptors();
+        final FlowRegistryKey key = FlowRegistryKeyFactory.create(flow);
+
+        assertTrue(allFlowDescriptors.containsKey(key));
+
+        deviceFlowRegistry.markToBeremoved(key);
+        deviceFlowRegistry.removeMarked();
+    }
+
+    @Test
+    public void testFailedFill() throws Exception {
+        final InstanceIdentifier<FlowCapableNode> path = nodeInstanceIdentifier.augmentation(FlowCapableNode.class);
+
+        final Table table = new TableBuilder()
+                .setFlow(null)
+                .build();
 
+        final FlowCapableNode flowCapableNode = new FlowCapableNodeBuilder()
+                .setTable(Collections.singletonList(table))
+                .build();
+
+        when(readOnlyTransaction.read(any(), any())).thenReturn(Futures.immediateCheckedFuture(Optional.of(flowCapableNode)));
+
+        deviceFlowRegistry.fill().get();
         verify(dataBroker, times(2)).newReadOnlyTransaction();
         verify(readOnlyTransaction).read(LogicalDatastoreType.CONFIGURATION, path);
         verify(readOnlyTransaction).read(LogicalDatastoreType.OPERATIONAL, path);
+
+        final Map<FlowRegistryKey, FlowDescriptor> allFlowDescriptors = deviceFlowRegistry.getAllFlowDescriptors();
+
+        Assert.assertEquals(1, allFlowDescriptors.size());
     }
 
     @Test
index c45a9d50880b5eee2f64c0e15c3e74bb1e014691..3e8b905f10bb50a9fa0d3e4e74c295296a32c04c 100644 (file)
@@ -18,6 +18,7 @@ import org.mockito.Mockito;
 import org.mockito.runners.MockitoJUnitRunner;
 import org.opendaylight.controller.md.sal.common.api.clustering.CandidateAlreadyRegisteredException;
 import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
+import org.opendaylight.openflowplugin.api.openflow.lifecycle.LifecycleService;
 import org.opendaylight.openflowplugin.api.openflow.role.RoleContext;
 import org.opendaylight.openflowplugin.api.openflow.role.RoleManager;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
@@ -35,13 +36,15 @@ public class RoleContextImplTest {
     private DeviceInfo deviceInfo;
     @Mock
     private RoleManager roleManager;
+    @Mock
+    private LifecycleService lifecycleService;
 
     private final NodeId nodeId = NodeId.getDefaultInstance("openflow:1");
     private RoleContext roleContext;
 
     @Before
     public void setup() throws CandidateAlreadyRegisteredException {
-        roleContext = new RoleContextImpl(deviceInfo, hashedWheelTimer, roleManager);
+        roleContext = new RoleContextImpl(deviceInfo, hashedWheelTimer, roleManager, lifecycleService);
         Mockito.when(deviceInfo.getNodeId()).thenReturn(nodeId);
     }